De Runnable & synchronized à parallele() et atomically()

Preview:

DESCRIPTION

 

Citation preview

Quel est l’objet

de la programmation

concurrente ?

Tirer parti de la

puissance des

processeurs

1) De quoi cette puissance est-elle faite ?

Architectures muticœurs

2) Conséquences sur la façon de coder ?

3) Quels sont les problèmes des applications « parallèles » ?

Synchronisation

4) Comment s’y prendre pour « synchroniser » les opérations

5) De quels outils dispose-t-on ?

Java 6

Java 7

Java 8

6) Importance de l’algorithmique ?

MULTITHREADING

1995

1995

Si un thread a « asser travaillé »

S’il est en attente d’une ressource lente

S’il est en attente d’un autre thread

1995

Parallèle

SIMD : chaque CPU travaille sur ses propres données

1995

API Java

• Thread, Runnable

• Threads « natifs », « Green threads »

• Moniteur

• synchronized, volatile

1995

API Java

• Thread, Runnable

• Threads « natifs », « Green threads »

• Moniteur

• synchronized, volatile

1995

2004

2004 - 2005

Software Hardware

Processeurs multicœurs

• Fin de la « soupe gratuite »

• La puissance de calcul augmente par le nombre de cœurs, et non plus par l’augmentation de la fréquence de chaque cœur

• Apparition des GPGPU (2007)

Processeurs multicœurs

• Aujourd’hui

– Smartphone : 2 à 4 cœurs

– PC de bureau : 4 à 8 cœurs

– Serveur pro : 128 cœurs

• Demain

– Plateforme exascale : de 100k à 1M nœuds

– Chaque nœud : de 1k à 10k cœurs

Java.util.concurrent

• API de concurrence avancée

– Dispo en Java 4 EDU.oswego

• Nouvelle approche du multithreading

– Nouvelle façon de lancer des threads

– Nouvelle façon de synchroniser les opérations

Problèmes du

multithread ?

1) Il est nécessaire !

2) Data race

Deux threads modifient une variable

« en même temps »

public class Singleton {

private static Singleton instance ;

private Singleton() {}

public static Singleton getInstance() {

if (instance == null) {

instance = new Singleton() ;

}

return instance ; }

}

public class Singleton {

private static Singleton instance ;

private Singleton() {}

public static Singleton getInstance() {

if (instance == null) {

instance = new Singleton() ;

}

return instance ; }

}

public class Singleton {

private static Singleton instance ;

private Singleton() {}

public static Singleton getInstance() {

if (instance == null) {

instance = new Singleton() ;

}

return instance ; }

}

Interruption =

catastrophe

public class Singleton {

private static Singleton instance ;

private Singleton() {}

public static synchronized Singleton getInstance() {

if (instance == null) {

instance = new Singleton() ;

}

return instance ; }

}

public class Singleton {

private static Singleton instance ;

private Singleton() {}

public static Singleton getInstance() {

if (instance == null) {

instance = new Singleton() ;

}

return instance ; }

}

public class Singleton {

private static Singleton instance ;

private Singleton() {}

public static synchronized Singleton getInstance() {

if (instance == null) {

instance = new Singleton() ;

}

return instance ; }

}

Lecture ralentie

public class Singleton {

private static Singleton instance ;

public static Singleton getInstance() {

if (instance == null) {

synchronized (Singleton.class) {

if (instance == null) {

instance = new Singleton() ;

}

}

}

return instance ;

}

}

Double check locking

public class Singleton {

private static Singleton instance ;

public static Singleton getInstance() {

if (instance == null) {

synchronized (Singleton.class) {

if (instance == null) {

instance = new Singleton() ;

}

}

}

return instance ;

}

}

Double check locking

BUG !

public enum Singleton {

instance ;

}

Solutions aux « data races »

• Synchronisation de la modification des variables

« garder les variables »

• Et si on ne modifie jamais nos variables ?

Systèmes immutables

SYNCHRONISATION

Java Memory Model

• Objet : définir la valeur d’une variable à un instant donné

• Règle : « toute lecture d’une variable retourne la dernière valeur écrite dans cette variable »

Java Memory Model

• Objet : définir la valeur d’une variable à un instant donné

• Règle : « toute lecture d’une variable retourne la dernière valeur écrite dans cette variable »

• Monothread : contrainte sur le fonctionnement des compilateurs & de la JVM

Java Memory Model

• Objet : définir la valeur d’une variable à un instant donné

• Règle : « toute lecture d’une variable retourne la dernière valeur écrite dans cette variable »

• Multithread multicœur : vrai problème !

« toute lecture … retourne la dernière valeur écrite … »

Java Memory Model

• Multithread multicœur : vrai problème !

« toute lecture … retourne la dernière valeur écrite … »

• Chronologie entre les threads / cœurs

Happens before

• Chronologie = créer des liens entre les lectures et les écritures

• Valeur de r1 ?

