Upload
jonas-bru-monserrat
View
205
Download
0
Embed Size (px)
Citation preview
Moi
~2 ans de pythonFull Stack dev chez AlephDjonasbru.com
DATA PIPELINES MADE EASY
Le problème
Des scripts......qui dépendent du résultat d’autres scripts...qui dépendent eux aussi d’autres scripts...qui dépendent des données d’un service
externe...qui plante une fois sur trois...le tout lancé depuis douze CRON
$> pip install luigi
Luigi à la rescousse
Luigi s’occupe....de gérer les dépendances entre les tâches
..de gérer la reprise en cas d’erreur
..d'éviter d’avoir des tâches qui tournent en double
..de visualiser l’état de tout ça
Luigi à la rescousse.. mais pas trop
Il ne va pas....lancer les tâches..repartir les tâches sur votre cluster
..faire le café
La base
Task & Target
Task
Task == N’importe quoiTrois méthodes à implémenter :
requires() --> Taskrun()output() --> Target
Target
Target == Un résultat d’une tâcheun fichier sur disque..ou une partition HDFS..ou une ligne dans votre BDD..ou n’importe quoi qui implémente exists() -->
bool
Example time!
Mail journalier des chansons les + écoutés
Télécharger la liste des
chansons écoutées
Calculer le top 10 Envoyer le mail
class DownloadSongs(luigi.Task): date = luigi.DateParameter() def output(self): return luigi.LocalTarget(self.date.strftime('songs_%Y_%m_%d.tsv')) def run(self): song_list = get_songs(self.date) # api call with self.output().open('w') as out_file: for song in song_list: out_file.write('{}\n'.format(song))
Télécharger la liste des
chansons écoutées
Calculer le top 10 Envoyer le mail
class TopTen(luigi.Task): date = luigi.DateParameter() def requires(self): return DownloadSongs(date=self.date) def output(self): return luigi.LocalTarget(self.date.strftime('top_%Y_%m_%d.tsv')) def run(self): top = [] with self.input().open('r') as in_file: top = Counter(in_file.readlines()).most_common(10) with self.output().open('w') as out_file: for song, count in top: out_file.write('{}\t{}\n'.format(song, count))
Télécharger la liste des
chansons écoutées
Calculer le top 10 Envoyer le mail
class SendMailTopSongs(luigi.Task): date = luigi.DateParameter() def requires(self): return TopTen(date=self.date) def run(self): with self.input().open('r') as top_ten_file: send_email(top_ten_file)
Télécharger la liste des
chansons écoutées
Calculer le top 10 Envoyer le mail
Et pour lancer tout ça..
$> python topSongs.py SendMailTopSongs --date 2015-12-09
Mieux dans un CRON*/10 * * * * python topSongs.py SendMailTopSongs --days 10
Central Scheduler
Organise et coordonne les différents workersUne tâche n’est lancée qu’une seule fois
Central Scheduler
Et bien plus !
Très bien intégré à HadoopMultiprocessing basiqueGestion des prioritésRessources Code simpleMainteneurs très actifs
Des questions ?