MA0979 - BigDatacosy.univ-reims.fr/~lsteffenel/cours/Master2/INFO0939-BigData/Cours... · M2 SEP...

Preview:

Citation preview

L.A. Steffenel

MA0979 - BigDataCours 2 – Hadoop et MapReduce

M2 SEPMA0979

L.A. SteffenelObjectifs

u Faire un tour des caractéristiques de la plate-forme Hadoop

u Créer et exécuter un premier programme

u En Java

u En Python

M2 SEPMA0979

L.A. SteffenelLes origines de Hadoop

u Les origines de Hadoop datent de 2001. À l’époque, le projet Apache Nutchcherchait à faire un moteur de recherche libre

u Le plus grand défi était le passage à l’échelle des opérations d’indexation

u Un article de Google a donné l’élan nécessaire

u Distribution des tâches + système de fichiers associé

u Hadoop lancé comme projet open-source en 2005

u Deux principaux composants

u MapReduce – un framework pour le calcul réparti

u Division des tâches entre les nœuds

u Ordonnancement et surveillance de l’exécution des tâches

u HDFS – un système de fichiers distribué

u Accent sur la « data locality » pour accélérer les opérations d’E/S

M2 SEPMA0979

L.A. SteffenelDes racines dans la programmation fonctionnelle

u Map

u Applique une fonction sur chaque élément d’une liste

u Retourne une liste de résultats

u Map(f(x),X[1:n]) à [f(X[1]),...,f(X[n])]

u Exemple :

u Map(x2,[0,1,2,3,4,5])=[0,1,4,9,16,25]

u Reduce/fold

u Fait l’itération d’une fonction sur une liste d’éléments

u Applique la fonction sur les résultats précédents et l’élément courant

u Retourne un seul résultat

u Exemple :

u Reduce(x+y,[0,1,2,3,4,5])=(((((0+1)+2)+3)+4)+5)=15

M2 SEPMA0979

L.A. SteffenelModèle de Calcul Parallèle

u Un algorithme Map-Reduce = job

u Opère avec des pairs clé-valeur : (k,V)

u Des types primitifs, Strings ou des structures de données complexes

u Les entrées et sorties d’un job Map-Reduce ont la forme de pairs {(k,V)}

u Un job MR est défini par deux fonctions

u map: (k1;v1)→{(k2;v2)}

u reduce: (k2;{v2})→{(k3;v3)}

M2 SEPMA0979

L.A. SteffenelL’exemple du WordCount

u Presque tous les tutoriaux Hadoop utilisent l’exemple du WordCount

u Simple pour comprendre

u Pas vraiment intéressant (et très lent)

u Entrée : un grand fichier texte

u Sortie : le nombre d’occurences de chaque mot dans le fichier

u Exemple :

u Pour le texte "Robur the Coqueror" de Jules Verne, nous avons

11 fois le mot "corpuscles", une fois le mot "susceptible", 5 fois "clear", etc.

M2 SEPMA0979

L.A. SteffenelWordCount – comment penser en MR

u Dans un code séquentiel, WordCount peut être implémenté avec un parseur String et un HashList <k, V> où :

u La clé est le "mot"

u La valeur est un entier incrémenté à chaque fois que ce mo apparaît

u Même dans un environnement distribué, la procédure est presque la même :

u Etape 1 : chaque machine parse une partie du fichier et enregistre le résultat partiel

u Etape 2 : à la fin, nous faisons la somme des valeurs associées à chaque clé

M2 SEPMA0979

L.A. SteffenelLa partie Map

u L'entrée de Map est un ensemble de mots {w} d'une partition du texte

u Clé=w Valeur=null

u La fonction Map calcule

u Le nombre de fois qu'une clé w apparaît dans la partition

u La sortie de la fonction map (pour une machine) est une liste sous la forme

u <w, nombre de mots>

