Upload
bruno-bonnin
View
760
Download
1
Embed Size (px)
Citation preview
User Group Nantes
MongoDB et son connecteur pour Hadoop
Bruno Bonnin - @_bruno_b_
AGENDA
• Rappels sur Hadoop et son éco-système• Un connecteur, pour quoi faire ?• Démo avec MongoDB, Hive, Spark (version Java en
300 lignes ou version Python en 8 lignes)
Nantes User Group
HADOOPNantes User Group
Source : http://docs.hortonworks.com/
HADOOPNantes User Group
Source : http://docs.hortonworks.com/
CONNECTEUR MONGODB POUR HADOOP
• Pour quoi faire ?• Les entreprises ont des données stockées dans
MongoDB, … et dans Hadoop• Besoin d’intégration• Traiter les données à partir de plusieurs
sources (en éliminant des étapes d’import/export)
• …
Nantes User Group
QUELQUES CAS D’UTILISATIONNantes User Group
Pour des analyses complexes faites dans Hadoop (avec des jobs Map/Reduce)
Traitement de données de plusieurs sources faites dans Hadoop (repo central de données)
Hadoop extrait les données de données de diverses sources, les traite et pousse les résultats dans MongoDB
CONNECTEUR MONGODB POUR HADOOP
• Support pour :• Hive : langage de requêtes SQL-like• Pig : langage de scripts pour définir des
workflows de traitement• Spark• Flume• …
Nantes User Group
DEMO : HIVENantes User Group
Valeur actions par minute
Calcul des min/max dans Hadoop (M/R) +Jointure avec données dans Hadoop
1
3
2
Sociétés
Valeur min/max actions par jour
CREATE EXTERNAL TABLESELECT MIN (), MAX()
INSERT
collectionTable Hive
Table Hive
DEMO : HIVE
ADD JAR mongo-java-driver-3.0.3.jar;ADD JAR mongo-hadoop-core-1.4.0.jar;ADD JAR mongo-hadoop-hive-1.4.0.jar;
CREATE EXTERNAL TABLE stock_prices ( id STRUCT<oid:STRING, bsontype:INT>, symbol STRING, timestamp STRING, volume INT )STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
WITH SERDEPROPERTIES ( 'mongo.columns.mapping'='{"id":"_id", "symbol":"Symbol", "timestamp":"Timestamp", "volume":"Volume"}' )
TBLPROPERTIES( 'mongo.uri'='mongodb://localhost:27017/marketdata.stock_prices' );
Nantes User Group
DEMO : SPARKNantes User Group
Traitement des données par Spark (groupBy, aggregateByKey)
1
3’
2
Résultat #2 : Stockage dans des
fichiersSource :
Valeur actions par minute
collection HDFS
Résultat #1 : Valeur min/max actions par jour
collection
3 SAVEINSERT
FIND
DEMO : HIVE
from pyspark import SparkContext
sc = SparkContext("local", "Test MongoDB Connector")
# Config MongoDB inputConfig = { "mongo.input.uri" : "mongodb://localhost:27017/marketdata.stock_prices" }
# Config pour RDD qui va lire les data dans MongoDB inputFormatClassName = "com.mongodb.hadoop.MongoInputFormat" keyClassName = "java.lang.Object" valueClassName = "org.bson.BSONObject"
stockPricesRDD = sc.newAPIHadoopRDD( inputFormatClassName, keyClassName, valueClassName, None, None, inputConfig )
# Les traitements... prices = stockPricesRDD.values()
# ... groupby sur (symbol, day) groupByRDD = prices.groupBy(lambda doc: (doc["Symbol"], doc["Day"]))
Nantes User Group
LIENS
• Plus d’infos:– http://docs.mongodb.org/ecosystem/tools/hadoop/ – https://github.com/mongodb/mongo-hadoop
• Code:– Démo :
• https://github.com/bbonnin/MUG-Nantes-Demo-Hadoop – Complément :
• https://github.com/bbonnin/hadoop-mongodb
Nantes User Group
MERCI !Nantes User Group