20

Luigi Paris.py meetup presentation

Embed Size (px)

Citation preview

Page 1: Luigi Paris.py meetup presentation
Page 2: Luigi Paris.py meetup presentation

Moi

~2 ans de pythonFull Stack dev chez AlephDjonasbru.com

Page 3: Luigi Paris.py meetup presentation

DATA PIPELINES MADE EASY

Page 4: Luigi Paris.py meetup presentation

Le problème

Des scripts......qui dépendent du résultat d’autres scripts...qui dépendent eux aussi d’autres scripts...qui dépendent des données d’un service

externe...qui plante une fois sur trois...le tout lancé depuis douze CRON

Page 5: Luigi Paris.py meetup presentation
Page 6: Luigi Paris.py meetup presentation

$> pip install luigi

Page 7: Luigi Paris.py meetup presentation

Luigi à la rescousse

Luigi s’occupe....de gérer les dépendances entre les tâches

..de gérer la reprise en cas d’erreur

..d'éviter d’avoir des tâches qui tournent en double

..de visualiser l’état de tout ça

Page 8: Luigi Paris.py meetup presentation

Luigi à la rescousse.. mais pas trop

Il ne va pas....lancer les tâches..repartir les tâches sur votre cluster

..faire le café

Page 9: Luigi Paris.py meetup presentation

La base

Task & Target

Page 10: Luigi Paris.py meetup presentation

Task

Task == N’importe quoiTrois méthodes à implémenter :

requires() --> Taskrun()output() --> Target

Page 11: Luigi Paris.py meetup presentation

Target

Target == Un résultat d’une tâcheun fichier sur disque..ou une partition HDFS..ou une ligne dans votre BDD..ou n’importe quoi qui implémente exists() -->

bool

Page 12: Luigi Paris.py meetup presentation

Example time!

Mail journalier des chansons les + écoutés

Télécharger la liste des

chansons écoutées

Calculer le top 10 Envoyer le mail

Page 13: Luigi Paris.py meetup presentation

class DownloadSongs(luigi.Task): date = luigi.DateParameter() def output(self): return luigi.LocalTarget(self.date.strftime('songs_%Y_%m_%d.tsv')) def run(self): song_list = get_songs(self.date) # api call with self.output().open('w') as out_file: for song in song_list: out_file.write('{}\n'.format(song))

Télécharger la liste des

chansons écoutées

Calculer le top 10 Envoyer le mail

Page 14: Luigi Paris.py meetup presentation

class TopTen(luigi.Task): date = luigi.DateParameter() def requires(self): return DownloadSongs(date=self.date) def output(self): return luigi.LocalTarget(self.date.strftime('top_%Y_%m_%d.tsv')) def run(self): top = [] with self.input().open('r') as in_file: top = Counter(in_file.readlines()).most_common(10) with self.output().open('w') as out_file: for song, count in top: out_file.write('{}\t{}\n'.format(song, count))

Télécharger la liste des

chansons écoutées

Calculer le top 10 Envoyer le mail

Page 15: Luigi Paris.py meetup presentation

class SendMailTopSongs(luigi.Task): date = luigi.DateParameter() def requires(self): return TopTen(date=self.date) def run(self): with self.input().open('r') as top_ten_file: send_email(top_ten_file)

Télécharger la liste des

chansons écoutées

Calculer le top 10 Envoyer le mail

Page 16: Luigi Paris.py meetup presentation

Et pour lancer tout ça..

$> python topSongs.py SendMailTopSongs --date 2015-12-09

Mieux dans un CRON*/10 * * * * python topSongs.py SendMailTopSongs --days 10

Page 17: Luigi Paris.py meetup presentation

Central Scheduler

Organise et coordonne les différents workersUne tâche n’est lancée qu’une seule fois

Page 18: Luigi Paris.py meetup presentation

Central Scheduler

Page 19: Luigi Paris.py meetup presentation

Et bien plus !

Très bien intégré à HadoopMultiprocessing basiqueGestion des prioritésRessources Code simpleMainteneurs très actifs

Page 20: Luigi Paris.py meetup presentation

Des questions ?