u Si nous avons plusieurs machines calculant Map, la sortie "finale" est plutôt une liste d'entrées

u {<w, {valeur1, valeur2, ...}>}

M2 SEPMA0979

L.A. SteffenelLa partie Reduce

u L'entrée de reduce correspond à la sortie de la fonction map

u {<clé, {valeur}>}, où

u cle= “mot”

u Chaque valeur est un entier

u La fonction Reduce calcule

u Le nombre total d'occurrences d'un mot k :

u La somme de toutes les valeurs avec la clé k

u La sortie de la fonction Reduce

u <clé, N>

M2 SEPMA0979

L.A. SteffenelLe flux des données

Input Map Shuffle & Sort Reduce Output Finish

dogs

like

dogs

map

map

map

reduce

reduce

dogs, 2

like, 1

dogs, 2like, 1

M2 SEPMA0979

L.A. SteffenelExemple 2 : la longueur moyenne des mots

u Entrée : un grand fichier texte

u Sortie : la longueur moyenne des mots dans le fichier

u Exemple :

u µ({dogs,like,cats})=4

u µ({parrot,cow,cat})=4

µ =1n

xi1

n

M2 SEPMA0979

L.A. SteffenelLa partie Map

u L'entrée de Map est un ensemble de mots {w} d'une partition du texte

u Clé=w Valeur=null

u La fonction Map calcule

u Le nombre de mots dans la partition

u La longueur totale des mots ∑length(w)

u La sortie de la fonction map (pour une machine) contient deux entrées

u <“count”, #mots>

u <“length”, longueur totale>

M2 SEPMA0979

L.A. SteffenelLa partie Reduce

u L'entrée Reduce

u {<clé, {valeur}>}, où

u clé = “count” ou “length”

u valeur est un entier

u La fonction Reduce calcule

u Le nombre total de mots : N = somme de toutes les valeurs “count”

u La longueur totale des mots : L = somme de toutes les valeurs “length”

u La sortie Reduce

u <“count”, N>

u <“length”, L>

u Le résultat

u Le résultat est obtenu en faisant la division µ=L/N

M2 SEPMA0979

L.A. SteffenelLe flux de données

Input Map Shuffle & Sort Reduce Output Finish

dogs

like

cats

map

map

map

reduce

reduce

count, 3

length,12

mean = length/count

M2 SEPMA0979

L.A. SteffenelExemple 3 : le nombre de voyelles et consonnes

u Entrée : un grand fichier de texte

u Sortie : le nombre de voyelles (nV) et de consonnes (nC) dans le texte

u Exemple :

u nV({dogs,like,cats})=4

u nC({dogs,like,cats})=8

M2 SEPMA0979

L.A. SteffenelLa partie Map

u L'entrée de Map est un ensemble de mots {w} d'une partition du texte

u Clé=null Valeur=w

u La fonction Map calcule

u Le nombre de consonnes dans le mot

u Le nombre de voyelles dans le mot

u La sortie Map

u <“nV”, #voyelles>

u <“nC”, #consonnes>

M2 SEPMA0979

L.A. SteffenelLa partie Reduce

u La partie Reduce

u {<clé, {valeur}>}, où

u clé = “nC”, “nV”

u Valeur est un entier

u La fonction Reduce calcule

u Le nombre total de voyelles = somme de toutes les valeurs "nV"

u Le nombre total de consonnes = somme de toutes les valeurs "nC"

u La sortie de Reduce

u <"V", #Voyelles>

u <“C”, #Consonnes>

M2 SEPMA0979

L.A. SteffenelLe Flux des Données

Parallel Computation

dogs C, 3 V, 1

C, 8

15

like

cats

C, 2 V, 2

C, 3 V, 1

V, 4

M2 SEPMA0979

L.A. SteffenelHDFS

u Le framework Hadoop travaille sur deux clusters virtuels :

u Un cluster pour les données (HDFS) et un autre pour le calcul (MapReduce)

