Luigi Paris.py meetup presentation

Preview:

Citation preview

Moi

~2 ans de pythonFull Stack dev chez AlephDjonasbru.com

DATA PIPELINES MADE EASY

Le problème

Des scripts......qui dépendent du résultat d’autres scripts...qui dépendent eux aussi d’autres scripts...qui dépendent des données d’un service

externe...qui plante une fois sur trois...le tout lancé depuis douze CRON

$> pip install luigi

Luigi à la rescousse

Luigi s’occupe....de gérer les dépendances entre les tâches

..de gérer la reprise en cas d’erreur

..d'éviter d’avoir des tâches qui tournent en double

..de visualiser l’état de tout ça

Luigi à la rescousse.. mais pas trop

Il ne va pas....lancer les tâches..repartir les tâches sur votre cluster

..faire le café

La base

Task & Target

Task

Task == N’importe quoiTrois méthodes à implémenter :

requires() --> Taskrun()output() --> Target

Target

Target == Un résultat d’une tâcheun fichier sur disque..ou une partition HDFS..ou une ligne dans votre BDD..ou n’importe quoi qui implémente exists() -->

bool

Example time!

Mail journalier des chansons les + écoutés

Télécharger la liste des

chansons écoutées

Calculer le top 10 Envoyer le mail

class DownloadSongs(luigi.Task): date = luigi.DateParameter() def output(self): return luigi.LocalTarget(self.date.strftime('songs_%Y_%m_%d.tsv')) def run(self): song_list = get_songs(self.date) # api call with self.output().open('w') as out_file: for song in song_list: out_file.write('{}\n'.format(song))

Télécharger la liste des

chansons écoutées

Calculer le top 10 Envoyer le mail

class TopTen(luigi.Task): date = luigi.DateParameter() def requires(self): return DownloadSongs(date=self.date) def output(self): return luigi.LocalTarget(self.date.strftime('top_%Y_%m_%d.tsv')) def run(self): top = [] with self.input().open('r') as in_file: top = Counter(in_file.readlines()).most_common(10) with self.output().open('w') as out_file: for song, count in top: out_file.write('{}\t{}\n'.format(song, count))

Télécharger la liste des

chansons écoutées

Calculer le top 10 Envoyer le mail

class SendMailTopSongs(luigi.Task): date = luigi.DateParameter() def requires(self): return TopTen(date=self.date) def run(self): with self.input().open('r') as top_ten_file: send_email(top_ten_file)

Télécharger la liste des

chansons écoutées

Calculer le top 10 Envoyer le mail

Et pour lancer tout ça..

$> python topSongs.py SendMailTopSongs --date 2015-12-09

Mieux dans un CRON*/10 * * * * python topSongs.py SendMailTopSongs --days 10

Central Scheduler

Organise et coordonne les différents workersUne tâche n’est lancée qu’une seule fois

Central Scheduler

Et bien plus !

Très bien intégré à HadoopMultiprocessing basiqueGestion des prioritésRessources Code simpleMainteneurs très actifs

Des questions ?