Les Streams sont parmi nous

Preview:

Citation preview

@JosePaumard

Les

sont parmi nous

@JosePaumard

Les

sont parmi nouspour le meilleur !

#J8Stream @JosePaumard

Pourquoi s’intéresser aux API Streams ?

#J8Stream @JosePaumard

Traitement de donnéesTraitement de données

• De 1998 à 2014… seul outil : l’API Collection Common collections

#J8Stream @JosePaumard

Traitement de donnéesTraitement de données

• De 1998 à 2014… seul outil : l’API Collection Common collections

• À partir de 2014… un foisonnement !

#J8Stream @JosePaumard

Traitement de donnéesTraitement de données

• Pourquoi autant d’offres ?

• Parce que le traitement de données devient central…

#J8Stream @JosePaumard

Traitement de donnéesTraitement de données

• Pourquoi autant d’offres ?

• Parce que le traitement de données devient central… et complexe !

#J8Stream @JosePaumard

Traitement de donnéesTraitement de données

• Pourquoi autant d’offres ?

• Parce que le traitement de données devient central… et complexe ! Volumes de plus en plus importants

#J8Stream @JosePaumard

Traitement de donnéesTraitement de données

• Pourquoi autant d’offres ?

• Parce que le traitement de données devient central… et complexe ! Volumes de plus en plus importants Temps de réponse maîtrisés

#J8Stream @JosePaumard

Traitement de donnéesTraitement de données

• Pourquoi autant d’offres ?

• Parce que le traitement de données devient central… et complexe ! Volumes de plus en plus importants Temps de réponse maîtrisés Algorithmes complexes

#J8Stream @JosePaumard

Traitement de donnéesTraitement de données

• Le traitement de données a besoin de « primitives » de haut niveau

#J8Stream @JosePaumard

Traitement de donnéesTraitement de données

• Le traitement de données a besoin de « primitives » de haut niveau Qui permettent d’accéder aux données où qu’elles

soient

#J8Stream @JosePaumard

Traitement de donnéesTraitement de données

• Le traitement de données a besoin de « primitives » de haut niveau Qui permettent d’accéder aux données où qu’elles

soient Qui exposent des fonctions (map / filter / reduce)

#J8Stream @JosePaumard

Traitement de donnéesTraitement de données

• Le traitement de données a besoin de « primitives » de haut niveau Qui permettent d’accéder aux données où qu’elles

soient Qui exposent des fonctions (map / filter / reduce) Qui soient efficaces !

#J8Stream @JosePaumard

Traitement de donnéesTraitement de données

• Le traitement de données a besoin de « primitives » de haut niveau Qui permettent d’accéder aux données où qu’elles

soient Qui exposent des fonctions (map / filter / reduce) Qui soient efficaces ! Éventuellement parallèles

#J8Stream @JosePaumard

Objet de la présentationObjet de la présentation

• Présenter trois API Exposer les concepts fondamentaux Présenter les fonctions implémentées Montrer des patterns de code (Java 8, lambdas)

• Comparer ces API Point de vue de l’utilisateur Performances !

#J8Stream @JosePaumard

Objet de la présentationObjet de la présentation

• Proposer une grille de lecture et d’analyse Comparer des outils qui proposent des solutions

différentes pour un même problème

#J8Stream @JosePaumard

Objet de la présentationObjet de la présentation

• Proposer une grille de lecture et d’analyse Comparer des outils qui proposent des solutions

différentes pour un même problème

• Du point de vue du développeur

#J8Stream @JosePaumard

Objet de la présentationObjet de la présentation

• Proposer une grille de lecture et d’analyse Comparer des outils qui proposent des solutions

différentes pour un même problème

• Du point de vue du développeur Performance Patterns écriture / lecture

#J8Stream @JosePaumard

Questions ?

#J8Stream

Questions ?

#J8Stream

#J8Stream @JosePaumard

Les forces en présenceLes forces en présence

• Les 3 API sont : L’API Stream de Java 8

#J8Stream @JosePaumard

Les forces en présenceLes forces en présence

• Les 3 API sont : L’API Stream de Java 8 GS Collections

#J8Stream @JosePaumard

Les forces en présenceLes forces en présence

• Les 3 API sont : L’API Stream de Java 8 GS Collections RxJava

#J8Stream @JosePaumard

Les forces en présenceLes forces en présence

• Les 3 API sont : L’API Stream de Java 8 GS Collections RxJava

• Comparaison des performances sur un exemple commun

#J8Stream @JosePaumard

API Stream

#J8Stream @JosePaumard

API StreamAPI Stream

• Qu’est-ce qu’un Stream Java 8 ? Un objet que l’on connecte à une source

#J8Stream @JosePaumard

API StreamAPI Stream

• Qu’est-ce qu’un Stream Java 8 ? Un objet que l’on connecte à une source Un objet qui ne porte pas les données qu’il traite

#J8Stream @JosePaumard

API StreamAPI Stream

• Qu’est-ce qu’un Stream Java 8 ? Un objet que l’on connecte à une source Un objet qui ne porte pas les données qu’il traite Un objet qui expose le pattern map / filter / reduce

#J8Stream @JosePaumard

API StreamAPI Stream

• Qu’est-ce qu’un Stream Java 8 ? Un objet que l’on connecte à une source Un objet qui ne porte pas les données qu’il traite Un objet qui expose le pattern map / filter / reduce

• Nouveau concept introduit dans le JDK

#J8Stream @JosePaumard

API StreamAPI Stream

• ExempleList<String> liste = Arrays.asList("un", "deux", "trois") ;

liste.stream() .map(s ‐> s.toUpperCase()).max(Comparator.comparing(s ‐> s.length())).ifPresent(s ‐> System.out.println(s)) ;

#J8Stream @JosePaumard

API StreamAPI Stream

• ExempleList<String> liste = Arrays.asList("un", "deux", "trois") ;

liste.stream()  // ouverture d’un stream de String.map(s ‐> s.toUpperCase()) .max(Comparator.comparing(s ‐> s.length())).ifPresent(s ‐> System.out.println(s)) ;

#J8Stream @JosePaumard

API StreamAPI Stream

• ExempleList<String> liste = Arrays.asList("un", "deux", "trois") ;

liste.stream().map(s ‐> s.toUpperCase()) // mise en majuscules des éléments.max(Comparator.comparing(s ‐> s.length())).ifPresent(s ‐> System.out.println(s)) ;

#J8Stream @JosePaumard

API StreamAPI Stream

• ExempleList<String> liste = Arrays.asList("un", "deux", "trois") ;

liste.stream().map(s ‐> s.toUpperCase()).max(Comparator.comparing(s ‐> s.length())) // plus longue.ifPresent(s ‐> System.out.println(s)) ;

#J8Stream @JosePaumard

API StreamAPI Stream

• ExempleList<String> liste = Arrays.asList("un", "deux", "trois") ;

liste.stream().map(s ‐> s.toUpperCase()).max(Comparator.comparing(s ‐> s.length())).ifPresent(s ‐> System.out.println(s)) ; // Optional !

#J8Stream @JosePaumard

API StreamAPI Stream

• ExempleList<String> liste = Arrays.asList("un", "deux", "trois") ;