u Ces deux clusters ont été conçus pour être couplés (travailler ensemble)

u Le Hadoop Distributed File System

u Système de fichiers distribué (comme NFS, Lustre, etc)

u Organise les données comme des fichiers et répertoires

u MapReduce

u Ordonnancement et exécution des jobs

Parallel Computation Framework

• Two virtual clusters: HDFS and MapReduce

• Physically tightly coupled. Designed to work together

• Hadoop Distributed File System. View data as files and directories

• MapReduce. Job scheduling and execution framework

17

M2 SEPMA0979

L.A. SteffenelHDFS

u HDFS est un système de fichiers distribue, extensible et portable u Ecrit en Java

u Permet de stocker de très gros volumes de données sur un grand nombre de nœuds

u Quand un fichier mydata.txt est enregistre dans HDFS,il est décomposé en grands blocs

u Taille par défaut 64Mo

u Chaque bloc a un nom unique: blk_1, blk_2...

!  HDFS est un système de fichiers distribué,

extensible et portable

!  Ecrit en Java

!  Permet de stocker de très gros volumes de

données sur un grand nombre de

machines (nœuds) équipées de disques

durs banalisés # Cluster

!  Quand un fichier mydata.txt est enregistré

dans HDFS, il est décomposé en grands

blocs (par défaut 64Mo), chaque bloc

ayant un nom unique: blk_1, blk_2…

8 HDFS : Hadoop Distributed File System

64 Mo

64 Mo

22 Mo

mydata.txt (150 Mo)

blk_1

blk_2

blk_3

Cluster

Nœuds

M2 SEPMA0979

L.A. SteffenelHDFS

u Chaque bloc est enregistre dans un nœud différent du cluster

u DataNode : démon sur chaque nœud du cluster

u NameNode : démon s’exécutant sur une machine séparée

u Contient des métadonnées

u Permet de retrouver les nœuds qui exécutent les blocs d’un fichier

!  Chaque bloc est enregistré dans un nœud

différent du cluster

!  DataNode : démon sur chaque nœud du

cluster

!  NameNode :

o  Démon s’exécutant sur une machine

séparée

o  Contient des méta-données

o  Permet de retrouver les nœuds qui

exécutent les blocs d’un fichier

9 HDFS : Hadoop Distributed File System

64 Mo

64 Mo

22 Mo

mydata.txt (150 Mo)

blk_1

blk_2

blk_3

DataNode

DN

DN DN

DN DN

NN NameNode

!  Chaque bloc est enregistré dans un nœud

différent du cluster

!  DataNode : démon sur chaque nœud du

cluster

!  NameNode :

o  Démon s’exécutant sur une machine

séparée

o  Contient des méta-données

o  Permet de retrouver les nœuds qui

exécutent les blocs d’un fichier

9 HDFS : Hadoop Distributed File System

64 Mo

64 Mo

22 Mo

mydata.txt (150 Mo)

blk_1

blk_2

blk_3

DataNode

DN

DN DN

DN DN

NN NameNode

M2 SEPMA0979

L.A. SteffenelHDFS - Réplication

u Si l’un des nœuds a un problème, les données seront perdues u Hadoop réplique chaque bloc 3 fois

u Il choisit 3 nœuds au hasard, et placeune copie du bloc dans chacun d’eux

u En cas de panne, le système régénère des copies pour garantir le taux de réplication

u Concept de Rack Awareness (rack = baie de stockage)

u Possibilité d'avoir un namenode "backup"

!  Chaque bloc est enregistré dans un nœud

différent du cluster

!  DataNode : démon sur chaque nœud du

cluster

!  NameNode :

o  Démon s’exécutant sur une machine

séparée

o  Contient des méta-données

o  Permet de retrouver les nœuds qui

exécutent les blocs d’un fichier

9 HDFS : Hadoop Distributed File System

64 Mo

64 Mo

22 Mo