– Même thread ⇒ évident

– Threads différents : impossible de prévoir

x = 1

r1 = x

Happens before

• Chronologie = créer des liens entre les lectures et les écritures

• Valeur de r1 ?

– Imposée par la JLS : r1 = 1

x = 1

r1 = x

happens before

Happens before

• Règle :

Un lien « happens before » est établi entre toute écriture synchronisée ou volatile et toute lecture synchronisée ou volatile qui suit

• Existence de ce lien ⇒

Javadoc (java.util.concurrent)

private int index ;

public void incrementonsGaiement() {

index++ ;

}

public void testonsJoyeusement() {

if (index > 10) {

System.out.println("Index est grand !") ;

} }

private int index ;

public void incrementonsGaiement() {

index++ ;

}

public void testonsJoyeusement() {

if (index > 10) {

System.out.println("Index est grand !") ;

} }

Pas de lien « happens before »

private volatile int index ;

public void incrementonsGaiement() {

index++ ;

}

public void testonsJoyeusement() {

if (index > 10) {

System.out.println("Index est grand !") ;

} }

private volatile int index ;

public void incrementonsGaiement() {

index++ ;

}

public void testonsJoyeusement() {

if (index > 10) {

System.out.println("Index est grand !") ;

} }

Lien « happens before »

private int x, y, r1, r2 ;

private Object lock = new Object() ;

void maMethode() {

x = 1 ;

synchronized(lock) {

y = 1 ;

}

}

void maMethodeAussi() {

synchronized(lock) {

r1 = y ;

}

r2 = x ;

}

Cas pathologique

private int x, y, r1, r2 ;

private Object lock = new Object() ;

void maMethode() {

x = 1 ;

synchronized(lock) {

y = 1 ;

}

}

void maMethodeAussi() {

synchronized(lock) {

r1 = y ;

}

r2 = x ;

}

1) T1 prend le lock

1

2

private int x, y, r1, r2 ;

private Object lock = new Object() ;

void maMethode() {

x = 1 ;

synchronized(lock) {

y = 1 ;

}

}

void maMethodeAussi() {

synchronized(lock) {

r1 = y ;

}

r2 = x ;

}

1) T1 prend le lock

1

2

3

4

private int x, y, r1, r2 ;

private Object lock = new Object() ;

void maMethode() {

x = 1 ;

synchronized(lock) {

y = 1 ;

}

}

void maMethodeAussi() {

synchronized(lock) {

r1 = y ;

}

r2 = x ;

}

1) T1 prend le lock

1

2

3

4

r2 = 1

private int x, y, r1, r2 ;

private Object lock = new Object() ;

void maMethode() {

x = 1 ;

synchronized(lock) {

y = 1 ;

}

}

void maMethodeAussi() {

synchronized(lock) {

r1 = y ;

}

r2 = x ;

}

1) T2 prend le lock

1

4

private int x, y, r1, r2 ;

private Object lock = new Object() ;

void maMethode() {

x = 1 ;

synchronized(lock) {

y = 1 ;

}

}

void maMethodeAussi() {

synchronized(lock) {

r1 = y ;

}

r2 = x ;

}

1) T2 prend le lock

1

? ?

4

private int x, y, r1, r2 ;

private Object lock = new Object() ;

void maMethode() {

x = 1 ;

synchronized(lock) {

y = 1 ;

}

}

void maMethodeAussi() {

synchronized(lock) {

r1 = y ;

}

r2 = x ;

}

1) T2 prend le lock

1

? ?

4

r2 = 0 ? r2 = 1 ?

public class Singleton {

private static Singleton instance ;

public static Singleton getInstance() {

if (instance == null) {

synchronized (Singleton.class) {

if (instance == null) {

instance = new Singleton() ;

}

}

}

return instance ;

}

}

Double check locking

Lecture non synchronisée

public class Singleton {

private volatile static Singleton instance ;

public static Singleton getInstance() {

if (instance == null) {

synchronized (Singleton.class) {

if (instance == null) {

instance = new Singleton() ;

}

}

}

return instance ;

}

}

Variable volatile !

PROCESSEUR

private Object o = new Object() ;

private int index = 0 ;

public void uneMethode() {

synchronized (o) {

index++ ;

}

}

private Object o = new Object() ;

private int index = 0 ;

public void uneMethode() {

synchronized (o) {

index++ ;

}

}

private Object o = new Object() ;

private int index = 0 ;

public void uneMethode() {

synchronized (o) {

index++ ;

}

}

« Visibilité »

Barrière de mémoire

• Une instruction assembleur

• Permet de « propager » une modification d’un cœur à l’autre

• Invalide le cache L1, force sa relecture dans L3 ou la mémoire

Optimisation 1

Concevoir des traitements tels que :

• Les données tiennent dans 32kO

• Le code tient dans 32kO

• Et pas de barrière de mémoire

⇒ Exécution à 1GHz

Optimisation 2

