Flussi paralleli, collettori e sicurezza del filo

Vedere il semplice esempio qui sotto che conta il numero di occorrenze di ogni parola in una lista:

Stream words = Stream.of("a", "b", "a", "c"); Map wordsCount = words.collect(toMap(s -> s, s -> 1, (i, j) -> i + j)); 

Alla fine, wordsCount è {a=2, b=1, c=1} .

Ma il mio stream è molto grande e voglio parallelizzare il lavoro, quindi scrivo:

 Map wordsCount = words.parallel() .collect(toMap(s -> s, s -> 1, (i, j) -> i + j)); 

Tuttavia ho notato che wordsCount è una semplice HashMap quindi mi chiedo se ho bisogno di chiedere esplicitamente una mappa concorrente per garantire la sicurezza dei thread:

 Map wordsCount = words.parallel() .collect(toConcurrentMap(s -> s, s -> 1, (i, j) -> i + j)); 

I collector non concorrenti possono essere tranquillamente utilizzati con un stream parallelo o devo usare le versioni simultanee solo quando si raccolgono da un stream parallelo?

I collector non concorrenti possono essere tranquillamente utilizzati con un stream parallelo o devo usare le versioni simultanee solo quando si raccolgono da un stream parallelo?

È sicuro utilizzare un raccoglitore non simultaneo in un’operazione di collect di un stream parallelo.

Nelle specifiche dell’interfaccia Collector , nella sezione con una mezza dozzina di punti elenco, è questo:

Per i collector non concorrenti, qualsiasi risultato restituito dal fornitore del risultato, dall’accumulatore o dalle funzioni del combinatore deve essere limitato in serie. Ciò consente alla raccolta di verificarsi in parallelo senza che il Collector debba implementare alcuna sincronizzazione aggiuntiva. L’implementazione della riduzione deve gestire che l’input sia partizionato correttamente, che le partizioni siano elaborate in modo isolato e che la combinazione avvenga solo dopo che l’accumulo è stato completato.

Ciò significa che le varie implementazioni fornite dalla class Collectors possono essere utilizzate con flussi paralleli, anche se alcune di tali implementazioni potrebbero non essere collector concorrenti. Questo vale anche per i tuoi collezionisti non concorrenti che potresti implementare. Possono essere utilizzati in sicurezza con flussi paralleli, a condizione che i raccoglitori non interferiscano con la fonte del stream, che siano privi di effetti collaterali, indipendenti dall’ordine, ecc.

Raccomando anche di leggere la sezione Riduzione mutevole della documentazione del pacchetto java.util.stream. Nel mezzo di questa sezione c’è un esempio che è dichiarato essere parallelizzabile, ma che raccoglie i risultati in un ArrayList , che non è thread-safe.

Il modo in cui funziona è che un stream parallelo che termina con un raccoglitore non simultaneo garantisce che i diversi thread funzionino sempre su istanze diverse delle raccolte di risultati intermedi. Ecco perché un raccoglitore ha una funzione Supplier , per creare tutte le raccolte intermedie quante sono le discussioni, in modo che ogni thread possa accumularsi nel proprio. Quando i risultati intermedi devono essere uniti, vengono trasferiti in modo sicuro tra i thread e in un dato momento solo un singolo thread sta unendo una coppia di risultati intermedi.

Tutti i collezionisti, se seguono le regole nella specifica, sono sicuri di correre in parallelo o in sequenza. La prontezza parallela è una parte fondamentale del design qui.

La distinzione tra collezionisti concorrenti e non concorrenti ha a che fare con l’approccio alla parallelizzazione.

Un collettore ordinario (non simultaneo) opera unendo i risultati secondari. Quindi la sorgente viene suddivisa in un gruppo di blocchi, ognuno dei quali viene raccolto in un contenitore dei risultati (come un elenco o una mappa), quindi i risultati secondari vengono uniti in un contenitore dei risultati più grande. Ciò è sicuro e preserva l’ordine, ma per alcuni tipi di contenitori, in particolare le mappe, può essere costoso, poiché la fusione di due mappe per chiave è spesso costosa.

Un programma di raccolta simultaneo crea invece un contenitore di risultati, le cui operazioni di inserimento sono garantite come thread-safe, e scansiona elementi da più thread. Con un contenitore di risultati altamente concorrente come ConcurrentHashMap, questo approccio potrebbe funzionare meglio di unire gli HashMaps ordinari.

Quindi, i collezionisti concorrenti sono strettamente ottimizzati rispetto alle loro controparti ordinarie. E non vengono senza un costo; poiché gli elementi vengono lanciati da molti thread, i collettori concorrenti generalmente non possono conservare l’ordine dell’incontro. (Ma spesso non ti interessa – quando crei un istogramma di conteggio delle parole, non ti interessa quale istanza di “pippo” hai contato per prima.)

È sicuro utilizzare collezioni non concorrenti e contatori non atomici con flussi paralleli.

Se dai un’occhiata alla documentazione di Stream :: collect , trovi il seguente paragrafo:

Come reduce(Object, BinaryOperator) , le operazioni di raccolta possono essere parallelizzate senza richiedere ulteriore sincronizzazione.

E per il metodo Stream :: reduce :

Mentre questo può sembrare un modo più arrotondato per eseguire un’aggregazione rispetto alla semplice mutazione di un totale parziale in un ciclo, le operazioni di riduzione si sovrappongono in modo più aggraziato, senza richiedere ulteriore sincronizzazione e con un rischio notevolmente ridotto di razze di dati.

Questo potrebbe essere un po ‘sorprendente. Tuttavia, si noti che gli stream paralleli si basano su un modello fork-join . Ciò significa che l’esecuzione simultanea funziona come segue:

  • sequenza divisa in due parti con circa la stessa dimensione
  • elaborare ciascuna parte singolarmente
  • raccogliere i risultati di entrambe le parti e combinarle in un unico risultato

Nella seconda fase, i tre passaggi vengono applicati in modo ricorsivo alle sottosequenze.

Un esempio dovrebbe chiarirlo. Il

 IntStream.range(0, 4) .parallel() .collect(Trace::new, Trace::accumulate, Trace::combine); 

L’unico scopo della class Trace è registrare le chiamate del costruttore e del metodo. Se si esegue questa istruzione, stampa le seguenti righe:

 thread: 9 / operation: new thread: 10 / operation: new thread: 10 / operation: accumulate thread: 1 / operation: new thread: 1 / operation: accumulate thread: 1 / operation: combine thread: 11 / operation: new thread: 11 / operation: accumulate thread: 9 / operation: accumulate thread: 9 / operation: combine thread: 9 / operation: combine 

Puoi vedere, che quattro oggetti Trace sono stati creati, accumulare è stato chiamato una volta su ogni object e combinare è stato usato tre volte per unire i quattro oggetti in uno. Ogni object può essere accessibile solo da un thread alla volta. Ciò rende il codice thread-safe, e lo stesso vale per il metodo Collectors :: toMap .