mydata.txt (150 Mo)

blk_1

blk_2

blk_3

DataNode

DN

DN DN

DN DN

NN NameNode

!  Si l’un des nœuds a un problème, les données seront perdues

o  Hadoop réplique chaque bloc 3 fois

o  Il choisit 3 nœuds au hasard, et place une copie du bloc dans chacun d’eux

o  Si le nœud est en panne, le NN le détecte, et s’occupe de répliquer encore les blocs qui y étaient hébergés pour avoir toujours 3 copies stockées

o  Concept de Rack Awareness (rack = baie de stockage)

!  Si le NameNode a un problème ?

11 HDFS : Hadoop Distributed File System

64 Mo

64 Mo

22 Mo

mydata.txt (150 Mo)

blk_1

blk_2

blk_3

DN

NN

DN DN

DN DN

M2 SEPMA0979

L.A. SteffenelHDFS + MapReduce

u L'entrée de MapReduce doit être fait à partir de fichiers dans HDFS

u Chaque bloc contient une liste de pairs clé-valeur

u Les tâches Map sont préférentiellement assignées aux nœuds contenant un block

u L'entrée des tâches Map sont des fichiers locaux, tout comme la sortie

u Les résultats des maps seront groupés : un groupe par reducer

u Chaque groupe est ordonnée (sorted)

u Les tâches Reduce sont assignées à un ou plusieurs nœuds

u Les tâches Reduce récupèrent ses entrées à partir de tous les nœuds Map associées à son groupe

u Le résultat est calculé et stocké dans un fichier HDFS

u La sortie est un ensemble de fichiers dans HDFS (un fichier par reducer)

M2 SEPMA0979

L.A. SteffenelHadoop 1 vs Hadoop 2

u Jusqu'à 2012, Hadoop reposait entièrement sur le couplage MapReduce-HDFSu Ce sont les versions 0.2x et 1.x

u EN 2012 on a vu l'arrivée de Hadoop 2.x, qui déconnecte HDFS de MapReduceu Possibilité d'exécuter des applications autres que MapReduce

u Possible grâce au nouveau mécanisme d'ordonnancement YARN

u Ce qui changeu Une redistribution des rôles (pas très compliqué)

u Une gestion des ressources plus structurée

u Une API légèrement plus souple pour la programmation MapReduceu On garde l'ancienne API quand même

u La réplication des services clés (Haute Disponibilité)

M2 SEPMA0979

L.A. SteffenelLes Rôles dans Hadoop 1 et 2

u On crée le rôle de RessourceManager qui travaille au dessus des ApplicationManager (ancien JobTracker)

u Les exécutions se font dans des "containers"

Client

ResourceManagerNameNode

NodeManagerApplicationMaster

Client

DataNode

NodeManagerContainerDataNode

NodeManager

DataNode

NodeManager

DataNode

NodeManagerContainer

DataNode

NodeManager

DataNodeContainer

ApplicationMaster

Container Container

YARN / AppHDFS

YARN

Hadoop 2.xdaemon

architectureClient

JobTrackerNameNode

TaskTrackerDataNode

TaskTrackerDataNode

TaskTrackerDataNode

Hadoop 1 Hadoop 2

M2 SEPMA0979

L.A. SteffenelL'API MapReduce en Java

u Chaque programme MapReduce doit spécifier un Mapper et un Reducer (pas obligatoire)u Le Mapper a une méthode map qui transforme une entrée (clé,

valeur) en un nombre arbitraire de paires intermédiaires (clé’, valeur’)

u Par défaut, une instance Mapper sera lancée par fichier d'entrée

u Le Reducer a une méthode reduce qui transforme les clés intermédiaires (clé’, valeur’*) et les agrège dans un nombre arbitraire de paires de sortie (clé’’, valeur’’)

u Par défaut, un reducer par noeud

M2 SEPMA0979

L.A. SteffenelWritable et WritableComparable