• S’arranger pour placer ses données dans des zones contigües de la mémoire

⇒ Divise pas 8 les temps de chargement

Cadeau empoisonné de L3

• Le cache invalide les données ligne par ligne

• Cas de deux threads :

– Le premier exécute une méthode, accède à a

– Le second exécute une autre méthode, accède à b

• Pas de concurrence d’accès sur a et b

Cadeau empoisonné de L3

• Catastrophique pour les perf !

• Solution : recours au « variable padding »

• Utilisé dans le disruptor (LMAX)

public class Sequence {

private static final AtomicLongFieldUpdater<Sequence> updater =

AtomicLongFieldUpdater.newUpdater(Sequence.class, "value");

private volatile long p1 = 7L, p2 = 7L, p3 = 7L, p4 = 7L,

p5 = 7L, p6 = 7L, p7 = 7L ;

private volatile long value = Sequencer.INITIAL_CURSOR_VALUE ;

private volatile long q1 = 7L, q2 = 7L, q3 = 7L, q4 = 7L,

q5 = 7L, q6 = 7L, q7 = 7L;

// ...

public long sumPaddingToPreventOptimisation() {

return p1 + p2 + p3 + p4 + p5 + p6 + p7 + value + q1 +

q2 + q3 + q4 + q5 + q6 + q7;

}

public void setPaddingValue(final long value)

{

p1 = p2 = p3 = p4 = p5 = p6 = p7 = q1 = q2 = q3 =

q4 = q5 = q6 = q7 = value;

}

}

Conclusion

• Prendre en compte le processeur

– Permet de belles optimisations

– Délicat à utiliser !

• Tenir compte de l’ordre des variables en mémoire

Conclusion

• Prendre en compte le processeur

– Permet de belles optimisations

– Délicat à utiliser !

• Tenir compte de l’ordre des variables en mémoire

• Veut-on le faire ?

• Le langage est-il prêt ?

java.util.concurrent

Nouvelle façon de

lancer les threads

La manière du JDK 1.1

Runnable r = new Runnable() {

public void run() {

while (true) {

System.out.println("Vogue la galère !") ;

}

}

} ;

Thread t = new Thread(r) ;

t.start() ;

La manière du JDK 1.1

Runnable r = new Runnable() {

public void run() {

while (true) {

System.out.println("Vogue la galère !") ;

}

}

} ;

Thread t = new Thread(r) ;

t.start() ; 1) Pas de type de retour 2) Pas d’exception 3) Création / destruction

La manière du JDK 5 ExecutorService service =

new ScheduledThreadPoolExecutor(10) ;

Callable<Boolean> task = new Callable<Boolean>() {

public Boolean call() throws Exception {

int i = 0 ;

while (i++ < 1000) {

System.out.println("Vogue la galère !") ;

}

return true ;

}

};

Future<Boolean> f = service.submit(task) ;

Boolean b = f.get(100, TimeUnit.MILLISECONDS) ;

1) Type de retour, exception 2) Cycle de vie géré par le pool

Nouvelle façon de

synchroniser

Interface Lock

private Lock lock = new ReentrantLock() ;

public void maMethode() {

lock.lock() ; // bloque « comme » synchronized

// ...

lock.unlock() ;

} ;

Interface Lock

private Lock lock = new ReentrantLock() ;

public void maMethode() {

lock.tryLock(10, TimeUnit.MILLISECONDS) ; // timeout

// ...

lock.unlock() ;

} ;

Classe Semaphore

private Semaphore s = new Semaphore(5) ;

public void maMethode() {

s.acquire() ; // bloque « comme » synchronized

// ...

s.release() ;

} ;

Classe Semaphore

private Semaphore s = new Semaphore(5) ;

public void maMethode() {

s.tryAcquire(10, TimeUnit.MILLISECONDS) ; // timeout

// ...

s.release() ;

} ;

Classe Semaphore

• Deux méthodes supplémentaires :

availablePermits()

getQueueLength()

Classe CountDownLatch

• Utilisable en initialisation (pas de reset)

private CountDownLatch latch = new CountDownLatch() ;

public void init() {

db.connect() ; // étape « lente »

latch.countDown() ; // ouverture du latch

} ;

public void process() {

latch.await() ; // attente de l’ouverture

} ;

Classe CyclicBarrier

• Synchronisation sur la fin du traitement de plusieurs tâches

public void processOne() {

// traitements divers

barrier.await() ;

} ;

private CyclicBarrier barrier = new CyclicBarrier(4) ;

public void processTwo() {

// traitements divers

barrier.await() ;

} ;

Interface ReadWriteLock

• Gère des paires de Lock

– Un pour la lecture

– Un pour l’écriture

• Lecture : concurrence possible

• Écriture : exclusive

• Visibilité des modifications garantie

Interface ReadWriteLock

private ReadWriteLock rwLock = new ReentrantReadWriteLock() ;

• Pas de contrainte sur les lectures

