16
Utiliser Hadoop en Perl avec HadoopStreaming

Utiliser Hadoop en perl avec HadoopStreaming

Embed Size (px)

Citation preview

Utiliser Hadoop en Perl avec

HadoopStreaming

le plus simple : apprendre java

un language simple

facile à apprendre

relativement répandu

commence à être assez stable

et si je ne veux pas apprendre ?

hadoop fournit une classe HadoopStreaming pour lancer des jobs hors de java

utilise STDIN et STDOUT pour communiquer avec vos processus

prend 2 arguments principaux : un programme de map, et un autre de reduce

etape 1 : importation des données avec sqoop

/usr/bin/sqoop-import -m 64 --connect jdbc:mysql://db_host/db_name --username db_user --password db_password --table db_table --hive-import --hive-delims-replacement ' ' --null-string '\N' --null-non-string '\N' --hive-overwrite --hive-table db_table_name_in_hive --split-by my_column

etape 1 : importation des données avec sqoop

Les données vont être écrite sous forme tabulaire dans HDFS, dans /user/hive/warehouse/ db_table_whatever/part-000[0-9]{2}

Utilisable avec HiveQL directment

mais sinon : hadoop fs -put ./fichier.tsv /user/dmorel/

etape 2 : écriture des scripts map + reduce

while ( <STDIN> ) { chomp; my @fields = split /\t/; # make the key print join( '+', @fields[ ( 16, 25 ) ] ); # separate key and value print "+\t"; # make the value print join(",", @fields[ ( 28, 30, ... ) ] ) . "\n";}

le mapper peut être plus «complexe» !

while ( <STDIN> ) { chomp; my @fields = split /\t/; # make the key my $key = join( '+', @fields[ ( 16, 25 ) ] , q{} ); # make the valueS my @base_values = @fields[ ( 28 .. 150 ) ] ); my @final_vals = mybigcombinesub(@base_values); for ( @final_values ) { print $key, join(«\t», @$_), «\n»; }}

exemple d’utilisation (ouf, pas de code...)

trouver les combinaisons de 3, 4, 5 objets ou + les plus fréquemment achetées par les clients d’un site e-commerce

impossible en SQL, trop de combinaisons, jointures trop lourdes

facile avec Hadoop : juste produire toutes les combinaisons pour chaque client séquentiellement, réduire ensuite, et compter

reducer : analyser ligne à ligne, vérifier le key switch

my ($k_prev, $processed_lines_in_key, @output_lines, $value_to_print);while (<STDIN>) { chomp; my ($key, $value) = split /\t/; if ( $k_main ne $k_prev ) { $processed_lines_in_key = 0; _flush_to_stdout(\@output_lines); $value_to_print = ''; $k_prev = $k_main; } $value_to_print .= $value; push @output_lines, [ $k_main, $value_to_print ] if (int (rand()*3) == 2);}

etape 3 : test

/usr/bin/hadoop fs -cat \ /user/david/myfile.tsv | head -10000 | ./mapper.pl | sort | ./reducer.pl

etape 4 : lancement de hadoop

hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-*.jar \ -D stream.map.output.field.separator='+' \ -D stream.num.map.output.key.fields=2 \ -D map.output.key.field.separator='+' \ -D num.key.fields.for.partition=1 \ -D mapred.text.key.partitioner.options=-k1 \ -D mapred.compress.map.output=true \ -D mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec \ -input /user/hive/warehouse/mytable/ \ -output myresult.tsv \ -mapper ./mapper.pl \ -reducer ./reducer.pl \ -file ./mapper.pl \ -file ./reducer.pl \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

/usr/bin/hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-*.jar

-D stream.map.output.field.separator='+' \-D stream.num.map.output.key.fields=2 \

-D map.output.key.field.separator='+' \-D num.key.fields.for.partition=1 \-D mapred.text.key.partitioner.options=-k1 \-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

-D mapred.compress.map.output=true \ -D mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec \

-input /user/hive/warehouse/mytable/ \ -output myresult.tsv \ -mapper ./mapper.pl \ -reducer ./reducer.pl \ -file ./mapper.pl \ -file ./reducer.pl \

etape 5 : récupération du fichier de sortie

hadoop fs -get /user/dmorel/myresult.tsv

Facile, non ?

Questions ?

[email protected]