liste.stream().map(String::toUpperCase).max(Comparator.comparing(String::length).ifPresent(System.out::println) ; // Optional !

#J8Stream @JosePaumard

CollectorsCollectors

• On peut collecter des donnéesList<Person> liste = ... ;

liste.stream().filter(person ‐> person.getAge() > 30).collect(Collectors.groupingBy(

Person::getAge,       // key extractorCollectors.counting() // downstream collector

)) ;

#J8Stream @JosePaumard

CollectorsCollectors

• On peut collecter des donnéesList<Person> liste = ... ;

liste.stream().filter(person ‐> person.getAge() > 30).collect(Collectors.groupingBy(

Person::getAge,       // key extractorCollectors.counting() // downstream collector

)) ;

#J8Stream @JosePaumard

CollectorsCollectors

• On peut collecter des donnéesList<Person> liste = ... ;

Map<Integer, Long> map = liste.stream()

.filter(person ‐> person.getAge() > 30)

.collect(Collectors.groupingBy(

Person::getAge,       // key extractorCollectors.counting() // downstream collector

)) ;

#J8Stream @JosePaumard

CollectorsCollectors

• Et on peut paralléliser les traitementsList<Person> liste = ... ;

Map<Integer, Long> map = liste.stream().parallel()

.filter(person ‐> person.getAge() > 30)

.collect(Collectors.groupingBy(

Person::getAge,       // key extractorCollectors.counting() // downstream collector

)) ;

#J8Stream @JosePaumard

Particularisation d’un StreamParticularisation d’un Stream

• On peut le connecter à sa propre source de données Implémenter un Spliterator (~ Iterator)

#J8Stream @JosePaumard

Particularisation d’un StreamParticularisation d’un Stream

• On peut le connecter à sa propre source de données Implémenter un Spliterator (~ Iterator)

• On peut implémenter son propre Collector

#J8Stream @JosePaumard

GS Collections

#J8Stream @JosePaumard

IntroductionIntroduction

• API Open source, Github• Développé par Goldman Sachs• Contribution importante

https://github.com/goldmansachs/gs-collections

#J8Stream @JosePaumard

GS CollectionsGS Collections

• Une alternative à l’API Collection Parfois très ressemblante…

• Des interfaces supplémentaires Beaucoup ! Des méthodes supplémentaires sur les interfaces

existantes

#J8Stream @JosePaumard

GS CollectionsGS Collections

• Nouveaux concepts ?

#J8Stream @JosePaumard

GS CollectionsGS Collections

• Nouveaux concepts ?

• Oui et non…

#J8Stream @JosePaumard

GS CollectionsGS Collections

• Nouveaux concepts ?

• Oui et non… Bag

#J8Stream @JosePaumard

GS CollectionsGS Collections

• Nouveaux concepts ?

• Oui et non… Bag MultiMap

#J8Stream @JosePaumard

GS CollectionsGS Collections

• Nouveaux concepts ?

• Oui et non… Bag MultiMap MutableSet

#J8Stream @JosePaumard

GS CollectionsGS Collections

• Nouveaux concepts ?

• Plutôt des extensions des concepts de l’API Collection

#J8Stream @JosePaumard

GS CollectionsGS Collections

• Nouveaux concepts ?

• Plutôt des extensions des concepts de l’API Collection Certains sont dans Common collections ou Guava

#J8Stream @JosePaumard

Structures fondamentalesStructures fondamentales

• Bag = Collection• Existe en plusieurs couleurs !

#J8Stream @JosePaumard

Structures fondamentalesStructures fondamentales

• Bag = Collection• Existe en plusieurs couleurs !

• Primitifs : IntBag, LongBag, …

#J8Stream @JosePaumard

Structures fondamentalesStructures fondamentales

• Bag = Collection• Existe en plusieurs couleurs !

• Primitifs : IntBag, LongBag, …• ImmutableBag, MutableBag

#J8Stream @JosePaumard

Structures fondamentalesStructures fondamentales

• Bag = Collection• Existe en plusieurs couleurs !

• Primitifs : IntBag, LongBag, …• ImmutableBag, MutableBag• Sorted, Unsorted

#J8Stream @JosePaumard

Structures fondamentalesStructures fondamentales

• Bag = Collection• Existe en plusieurs couleurs !

• Primitifs : IntBag, LongBag, …• ImmutableBag, MutableBag• Sorted, Unsorted• Synchronized

#J8Stream @JosePaumard

Structures fondamentalesStructures fondamentales

• Bag, List, Set, Stack, Map, Multimap

#J8Stream @JosePaumard

Structures fondamentalesStructures fondamentales

• Bag, List, Set, Stack, Map, Multimap• Nombreuses implémentations

#J8Stream @JosePaumard

BagBag

• Méthodes originales type CollectionBag<Person> bag = ... ;

int numberOfPaul = bag.occurenceOf(paul) ;

#J8Stream @JosePaumard

BagBag

• Méthodes originales type CollectionBag<Person> bag = ... ;

int numberOfPaul = bag.occurenceOf(paul) ;int numberOfDistinct = bag.sizeDistinct() ;

#J8Stream @JosePaumard

BagBag

• Méthodes originales type CollectionBag<Person> bag = ... ;

int numberOfPaul = bag.occurenceOf(paul) ;int numberOfDistinct = bag.sizeDistinct() ;

Bag<Customer> customers = bag.instanceOf(Customer.class) ;

#J8Stream @JosePaumard

BagBag

• Méthodes originales type CollectionBag<Person> bag = ... ;

Partition<Person> partition = bag.partition(person ‐> person.getAge() > 30) ;

#J8Stream @JosePaumard

BagBag

• Méthodes originales type CollectionBag<Person> bag = ... ;

Partition<Person> partition = bag.partition(person ‐> person.getAge() > 30) ;

Bag<Person> selected = partition.getSelected() ;Bag<Person> rejected = partition.getRejected() ;

#J8Stream @JosePaumard

BagBag

• Méthodes originales type CollectionBag<Person> bag = ... ;

Partition<Person> partition = bag.partition(person ‐> person.getAge() > 30) ;

Bag<Person> selected = partition.getSelected() ;Bag<Person> rejected = partition.getRejected() ;

Bag<Person> selected = bag.select(person ‐> person.getAge() > 30) ;Bag<Person> rejected = bag.reject(person ‐> person.getAge() > 30) ;

#J8Stream @JosePaumard

BagBag

• Méthode forEach()Bag<Person> bag = ... ;

bag.forEach(System.out::println) ;

#J8Stream @JosePaumard

BagBag

• Méthode forEach()Bag<Person> bag = ... ;

bag.forEach(System.out::println) ; // compiler error

#J8Stream @JosePaumard

BagBag

• Méthode forEach()Bag<Person> bag = ... ;

bag.forEach(System.out::println) ; // compiler error

public interface Iterable<T> { // java.util

default void forEach(Consumer<? super T> action) {...}}

#J8Stream @JosePaumard

BagBag

• Méthode forEach()Bag<Person> bag = ... ;

bag.forEach(System.out::println) ; // compiler error

public interface InternalIterable<T> { // GS Collection 

void forEach(Procedure<? super T> procedure) ;}

#J8Stream @JosePaumard

BagBag

• Méthode forEach()Bag<Person> bag = ... ;

bag.forEach((Procedure<String>)System.out::println) ;

#J8Stream @JosePaumard

BagBag

• Méthodes map & filterBag<Person> bag = ... ;

Bag<String> names = bag.collect(Person::getAge) ;      // eq. map()Bag<String> names = bag.select(p ‐> p.getAge() > 30) ; // eq filter()

#J8Stream @JosePaumard

BagBag

• Méthodes map & filterBag<Person> bag = ... ;

Bag<String> names = bag.collect(Person::getAge) ;      // eq. map()Bag<String> names = bag.select(p ‐> p.getAge() > 30) ; // eq filter()

Bag<T> tap(Procedure<? super T> procedure) ;

#J8Stream @JosePaumard

BagBag

• Méthodes map & flatMapBag<T> collect( // eq. map()

Function<? super T, ? extends V> function) {...}

Bag<T> flatCollect( // eq. flatMap()Function<? super T, ? extends Iterable<V>> function) {...}

#J8Stream @JosePaumard

BagBag

• Méthodes map & filter fusionnéesBag<Person> bag = ... ;

names = bag.collectIf(person ‐> person.getAge() > 30, Peson::getName) ;

#J8Stream @JosePaumard

BagBag

• Méthodes de réductionBag<Person> bag = ... ;

boolean b1 = bag.allSatisfy(person ‐> person.getAge() > 30) ;

boolean b2 = bag.anySatisfy(person ‐> person.getAge() > 100) ;

boolean b3 = bag.noneSatisfy(person ‐> person.getAge() < 18) ;

#J8Stream @JosePaumard

BagBag

• Méthodes de réductionBag<Person> bag = ... ;

Person first = bag.detect(person ‐> person.getAge() > 30) ;

Person p = bag.detectIfNone(person ‐> person.getAge() > 100, () ‐> Person.NOBODY) ;

#J8Stream @JosePaumard

BagBag

• Méthodes de réductionBag<Person> bag = ... ;

Person first = bag.detectWith((person, param) ‐> person.getAge() > param, 30) ;

Person p = bag.detectWithIfNone((person, param) ‐> person.getAge() > param, 100, () ‐> Person.NOBODY) ;

#J8Stream @JosePaumard

BagBag

• Méthodes de foldingBag<Person> bag = ... ;

long sum = bag.injectInto(0L, (sum, person) ‐> sum + person.getAge()) ;

long count = bag.injectInto(0L, (sum, person) ‐> sum + 1L) ;

#J8Stream @JosePaumard

BagBag

• Méthodes de foldingBag<Person> bag = ... ;

long sum = bag.injectInto(0L, (l, person) ‐> l + person.getAge()) ;

long sumOfAge = bag.sumOfInt(Person::getAge)) ;

#J8Stream @JosePaumard

BagBag

• Méthodes de foldingBag<Person> bag = ... ;

String names = bag.collect(Person::getName).makeString() ;

Charles, Michelle, Paul, Barbara

#J8Stream @JosePaumard

BagBag

• Réduction dans une chaîne de caractèresBag<Person> bag = ... ;

String names = bag.collect(Person::getAge).makeString() ;

String names = bag.collect(Person::getAge).makeString(" ; ") ;

Charles ; Michelle ; Paul ; Barbara

#J8Stream @JosePaumard

BagBag

• Réduction dans une chaîne de caractèresBag<Person> bag = ... ;

String names = bag.collect(Person::getAge).makeString() ;

String names = bag.collect(Person::getAge).makeString(" ; ") ;

String names = bag.collect(Person::getAge).makeString("{", " ; ", "}") ;

{Charles ; Michelle ; Paul ; Barbara}

#J8Stream @JosePaumard

BagBag

• Réduction dans une chaîne de caractèresBag<Person> bag = ... ;

bag.collect(Person::getAge).appendString(StringBuilder::new) ;

#J8Stream @JosePaumard

BagBag

• Réduction dans une table de hachageBag<Person> bag = ... ;

MapIterable<Integer, Bag<Person>> map = bag.aggregateBy(

Person::getAge,       // key extractorFastList::new,        // zero factory(list, person) ‐> {   //

list.add(person) ; // merge operationreturn list ;      //

}) ;

#J8Stream @JosePaumard

BagBag

• Réduction dans une table de hachageBag<Person> bag = ... ;

MultiMap<Integer, Person> map = bag.groupBy(Person::getAge) ;

#J8Stream @JosePaumard

MutableMapMutableMap

• Méthodes de MapV getIfAbsentPut(K key, 

Function0<? extends V> function) ;

V getIfAbsentPutWithKey(K key, Function<? super K, ? extends V> function) ;

V getIfAbsentPutWith(K key, Function<? super P, ? extends V> functionP parameter) ;

#J8Stream @JosePaumard

MutableMapMutableMap

• Méthodes forEach()void forEachKeyValue(Procedure2<? super K, ? super V> procedure) ;

void forEachKey(Procedure<? super T> procedure) ;

void forEachValue(Procedure<? super T> procedure) ;

#J8Stream @JosePaumard

MutableMapMutableMap

• Méthodes map

• Une table de hachage est aussi une liste de valeurs

MapIterable<K2, V2> collect(Function2<? super K, ? super V, Pair<K2, V2>> function) ;

public interface MapIterable<K, V> extends RichIterable<V> {}

#J8Stream @JosePaumard

PairPair

• Modélisation d’un tuplepublic interface Pair<T1, T2>

extends Serializable, Comparable<Pair<T1, T2>> {

T1 getOne();T2 getTwo();

void put(Map<T1, T2> map);

Map.Entry<T1, T2> toEntry(); // pont avec API Collection Pair<T2, T1> swap();

}

#J8Stream @JosePaumard

MutableMapMutableMap

• Les méthodes map, filter et méthodes de réduction sont définies sur RichIterable Donc disponibles sur les tables de hachage Elles opèrent sur les valeurs

• Méthodes groupBy, retournent de nouvelles tables

reject(Predicate2<? super K, ? super V> predicate) ;

select(Predicate2<? super K, ? super V> predicate) ;

#J8Stream @JosePaumard

ImmutableMapImmutableMap

• Méthodes pour « ajouter » des clés / valeursMutableMapIterable<K, V> withKeyValue(K key, V value) ; // put

MutableMapIterable<K, V> withoutKey(K key) ; // remove