private ReadWriteLock rwLock = new ReentrantReadWriteLock() ;

public void maMethodeQuiLit() {

Lock readLock = rwLock.readLock() ;

readLock.lock() ;

// traitements de lecture

readLock.unlock() ;

} ;

Interface ReadWriteLock

• Les écritures stoppent les lectures

• Une seule écriture à la fois

private ReadWriteLock rwLock = new ReentrantReadWriteLock() ;

public void maMethodeQuiEcrit() {

Lock writeLock = rwLock.writeLock() ;

writeLock.lock() ;

// traitements de lecture

writeLock.unlock() ;

} ;

Types atomiques

Types atomiques

• AtomicLong, AtomicFloat, etc…

• Compile en une unique instruction assembleur

private AtomicInteger index = new AtomicInteger(0) ;

public void uneMethode() {

long newValue = index.incrementAndGet() ;

}

Types atomiques

• Principe du CAS « compare and swap »

• Prend en entrée :

– Une adresse mémoire

– Deux valeurs « ancienne » & « nouvelle »

• Si la valeur lue à l’adresse correspond à l’ancienne alors la nouvelle est recopiée

• Sinon, retourne false

• Pas de synchronisation !

Types atomiques

// classe AtomicLong

public final long incrementAndGet() {

for (;;) {

long current = get() ;

long next = current + 1 ;

if (compareAndSet(current, next))

return next ;

}

}

Files d’attente

• BlockingQueue

– File d’attente de taille fixe

– add(e), offer(e), put(e) avec timeout

– remove(e), poll(e), take(e)

– element(e), examine(e)

• Thread safe (locks internes)

– Utilisable pour le pattern producteur / consommateur

Tableau immutable

• CopyOnWriteArrayList (et version LinkedList)

– Lecture synchronisée

– Écriture par duplication

• Le tableau lu n’est jamais modifié, donc on peut le partager sans synchronisation

Structures immutables

• Peut-on construire d’autres structures immutables dans l’API Collection ?

– Tables de hachage

• Réponse : oui, et de façon efficace

– Ce qui n’est pas le cas des tableaux

Alternatives à

synchronized

STM Software Transactionnal memory

Philisophie

• Base de données :

• Sur le commit :

– Succès : on continue

– Échec : on annule les modifications

– Règles de visibilité

• Idée de 1986, implémentations 1995

begin

update ... set ... where ...

commit

Même chose dans la JVM ?

• Plusieurs implémentations

• Akka 1.3.1

<dependency>

<groupId>se.scalablesolutions.akka</groupId>

<artifactId>akka-kernel</artifactId>

<version>1.3.1</version>

</dependency>

<repository>

<id>Akka</id>

<name>Akka Maven2 Repository</name>

<url>http://akka.io/repository/</url>

</repository>

Pattern STM

final Ref<Integer> source = new Ref<Integer>(500) ;

final Atomic<Object> atom = new Atomic<Object>() {

@Override

public Object atomically() {

source.swap(source.get() + 1) ;

return null ;

}

} ;

atom.execute() ;

Effectué dans une transaction

Pattern STM

final Ref<Integer> source = new Ref<Integer>(500) ;

final Atomic<Object> atom = new Atomic<Object>() {

@Override

public Object atomically() {

source.swap(source.get() + 1) ;

return null ;

}

} ;

atom.execute() ;

Exécuté dans le thread courant

Pattern STM

• Une transaction qui échoue est rejouée (idem AtomicLong)

– Utilisation du processeur différente de la synchronisation

• Cas applicatifs non accessibles aux Locks

Object o = queue1.remove() ;

queue2.add(o) ;

Pattern STM

• Bons cas applicatifs : faible concurrence d’accès

• Forte concurrence : les cœurs passent leur temps à rejouer le code

Implémentation des STM

• Peut utiliser les Locks

• Peut ne pas les utiliser

Dans ce cas : quid de la visibilité (pas de barrière de mémoire)

Visibilité : comment l’intégrité des données est-elle garantie ?

Implémentation des STM

• Consomme des ressources (mémoire)

– Fonction du nombre de transactions

– Peut être en O(n2) (très mauvais)

• Concurrence avec les Locks, qui n’en consomment pas

Futur des STM

• Architecture Haswell d’Intel (2013)

– Support de TSX (Transactional Synchronisation Extension)

– TSX permet de marquer des portions de code

– La mémoire modifiée n’est visible que du thread qui modifie cette mémoire

– Elle est publiée « instantanément » sur commit

Acteurs

Modèle d’acteurs

1973

• Un acteur réalise un traitement

• Il reçoit un message immutable

• Effectue un traitement, et retourne éventuellement un résultat

Non synchronisé car sans accès concurrent

Modèle d’acteurs

• Utilisé en Erlang

• Plusieurs API Java et sur la JVM (Scala, Clojure)

Akka : écrit en Scala

Akka : fonctionnement

• Akka gère un pool de threads