u Hadoop utilisez des types de données propres pour la sérialisation et les appels via RPC. Cela oblige que les types de données soient compatibles avec les interfaces Writable et WritableComparable

u Plusieurs types de données sont déjà offerts par l'API :

u Types primitifs : IntWritable, Text, FloatWritable, . . .

u Arrays et dictionaires : MapWritable, ArrayWritable, . . .

M2 SEPMA0979

L.A. SteffenelDe retour à notre exemple : la longueur moyenneu Entrée : un grand fichier texte

u Le Map calcule

u Le nombre de mots dans la partition à (clé "count")

u La longueur totale des mots à (clé "length")

u Le Reduce calcule

u Le nombre total de mots : N = somme de toutes les valeurs “count”

u La longueur totale des mots : L = somme de toutes les valeurs “length”

u Finalisationu Après le Reduce, notre application calcule la longueur moyenne

des mots dans le fichier

M2 SEPMA0979

L.A. SteffenelLe Flux des Données

29

Input Map Shuffle & Sort Reduce Output Finish

dogs

like

cats

map

map

map

reduce

reduce

count, 3

length,12

mean = length/count

M2 SEPMA0979

L.A. SteffenelLa classe Mapper

La classe Mapper avec la

méthode map

M2 SEPMA0979

L.A. SteffenelLa classe Reducer

Le reduceparcourt les clés et fait la somme

M2 SEPMA0979

L.A. SteffenelPour faire la moyenne

Cette méthode doit être executée

après map-reduce

M2 SEPMA0979

L.A. SteffenelConfiguration d'un Jobu Afin de lancer une application, nous devons définir un Job

u Job mrjob = Job.getInstance(conf, "nom");

u Les Jobs sont "guidés" par une classe Configuration