#J8Stream @JosePaumard

MultiMapMultiMap

• Concept qui ne fait pas partie de l’API Collection• Map qui associe plusieurs valeurs à une même clé Gère une collection de valeurs en interne

MultiMap<Integer, Person> map ;  // GS Collections

Map<Integer, List<Person>> map ; // API Collection

#J8Stream @JosePaumard

MultiMapMultiMap

• La plupart des méthodes sont les mêmes• Méthodes get différentesRichIterable<V> values = multiMap.get(K key) ;

MutableMap<K, RichIterable<V>> map = multiMap.toMap() ;

#J8Stream @JosePaumard

Structures concurrentesStructures concurrentes

• ConcurrentHashMap Construite sur un tableau de Entry

(AtomicReferenceArray) Accès via des AtomicReference Même structure que java.util.HashMap

#J8Stream @JosePaumard

Opérations de type ZipOpérations de type Zip

• Permet d’appairer deux ensemblesBag<Person> bag1 = ImmutableBagImpl.of(anna, charles) ;Bag<Person> bag2 = ImmutableBagImpl.of(paul, barbara) ;

Bag<Pair<Person, Person>> friends = bag1.zip(bag2) ;

#J8Stream @JosePaumard

Opérations de type ZipOpérations de type Zip

• Permet d’appairer deux ensemblesBag<Person> bag1 = ImmutableBagImpl.of(anna, charles) ;Bag<Person> bag2 = ImmutableBagImpl.of(paul, barbara) ;Bag<City> bag3 = ImmutableBagImpl.of(paris, sanFrancisco) ;

Bag<Pair<Person, Person>> friends = bag1.zip(bag2) ;

Bag<Pair<Person, City>> living = bag1.zip(bag3) ;

#J8Stream @JosePaumard

Opérations de type ZipOpérations de type Zip

• Permet d’appairer deux ensemblesBag<Person> bag1 = ImmutableBagImpl.of(anna, charles) ;Bag<Person> bag2 = ImmutableBagImpl.of(paul, barbara) ;Bag<City> bag3 = ImmutableBagImpl.of(paris, sanFrancisco) ;

Bag<Pair<Person, Person>> friends = bag1.zip(bag2) ;

Bag<Pair<Person, City>> living = bag1.zip(bag3) ;

Bag<Pair<City, Integer>> cities = bag3.zipWithIndex() ;

#J8Stream @JosePaumard

Classes factoryClasses factory

• Factory pour Bag, List, Set, Stack, MapBag<Long> bagOfInteger = MutableBagFactoryImpl.of(1L, 2L, 5L) ;

Bag<Person> bagOfPerson = MutableBagFactoryImpl.of(persons) ;

#J8Stream @JosePaumard

Implémentations Implémentations

• Pas de miracle à attendre…// class FastListpublic boolean add(T newItem) {

if (this.items.length == this.size) {this.ensureCapacityForAdd();

}this.items[this.size++] = newItem;return true;

}

#J8Stream @JosePaumard

Implémentations Implémentations

• Pas de miracle à attendre…// class FastListpublic boolean add(T newItem) {

if (this.items.length == this.size) {this.ensureCapacityForAdd();

}this.items[this.size++] = newItem;return true;

}

// class ArrayListpublic boolean add(E e) {

ensureCapacityInternal(size + 1);elementData[size++] = e;return true;

}

#J8Stream @JosePaumard

Implémentations Implémentations

• Une implémentation est différente : celle des tables de hachage Fonctionne avec un tableau unique

#J8Stream @JosePaumard

Bizarreries Bizarreries

• Quelques bizarreriesMutableMap<K, V> map1 = map.asUnmodifiable() ; // UnmodifiableMap ?

#J8Stream @JosePaumard

Bilan sur l’APIBilan sur l’API

• Une API complète (complexe)• Concept de MultiMap• Des méthodes que l’on trouve sur Stream Ajoutées aux collections

• Des méthodes supplémentaires Que l’on aura peut-être sur Stream (zip)

#J8Stream @JosePaumard

RxJava

#J8Stream @JosePaumard

RxJavaRxJava

• API Open source, Github• Développé par Netflix• Version Java de ReactiveX .NET Python, Kotlin, JavaScript, Scala, Ruby, Groovy, Rust Android

https://github.com/ReactiveX/RxJava

#J8Stream @JosePaumard

Vue d’ensembleVue d’ensemble

• Approche différente de GS Collections• Il ne s’agit pas d’une implémentation alternative de

l’API Collection

• Mais plutôt de l’implémentation du pattern Reactor

#J8Stream @JosePaumard

Pattern ReactorPattern Reactor

• Set (fixed size, mutable, immutable, primitifs)• SortedSet (fixed size, mutable, immutable, primitifs)• Pool• Stack (mutable, immutable, primitifs)

http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf

#J8Stream @JosePaumard

Notions fondamentalesNotions fondamentales

• Observable : source de données

#J8Stream @JosePaumard

Notions fondamentalesNotions fondamentales

• Observable : source de données Objet complexe ~100 méthodes statiques + ~150

méthodes non statiques

#J8Stream @JosePaumard

Notions fondamentalesNotions fondamentales

• Observable : source de données Objet complexe ~100 méthodes statiques + ~150

méthodes non statiques• Observer : permet d’observer un observable Objet simple : 3 méthodes

#J8Stream @JosePaumard

Notions fondamentalesNotions fondamentales

• Observable : source de données Objet complexe ~100 méthodes statiques + ~150

méthodes non statiques• Observer : permet d’observer un observable Objet simple : 3 méthodes

• Subscription : lien qui existe entre un observable et un observer

#J8Stream @JosePaumard

Notions fondamentalesNotions fondamentales

• Observerpublic interface Observer<T> {

public void onNext(T t);

public void onCompleted();

public void onError(Throwable e);}

#J8Stream @JosePaumard

Notions fondamentalesNotions fondamentales

• Associer un observer à un observableObservable<T> observable = ... ;

Subscription subscription = observable.subscribe(observer) ;

public interface Subscription {

public void unsubscribe();

public void isUnsubscribe();}

#J8Stream @JosePaumard

Création d’un ObservableCréation d’un Observable

• Constructeur non vide protégé

#J8Stream @JosePaumard

Création d’un ObservableCréation d’un Observable

• Constructeur non vide protégé• On peut construire un Observable par extension en utilisant une des méthodes statiques

#J8Stream @JosePaumard

Création d’un ObservableCréation d’un Observable

• Constructeur non vide protégé• On peut construire un Observable par extension en utilisant une des méthodes statiques

• Prend un producer en paramètre

#J8Stream @JosePaumard

Création d’un ObservableCréation d’un Observable

• Création à partir de collectionsObservable<String> obs1 = Observable.just("one",  "two", "three") ;

List<String> strings = Arrays.asList("one",  "two", "three") ;Observable<String> obs2 = Observable.from(strings) ;

#J8Stream @JosePaumard

Création d’un ObservableCréation d’un Observable

• Création à partir de collectionsObservable<String> obs1 = Observable.just("one",  "two", "three") ;

List<String> strings = Arrays.asList("one",  "two", "three") ;Observable<String> obs2 = Observable.from(strings) ;

Observable<String> empty = Observable.empty() ;Observable<String> never = Observable.never() ;Observable<String> error = Observable.<String>error(exception) ;

#J8Stream @JosePaumard

Création d’un ObservableCréation d’un Observable

• Création de sériesObservable<Long> longs = Observable.range(1L, 100L) ;

#J8Stream @JosePaumard

Création d’un ObservableCréation d’un Observable

• Création de sériesObservable<Long> longs = Observable.range(1L, 100L) ;

// intervalObservable<Long> timeSerie1 = 

Observable.interval(1L, TimeUnit.MILLISECONDS) ; // a serie of longs

// initial delay, then intervalObservable<Long> timeSerie2 = 

Observable.timer(10L, 1L, TimeUnit.MILLISECONDS) ; // one 0

#J8Stream @JosePaumard

Création d’un ObservableCréation d’un Observable

• Méthode using()public final static <T, Resource> Observable<T> using(

final Func0<Resource> resourceFactory, // producerfinal Func1<Resource, Observable<T>> observableFactory, // functionfinal Action1<? super Resource> disposeAction // consumer

) { }

#J8Stream @JosePaumard

Notion de SchedulerNotion de Scheduler

• Ces méthodes peuvent prendre un autre paramètre

• Scheduler : interface• Associée à la factory Schedulers

Observable<Long> longs = Observable.range(0L, 100L, scheduler) ;

#J8Stream @JosePaumard

Notion de SchedulerNotion de Scheduler