• On crée des acteurs, on leur envoie des messages

• Un message reçu est traité dans un thread

• Un acteur ne peut pas être exécuté dans deux threads à la fois

• Pas de concurrence d’accès sur son éventuel état

4000! – Akka

public class PrimeFinderActor extends UntypedActor {

public void onReceive(Object o) throws Exception {

List<Integer> bounds = (List<Integer>)o ;

int debut = bounds.get(0) ;

int fin = bounds.get(1) ;

PrimeFactors pfs = new PrimeFactors() ;

for (int i = debut ; i < fin ; i++) {

PrimeFactors pfi = pfs.getPrimeFactors(i) ;

pfs.add(pfi) ;

} // return pfs ;

getContext().reply(pfs) ;

}

}

4000! - ExecutorService public class PrimeFactorCallable

implements Callable<PrimeFactors> {

private int debut, fin ;

public PrimeFactorCallable(int debut, int fin) {

this.debut = debut ; this.fin = fin ;

}

public PrimeFactors call() throws Exception {

PrimeFactors pfs = new PrimeFactors() ;

for (int i = debut ; i < fin ; i++) {

PrimeFactors pfi = pfs.getPrimeFactors(i) ;

pfs.add(pfi) ;

}

return pfs ;

}

}

public static void main(String[] args) {

Future [] futures = new Future [400] ;

for (int i = 0 ; i < futures.length ; i++) {

List<Integer> bound = Collections.unmodifiableList(

Arrays.asList(10*i, 10*(i + 1))) ;

ActorRef primeFactorFinder =

Actors.actorOf(PrimeFinderActor.class).start() ;

futures[i] =

primeFactorFinder.sendRequestReplyFuture(bound) ;

}

PrimeFactors pfs = new PrimeFactors() ;

for (int i = 0 ; i < futures.length ; i++) {

pfs.add((PrimeFactors)futures[i].get()) ;

}

Actors.registry().shutdownAll() ;

}

public static void main(String... args)

throws ExecutionException, InterruptedException {

ExecutorService es = new ScheduledThreadPoolExecutor(10) ;

Future [] futures = new Future [400] ;

for (int i = 0 ; i < futures.length ; i++) {

PrimeFactorCallable callable =

new PrimeFactorCallable(10*i, 10*(i + 1)) ;

futures[i] = es.submit(callable) ;

}

PrimeFactors pfs = new PrimeFactors() ;

for (int i = 0 ; i < futures.length ; i++) {

pfs.add((PrimeFactors)futures[i].get()) ;

}

es.shutdown() ;

}

[2^3989][3^1996][5^996][7^664][11^399][13^331][17^248][19^221][23^180][29^141][31^133][37^110][41^99][43^

95][47^86][53^76][59^68][61^66][67^59][71^56][73^54][79^50][83^48][89^44][97^41][101^39][103^38][107^37][

109^36][113^35][127^31][131^30][137^29][139^28][149^26][151^26][157^25][163^24][167^23][173^23][179^22][1

81^22][191^20][193^20][197^20][199^20][211^18][223^17][227^17][229^17][233^17][239^16][241^16][251^15][25

7^15][263^15][269^14][271^14][277^14][281^14][283^14][293^13][307^13][311^12][313^12][317^12][331^12][337

^11][347^11][349^11][353^11][359^11][367^10][373^10][379^10][383^10][389^10][397^10][401^9][409^9][419^9]

[421^9][431^9][433^9][439^9][443^9][449^8][457^8][461^8][463^8][467^8][479^8][487^8][491^8][499^8][503^7]

[509^7][521^7][523^7][541^7][547^7][557^7][563^7][569^7][571^7][577^6][587^6][593^6][599^6][601^6][607^6]

[613^6][617^6][619^6][631^6][641^6][643^6][647^6][653^6][659^6][661^6][673^5][677^5][683^5][691^5][701^5]

[709^5][719^5][727^5][733^5][739^5][743^5][751^5][757^5][761^5][769^5][773^5][787^5][797^5][809^4][811^4]

[821^4][823^4][827^4][829^4][839^4][853^4][857^4][859^4][863^4][877^4][881^4][883^4][887^4][907^4][911^4]

[919^4][929^4][937^4][941^4][947^4][953^4][967^4][971^4][977^4][983^4][991^4][997^4][1009^3][1013^3][1019

^3][1021^3][1031^3][1033^3][1039^3][1049^3][1051^3][1061^3][1063^3][1069^3][1087^3][1091^3][1093^3][1097^

3][1103^3][1109^3][1117^3][1123^3][1129^3][1151^3][1153^3][1163^3][1171^3][1181^3][1187^3][1193^3][1201^3

][1213^3][1217^3][1223^3][1229^3][1231^3][1237^3][1249^3][1259^3][1277^3][1279^3][1283^3][1289^3][1291^3]

