MUG Nantes - MongoDB et son connecteur pour hadoop

Preview:

Citation preview

User Group Nantes

MongoDB et son connecteur pour Hadoop

Bruno Bonnin - @_bruno_b_

AGENDA

• Rappels sur Hadoop et son éco-système• Un connecteur, pour quoi faire ?• Démo avec MongoDB, Hive, Spark (version Java en

300 lignes ou version Python en 8 lignes)

Nantes User Group

HADOOPNantes User Group

Source : http://docs.hortonworks.com/

HADOOPNantes User Group

Source : http://docs.hortonworks.com/

CONNECTEUR MONGODB POUR HADOOP

• Pour quoi faire ?• Les entreprises ont des données stockées dans

MongoDB, … et dans Hadoop• Besoin d’intégration• Traiter les données à partir de plusieurs

sources (en éliminant des étapes d’import/export)

• …

Nantes User Group

QUELQUES CAS D’UTILISATIONNantes User Group

Pour des analyses complexes faites dans Hadoop (avec des jobs Map/Reduce)

Traitement de données de plusieurs sources faites dans Hadoop (repo central de données)

Hadoop extrait les données de données de diverses sources, les traite et pousse les résultats dans MongoDB

CONNECTEUR MONGODB POUR HADOOP

• Support pour :• Hive : langage de requêtes SQL-like• Pig : langage de scripts pour définir des

workflows de traitement• Spark• Flume• …

Nantes User Group

DEMO : HIVENantes User Group

Valeur actions par minute

Calcul des min/max dans Hadoop (M/R) +Jointure avec données dans Hadoop

1

3

2

Sociétés

Valeur min/max actions par jour

CREATE EXTERNAL TABLESELECT MIN (), MAX()

INSERT

collectionTable Hive

Table Hive

DEMO : HIVE

ADD JAR mongo-java-driver-3.0.3.jar;ADD JAR mongo-hadoop-core-1.4.0.jar;ADD JAR mongo-hadoop-hive-1.4.0.jar;

CREATE EXTERNAL TABLE stock_prices ( id STRUCT<oid:STRING, bsontype:INT>, symbol STRING, timestamp STRING, volume INT )STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'

WITH SERDEPROPERTIES ( 'mongo.columns.mapping'='{"id":"_id", "symbol":"Symbol", "timestamp":"Timestamp", "volume":"Volume"}' )

TBLPROPERTIES( 'mongo.uri'='mongodb://localhost:27017/marketdata.stock_prices' );

Nantes User Group

DEMO : SPARKNantes User Group

Traitement des données par Spark (groupBy, aggregateByKey)

1

3’

2

Résultat #2 : Stockage dans des

fichiersSource :

Valeur actions par minute

collection HDFS

Résultat #1 : Valeur min/max actions par jour

collection

3 SAVEINSERT

FIND

DEMO : HIVE

from pyspark import SparkContext

sc = SparkContext("local", "Test MongoDB Connector")

# Config MongoDB inputConfig = { "mongo.input.uri" : "mongodb://localhost:27017/marketdata.stock_prices" }

# Config pour RDD qui va lire les data dans MongoDB inputFormatClassName = "com.mongodb.hadoop.MongoInputFormat" keyClassName = "java.lang.Object" valueClassName = "org.bson.BSONObject"

stockPricesRDD = sc.newAPIHadoopRDD( inputFormatClassName, keyClassName, valueClassName, None, None, inputConfig )

# Les traitements... prices = stockPricesRDD.values()

# ... groupby sur (symbol, day) groupByRDD = prices.groupBy(lambda doc: (doc["Symbol"], doc["Day"]))

Nantes User Group

LIENS

• Plus d’infos:– http://docs.mongodb.org/ecosystem/tools/hadoop/ – https://github.com/mongodb/mongo-hadoop

• Code:– Démo :

• https://github.com/bbonnin/MUG-Nantes-Demo-Hadoop – Complément :

• https://github.com/bbonnin/hadoop-mongodb

Nantes User Group

MERCI !Nantes User Group