• Factory Schedulerspublic final class Schedulers {

public static Scheduler immediate() {...}   // immediate, same thread

public static Scheduler newThread() {...}   // new thread

public static Scheduler computation() {...} // computation ES

public static Scheduler io() {...}          // IO growing ES}

#J8Stream @JosePaumard

Notion de SchedulerNotion de Scheduler

• Factory Schedulerspublic final class Schedulers {

public static Scheduler trampoline() {...} // queued in the current// thread

public static Scheduler test() {...}

public static Scheduler fromExecutor(Executor executor) {...}}

#J8Stream @JosePaumard

Notion de SchedulerNotion de Scheduler

• RxJava permet de créer des Observable dans différents threads

• Certains pools sont spécialisés : IO, Computation• On peut passer ses propres pools

• Les observateurs sont appelés dans les threads des observables

#J8Stream @JosePaumard

Notion de SchedulerNotion de Scheduler

• La classe Scheduler n’est pas immédiate à étendre

• On utilise from() pour les threads IHM (Swing, JavaFX)

@JosePaumard

Pause café ?

#J8Stream

@JosePaumard

Pause café !

#J8Stream

Encore du RxJavaPatterns

Extensions de StreamPerformance

@JosePaumard

Les

sont parmi nous

#J8Stream @JosePaumard

Observable : méthodes statiques Observable : méthodes statiques

• Série de méthodes statiques de combinaisons d’Observable en un seul

#J8Stream @JosePaumard

Observable de listesObservable de listes

• amb() : prend une liste d’Observable et suit le premier qui parle

©RxJava

O1

O2

O1

#J8Stream @JosePaumard

Observable de listesObservable de listes

• combineLatest() : applique une fonction aux deux derniers éléments émis

©RxJava

O1

O2

F(O1, O2)

#J8Stream @JosePaumard

Observable de listesObservable de listes

• zip() : équivalent d’un combine, mais prend les éléments un par un

©RxJava

O1

O2

F(O1, O2)

#J8Stream @JosePaumard

Observable de listesObservable de listes

• concat() : émet O1 puis O2, sans les mélanger• merge() : émet O1 et O2, s’arrête sur erreur

#J8Stream @JosePaumard

Observable de listesObservable de listes

• concat() : émet O1 puis O2, sans les mélanger• merge() : émet O1 et O2, s’arrête sur erreur

©RxJava©RxJava

#J8Stream @JosePaumard

Observable de listesObservable de listes

• concat() : émet O1 puis O2, sans les mélanger• merge() : émet O1 et O2, s’arrête avec le premier• mergeDelayError() : reporte l’erreur à la fin

©RxJava

#J8Stream @JosePaumard

Observable de listesObservable de listes

• sequenceEqual() : compare deux séquences

©RxJava

O1

O2

boolean

#J8Stream @JosePaumard

Observable d’ObservableObservable d’Observable

• switchOnNext() : un peu spécial…

#J8Stream @JosePaumard

Observable d’ObservableObservable d’Observable

• Paramètres des méthodes précédentes :public final static <T> Observable<T> merge(

Iterable<Observable<T>> listOfSequences) { }

#J8Stream @JosePaumard

Observable d’ObservableObservable d’Observable

• Paramètres des méthodes précédentes :public final static <T> Observable<T> merge(

Iterable<Observable<T>> listOfSequences) { }

public final static <T> Observable<T> merge(Observable<Observable<T>> sequenceOfSequences) { }

#J8Stream @JosePaumard

Observable d’ObservableObservable d’Observable

• Paramètres des méthodes précédentes :public final static <T> Observable<T> merge(

Iterable<Observable<T>> listOfSequences) { }

©RxJava

#J8Stream @JosePaumard

Observable d’ObservableObservable d’Observable

• Paramètres des méthodes précédentes :public final static <T> Observable<T> merge(

Observable<Observable<T>> sequenceOfSequences) { }

©RxJava

#J8Stream @JosePaumard

Observable d’ObservableObservable d’Observable

• switchOnNext() : prend le nouvel Observable

©RxJava

public final static <T> Observable<T> switchOnNext(Observable<Observable<T>> sequenceOfSequences) { }

#J8Stream @JosePaumard

ExemplesExemples

• Un petit cas simpleObservable<Integer> range1To100 = Observable.range(1L, 100L) ;manyStrings.subscribe(System.out::println) ;

#J8Stream @JosePaumard

ExemplesExemples

• Un petit cas simpleObservable<Integer> range1To100 = Observable.range(1L, 100L) ;manyStrings.subscribe(System.out::println) ;

> 1 2 3 4 ... 100

#J8Stream @JosePaumard

ExemplesExemples

• Un petit cas un peu plus durObservable<Integer> timer = Observable.timer(1, TimeUnit.SECONDS) ;timer.subscribe(System.out::println) ;

#J8Stream @JosePaumard

ExemplesExemples

• Un petit cas un peu plus dur

• N’affiche rien…

Observable<Integer> timer = Observable.timer(1, TimeUnit.SECONDS) ;timer.subscribe(System.out::println) ;

>

#J8Stream @JosePaumard

ExemplesExemples

• Un petit cas un peu plus durObservable<Integer> timer = Observable.timer(1, TimeUnit.SECONDS) ;timer.subscribe(() ‐> {

System.out.println(Thread.currentThread().getName() + " " + Thread.currentThread().isDaemon()) ;

}) ;Thread.sleep(2) ;

#J8Stream @JosePaumard

ExemplesExemples

• Un petit cas un peu plus durObservable<Integer> timer = Observable.timer(1, TimeUnit.SECONDS) ;timer.subscribe(() ‐> {

System.out.println(Thread.currentThread().getName() + " " + Thread.currentThread().isDaemon()) ;

}) ;Thread.sleep(2) ;

> RxComputationThreadPool‐1 ‐ true

#J8Stream @JosePaumard

ExemplesExemples

• Encore un peu plus durObservable<Integer> range1To100 = Observable.range(1, 100) ;

Observable<String> manyStrings = Observable.combineLatest(

range1To100, Observable.just("one"),(integer, string) ‐> string) ;

#J8Stream @JosePaumard

ExemplesExemples

• Encore un peu plus durObservable<Integer> range1To100 = Observable.range(1, 100) ;

Observable<String> manyStrings = Observable.combineLatest(

range1To100, Observable.just("one"),(integer, string) ‐> string) ;

> one (et c’est tout)

Combines two source Observables by emitting an item that aggregatesthe latest values of each of the source Observables each time an item is received from either of the source Observables, where this aggregation is defined by a specified function. 

#J8Stream @JosePaumard

ExemplesExemples

• Encore un peu plus durObservable<Integer> serie = Observable.interval(3, TimeUnit.MILLISECONDS) ;

Observable<String> manyStrings = Observable.combineLatest(

serie, Observable.just("one"),(integer, string) ‐> string) ;

#J8Stream @JosePaumard

ExemplesExemples

• Encore un peu plus durObservable<Integer> serie = Observable.interval(3, TimeUnit.MILLISECONDS) ;

Observable<String> manyStrings = Observable.combineLatest(

serie, Observable.just("one"),(integer, string) ‐> string) ;

> one one one one one one ...

#J8Stream @JosePaumard

Observables hot & coldObservables hot & cold

• Deux mécanismes d’Observable : Ceux qui émettent des données lorsqu’elles sont

consommées = cold observables Ceux qui émettent des données, qu’elles soient

consommées ou pas = hot observables• Un hot observable peut générer ses données

indépendamment de ses observateurs

#J8Stream @JosePaumard

Méthodes d’instance

#J8Stream @JosePaumard

Opérations callbackOpérations callback

• Retournent un Observable<T>

• ~10 versions doOn*

doOnNext(Action1<T> onNext) { } // consumer

#J8Stream @JosePaumard

Opérations booléennesOpérations booléennes

• Retournent un Observable<Boolean>

• Retournent un Observable<T> vide ou singleton

exists(Func1<T, Boolean> predicate) { }elementAt(int index) { }

ignoreElement() { }                     // returns an empty Observablesingle() { }                            // completes on error or transmitssingle(Func1<T, Boolean> predicate) { } // returns the item or error

#J8Stream @JosePaumard

Opérations de mappingOpérations de mapping

• Retournent un Observable<R>map(Func1<T, R> mapping) { } cast(Class<R> clazz) { } // casts the elementstimestamp() { }  // wraps the elements in a timestamped object

zipWith(Observable<U> other, Func2<T, U, R> zipFunction) { } // BiFunction

flatMap(Func1<T, Observable<R>> mapping) { }

#J8Stream @JosePaumard

Opérations de filtrageOpérations de filtrage

• Retourne un Observable<T>filter(Func1<T, Boolean> predicate) { } 

#J8Stream @JosePaumard

Opérations de sélectionOpérations de sélection

• Retournent un Observable<R>sample(long period, TimeUnit timeUnit) { }

©RxJava

O1

sampler

#J8Stream @JosePaumard

Opérations de sélectionOpérations de sélection

• Retournent un Observable<R>sample(Observable<U> sampler) { } // samples on emission & completion

©RxJava

O1

sampler

#J8Stream @JosePaumard

Remarque sur le tempsRemarque sur le temps

• RxJava peut mesurer le temps « en réel »• Peut aussi utiliser un Observable comme mesure du

temps Appels à onNext() et onComplete()

#J8Stream @JosePaumard

Sélection d’élémentsSélection d’éléments

• Retournent Observable<T>

• Exception en cas de timeout entre deux émissions

first() { }last() { }skip(int n) { }limit(int n) { }take(int n) { }

timeout(long n, TimeUnit timeUnit) { }timeout(Func0<Observable<U>> firstTimeoutSelector,   // producer

Func1<T, Observable<V>> timeoutSelector) { } // function

#J8Stream @JosePaumard

Méthodes de réductionMéthodes de réduction

• Retourne un Observable<> singleton

• Retourne un Observable<T>

all(Func1<T, Boolean> predicate) { }   // Observable<Boolean>count() { }  // Observable<Long>reduce(Func2<T, T, T> accumulator) { } // Observable<T>

scan(Func2<T, T, T> accumulator) { } // Observable<T>

forEach(Action1<T> consumer) { }  // void

#J8Stream @JosePaumard

Méthodes de collectionMéthodes de collection

• Méthode collectcollect(Func0<R> stateFactory, // producer

Action2<R, T> collector) { } // BiConsumer

#J8Stream @JosePaumard

Méthodes de collectionMéthodes de collection

• Méthode collect

• Construit une Map<T, List<String>>

collect(Func0<R> stateFactory, // producerAction2<R, T> collector) { } // BiConsumer

collect(() ‐> new ArrayList<String>(), // producerArrayList::add) { } // BiConsumer

#J8Stream @JosePaumard

Méthodes de collectionMéthodes de collection

• Retournent des Observable<> singletontoList() { }       // Observable<List<T>>toSortedList() { } // Observable<List<T>>

toMultimap(Func1<T, K> keySelector,       // Observable<Func1<T, V> valueSelector) { } //    Map<K, Collection<T>>

toMap(Func1<T, K> keySelector) { } // Observable<Map<K, T>>

#J8Stream @JosePaumard

Opérations join / groupJoinOpérations join / groupJoin

• Retourne un Observable<GroupedObservable<K, T>>

• GroupedObservable : observable avec une clé

groupBy(Func1<T, K> keySelector) { } // function

#J8Stream @JosePaumard

Opérations join / groupJoinOpérations join / groupJoin

• Combine deux observables, sur un timerpublic final <T2, D1, D2, R> Observable<R> groupJoin(

Observable<T2> right, Func1<T, Observable<D1>> leftDuration, // functionFunc1<T2, Observable<D2>> rightDuration, // functionFunc2<T, Observable<T2>, R> resultSelector // bifunction

) { }

#J8Stream @JosePaumard

Opérations join / groupJoinOpérations join / groupJoin

• Les fenêtres de temps sont définies par des observables Elles démarrent lorsque l’observable démarre Et se ferment lorsque l’observable émet un objet ou

s’arrête

#J8Stream @JosePaumard

Opérations join / groupJoinOpérations join / groupJoin

• Combine deux observables

#J8Stream @JosePaumard

Méthodes de debugMéthodes de debug

• Deux méthodes, font du mapping

• Notification : objet qui englobe des meta-données en plus

materialize() { } // Observable<Notification<T>>

#J8Stream @JosePaumard

Méthodes de debugMéthodes de debug

• Deux méthodes, font du mapping

• Notification : objet qui englobe des métadonnées en plus des objets de l’Observable

materialize() { } // Observable<Notification<T>>

dematerialize() { } // Observable<T>

#J8Stream @JosePaumard

Méthodes synchronesMéthodes synchrones

• Associent des opérations avec une horlogedelay(long delay, TimeUnit timeUnit) ; // Observable<T>

#J8Stream @JosePaumard

Méthodes synchronesMéthodes synchrones

• Associent des opérations avec une horloge

• Ou avec un autre Observable

delay(long delay, TimeUnit timeUnit) ; // Observable<T>

delay(Func1<T, Observable<U> func1) ; // Observable<T>

#J8Stream @JosePaumard

Méthodes synchronesMéthodes synchrones

• Limite le nombre d’éléments émisdebounce(long delay, TimeUnit timeUnit) ; // Observable<T>

©RxJava

#J8Stream @JosePaumard

Méthodes synchronesMéthodes synchrones

• Limite le nombre d’éléments émis

• Peut aussi se caler sur les événements d’un Observable

debounce(long delay, TimeUnit timeUnit) ; // Observable<T>

debounce(Func1<T, Observable<U> func1) ;  // Observable<T>

#J8Stream @JosePaumard

Méthodes synchronesMéthodes synchrones

• Mesure du temps entre événements

• TimeInterval : wrapper pour la durée en ms et la valeur émise

timeInterval() { } // Observable<TimeInterval<T>>

#J8Stream @JosePaumard

Méthodes synchronesMéthodes synchrones

• Méthodes throttle()throttleFirst(long windowDuration, TimeUnit unit) { } // Observable<T>

throttleLast(long windowDuration, TimeUnit unit) { }

throttleWithTimeout(long windowDuration, TimeUnit unit) { }

#J8Stream @JosePaumard

Backpressure

#J8Stream @JosePaumard

Méthodes backpressureMéthodes backpressure

• Problème : une source peut générer « trop » d’éléments Par rapport à la vitesse des consommateurs

• Certaines méthodes permettent de « sauter » des éléments

#J8Stream @JosePaumard

Méthodes backpressureMéthodes backpressure

• Problème : une source peut émettre « trop » d’éléments Par rapport à la vitesse des consommateurs

• Ne peut pas arriver avec les cold observables

#J8Stream @JosePaumard

Méthodes backpressureMéthodes backpressure

• Problème : une source peut émettre « trop » d’éléments Par rapport à la vitesse des consommateurs

• Ne peut pas arriver avec les cold observables

• Un observable cold peut devenir hot…

#J8Stream @JosePaumard

Méthodes backpressureMéthodes backpressure

• Certaines méthodes permettent de « sauter » des éléments

#J8Stream @JosePaumard

Méthodes backpressureMéthodes backpressure

• Certaines méthodes permettent de « sauter » des éléments

• Backpressure : consiste à ralentir le rythme d’émission

#J8Stream @JosePaumard

Méthodes backpressureMéthodes backpressure

• Méthodes utiles : sample() et debounce()• Méthode buffer() : stocke les éléments avant de les

émettre

#J8Stream @JosePaumard

Méthodes backpressureMéthodes backpressure

• Méthodes buffer()buffer(int size) { }                     // Observable<List<T>>

#J8Stream @JosePaumard

Méthodes backpressureMéthodes backpressure

• Méthodes buffer()buffer(int size) { }                     // Observable<List<T>>

buffer(long timeSpan, TimeUnit unit) { } // Observable<List<T>>buffer(long timeSpan, TimeUnit unit, int maxSize) { }

#J8Stream @JosePaumard

Méthodes backpressureMéthodes backpressure

• Méthodes buffer()buffer(int size) { }                     // Observable<List<T>>

buffer(long timeSpan, TimeUnit unit) { } // Observable<List<T>>buffer(long timeSpan, TimeUnit unit, int maxSize) { }

buffer(Observable<O> bufferOpenings,               // Openings eventsFunc1<O, Observable<C>> bufferClosings) { } // Closings events

#J8Stream @JosePaumard

Méthodes backpressureMéthodes backpressure

• Méthodes window()window(int size) { }                     // Observable<Observable<T>>

window(long timeSpan, TimeUnit unit) { } // Observable<Observable<T>>window(long timeSpan, TimeUnit unit, int maxSize) { }

window(Observable<O> bufferOpenings,               // Openings eventsFunc1<O, Observable<C>> bufferClosings) { } // Closings events

#J8Stream @JosePaumard

Méthodes repeat & retryMéthodes repeat & retry

• repeat() : répète l’émission d’objets indéfinimentrepeat() { }            // Observable<T>repeat(long times) { } // Observable<T>

#J8Stream @JosePaumard

Méthodes repeat & retryMéthodes repeat & retry

• repeat() : répète l’émission d’objets indéfiniment

• Quand l’observable appelle onComplete(), le notification handler est invoqué

repeat() { }            // Observable<T>repeat(long times) { } // Observable<T>

repeatWhen(Func1<Observable<Void>>, Observable<?>> notificationHandler

) { }

#J8Stream @JosePaumard

Méthodes repeat & retryMéthodes repeat & retry

• repeat() : répète l’émission d’objets indéfiniment

• Sur cet Observable, le notification handler peut alors invoquer : onComplete() ou onError(), ce qui déclenche le même

appel sur l’Observable source onNext(), ce qui déclenche la répétition

#J8Stream @JosePaumard

Méthodes repeat & retryMéthodes repeat & retry

• retry() : répète l’émission d’objets sur erreurretry() { }            // Observable<T>retry(long times) { } // Observable<T>

#J8Stream @JosePaumard

Méthodes repeat & retryMéthodes repeat & retry

• retry() : répète l’émission d’objets sur erreur

• Quand l’observable appelle onError(), le notification handler est invoqué avec l’exception

retry() { }            // Observable<T>retry(long times) { } // Observable<T>

retryWhen(Func1<Observable<Throwable>>, Observable<?>> notificationHandler

) { }

#J8Stream @JosePaumard

Méthodes repeat & retryMéthodes repeat & retry

• Sur cet Observable, le notification handler peut alors invoquer : onComplete() ou onError(), ce qui déclenche le même

appel sur l’Observable source onNext(), ce qui déclenche la répétition

#J8Stream @JosePaumard

Plusieurs ObserverPlusieurs Observer

• À la différence des Stream de Java 8, un Observable peut avoir plusieurs observateurs

#J8Stream @JosePaumard

Plusieurs ObserverPlusieurs Observer

• À la différence des Stream de Java 8, un Observable peut avoir plusieurs observateurs

• Ce qui peut poser des problèmes…

#J8Stream @JosePaumard

Plusieurs ObserverPlusieurs Observer

• Cas d’un cold Observable : La consommation par plusieurs observateurs ne pose

pas de problème• Cas d’un hot Observable : Un deuxième observateur peut « manquer » les

premières valeurs

#J8Stream @JosePaumard

Plusieurs ObserverPlusieurs Observer

• Méthode cache()

• Permet de ne pas « manquer » les premières valeurs

cache() { }cache(int capacity) { }

#J8Stream @JosePaumard

Plusieurs ObserverPlusieurs Observer

• Notion de ConnectableObserverObservable<T> observable = ... ;ConnectableObservable<T> connectable = observable.publish() ;

#J8Stream @JosePaumard

Plusieurs ObserverPlusieurs Observer

• Notion de ConnectableObserver

• L’enregistrement d’observateurs ne déclenche pas la production des éléments

Observable<T> observable = ... ;ConnectableObservable<T> connectable = observable.publish() ;

#J8Stream @JosePaumard

Plusieurs ObserverPlusieurs Observer

• Notion de ConnectableObserver

• L’enregistrement d’observateurs ne déclenche pas la production des éléments

• Pour cela : appeler connect()

Observable<T> observable = ... ;ConnectableObservable<T> connectable = observable.publish() ;

connectable.publish() ;

#J8Stream @JosePaumard

Bilan sur RxJavaBilan sur RxJava

• API Complète (complexe)• Permet de gérer le lancement de traitements dans

différents pools de threads

#J8Stream @JosePaumard

Bilan sur RxJavaBilan sur RxJava

• API Complète (complexe)• Permet de gérer le lancement de traitements dans

différents pools de threads• Permet de synchroniser les opérations Sur une horloge Sur des références applicatives

#J8Stream @JosePaumard

Ponts entre Java 8 Stream /

GS Collections / RxJava

#J8Stream @JosePaumard

Le meilleur des deux mondes ?Le meilleur des deux mondes ?

• Connecter GS Collections et Java 8 Stream ?= connecter Stream et Iterable

#J8Stream @JosePaumard

Le meilleur des deux mondes ?Le meilleur des deux mondes ?

• Connecter GS Collections et Java 8 Stream ?= connecter Stream et Iterable

• Connecter Java 8 Stream et RxJava ? = connecter Stream et Observable

#J8Stream @JosePaumard

Spliterator et IteratorSpliterator et Iterator

• Les collections sont construites sur des Iterator• Les Stream sont construits sur des Spliterator

Iterator<T> iterator = ... ;Spliterator<T> spliterator = 

Spliterators.spliteratorUnknownSize(iterator, 0) ;

Spliterator<T> spliterator = Spliterators.spliterator(iterator, size, 0) ;

#J8Stream @JosePaumard

Spliterator et IteratorSpliterator et Iterator

• Les collections sont construites sur des Iterator• Les Stream sont construits sur des Spliterator

Spliterator<T> spliterator = ... ;

Iterator<T> iterator = Spliterators.iterator(spliterator) ;

Iterator<T> iterator = ... ;

Iterable<T> iterable = () ‐> iterator ;

#J8Stream @JosePaumard

GS Collections & StreamGS Collections & Stream

• Le problème consiste à passer d’une Collection à un Stream (et réciproquement)

#J8Stream @JosePaumard

GS Collections & StreamGS Collections & Stream

• Le problème consiste à passer d’une Collection à un Stream (et réciproquement)

• Donc d’un Iterator à un Spliterator (et réciproquement)

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• Si l’on a un Iterator : facile !Iterator<T> iterator = ... ;

Observable<T> observable = Observable.from(() ‐> iterator) ;

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• Si l’on a un Iterator : facile !

• Si l’on a un Spliterator : facile !

Iterator<T> iterator = ... ;

Observable<T> observable = Observable.from(() ‐> iterator) ;

Spliterator<T> spliterator = ... ;

Observable<T> observable = Observable.from(() ‐> Spliterators.iterator(spliterator)) ;

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• Donc si l’on a un Stream, on peut facilement construire un Observable

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• Donc si l’on a un Stream, on peut facilement construire un Observable

• L’inverse ?

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• Donc si l’on a un Stream, on peut facilement construire un Observable

• L’inverse ? On peut construire un Iterator sur un Observable Puis un Spliterator sur un Iterator

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• Implémenter un Iterator : Deux méthodes next() et hasNext() La méthode remove() et une méthode par défaut

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• Un Iterator « tire » les données d’une source• Alors qu’un Observable « pousse » les données vers

des callbacks

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• Un Iterator « tire » les données d’une source• Alors qu’un Observable « pousse » les données vers

des callbacks

• Il nous faut donc une adaptation entre les deux

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• On peut s’inspirer du JDKpublic static<T> Iterator<T> iterator(Spliterator<? extends T> spliterator) {

class Adapter implements Iterator<T>, Consumer<T> {// implementation

}

return new Adapter() ;}

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• On peut s’inspirer du JDK

class Adapter implements Iterator<T>, Consumer<T> {boolean valueReady = false ;T nextElement;

public void accept(T t) {valueReady = true ;nextElement = t ;

}

public boolean hasNext() {if (!valueReady)

spliterator.tryAdvance(this) ; // calls accept()return valueReady ;

}

public T next() {if (!valueReady && !hasNext())

throw new NoSuchElementException() ;else {

valueReady = false ;return nextElement ;

}}

}

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• On peut s’inspirer du JDKpublic static<T> Iterator<T> of(Observable<? extends T> obsevable) {

class Adapter implements Iterator<T> {// implementation

}

return new Adapter() ;}

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• On peut s’inspirer du JDK

class Adapter implements Iterator<T>, Consumer<T> {boolean valueReady = false ;T nextElement;

public void accept(T t) {valueReady = true ;nextElement = t ;

}

public boolean hasNext() {return valueReady ;

}

public T next() {if (!valueReady && !hasNext())

throw new NoSuchElementException() ;else {

valueReady = false ;return nextElement ;

}}

}

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• On peut s’inspirer du JDKclass Adapter implements Iterator<T>, Consumer<T> {boolean valueReady = false ;T nextElement;

public void accept(T t) {observable.subscribe(

element ‐> nextElement = element, // onNextexception ‐> valueReady = false,    // onError() ‐> valueReady = false            // onComplete

) ;}

// next

// hasNext}

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• On peut s’inspirer du JDKclass Adapter implements Iterator<T>, Consumer<T> {AtomicBoolean valueReady = new AtomicBoolean(false) ;AtomicReference<T> nextElement = new AtomicReference() ;

public void accept(T t) {observable.subscribe(

element ‐> nextElement.set(element), // onNextexception ‐> valueReady.set(false),    // onError() ‐> valueReady.set(false) // onComplete

) ;}

// next

// hasNext}

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• Peut mieux faire ? interface Wrapper<E> {

E get() ;

}

Wrapper<Boolean> wb = () ‐> true ;

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• Peut mieux faire ? interface Wrapper<E> {

E get() ;

}

Wrapper<Boolean> wb = () ‐> true ;Action1<Boolean> onNext = b ‐> wb.set(b) ; // should return Wrapper<T>

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• Peut mieux faire ? interface Wrapper<E> {

E get() ;

public default Wrapper<E> set(E e) {// should return e

}}

Wrapper<Boolean> wb = () ‐> true ;Action1<Boolean> onNext = b ‐> wb.set(b) ; // should return Wrapper<T>

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• Peut mieux faire ? interface Wrapper<E> {

E get() ;

public default Wrapper<E> set(E e) {return () ‐> e ;

}}

Wrapper<Boolean> wb = () ‐> true ;Action1<Boolean> onNext = b ‐> wb.set(b) ; // should return Wrapper<T>

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• On peut s’inspirer du JDKclass Adapter implements Iterator<T>, Consumer<T> {Wrapper<Boolean> valueReady = () ‐> false ;Wrapper<T> nextElement ;

public void accept(T t) {observable.subscribe(

element ‐> nextElement.set(element), // onNextexception ‐> valueReady.set(false),    // onError() ‐> valueReady.set(false) // onComplete

) ;}

// next

// hasNext}

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• On peut construire un Iterator sur un Observable Et donc un Spliterator sur un Observable

#J8Stream @JosePaumard

RxJava & StreamRxJava & Stream

• On peut construire un Iterator sur un Observable Et donc un Spliterator sur un Observable

• Fonctionne sur les cold Observable

#J8Stream @JosePaumard

Spliterators avancésSpliterators avancés

• Peut-on construire des Stream à la façon des Observable ?

• Partant d’un Stream [1, 2, 3, 4, 5, …]

#J8Stream @JosePaumard

Spliterators avancésSpliterators avancés

• Peut-on construire des Stream à la façon des Observable ?

• Partant d’un Stream [1, 2, 3, 4, 5, …] [[1, 2, 3], [4, 5, 6], [7, 8, 9], …] ?

#J8Stream @JosePaumard

Spliterators avancésSpliterators avancés

• Peut-on construire des Stream à la façon des Observable ?

• Partant d’un Stream [1, 2, 3, 4, 5, …] [[1, 2, 3], [4, 5, 6], [7, 8, 9], …] ? [[1, 2, 3], [2, 3, 4], [3, 4, 5], …] ?

#J8Stream @JosePaumard

Spliterators avancésSpliterators avancés

• Peut-on construire des Stream à la façon des Observable ?

• Deux Stream [10, 11, 12, …] et [50, 51, 52, …]

#J8Stream @JosePaumard

Spliterators avancésSpliterators avancés

• Peut-on construire des Stream à la façon des Observable ?

• Deux Stream [10, 11, 12, …] et [50, 51, 52, …] [10, 50, 11, 51, 12, 52, …] ?

#J8Stream @JosePaumard

Spliterators avancésSpliterators avancés

• Peut-on construire des Stream à la façon des Observable ?

• Deux Stream [10, 11, 12, …] et [50, 51, 52, …] [10, 50, 11, 51, 12, 52, …] ? [Pair(10, 50), Pair(11, 51), Pair(12, 52), …] ?

#J8Stream @JosePaumard

Spliterators avancésSpliterators avancés

• Trois méthodes pour un Spliterator tryAdvance(Consumer) trySplit(), retourne un sous-spliterator estimateSize()

#J8Stream @JosePaumard

GroupingSpliteratorGroupingSpliterator

• [1, 2, 3, 4, 5, …] -> [[1, 2, 3], [4, 5, 6], [7, 8, 9], …]public class GroupingSpliterator<E> implements Spliterator<Stream<E>> {

private final long grouping ;private final Spliterator<E> spliterator ;

// implementation}

#J8Stream @JosePaumard

GroupingSpliteratorGroupingSpliterator

• [1, 2, 3, 4, 5, …] -> [[1, 2, 3], [4, 5, 6], [7, 8, 9], …]public boolean tryAdvance(Consumer<? super Stream<E>> action) {

boolean finished = false ;Stream.Builder<E> builder = Stream.builder() ;for (int i = 0 ; i < grouping ; i++) {

if (!spliterator.tryAdvance(element ‐> { builder.add(element) ; })) {finished = true ;

}}Stream<E> subStream = subBuilder.build() ;action.accept(subStream) ;return !finished ;

}

#J8Stream @JosePaumard

GroupingSpliteratorGroupingSpliterator

• [1, 2, 3, 4, 5, …] -> [[1, 2, 3], [4, 5, 6], [7, 8, 9], …]public Spliterator<Stream<E>> trySplit() {

Spliterator<E> spliterator = this.spliterator.trySplit() ;return new GroupingSpliterator<E>(spliterator, grouping) ;

}

#J8Stream @JosePaumard

GroupingSpliteratorGroupingSpliterator

• [1, 2, 3, 4, 5, …] -> [[1, 2, 3], [4, 5, 6], [7, 8, 9], …]public long estimateSize() {

return spliterator.estimateSize() / grouping ;}

#J8Stream @JosePaumard

RollingSpliteratorRollingSpliterator

• [1, 2, 3, 4, 5, …] -> [[1, 2, 3], [2, 3, 4], [3, 4, 5], …]

#J8Stream @JosePaumard

RollingSpliteratorRollingSpliterator

• [1, 2, 3, 4, 5, …] -> [[1, 2, 3], [4, 5, 6], [7, 8, 9], …]public class RollingSpliterator<E> implements Spliterator<Stream<E>> {

private final int grouping ;private final Spliterator<E> spliterator ;private Object [] buffer ; // we cant create arrays of Eprivate AtomicInteger bufferWriteIndex = new AtomicInteger(0) ;private AtomicInteger bufferReadIndex = new AtomicInteger(0) ;

// implementation}

#J8Stream @JosePaumard

RollingSpliteratorRollingSpliterator

• [1, 2, 3, 4, 5, …] -> [[1, 2, 3], [4, 5, 6], [7, 8, 9], …]

public boolean tryAdvance(Consumer<? super Stream<E>> action) {boolean finished = false ;

if (bufferWriteIndex.get() == bufferReadIndex.get()) {for (int i = 0 ; i < grouping ; i++) {

if (!advanceSpliterator()) {finished = true ;

}}

}if (!advanceSpliterator()) {

finished = true ;}

Stream<E> subStream = buildSubstream() ;action.accept(subStream) ;return !finished ;

}

#J8Stream @JosePaumard

RollingSpliteratorRollingSpliterator

• [1, 2, 3, 4, 5, …] -> [[1, 2, 3], [4, 5, 6], [7, 8, 9], …]private boolean advanceSpliterator() {

return spliterator.tryAdvance(element ‐> { 

buffer[bufferWriteIndex.get() % buffer.length] = element ; bufferWriteIndex.incrementAndGet() ;

});}

#J8Stream @JosePaumard

RollingSpliteratorRollingSpliterator

• [1, 2, 3, 4, 5, …] -> [[1, 2, 3], [4, 5, 6], [7, 8, 9], …]public Spliterator<Stream<E>> trySplit() {

return new RollingSpliterator<E>(spliterator.trySplit(), grouping) ;}

public long estimateSize() {return spliterator.estimateSize() ‐ grouping ;

}

#J8Stream @JosePaumard

ZippingSpliteratorZippingSpliterator

• [1, 2, 3, …], [a, b, c, …] -> [F(1, a), F(2, b), F(3, c), …]public class ZippingSpliterator<E1, E2, R> implements Spliterator<R> {

private final Spliterator<E1> spliterator1 ;private final Spliterator<E2> spliterator2 ;private final BiFunction<E1, E2, R> tranform ;

// implementation}

#J8Stream @JosePaumard

ZippingSpliteratorZippingSpliterator

• [1, 2, 3, …], [a, b, c, …] -> [F(1, a), F(2, b), F(3, c), …]public boolean tryAdvance(Consumer<? super R> action) {

return spliterator1.tryAdvance(e1 ‐> {

spliterator2.tryAdvance(e2 ‐> {action.accept(tranform.apply(e1, e2)) ;

}) ;}) ;

}

#J8Stream @JosePaumard

ZippingSpliteratorZippingSpliterator

• [1, 2, 3, …], [a, b, c, …] -> [F(1, a), F(2, b), F(3, c), …]public Spliterator<R> trySplit() {

return new ZippingSpliterator<E1, E2, R>(spliterator1.trySplit(), spliterator2.trySplit(), tranform) ;

}

public long estimateSize() {return this.spliterator1.estimateSize() ;

}

#J8Stream @JosePaumard

PairingSpliteratorPairingSpliterator

• [1, 2, 3, …], [a, b, c, …] -> [(1, a), (2, b), (3, c), …]public class PairingSpliterator<E1, E2> extends ZippingSpliterator<E1, E2, Pair<E1, E2>> {

public PairingSpliterator(Spliterator<E1> spliterator1, Spliterator<E2> spliterator2) {

super(spliterator1, spliterator2, (e1, e2) ‐> new Pair<E1, E2>(e1, e2)) ;

}}

}

#J8Stream @JosePaumard

Stream vs RxJavaStream vs RxJava

• Pour la partie cold : on peut l’implémenter avec des Stream

• Pour la partie hot : on peut interfacer les deux API

#J8Stream @JosePaumard

Comparaisons

#J8Stream @JosePaumard

ComparaisonsComparaisons

• Cas applicatif unique• Implémentés avec les trois API

• Comparaisons des patterns• Mesure des temps de traitement avec JMH

#J8Stream @JosePaumard

Shakespeare joue an ScrabbleShakespeare joue an Scrabble

• « Shakespeare joue au Scrabble »• Ensemble de mots Les mots utilisés par Shakespeare Les mots autorisés au Scrabble

• Question : quel aurait été le meilleur mot que Shakespeare aurait pu jouer ?

#J8Stream @JosePaumard

Shakespeare joue an ScrabbleShakespeare joue an Scrabble

• Comparaison des principaux patterns• Puis comparaison des performances globales

#J8Stream @JosePaumard

Histogramme des lettresHistogramme des lettres

• Java Stream GS Collections Rx Java// Histogram of the letters in a given wordFunction<String, Map<Integer, Long>> histoOfLetters = 

word ‐> word.chars().boxed().collect(

Collectors.groupingBy(Function.identity(),Collectors.counting()

)) ;

#J8Stream @JosePaumard

Histogramme des lettresHistogramme des lettres

• Java Stream GS Collections Rx Java// Histogram of the letters in a given wordFunction<String, MutableMap<Integer, Long>> histoOfLetters = 

word ‐> new CharArrayList(word.toCharArray()).collect(c ‐> new Integer((int)c))// .groupBy(letter ‐> letter) ;.aggregateBy(

letter ‐> letter, () ‐> 0L, (value, letter) ‐> { return value + 1 ; }

) ;

#J8Stream @JosePaumard

Histogramme des lettresHistogramme des lettres

• Java Stream GS Collections Rx Java// Histogram of the letters in a given wordFunc1<String, Observable<HashMap<Integer, LongWrapper>>> histoOfLetters =

word ‐> toIntegerObservable.call(word).collect(

() ‐> new HashMap<Integer, LongWrapper>(), (HashMap<Integer, LongWrapper> map, Integer value) ‐> { 

LongWrapper newValue = map.get(value) ;if (newValue == null) {

newValue = () ‐> 0L ;}map.put(value, newValue.incAndSet()) ;

}) ;

#J8Stream @JosePaumard

Histogramme des lettresHistogramme des lettres

• Java Stream GS Collections Rx Javainterface LongWrapper {

long get() ;

public default LongWrapper set(long l) {return () ‐> l ;

}

public default LongWrapper incAndSet() {return () ‐> get() + 1L ;

}

public default LongWrapper add(LongWrapper other) {return () ‐> get() + other.get() ;

}}

#J8Stream @JosePaumard

Nombre de blancs pour une lettreNombre de blancs pour une lettre

• Java Stream GS Collections Rx Java// number of blanks for a given letterToLongFunction<Map.Entry<Integer, Long>> blank =

entry ‐> Long.max(

0L, entry.getValue() ‐

scrabbleAvailableLetters[entry.getKey() ‐ 'a']) ;

#J8Stream @JosePaumard

Nombre de blancs pour une lettreNombre de blancs pour une lettre

• Java Stream GS Collections Rx Java// number of blanks for a given letterLongFunction<Map.Entry<Integer, Long>> blank = 

entry ‐> Long.max(

0L, entry.getValue() ‐

scrabbleAvailableLetters[entry.getKey() ‐ 'a']) ;

#J8Stream @JosePaumard

Nombre de blancs pour une lettreNombre de blancs pour une lettre

• Java Stream GS Collections Rx Java// number of blanks for a given letterFunc1<Entry<Integer, LongWrapper>, Observable<Long>> blank =

entry ‐>Observable.just(

Long.max(0L, entry.getValue().get() ‐

scrabbleAvailableLetters[entry.getKey() ‐ 'a']

#J8Stream @JosePaumard

Nombre de blancs pour un motNombre de blancs pour un mot

• Java Stream GS Collections Rx Java// number of blanks for a given wordFunction<String, Long> nBlanks =

word ‐> histoOfLetters.apply(word).entrySet().stream().mapToLong(blank).sum();

#J8Stream @JosePaumard

Nombre de blancs pour un motNombre de blancs pour un mot

• Java Stream GS Collections Rx Java// number of blanks for a given wordFunction<String, Long> nBlanks =

word ‐> UnifiedSet.newSet(histoOfLetters.valueOf(word)

.entrySet()).sumOfLong(blank) ;

#J8Stream @JosePaumard

Nombre de blancs pour un motNombre de blancs pour un mot

• Java Stream GS Collections Rx Java// number of blanks for a given wordFunc1<String, Observable<Long>> nBlanks = 

word ‐> histoOfLetters.call(word).flatMap(map ‐> Observable.from(() ‐> map.entrySet().iterator())).flatMap(blank).reduce(Long::sum) ;

#J8Stream @JosePaumard

Prédicat sur 2 blancsPrédicat sur 2 blancs

• Java Stream GS Collections Rx Java// can a word be written with 2 blanks?Predicate<String> checkBlanks = word ‐> nBlanks.apply(word) <= 2 ;

// can a word be written with 2 blanks?Predicate<String> checkBlanks = word ‐> nBlanks.valueOf(word) <= 2 ;

// can a word be written with 2 blanks?Func1<String, Observable<Boolean>> checkBlanks = 

word ‐> nBlanks.call(word).flatMap(l ‐> Observable.just(l <= 2L)) ;

#J8Stream @JosePaumard

Bonus lettre doublée – 1Bonus lettre doublée – 1

• Java Stream GS Collections Rx Java// Placing the word on the board// Building the streams of first and last lettersFunction<String, IntStream> first3 = 

word ‐> word.chars().limit(3);

#J8Stream @JosePaumard

Bonus lettre doublée – 1Bonus lettre doublée – 1

• Java Stream GS Collections Rx Java// Placing the word on the board// Building the streams of first and last lettersFunction<String, MutableList<Integer>> first3 = 

word ‐>  new CharArrayList(word.toCharArray()).collect(c ‐> (int)c) ;.subList(0, Integer.min(list.size(), 3)) ;

#J8Stream @JosePaumard

Bonus lettre doublée – 1Bonus lettre doublée – 1

• Java Stream GS Collections Rx Java// Placing the word on the board// Building the streams of first and last lettersFunc1<String, Observable<Integer>> first3 = 

word ‐> Observable.from(

IterableSpliterator.of(word.chars().boxed().limit(3).spliterator()

)) ;

#J8Stream @JosePaumard

Bonus lettre doublée – 2Bonus lettre doublée – 2

• Java Stream GS Collections Rx Java// Bonus for double letterToIntFunction<String> bonusForDoubleLetter = 

word ‐> Stream.of(first3.apply(word), last3.apply(word)).flatMapToInt(Function.identity()).map(scoreOfALetter).max().orElse(0) ;

#J8Stream @JosePaumard

Bonus lettre doublée – 2Bonus lettre doublée – 2

• Java Stream GS Collections Rx Java// Bonus for double letterFunction<String, MutableList<Integer>> toBeMaxed = 

word ‐> { MutableList<Integer> list = first3.valueOf(word) ;list.addAll(last3.valueOf(word)) ;return list ; 

} ;

IntFunction<String> bonusForDoubleLetter =word ‐> toBeMaxed.valueOf(word)

.collect(scoreOfALetter)

.max() ;

#J8Stream @JosePaumard

Bonus lettre doublée – 2Bonus lettre doublée – 2

• Java Stream GS Collections Rx Java// Bonus for double letterFunc1<String, Observable<Integer>> bonusForDoubleLetter = 

word ‐> Observable.just(first3.call(word), last3.call(word)).flatMap(observable ‐> observable).flatMap(scoreOfALetter).reduce(Integer::max) ;

#J8Stream @JosePaumard

Score d’un motScore d’un mot

• Java Stream GS Collections Rx Java// score of the word put on the boardFunction<String, Integer> score3 =

word ‐> 2*(score2.apply(word) + bonusForDoubleLetter.applyAsInt(word))+ (word.length() == 7 ? 50 : 0);

#J8Stream @JosePaumard

Score d’un motScore d’un mot

• Java Stream GS Collections Rx Java// score of the word put on the boardFunction<String, Integer> score3 = 

word ‐>2*(score2.valueOf(word) + bonusForDoubleLetter.intValueOf(word))+ (word.length() == 7 ? 50 : 0);

#J8Stream @JosePaumard

Score d’un motScore d’un mot

• Java Stream GS Collections Rx Java// score of the word put on the boardFunc1<String, Observable<Integer>> score3 = 

word ‐>Observable.just(

score2.call(word), score2.call(word), bonusForDoubleLetter.call(word), bonusForDoubleLetter.call(word), Observable.just(word.length() == 7 ? 50 : 0)

).flatMap(observable ‐> observable).reduce(Integer::sum) ;

#J8Stream @JosePaumard

Histogramme des scoresHistogramme des scores

• Java Stream GS Collections Rx JavaFunction<Function<String, Integer>, Map<Integer, List<String>>>buildHistoOnScore = 

score ‐> shakespeareWords.stream().parallel().filter(scrabbleWords::contains).filter(checkBlanks).collect(

Collectors.groupingBy(score, () ‐> new TreeMap<>(Comparator.reverseOrder()), Collectors.toList()

)) ;

#J8Stream @JosePaumard

Histogramme des scoresHistogramme des scores

• Java Stream GS Collections Rx JavaFunction<

Function<String, Integer>, MutableMap<Integer, MutableList<String>>>buildHistoOnScore = 

score ‐> shakespeareWords.select(scrabbleWords::contains).select(checkBlanks).aggregateBy(

score, FastList::new, (list, value) ‐> { list.add(value) ; return list ; }

) ;

#J8Stream @JosePaumard

Histogramme des scoresHistogramme des scores

• Java Stream GS Collections Rx JavaFunc1<Func1<String, Observable<Integer>>, Observable<TreeMap<Integer, List<String>>>> buildHistoOnScore =

score ‐> Observable.from(() ‐> shakespeareWords.iterator()).filter(scrabbleWords::contains).filter(word ‐> checkBlanks.call(word).toBlocking().first()).collect(

() ‐> new TreeMap<Integer, List<String>>(Comparator.reverseOrder()), (TreeMap<Integer, List<String>> map, String word) ‐> {

Integer key = score.call(word).toBlocking().first() ;List<String> list = map.get(key) ;if (list == null) {

list = new ArrayList<String>() ;map.put(key, list) ;

}list.add(word) ;

}) ;

#J8Stream @JosePaumard

Meilleurs motsMeilleurs mots

• Java Stream GS Collections Rx Java// best key / value pairsList<Entry<Integer, List<String>>> finalList = 

buildHistoOnScore.apply(score3).entrySet().stream().limit(3).collect(Collectors.toList()) ;

#J8Stream @JosePaumard

Meilleurs motsMeilleurs mots

• Java Stream GS Collections Rx Java// best key / value pairsMutableList<Entry<Integer, MutableList<String>>> finalList = 

new FastList<Map.Entry<Integer,MutableList<String>>>(new TreeSortedMap<Integer, MutableList<String>>(

Comparator.reverseOrder(), buildHistoOnScore.valueOf(score3)

).entrySet()

).subList(0, 3) ;

#J8Stream @JosePaumard

Meilleurs motsMeilleurs mots

• Java Stream GS Collections Rx Java// best key / value pairsList<Entry<Integer, List<String>>> finalList2 =

buildHistoOnScore.call(score3).flatMap(map ‐> Observable.from(() ‐> map.entrySet().iterator())).take(3).collect(

() ‐> new ArrayList<Entry<Integer, List<String>>>(), (list, entry) ‐> { list.add(entry) ; }

).toBlocking().first() ;

#J8Stream @JosePaumard

Meilleurs motsMeilleurs mots

• Java Stream GS Collections Rx Java// best key / value pairsCountDownLatch latch = new CountDownLatch(3) ;

buildHistoOnScore.call(score3).flatMap(map ‐> Observable.from(() ‐> map.entrySet().iterator())).take(3).collect(

() ‐> new ArrayList<Entry<Integer, List<String>>>(), (list, entry) ‐> { list.add(entry) ; latch.countDown() ; }

).forEach(...) ;

latch.await() ;

#J8Stream @JosePaumard

1er bilan : patterns1er bilan : patterns

• Java 8 Stream donne les patterns les plus simples

#J8Stream @JosePaumard

1er bilan : patterns1er bilan : patterns

• Java 8 Stream donne les patterns les plus simples• Java 8 Stream & GS Collections se ressemblent

#J8Stream @JosePaumard

1er bilan : patterns1er bilan : patterns

• Java 8 Stream donne les patterns les plus simples• Java 8 Stream & GS Collections se ressemblent• RxJava fait le choix du « tout flatMap » ce qui mène à

des patterns inutilement lourds

#J8Stream @JosePaumard

Performances Performances

• Utilisation de JMH• Outil standard de mesure de performances du JDK• Développé dans le cadre de l’Open JDK• Aleksey Shipilev http://shipilev.net/• https://twitter.com/shipilev

http://openjdk.java.net/projects/code-tools/jcstress/

http://openjdk.java.net/projects/code-tools/jmh/

https://www.parleys.com/tutorial/java-microbenchmark-harness-the-lesser-two-evils

#J8Stream @JosePaumard

Performances Performances

• Utilisation de JMH• Outil standard de mesure de performances du JDK• Développé dans le cadre de l’Open JDK• Aleksey Shipilev http://shipilev.net/• https://twitter.com/shipilev

http://openjdk.java.net/projects/code-tools/jcstress/

http://openjdk.java.net/projects/code-tools/jmh/

https://www.parleys.com/tutorial/java-microbenchmark-harness-the-lesser-two-evils

Aleksey Shipilëv @shipilev

Чувак из ТВ-службы пришёл отключать антенну. Оказался масс-спектрометристом, сцепился языком с тестем: стоят, обсуждают девайсы. #наукоград

#J8Stream @JosePaumard

JMHJMH

• Simple à utiliser<dependency>

<groupId>org.openjdk.jmh</groupId><artifactId>jmh‐core</artifactId><version>1.7</version>

</dependency>

#J8Stream @JosePaumard

JMHJMH

• État partagé entre les exécutions@State(Scope.Benchmark)public class ShakespearePlaysScrabble {

Set<String> scrabbleWords = null ;Set<String> shakespeareWords = null ;

@Setuppublic void init() {

scrabbleWords = Util.readScrabbleWords() ;shakespeareWords = Util.readShakespeareWords() ;

}}

#J8Stream @JosePaumard

JMHJMH

• Simple à mettre en œuvre @Benchmark@BenchmarkMode(Mode.AverageTime)@OutputTimeUnit(TimeUnit.MILLISECONDS)@Warmup(iterations=5)@Measurement(iterations=5)@Fork(3)public List<Entry<Integer, List<String>>> measureAverage() {

// implementation to test}

#J8Stream @JosePaumard

JMHJMH

• Utilisation particulière> mvn clean install> java –jar target/benchmark.jar

#J8Stream @JosePaumard

JMHJMH

• 3 façons de mesurer les performances Temps moyen d’exécution Nombre d’exécutions par seconde Échantillonnage (diagramme par quantiles)

#J8Stream @JosePaumard

Performances Performances

• Mesure du temps moyenBenchmark           Mode  Cnt Score   Error UnitsGSCollections avgt 100   25,392 ± 0,253  ms/op

#J8Stream @JosePaumard

Performances Performances

• Mesure du temps moyenBenchmark           Mode  Cnt Score   Error UnitsGSCollections avgt 100   25,392 ± 0,253  ms/opNonParallelStreams avgt 100   29,027 ± 0,279  ms/op

#J8Stream @JosePaumard

Performances Performances

• Mesure du temps moyenBenchmark           Mode  Cnt Score   Error UnitsGSCollections avgt 100   25,392 ± 0,253  ms/opNonParallelStreams avgt 100   29,027 ± 0,279  ms/opRxJava avgt 100  253,788 ± 1,421  ms/op

#J8Stream @JosePaumard

Performances Performances

• Mesure du temps moyenBenchmark           Mode  Cnt Score   Error UnitsGSCollections avgt 100   25,392 ± 0,253  ms/opNonParallelStreams avgt 100   29,027 ± 0,279  ms/opRxJava avgt 100  253,788 ± 1,421  ms/opParallelStreams avgt 100    7,624 ± 0,055  ms/op

#J8Stream @JosePaumard

Performances Performances

• Mesure du temps moyen

• Attention à la mémoire avec GS Collections

Benchmark           Mode  Cnt Score   Error UnitsGSCollections avgt 100   25,392 ± 0,253  ms/opNonParallelStreams avgt 100   29,027 ± 0,279  ms/opRxJava avgt 100  253,788 ± 1,421  ms/opParallelStreams avgt 100    7,624 ± 0,055  ms/op

#J8Stream @JosePaumard

Conclusion

#J8Stream @JosePaumard

Conclusion Conclusion

• La programmation fonctionnelle est « à la mode »• Du point de vue performance, les choses ne sont pas

si simples• Le choix de l’API Stream : « une dose de fonctionnel »

est probablement le bon

#J8Stream @JosePaumard

Conclusion Conclusion

• GS Collections : bonne API Beaucoup de patterns en doublon des Streams Compatible Java 7

• RxJava : API riche et complexe Approche différente, patterns complémentaires Attention aux performances

#J8Stream @JosePaumard

Conclusion Conclusion

• Java 8 Stream Performante, efficace en mémoire Seule à offrir la parallélisation « gratuite » Extensible au travers des Spliterator

@JosePaumard#J8Stream

@JosePaumard#J8Stream

Recommended