[1297^3][1301^3][1303^3][1307^3][1319^3][1321^3][1327^3][1361^2][1367^2][1373^2][1381^2][1399^2][1409^2][

1423^2][1427^2][1429^2][1433^2][1439^2][1447^2][1451^2][1453^2][1459^2][1471^2][1481^2][1483^2][1487^2][1

489^2][1493^2][1499^2][1511^2][1523^2][1531^2][1543^2][1549^2][1553^2][1559^2][1567^2][1571^2][1579^2][15

83^2][1597^2][1601^2][1607^2][1609^2][1613^2][1619^2][1621^2][1627^2][1637^2][1657^2][1663^2][1667^2][166

9^2][1693^2][1697^2][1699^2][1709^2][1721^2][1723^2][1733^2][1741^2][1747^2][1753^2][1759^2][1777^2][1783

^2][1787^2][1789^2][1801^2][1811^2][1823^2][1831^2][1847^2][1861^2][1867^2][1871^2][1873^2][1877^2][1879^

2][1889^2][1901^2][1907^2][1913^2][1931^2][1933^2][1949^2][1951^2][1973^2][1979^2][1987^2][1993^2][1997^2

][1999^2][2003^1][2011^1][2017^1][2027^1][2029^1][2039^1][2053^1][2063^1][2069^1][2081^1][2083^1][2087^1]

[2089^1][2099^1][2111^1][2113^1][2129^1][2131^1][2137^1][2141^1][2143^1][2153^1][2161^1][2179^1][2203^1][

2207^1][2213^1][2221^1][2237^1][2239^1][2243^1][2251^1][2267^1][2269^1][2273^1][2281^1][2287^1][2293^1][2

297^1][2309^1][2311^1][2333^1][2339^1][2341^1][2347^1][2351^1][2357^1][2371^1][2377^1][2381^1][2383^1][23

89^1][2393^1][2399^1][2411^1][2417^1][2423^1][2437^1][2441^1][2447^1][2459^1][2467^1][2473^1][2477^1][250

3^1][2521^1][2531^1][2539^1][2543^1][2549^1][2551^1][2557^1][2579^1][2591^1][2593^1][2609^1][2617^1][2621

^1][2633^1][2647^1][2657^1][2659^1][2663^1][2671^1][2677^1][2683^1][2687^1][2689^1][2693^1][2699^1][2707^

1][2711^1][2713^1][2719^1][2729^1][2731^1][2741^1][2749^1][2753^1][2767^1][2777^1][2789^1][2791^1][2797^1

][2801^1][2803^1][2819^1][2833^1][2837^1][2843^1][2851^1][2857^1][2861^1][2879^1][2887^1][2897^1][2903^1]

[2909^1][2917^1][2927^1][2939^1][2953^1][2957^1][2963^1][2969^1][2971^1][2999^1][3001^1][3011^1][3019^1][

3023^1][3037^1][3041^1][3049^1][3061^1][3067^1][3079^1][3083^1][3089^1][3109^1][3119^1][3121^1][3137^1][3

163^1][3167^1][3169^1][3181^1][3187^1][3191^1][3203^1][3209^1][3217^1][3221^1][3229^1][3251^1][3253^1][32

57^1][3259^1][3271^1][3299^1][3301^1][3307^1][3313^1][3319^1][3323^1][3329^1][3331^1][3343^1][3347^1][335

9^1][3361^1][3371^1][3373^1][3389^1][3391^1][3407^1][3413^1][3433^1][3449^1][3457^1][3461^1][3463^1][3467

^1][3469^1][3491^1][3499^1][3511^1][3517^1][3527^1][3529^1][3533^1][3539^1][3541^1][3547^1][3557^1][3559^

1][3571^1][3581^1][3583^1][3593^1][3607^1][3613^1][3617^1][3623^1][3631^1][3637^1][3643^1][3659^1][3671^1

][3673^1][3677^1][3691^1][3697^1][3701^1][3709^1][3719^1][3727^1][3733^1][3739^1][3761^1][3767^1][3769^1]

[3779^1][3793^1][3797^1][3803^1][3821^1][3823^1][3833^1][3847^1][3851^1][3853^1][3863^1][3877^1][3881^1][

3889^1][3907^1][3911^1][3917^1][3919^1][3923^1][3929^1][3931^1][3943^1][3947^1][3967^1][3989^1]

Performances

• Les codes se ressemblent

• Utilisation CPU : idem

• Temps de calcul :

– Akka : 2,7s

– ExecutorService : 2,3s

• Différence non significative

Exécution transactionnelle

public class Balance {

public final int montant ;

public Balance(int montant) {

this.montant = montant ;

}

}

public class Retrait {

public final int montant ;

public Retrait(int montant) {

this.montant = montant ;

}

}

Exécution transactionnelle

public class Depot {

public final int montant ;

public Depot(int montant) {

this.montant = montant ;

}

}

public class GetBalance {

}

