In che modo il framework fork / join è migliore di un pool di thread?

Quali sono i vantaggi dell’utilizzo del nuovo framework fork / join semplicemente semplicemente suddividendo l’attività principale in N attività secondarie all’inizio, inviandole a un pool di thread memorizzato nella cache (da Executor ) e attendendo il completamento di ciascuna attività? Non riesco a capire come utilizzare l’astrazione fork / join semplifica il problema o rende la soluzione più efficiente da quella che abbiamo avuto per anni.

Ad esempio, l’algoritmo di sfocatura parallela nell’esempio tutorial potrebbe essere implementato in questo modo:

public class Blur implements Runnable { private int[] mSource; private int mStart; private int mLength; private int[] mDestination; private int mBlurWidth = 15; // Processing window size, should be odd. public ForkBlur(int[] src, int start, int length, int[] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst; } public void run() { computeDirectly(); } protected void computeDirectly() { // As in the example, omitted for brevity } } 

Dividi all’inizio e invia le attività a un pool di thread:

 // source image pixels are in src // destination image pixels are in dst // threadPool is a (cached) thread pool int maxSize = 100000; // analogous to FJ's "sThreshold" List futures = new ArrayList(); // Send stuff to thread pool: for (int i = 0; i < src.length; i+= maxSize) { int size = Math.min(maxSize, src.length - i); ForkBlur task = new ForkBlur(src, i, size, dst); Future f = threadPool.submit(task); futures.add(f); } // Wait for all sent tasks to complete: for (Future future : futures) { future.get(); } // Done! 

Le attività passano alla coda del pool di thread, da cui vengono eseguite man mano che i thread di lavoro diventano disponibili. Finché la suddivisione è abbastanza granulare (per evitare di dover attendere in particolare l’ultima attività) e il pool di thread dispone di thread sufficienti (almeno N di processori), tutti i processori funzionano a piena velocità fino al completamento dell’intero calcolo.

Mi sto perdendo qualcosa? Qual è il valore aggiunto dell’uso del framework fork / join?

    Penso che l’incomprensione fondamentale sia che gli esempi di Fork / Join NON mostrano il furto del lavoro, ma solo una specie di divisione e conquista standard.

    Il furto di lavoro sarebbe come questo: l’operaio B ha finito il suo lavoro. È un tipo gentile, quindi si guarda intorno e vede che Worker A sta ancora lavorando molto duramente. Si avvicina e chiede: “Ehi ragazzo, potrei darti una mano”. Una risposta “Fantastico, ho questo compito di 1000 unità. Finora ho finito 345 lasciando 655. Potresti lavorare sul numero da 673 a 1000, farò il 346 a 672.” B dice “OK, iniziamo così possiamo andare al pub in precedenza.”

    Vedete, i lavoratori devono comunicare tra loro anche quando hanno iniziato il vero lavoro. Questa è la parte mancante negli esempi.

    Gli esempi d’altra parte mostrano solo qualcosa come “usa i subappaltatori”:

    Lavoratore A: “Dang, ho 1000 unità di lavoro, troppo per me, farò 500 me stesso e subappalterò 500 a qualcun altro”. Questo va avanti fino a quando il grande compito è suddiviso in piccoli pacchetti di 10 unità ciascuno. Questi saranno eseguiti dai lavoratori disponibili. Ma se un pacchetto è una sorta di pillola velenosa e richiede molto più tempo degli altri pacchetti – sfortuna, la fase di divisione è finita.

    L’unica differenza rimanente tra Fork / Join e la suddivisione dell’attività in anticipo è questa: quando si divide in anticipo, la coda di lavoro è completa fin dall’inizio. Esempio: 1000 unità, la soglia è 10, quindi la coda ha 100 voci. Questi pacchetti sono distribuiti ai membri del threadpool.

    Fork / Join è più complesso e cerca di mantenere il numero di pacchetti in coda più piccolo:

    • Passo 1: Mettere un pacchetto contenente (1 … 1000) in coda
    • Passaggio 2: un worker apre il pacchetto (1 … 1000) e lo sostituisce con due pacchetti: (1 … 500) e (501 … 1000).
    • Passaggio 3: un lavoratore apre il pacchetto (500 … 1000) e spinge (500 … 750) e (751 … 1000).
    • Passaggio n: lo stack contiene questi pacchetti: (1..500), (500 … 750), (750 … 875) … (991..1000)
    • Passo n + 1: Il pacchetto (991..1000) viene estratto ed eseguito
    • Passo n + 2: Il pacchetto (981..990) viene spuntato ed eseguito
    • Passo n + 3: Il pacchetto (961..980) viene scoppiato e suddiviso in (961 … 970) e (971..980). ….