u Configuration récupère des pairs de propriétés (par défaut ou crées par l'utilisateur)

u Ceci permet de gérer certaines propriétés de l'exécution

u conf.set(“mapred.job.name”, “MyApp”);

u conf.set(“my.string”, “foo”);

u conf.setInteger(“my.integer”, 12);

u Avec Job, on peut définir les classes mapper, reducer, etc.

33

M2 SEPMA0979

L.A. SteffenelMaintenant, tout ensemble

Ici on indique les classes pour

le map et le reduce

Cette partie s'exécute après le map-reduce

M2 SEPMA0979

L.A. SteffenelMise en Route

u Compilationu Il faut inclure plusieurs jars de Hadoop dans votre classpath. Le plus simple

est de définir une variable HADOOP_CLASSPATH comme suit :

export HADOOP_CLASSPATH=$(bin/hadoop classpath)

u Ensuite, vous compilez votre code

javac -classpath ${HADOOP_CLASSPATH}:. -d Average/ AverageLength.java

u Pour finir, il faut produire un jar avec votre code

jar -cvf AverageLength.jar -C Average/ .

u Les programmes MapReduce dans le “jar” seront placés dans le HDFS et récupérés par chaque noeud exécutant Hadoop

M2 SEPMA0979

L.A. SteffenelModes d'Exécution Hadoop

u Hadoop propose trois modes d'exécution :u Local (standalone)

u Bon pour des petits tests et debugage

u Local (pseudo-distributed)u Les tâches sont lancées dans différentes VM Java

u La configuration est presque comme celle d'un cluster

u Fully-distributed (cluster)u Toutes les machines du cluster sont configurées avec Hadoop

u Une topologie maître-esclave est mise en place

M2 SEPMA0979

L.A. SteffenelMode Standalone

u Dans le mode standalone, il suffit d'une configuration minimale:

u Télécharger Hadoop à partir de http://hadoop.apache.org

u Dézipper et éditer etc/hadoop/hadoop-env.sh

u Il faut définir JAVA_HOME avec le chemin vers votre executable java

u Est-ce que ça marche ?

u Essayez d'exécuter bin/hadoop

u Il doit vous montrer la liste de paramètres acceptés par Hadoop

M2 SEPMA0979

L.A. Steffenel

M2 SEPMA0979

L.A. SteffenelLancer notre App en mode Standalone

u Si on considère que:

u Répertoire d'input : /home/toto/Average/input

u Répertoire de sortie : /home/toto/Average/output

u Ne le créez pas ! Hadoop fait ça

u Il suffit de mettre quelques fichiers texte dans le répertoire input et d'exécuter l'application :

u $ bin/hadoop jar AverageLength.jar AverageLength/home/toto/Average/input /home/toto/Average/output

M2 SEPMA0979

L.A. SteffenelVérifier les résultats

u Check the results:

u $ cat /home/toto/Average/output/part-r-00000

count 245

length 2354

u Hey, où est la moyenne ? Le fichier part-r-00000 n'a que les entrées count et length ?u La moyenne est dans la sortie de stdout. Souvenez-vous,

on a calculé la moyenne après le dernier reduce

u ATTENTION : les "println" à l'intérieur des map/reduce ne sont pas affichées sur la console (mais on les trouve dans le fichier de log)

M2 SEPMA0979

L.A. SteffenelEt si je ne veux pas coder en Java ?

u Hadoop permet la composition de jobs map-reduce avec d'autres langages de programmationu Hadoop Streaming – très utilisé pour l'exécution de mappers et

reducers codés avec des langages de programmation interprétées

u Python

u Ruby

u Scripts shell !!!

u Hadoop Pipes – interface "réseau" (utilise des sockets) pour l'interfaçage avec du code C et C++

u Pas trop de succès, cette interface n'a pas été portée dans Hadoop 2 (mais la compatibilité de code est garantie)

M2 SEPMA0979

L.A. SteffenelHadoop Streaming

u C'est en réalité une application disponible dans hadoop-streaming-*.jar

u Accepte comme paramètres le chemin de exécutables système qui feront le rôle de mapper et reducer

u Les données seront passées par des streams système (genre cat file1 > file2)

u Pour l'utilisation de base, il suffit de renseigner quatre paramètres

u hadoop jar hadoop-streaming-*.jar -input myInputDirs -output myOutputDir-mapper /bin/cat -reducer /usr/bin/wc

u Il n'est pas rare d'utiliser du code Python ou Ruby

M2 SEPMA0979

L.A. SteffenelExemple : Wordcount en Python

u Mapper

#!/usr/bin/env pythonimport sys

for line in sys.stdin: # remove leading and trailing whitespaceline = line.strip() # split the line into wordswords = line.split() for word in words: print '%s\t%s' % (word, 1)

M2 SEPMA0979

L.A. SteffenelExemple : Wordcount en Python

u Reducer

#!/usr/bin/env pythonfrom operator import itemgetterimport sys

current_word = None current_count = 0 word = None

for line in sys.stdin: line = line.strip() word, count = line.split('\t', 1) try:

count = int(count) except ValueError:

continue if current_word == word:

current_count += count else:

if current_word: print '%s\t%s' % (current_word, current_count)

current_count = count current_word = word

if current_word == word: print '%s\t%s' % (current_word, current_count)

Raccourci car Hadoop trie les clés

Le dernier mot est toujours imprimé

M2 SEPMA0979

L.A. SteffenelExemple : Wordcount en Python

u Pour l'exécution, il suffit de passer les exécutables Python (chmod +x, au cas où) et les fichiers d'entrée et sortie

u bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \-file /home/hduser/mapper.py \-mapper /home/hduser/mapper.py \-file /home/hduser/reducer.py \-reducer /home/hduser/reducer.py \-input /user/hduser/gutenberg/* \-output /user/hduser/gutenberg-output

u Autre option : on enregistre les exécutables aussi sur HDFS et donc pas besoin d'appeler les paramètres -file

Recommended