public class CompteBancaire extends UntypedTransactor {

private final Ref<Integer> balance = new Ref<Integer>(0) ;

public void atomically(final Object message) {

if (message instanceof Depot) {

int montant = ((Depot)message).montant ;

if (montant > 0)

balance.swap(balance.get() + montant) ;

}

if (message instanceof Retrait) {

int montant = ((Retrait)message).montant ;

if (montant > 0) {

if (balance.get() < montant)

throw new IllegalStateException("...") ;

balance.swap(balance.get() - montant) ;

}

}

if (message instanceof GetBalance)

getContext().replySafe((new Balance(balance.get()))) ;

}

}

Exécuté dans une transaction

public class CompteBancaire extends UntypedTransactor {

private final Ref<Integer> balance = new Ref<Integer>(0) ;

public void atomically(final Object message) {

if (message instanceof Depot) {

int montant = ((Depot)message).montant ;

if (montant > 0)

balance.swap(balance.get() + montant) ;

}

if (message instanceof Retrait) {

int montant = ((Retrait)message).montant ;

if (montant > 0) {

if (balance.get() < montant)

throw new IllegalStateException("...") ;

balance.swap(balance.get() - montant) ;

}

}

if (message instanceof GetBalance)

getContext().replySafe((new Balance(balance.get()))) ;

}

}

Fera échouer la transaction

public class Transfert {

public final ActorRef source ;

public final ActorRef destination ;

public final int montant ;

public Transfert(final ActorRef source,

final ActorRef destination,

final int montant) {

this.source = source ;

this.destination = destination ;

this.montant = montant ;

}

}

public class ServiceBancaire extends UntypedTransactor {

public Set<SendTo> coordinate(final Object message) {

if (message instanceof Transfert) {

Set<SendTo> s = new HashSet<SendTo>() ;

Transfert t = (Transfert)message ;

if (t.montant > 0) { // validation

s.add(sendTo(t.destination, new Depot(t.montant))) ;

s.add(sendTo(t.source, new Retrait(t.montant))) ;

return Collections.unmodifiableSet(s) ;

}

}

return nobody() ;

}

}

Ensemble d’appels d’acteurs

public class ServiceBancaire extends UntypedTransactor {

public Set<SendTo> coordinate(final Object message) {

if (message instanceof Transfert) {

Set<SendTo> s = new HashSet<SendTo>() ;

Transfert t = (Transfert)message ;

if (t.montant > 0) { // validation

s.add(sendTo(t.destination, new Depot(t.montant))) ;

s.add(sendTo(t.source, new Retrait(t.montant))) ;

return Collections.unmodifiableSet(s) ;

}

}

return nobody() ;

}

}

Ensemble d’appels d’acteurs

public class ServiceBancaire extends UntypedTransactor {

public Set<SendTo> coordinate(final Object message) {

if (message instanceof Transfert) {

Set<SendTo> s = new HashSet<SendTo>() ;

Transfert t = (Transfert)message ;

if (t.montant > 0) { // validation

s.add(sendTo(t.destination, new Depot(t.montant))) ;

s.add(sendTo(t.source, new Retrait(t.montant))) ;

return Collections.unmodifiableSet(s) ;

}

}

return nobody() ;

}

}

Appel dans une transaction

Exécution transactionnelle

• Chaque acteur gère un compte

• Il possède un état, mais qui n’est jamais partagé

• Les transferts sont gérés par un acteur sans état

• Toutes les opérations sont transactionnelles

Bilan STM & acteurs

• STM & acteurs : deux approches alternatives à synchronized

• Offrent des fonctionnalités supplémentaires

• Vont bénéficier du support de l’assembleur

• Pas de miracle au niveau performance

Parallélisation des

calculs

Nouveautés Java 7 & 8

• Pattern Fork / Join

• API non incluse en Java 7 : parallel arrays

• Méthode en Java 8 : parallel()

Objet :

• Automatiser la parallélisation sur les tableaux et les collections

• Ne plus avoir à écrire de code de parallélisation

Java 7 : fork / join

Fork / Join

• Traite des calculs sur de grandes quantités de données, numériques ou non

• Gère des tâches

– Si elle est trop grosse elle se divise : Fork

– Sinon elle s’exécute et retourne son résultat

• Une tâche qui s’est divisée récupère les résultats de ses sous-tâches : Join

Fork / Join

• Gère un pool de threads

• Chaque thread possède une file d’attente qui stocke les tâches

• Quand sa file d’attente est vide, il va voler du travail à son voisin

Fork / Join

• Chaque tâche travaille sur ses propres données

• Pas de bloc synchronisé

• Pas de variables partagées

• On doit donc « dupliquer » des informations

• Cas du traitement des tableaux de grande taille ?

Parallel Arrays

Java 6 &7

Parallel Arrays

• API non incluse dans le JDK

• Peut être utilisée en Java 6

• Fork / Join dans un package jsr166y.*

• Avantage : API auto-porteuse

Pourquoi les parallel arrays