    Vedete: in Fork / Join la coda è più piccola (6 nell’esempio) e le fasi “split” e “work” sono interlacciate.

    Quando più lavoratori stanno scoppiando e spingendo simultaneamente le interazioni non sono così chiare, ovviamente.

    Se hai n thread occupati che funzionano indipendentemente al 100% in modo indipendente, sarà meglio di n thread in un pool Fork-Join (FJ). Ma non funziona mai così.

    Potrebbe non essere in grado di suddividere il problema in parti uguali. Anche se lo fai, la programmazione dei thread è in qualche modo equa. Finirai per aspettare il thread più lento. Se si dispone di più attività, ciascuna può essere eseguita con un parallelismo inferiore a N (in genere più efficiente), ma andare fino a n quando le altre attività sono terminate.

    Quindi, perché non limitiamo il problema a pezzi di dimensioni FJ e facciamo un pool di thread su questo. L’uso tipico di FJ riduce il problema in piccoli pezzi. Fare questi in un ordine casuale richiede molto coordinamento a livello hardware. Le spese generali sarebbero un assassino. In FJ, le attività vengono messe su una coda che il thread legge nell’ordine Last In First Out (LIFO / stack) e il furto del lavoro (nel lavoro di base, in generale) viene eseguito First In First Out (FIFO / “queue”). Il risultato è che l’elaborazione di array lunghi può essere eseguita in gran parte in sequenza, anche se è suddivisa in piccoli blocchi. (È anche il caso che potrebbe non essere banale infrangere il problema in piccoli blocchi di dimensioni uguali in un big bang. Dire di trattare con una qualche forma di gerarchia senza bilanciamento).

    Conclusione: FJ consente un uso più efficiente dei thread hardware in situazioni non uniformi, che saranno sempre se si dispone di più thread.

    Fork / join è diverso da un pool di thread perché implementa il furto del lavoro. Da Fork / Join

    Come con qualsiasi ExecutorService, il framework fork / join distribuisce le attività ai thread worker in un pool di thread. Il framework fork / join è distinto perché utilizza un algoritmo di work-stealing. I thread di lavoro che esauriscono le cose da fare possono rubare le attività da altri thread ancora occupati.

    Di ‘che hai due thread e 4 compiti a, b, c, d che prendono rispettivamente 1, 1, 5 e 6 secondi. Inizialmente, aeb sono assegnati al thread 1 e c e d al thread 2. In un pool di thread, sarebbero necessari 11 secondi. Con fork / join, il thread 1 termina e può rubare il lavoro dal thread 2, quindi l’attività d verrebbe eseguita dal thread 1. Il thread 1 esegue a, beid, thread 2 solo c. Tempo complessivo: 8 secondi, non 11.

    EDIT: come sottolinea Joonas, le attività non sono necessariamente pre-allocate a un thread. L’idea di fork / join è che un thread può scegliere di suddividere un’attività in più sotto-pezzi. Quindi per riaffermare quanto sopra:

    Abbiamo due compiti (ab) e (cd) che prendono rispettivamente 2 e 11 secondi. Thread 1 inizia ad eseguire ab e dividerlo in due sotto-task a & b. Analogamente al thread 2, si divide in due sotto-attività c & d. Quando il thread 1 ha terminato a & b, può rubare d dal thread 2.

    Tutti sopra sono corretti i benefici sono raggiunti dal lavoro rubare, ma per espandere il motivo per cui questo è.

    Il vantaggio principale è la coordinazione efficiente tra i thread di lavoro. Il lavoro deve essere suddiviso e ricomposto, il che richiede un coordinamento. Come puoi vedere nella risposta di AH sopra ogni thread ha la sua lista di lavoro. Una proprietà importante di questa lista è che è ordinata (grandi compiti nella parte superiore e piccoli compiti nella parte inferiore). Ogni thread esegue le attività nella parte inferiore dell’elenco e ruba le attività dalla parte superiore degli altri elenchi di thread.

    Il risultato di questo è:

    • La testa e la coda degli elenchi di attività possono essere sincronizzati in modo indipendente, riducendo la contesa sull’elenco.
    • Sottolivelli significativi del lavoro sono suddivisi e riassemblati dallo stesso thread, quindi non è richiesto alcun coordinamento tra thread per questi sottoalberi.
    • Quando un thread ruba il lavoro, prende un pezzo grande che poi suddivide nella propria lista
    • La lavorazione del ferro significa che i fili sono quasi completamente utilizzati fino alla fine del processo.

    La maggior parte degli altri schemi di divisione e conquista che utilizzano pool di thread richiedono una maggiore comunicazione e coordinamento tra thread.

    In questo esempio Fork / Join non aggiunge alcun valore perché la foratura non è necessaria e il carico di lavoro è suddiviso equamente tra i thread di lavoro. Fork / Join aggiunge solo overhead.

    Ecco un bell’articolo sull’argomento. Citazione:

    Nel complesso, possiamo dire che è preferibile utilizzare ThreadPoolExecutor in cui il carico di lavoro è suddiviso in modo uniforms tra i thread di lavoro. Per essere in grado di garantire ciò, è necessario conoscere con precisione l’aspetto dei dati di input. Al contrario, ForkJoinPool offre buone prestazioni indipendentemente dai dati di input ed è quindi una soluzione significativamente più robusta.

    L’objective finale dei pool di thread e Fork / Join sono simili: entrambi vogliono utilizzare la potenza della CPU disponibile il meglio che possono per il massimo rendimento. Il throughput massimo significa che quante più attività possibili dovrebbero essere completate in un lungo periodo di tempo. Cosa è necessario per farlo? (Per quanto segue, supponiamo che non ci sia carenza di compiti di calcolo: c’è sempre abbastanza da fare per il 100% di utilizzo della CPU.Inoltre io uso “CPU” in modo equivalente per core o core virtuali in caso di hyper-threading).

    1. Almeno ci devono essere tanti thread in esecuzione quante sono le CPU disponibili, perché l’esecuzione di meno thread lascerà un core inutilizzato.
    2. Al massimo ci devono essere tanti thread in esecuzione quante sono le CPU disponibili, perché l’esecuzione di più thread creerà un carico aggiuntivo per l’Utilità di pianificazione che assegna le CPU ai diversi thread che fa sì che parte del tempo di CPU passi allo scheduler piuttosto che al nostro compito computazionale.

    Così abbiamo capito che per il throughput massimo abbiamo bisogno di avere lo stesso numero di thread rispetto alle CPU. Nell’esempio sfocato di Oracle è ansible sia prendere un pool di thread di dimensioni fisse con il numero di thread uguale al numero di CPU disponibili o utilizzare un pool di thread. Non farà la differenza, hai ragione!

    Quindi quando ti metteresti nei guai con un pool di thread? Questo è se un thread si blocca , perché il thread è in attesa di un’altra attività da completare. Supponi il seguente esempio:

     class AbcAlgorithm implements Runnable { public void run() { Future aFuture = threadPool.submit(new ATask()); StepBResult bResult = stepB(); StepAResult aResult = aFuture.get(); stepC(aResult, bResult); } } 

    Quello che vediamo qui è un algoritmo che consiste di tre passi A, B e C. A e B possono essere eseguiti indipendentemente l’uno dall’altro, ma il passo C ha bisogno del risultato del passo A AND B. Ciò che questo algoritmo fa è inviare l’attività A a il threadpool ed eseguire direttamente l’attività b. Successivamente, il thread attenderà che l’attività A sia eseguita correttamente e proseguirà con il passaggio C. Se A e B sono completati allo stesso tempo, allora tutto è a posto. Ma cosa succede se A impiega più tempo di B? Ciò può essere dovuto al fatto che la natura del compito A lo impone, ma potrebbe anche essere il caso in quanto non esiste un thread per l’attività A disponibile all’inizio e l’attività A deve attendere. (Se c’è solo una singola CPU disponibile e quindi il tuo threadpool ha solo un singolo thread questo causerà anche un deadlock, ma per ora questo è oltre il punto). Il punto è che il thread che ha appena eseguito l’attività B blocca l’intero thread . Poiché abbiamo lo stesso numero di thread delle CPU e un thread è bloccato, significa che una CPU è intriggers .

    Fork / Join risolve questo problema: nel framework fork / join scriverebbe lo stesso algoritmo come segue:

     class AbcAlgorithm implements Runnable { public void run() { ATask aTask = new ATask()); aTask.fork(); StepBResult bResult = stepB(); StepAResult aResult = aTask.join(); stepC(aResult, bResult); } } 

    Sembra lo stesso, vero? Comunque l’indizio è che aTask.join non bloccherà . Invece qui è dove entra in gioco il furto del lavoro: il thread cercherà altri compiti che sono stati biforcati in passato e continueranno con quelli. In primo luogo controlla se le attività che ha forked stesso hanno iniziato l’elaborazione. Quindi se A non è stato ancora avviato da un altro thread, lo farà A successivo, altrimenti controllerà la coda di altri thread e ruberà il loro lavoro. Una volta completata quest’altra attività di un altro thread, verrà verificato se A è completato ora. Se è l’algoritmo sopra può chiamare stepC . Altrimenti cercherà un altro compito da rubare. Pertanto, i pool fork / join possono raggiungere il 100% di utilizzo della CPU, anche a fronte di azioni di blocco .

    Tuttavia c’è una trappola: il furto del lavoro è ansible solo per la chiamata di ForkJoinTask di ForkJoinTask s. Non può essere fatto per azioni di blocco esterne come aspettare un altro thread o aspettare un’azione I / O. Quindi che dire di ciò, attendere che l’I / O sia completato è un compito comune? In questo caso, se potessimo aggiungere una discussione aggiuntiva a Fork / Join pool che verrà interrotta di nuovo non appena l’azione di blocco sarà completata, sarà la seconda cosa migliore da fare. E ForkJoinPool può fare proprio questo se stiamo usando ManagedBlocker s.

    Fibonacci

    Nel JavaDoc per RecursiveTask è un esempio per il calcolo dei numeri di Fibonacci usando Fork / Join. Per una classica soluzione ricorsiva vedi:

     public static int fib(int n) { if (n <= 1) { return n; } return fib(n - 1) + fib(n - 2); } 

    Come spiegato nel JavaDocs, questo è un modo abbastanza facile di calcolare i numeri di Fibonacci, poiché questo algoritmo ha una complessità O (2 ^ n) mentre sono possibili modi più semplici. Tuttavia questo algoritmo è molto semplice e facile da capire, quindi ci atteniamo ad esso. Supponiamo di voler accelerare con Fork / Join. Un'implementazione ingenua sarebbe simile a questa:

     class Fibonacci extends RecursiveTask { private final long n; Fibonacci(long n) { this.n = n; } public Long compute() { if (n <= 1) { return n; } Fibonacci f1 = new Fibonacci(n - 1); f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); return f2.compute() + f1.join(); } } 

    I passaggi in cui questa attività è suddivisa sono troppo brevi e quindi ciò si comporterà in modo orribile, ma puoi vedere come il framework funziona in generale molto bene: i due summit possono essere calcolati in modo indipendente, ma poi abbiamo bisogno di entrambi per build il finale risultato. Quindi una metà è fatta in un altro thread. Divertiti a fare lo stesso con i pool di thread senza ottenere un deadlock (ansible, ma non altrettanto semplice).

    Solo per completezza: se vuoi veramente calcolare i numeri di Fibonacci usando questo approccio ricorsivo ecco una versione ottimizzata:

     class FibonacciBigSubtasks extends RecursiveTask { private final long n; FibonacciBigSubtasks(long n) { this.n = n; } public Long compute() { return fib(n); } private long fib(long n) { if (n <= 1) { return 1; } if (n > 10 && getSurplusQueuedTaskCount() < 2) { final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1); final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2); f1.fork(); return f2.compute() + f1.join(); } else { return fib(n - 1) + fib(n - 2); } } } 

    Ciò mantiene le attività secondarie molto più piccole perché sono divise solo quando n > 10 && getSurplusQueuedTaskCount() < 2 è vero, il che significa che ci sono significativamente più di 100 chiamate al metodo da fare ( n > 10 ) e non ci sono già task molto man waiting ( getSurplusQueuedTaskCount() < 2 ).

    Sul mio computer (4 core (8 contando Hyper-threading), Intel (R) Core (TM) i7-2720QM CPU @ 2.20 GHz) il fib(50) impiega 64 secondi con l'approccio classico e solo 18 secondi con il Fork / Unire l'approccio che è un guadagno notevole, anche se non tanto quanto teoricamente ansible.

    Sommario

    • Sì, nel tuo esempio Fork / Join non ha alcun vantaggio sui classici pool di thread.
    • Fork / Join può migliorare drasticamente le prestazioni in caso di blocco
    • Fork / Join elude alcuni problemi di deadlock

    Un’altra importante differenza sembra essere che con FJ, puoi fare più fasi “Join” complesse. Si consideri l’ordinamento di unione da http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture18.html , ci sarebbe troppa orchestrazione necessaria per suddividere questo lavoro. ad esempio, devi fare le seguenti cose:

    • ordina il primo trimestre
    • ordina il secondo trimestre
    • unire i primi 2 trimestri
    • ordina il terzo trimestre
    • ordina il quarto quarto
    • unire gli ultimi 2 trimestri
    • unire le 2 metà

    Come specifichi che devi fare il genere prima delle unioni che li riguardano ecc.

    Ho cercato il modo migliore per fare una determinata cosa per ognuno di un elenco di elementi. Penso che mi limiterò a suddividere l’elenco e utilizzare un ThreadPool standard. FJ sembra molto utile quando il lavoro non può essere pre-suddiviso in un numero sufficiente di compiti indipendenti, ma può essere suddiviso ricorsivamente in attività indipendenti tra loro (ad esempio l’ordinamento delle due metà è indipendente, ma non è ansible unire le due metà ordinate in un intero ordinato).

    F / J ha anche un netto vantaggio quando si hanno costose operazioni di unione. Poiché si divide in una struttura ad albero, si fa solo l’unione di log2 (n) invece di n si fonde con la divisione del thread lineare. (Questo fa presupporre teoricamente di avere tanti processori quanti thread, ma ancora un vantaggio). Per un compito a casa dovevamo unire diverse migliaia di array 2D (tutte le stesse dimensioni) sumndo i valori di ogni indice. Con il fork join e i processori P il tempo si avvicina a log2 (n) quando P si avvicina all’infinito.

    1 2 3 .. 7 3 1 …. 8 5 4
    4 5 6 + 2 4 3 => 6 9 9
    7 8 9 .. 1 1 0 …. 8 9 9

    Se il problema è tale da dover attendere il completamento di altri thread (come nel caso dell’ordinamento dell’array o della sum dell’array), è necessario utilizzare fork join, in quanto Executor (Executors.newFixedThreadPool (2)) si strozzerà a causa di limitazioni numero di thread. Il forkjoin pool creerà più thread in questo caso per coprire il thread bloccato per mantenere lo stesso parallelismo

    Fonte: http://www.oracle.com/technetwork/articles/java/fork-join-422606.html

    Il problema con gli esecutori per l’implementazione degli algoritmi divide and conquer non è correlato alla creazione di attività secondarie, poiché un Callable è libero di inviare una nuova attività secondaria al relativo esecutore e attendere il risultato in modo sincrono o asincrono. Il problema è quello del parallelismo: quando un Callable attende il risultato di un altro Callable, viene messo in uno stato di attesa, perdendo così l’opportunità di gestire un altro Callable in coda per l’esecuzione.

    Il framework fork / join aggiunto al pacchetto java.util.concurrent in Java SE 7 attraverso gli sforzi di Doug Lea riempie questo vuoto

    Fonte: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

    Il pool tenta di mantenere un numero sufficiente di thread attivi (o disponibili) aggiungendo, sospendendo o riprendendo in modo dinamico i thread interni del worker, anche se alcune attività sono bloccate in attesa di unirsi agli altri. Tuttavia, non sono garantite tali regolazioni a fronte dell’IO bloccata o di altre sincronizzazioni non gestite

    public int getPoolSize () Restituisce il numero di thread worker avviati ma non ancora terminati. Il risultato restituito da questo metodo può differire da getParallelism () quando i thread vengono creati per mantenere il parallelismo quando altri sono bloccati in modo cooperativo.

    Sareste stupiti su ForkJoin prestazioni in applicazione come crawler. ecco il miglior tutorial da cui potresti imparare.

    La logica di Fork / Join è molto semplice: (1) separare (fork) ogni task di grandi dimensioni in attività più piccole; (2) elaborare ciascuna attività in un thread separato (separandole in compiti ancora più piccoli se necessario); (3) unire i risultati.