ne sont-ils pas dans

l’API Java 7 ?

Java 8

Sept. 2013

Syntaxe

• Du light !

int maxAge =

persons.map(p -> p.getAge()).reduce(0, Integer::max) ;

Collection<Person> oldies =

persons.filter(p -> p.age > 40).into(new ArrayList()) ;

Syntaxe

• Du light !

int maxAge =

persons.map(p -> p.getAge()).reduce(0, Integer::max) ;

Collection<Person> oldies =

persons.filter(p -> p.age > 40).into(new ArrayList()) ;

Type Collection

Syntaxe

• Du light !

int maxAge =

persons.map(p -> p.getAge()).reduce(0, Integer::max) ;

Collection<Person> oldies =

persons.filter(p -> p.age > 40).into(new ArrayList()) ;

D’où viennent les méthodes map() et filter() ??

Syntaxe

• Du light !

• Évolution de l’API Collection, méthodes vituelles d’extension

int maxAge =

persons.map(p -> p.getAge()).reduce(0, Integer::max) ;

Collection<Person> oldies =

persons.filter(p -> p.age > 40).into(new ArrayList()) ;

Méthodes virtuelles d’ext.

• On ajoute des méthodes dans les interfaces, avec leurs implémentation par défaut

public interface Collection<E> {

public boolean add(E e) ;

public boolean addAll(Collection<? extends E> c) ;

// la litanie des méthodes de Collection

// méthode virtuelle d'extension

public void sort(Comparator<? super E> comp)

default Collections.sort(comp) ;

}

Syntaxe

• Du light !

• Support du parallélisme

int maxAge =

persons.map(p -> p.getAge()).reduce(0, Integer::max) ;

Collection<Person> oldies =

persons.filter(p -> p.age > 40).into(new ArrayList()) ;

Collection<Person> oldies =

persons.parallel().filter(p -> p.age > 40)

.into(new ArrayList()) ;

Interface spliterable()

• Extension d’iterable()

• Type de retour de parallel()

• Le fork / join devient un framework système

• Les fonctionnalités des parallel arrays sont intégrées via les lambdas

• On gagne java.util.parallel

Algorithmique

Une anecdote

Le 24/2/2012 Heinz Kabutz lance un défi :

• Déterminer les 10 premiers et 10 derniers octets du nombre Fibonacci (1 milliard)

• Taille du nombre écrit dans un fichier = ~180Mo

Une anecdote

• Record à battre : 5600s sur un CPU 8 cœurs

Une anecdote

• Record à battre : 5600s sur un CPU 8 cœurs

• 1ère amélioration : 3300s sur un CPU 4 cœurs

Une anecdote

• Record à battre : 5600s sur un CPU 8 cœurs

• 1ère amélioration : 3300s sur un CPU 4 cœurs

• 2ème amélioration : 51s sur un cœur

Une anecdote

Par quel miracle ?

• Utilisation de GNU Multiple Precision Arithmetic Lib (GMP)

• Changement d’implémentation de BigInteger

• Changement d’algorithme de calcul de Fibonacci

• Suppression de l’utilisation de la récursivité

Par quel miracle ?

• Utilisation de GNU Multiple Precision Arithmetic Lib (GMP)

• Changement d’implémentation de BigInteger

• Changement d’algorithme de calcul de Fibonacci

• Suppression de l’utilisation de la récursivité

Quel rapport avec

Fork / Join ou la

parallélisation ?

Algorithmique

L’optimisation des fonctions linéaires a gagné un facteur 43M sur les temps de traitement en 15 ans :

• 1k est dû à la vitesse des processeurs

• 43k est dû à l’amélioration des algorithmes

Moralité

• Utiliser les bons algorithmes de traitement avant de penser à la parallélisation

Moralité

• Utiliser les bons algorithmes de traitement avant de penser à la parallélisation

• S’assurer que les algorithmes sont bien parallélisables !!

Moralité

• Utiliser les bons algorithmes de traitement avant de penser à la parallélisation

• S’assurer que les algorithmes sont bien parallélisables !!

Quick sort : se parallélise mal

• Merge sort consomme de la mémoire

Moralité

• Utiliser les bons algorithmes de traitement avant de penser à la parallélisation

• S’assurer que les algorithmes sont bien parallélisables !!

Recuit simulé : ne se parallélise pas, perd ses propriétés de convergence

Conclusion

Conclusion

Une nouvelle période charnière se déroule en ce moment

En 1995 : la mémoire est devenue une ressource gérée par la JVM

En 2012 : le CPU est en train de devenir une ressource gérée par la JVM (et le serveur aussi !)

Les langages et les processeurs s’adaptent les uns aux autres

Conclusion

Les applications vont utiliser des traitements en parallèle de façon transparente

Mais les pièges se trouvent au niveau des algorithmes, ils seront plus durs à éviter

Nouveaux challenges pour les développeurs, et de nouvelles opportunités !

Recommended