Perché queste goroutine non scalano le loro prestazioni da più esecuzioni simultanee?

sfondo

Attualmente sto lavorando alla mia tesi di laurea e fondamentalmente il mio compito è quello di ottimizzare un determinato codice in Go, cioè farlo funzionare il più velocemente ansible. In primo luogo, ho ottimizzato la funzione seriale e poi ho provato a introdurre il parallelismo tramite le goroutine. Dopo aver fatto ricerche su internet ora capisco la differenza tra concorrenza e parallelismo grazie alle seguenti diapositive di talks.golang . Ho visitato alcuni corsi di programmazione parallela in cui abbiamo parallelizzato il codice ac / c ++ con l’aiuto di pthread / openmp, così ho provato ad applicare questi paradigmi in Go. Detto questo, in questo caso specifico sto ottimizzando una funzione che calcola la media mobile di una fetta con lunghezza len:=n+(window_size-1) (è uguale a 9393 o 10175), quindi abbiamo n windows di cui calcoliamo la corrispondente media aritmetica e salvarla correttamente nella sezione di output.

Si noti che questa attività è intrinsecamente imbarazzante in parallelo.

I miei tentativi di ottimizzazione e risultati

In moving_avg_concurrent2 ho diviso la slice in num_goroutines pezzi più piccoli e num_goroutines eseguito ognuno con una goroutine. Questa funzione è stata eseguita con una goroutine, per qualche motivo (non è stato ansible scoprire perché, ma stiamo diventando tangenti qui), meglio di moving_avg_serial4 ma con più di una goroutine ha iniziato a peggiorare di moving_avg_serial4 .
In moving_avg_concurrent3 ho adottato il paradigma master / worker. La performance era peggiore di moving_avg_serial4 quando si utilizzava una goroutine. Qui, almeno, ho ottenuto una performance migliore aumentando num_goroutines ma ancora non migliori di moving_avg_serial4 . Per confrontare le prestazioni di moving_avg_serial4 , moving_avg_concurrent2 e moving_avg_concurrent3 ho scritto un benchmark e ho tabulato i risultati:

 fct & num_goroutines | timing in ns/op | percentage --------------------------------------------------------------------- serial4 | 4357893 | 100.00% concur2_1 | 5174818 | 118.75% concur2_4 | 9986386 | 229.16% concur2_8 | 18973443 | 435.38% concur2_32 | 75602438 | 1734.84% concur3_1 | 32423150 | 744.01% concur3_4 | 21083897 | 483.81% concur3_8 | 16427430 | 376.96% concur3_32 | 15157314 | 347.81% 

Domanda

Dato che, come accennato sopra, questo problema è imbarazzantemente parallelo mi aspettavo di vedere un enorme aumento delle prestazioni, ma non era così.

Perché moving_avg_concurrent2 non scala affatto?
E perché moving_avg_concurrent3 è molto più lento di moving_avg_serial4 ?
So che le goroutine sono economiche ma non sono ancora libere, ma è ansible che ciò generi un sovraccarico tale che siamo ancora più lenti di moving_avg_serial4 ?

Codice

funzioni:

 // returns a slice containing the moving average of the input (given, ie not optimised) func moving_avg_serial(input []float64, window_size int) []float64 { first_time := true var output = make([]float64, len(input)) if len(input) > 0 { var buffer = make([]float64, window_size) // initialise buffer with NaN for i := range buffer { buffer[i] = math.NaN() } for i, val := range input { old_val := buffer[int((math.Mod(float64(i), float64(window_size))))] buffer[int((math.Mod(float64(i), float64(window_size))))] = val if !NaN_in_slice(buffer) && first_time { sum := 0.0 for _, entry := range buffer { sum += entry } output[i] = sum / float64(window_size) first_time = false } else if i > 0 && !math.IsNaN(output[i-1]) && !NaN_in_slice(buffer) { output[i] = output[i-1] + (val-old_val)/float64(window_size) // solution without loop } else { output[i] = math.NaN() } } } else { // empty input fmt.Println("moving_avg is panicking!") panic(fmt.Sprintf("%v", input)) } return output } // returns a slice containing the moving average of the input // reordering the control structures to exploid the short-circuit evaluation func moving_avg_serial4(input []float64, window_size int) []float64 { first_time := true var output = make([]float64, len(input)) if len(input) > 0 { var buffer = make([]float64, window_size) // initialise buffer with NaN for i := range buffer { buffer[i] = math.NaN() } for i := range input { // fmt.Printf("in mvg_avg4: i=%v\n", i) old_val := buffer[int((math.Mod(float64(i), float64(window_size))))] buffer[int((math.Mod(float64(i), float64(window_size))))] = input[i] if first_time && !NaN_in_slice(buffer) { sum := 0.0 for j := range buffer { sum += buffer[j] } output[i] = sum / float64(window_size) first_time = false } else if i > 0 && !math.IsNaN(output[i-1]) /* && !NaN_in_slice(buffer)*/ { output[i] = output[i-1] + (input[i]-old_val)/float64(window_size) // solution without loop } else { output[i] = math.NaN() } } } else { // empty input fmt.Println("moving_avg is panicking!") panic(fmt.Sprintf("%v", input)) } return output } // returns a slice containing the moving average of the input // splitting up slice into smaller pieces for the goroutines but without using the serial version, ie we only have NaN's in the beginning, thus hope to reduce some overhead // still does not scale (decreasing performance with increasing size and num_goroutines) func moving_avg_concurrent2(input []float64, window_size, num_goroutines int) []float64 { var output = make([]float64, window_size-1, len(input)) for i := 0; i  0 { num_items := len(input) - (window_size - 1) var barrier_wg sync.WaitGroup n := num_items / num_goroutines go_avg := make([][]float64, num_goroutines) for i := 0; i < num_goroutines; i++ { go_avg[i] = make([]float64, 0, num_goroutines) } for i := 0; i < num_goroutines; i++ { barrier_wg.Add(1) go func(go_id int) { defer barrier_wg.Done() // computing boundaries var start, stop int start = go_id*int(n) + (window_size - 1) // starting index // ending index if go_id != (num_goroutines - 1) { stop = start + n // Ending index } else { stop = num_items + (window_size - 1) // Ending index } loc_avg := moving_avg_serial4(input[start-(window_size-1):stop], window_size) loc_avg = make([]float64, stop-start) current_sum := 0.0 for i := start - (window_size - 1); i < start+1; i++ { current_sum += input[i] } loc_avg[0] = current_sum / float64(window_size) idx := 1 for i := start + 1; i < stop; i++ { loc_avg[idx] = loc_avg[idx-1] + (input[i]-input[i-(window_size)])/float64(window_size) idx++ } go_avg[go_id] = append(go_avg[go_id], loc_avg...) }(i) } barrier_wg.Wait() for i := 0; i < num_goroutines; i++ { output = append(output, go_avg[i]...) } } else { // empty input fmt.Println("moving_avg is panicking!") panic(fmt.Sprintf("%v", input)) } return output } // returns a slice containing the moving average of the input // change of paradigm, we opt for a master worker pattern and spawn all windows which each will be computed by a goroutine func compute_window_avg(input, output []float64, start, end int) { sum := 0.0 size := end - start for _, val := range input[start:end] { sum += val } output[end-1] = sum / float64(size) } func moving_avg_concurrent3(input []float64, window_size, num_goroutines int) []float64 { var output = make([]float64, window_size-1, len(input)) for i := 0; i  0 { num_windows := len(input) - (window_size - 1) var output = make([]float64, len(input)) for i := 0; i < window_size-1; i++ { output[i] = math.NaN() } pending := make(chan *Work) done := make(chan *Work) // creating work go func() { for i := 0; i < num_windows; i++ { pending <- NewWork(compute_window_avg, input, output, i, i+window_size) } }() // start goroutines which work through pending till there is nothing left for i := 0; i < num_goroutines; i++ { go func() { Worker(pending, done) }() } // wait till every work is done for i := 0; i < num_windows; i++ { <-done } return output } else { // empty input fmt.Println("moving_avg is panicking!") panic(fmt.Sprintf("%v", input)) } return output } 

benchmark:

 //############### BENCHMARKS ############### var import_data_res11 []float64 func benchmarkMoving_avg_serial(b *testing.B, window int) { var r []float64 for n := 0; n < bN; n++ { r = moving_avg_serial(BackTest_res.F["Trading DrawDowns"], window) } import_data_res11 = r } var import_data_res14 []float64 func benchmarkMoving_avg_serial4(b *testing.B, window int) { var r []float64 for n := 0; n < bN; n++ { r = moving_avg_serial4(BackTest_res.F["Trading DrawDowns"], window) } import_data_res14 = r } var import_data_res16 []float64 func benchmarkMoving_avg_concurrent2(b *testing.B, window, num_goroutines int) { var r []float64 for n := 0; n < bN; n++ { r = moving_avg_concurrent2(BackTest_res.F["Trading DrawDowns"], window, num_goroutines) } import_data_res16 = r } var import_data_res17 []float64 func benchmarkMoving_avg_concurrent3(b *testing.B, window, num_goroutines int) { var r []float64 for n := 0; n < bN; n++ { r = moving_avg_concurrent3(BackTest_res.F["Trading DrawDowns"], window, num_goroutines) } import_data_res17 = r } func BenchmarkMoving_avg_serial_261x10(b *testing.B) { benchmarkMoving_avg_serial(b, 261*10) } func BenchmarkMoving_avg_serial4_261x10(b *testing.B) { benchmarkMoving_avg_serial4(b, 261*10) } func BenchmarkMoving_avg_concurrent2_261x10_1(b *testing.B) { benchmarkMoving_avg_concurrent2(b, 261*10, 1) } func BenchmarkMoving_avg_concurrent2_261x10_8(b *testing.B) { benchmarkMoving_avg_concurrent2(b, 261*10, 8) } func BenchmarkMoving_avg_concurrent3_261x10_1(b *testing.B) { benchmarkMoving_avg_concurrent3(b, 261*10, 1) } func BenchmarkMoving_avg_concurrent3_261x10_8(b *testing.B) { benchmarkMoving_avg_concurrent3(b, 261*10, 8) } //############### BENCHMARKS end ############### 

Osservazioni:
Questo è il mio primo post, sto ancora imparando, quindi ogni critica costruttiva è anche benvenuta.

Fatto n. 0: gli sforzi di ottimizzazione pre-matura hanno spesso rendimenti negativi
dimostrando che sono solo una perdita di tempo e sforzi


Perché?
Un singolo SLOC “sbagliato” può devastare le prestazioni in più del + 37% circa
o può migliorare le prestazioni per spendere meno del -57% del tempo di elaborazione di base

 51.151µs on MA(200) [10000]float64 ~ 22.017µs on MA(200) [10000]int 70.325µs on MA(200) [10000]float64 

Perché []int -s?
Lo vedi da solo sopra – questo è il pane per le strategie di elaborazione sub- [us] efficienti HPC / fintech (e parliamo ancora in termini di pianificazione dei processi [SERIAL] ).

Questo può testare su qualsiasi scala – ma piuttosto testare prima (qui) le proprie implementazioni, sulla stessa scala – MA(200) [10000]float64 setup MA(200) [10000]float64 – e postare le durate di base in [us] per visualizzare il processo iniziale prestazioni e confrontare le mele con le mele , con la soglia 51.2 [us] da confrontare.

Poi arriva la parte più difficile:


Fatto n. 1: questo compito NON è imbarazzantemente parallelo

Sì, si può andare a implementare un calcolo della media mobile, in modo che proceda attraverso gli heap di dati usando un approccio di elaborazione “just” – [CONCURRENT] intenzionalmente indoctrineto (indipendentemente dal fatto che sia dovuto a qualche tipo di errore, consiglio di qualche autorità , cecità professionale o semplicemente da un’ignoranza dual-Socrates-fair) che ovviamente non significa che la natura dell’elaborazione del stream convoluzionale, presente all’interno della formulazione matematica Moving Average, abbia dimenticato di essere un puro processo [SERIAL] , solo a causa di un tentativo di farla rispettare viene calcasting all’interno di un certo grado di elaborazione “just” – [CONCURRENT] .

(Btw. Gli Hard Computer-Scientists e i nerd a doppio dominio obietteranno anche qui, che il Go-language è progettato utilizzando le migliori abilità di Rob Pike per avere un framework di coroutine simultanee, non una vera schedulazione del processo [PARALLEL] , anche sebbene gli strumenti CSP di Hoare, disponibili nel concetto di lingua, possano aggiungere sale e pepe e introdurre un tipo di blocco degli strumenti di comunicazione tra processi, che bloccherà le sezioni di codice “just” – [CONCURRENT] in qualche CSP-p2p cablato -sincronizzazione.)


Fatto n. 2: va distribuito (per qualsiasi tipo di accelerazione) solo ALLA FINE

Avere un livello scarso di prestazioni in [SERIAL] non ha alcun valore. Avendo una ragionevole quantità di ottimizzazione delle prestazioni in single-thread, solo allora si può trarre vantaggio dall’andare distribuito (ancora dover pagare costi seriali aggiuntivi, il che rende Amdahl Law (piuttosto Overhead-strict- Amdahl Law ) entrare nel gioco).

Se è ansible introdurre un livello così basso di overhead di configurazione aggiuntivi e ottenere comunque un notevole parallelismo, scalato nella parte non [SEQ] dell’elaborazione , lì e solo lì si ha la possibilità di aumentare il rendimento effettivo del processo.

Non è difficile perdere molto di più che guadagnare in questo, quindi confrontate sempre il puro [SEQ] con i potenziali compromessi tra un non-[SEQ] / N[PAR]_processes teorico, overhead-naive speedup, per il quale uno pagherà un costo di una sum di tutti gli add-on- [SEQ] -overheads, quindi se e solo se:

 ( pure-[SEQ]_processing [ns] + add-on-[SEQ]-setup-overheads [ns] + ( non-[SEQ]_processing [ns] / N[PAR]_processes ) ) << ( pure-[SEQ]_processing [ns] + ( non-[SEQ]_processing [ns] / 1 ) ) 

Non avendo questo jet-vantaggio il vantaggio sia dell'altezza surplus che del Sole dietro di te, non tentare mai di entrare in nessun tipo di tentativi di parallelismo / HPC - non pagheranno mai per sé stessi senza essere straordinariamente << migliori di un processo [SEQ] intelligente .


inserisci la descrizione dell'immagine qui

Epilogo: sull'intero esperimento interattivo di Amdahl's Law

Un'animazione vale milioni di parole.

Un'animazione intertriggers ancora migliore:

Così,
assumere un processo sotto test, che ha sia una parte [SERIAL] che una [PARALLEL] della pianificazione del processo.

Sia p la frazione [PARALLEL] della durata del processo ~ ( 0.0 .. 1.0 ) quindi la parte [SERIAL] non dura più a lungo di ( 1 - p ) , giusto?

Quindi, iniziamo la sperimentazione intertriggers da un tale caso di prova, dove p == 1.0 , che significa che tutta la durata del processo è spesa solo in una parte [PARALLEL] , e sia la parte iniziale seriale che le parti terminali del stream di processo ( che principalmente sono sempre [SERIAL] ) hanno zero durate ( ( 1 - p ) == 0. )

Supponiamo che il sistema non abbia particolari magie e quindi abbia bisogno di passare dei veri passi sull'intializzazione di ciascuna parte [PARALLEL] , in modo da eseguirlo su un processore differente ( (1), 2, .., N ) , quindi proviamo aggiungere alcuni overhead, se richiesto per riorganizzare il stream del processo e per maresciallo + distribuire + un-marshal tutte le istruzioni e i dati necessari, così come ora il processo previsto può essere avviato ed eseguito su processori N in parallelo.

Questi costi sono chiamati o (qui inizialmente si ipotizzava che la semplicità fosse semplicemente costante e invariabile a N , che non è sempre il caso nel reale, su silicio / su NUMA / su infrastrutture distribuite).

Facendo clic sul titolo Epilogo sopra, un ambiente interattivo si apre ed è gratuito per la propria sperimentazione.

Con p == 1. && o == 0. && N > 1 le prestazioni sono in forte crescita fino ai limiti attuali [PARALLEL] -hardware O / S per un'esecuzione di codice O / S ancora monolitico (dove ancora nessun costo di distribuzione aggiuntivo per MPI- e simili distribuzioni di unità di lavoro in modalità depeche (dove si dovrebbe aggiungere immediatamente un gran numero di [ms] , mentre la nostra migliore implementazione [SERIAL] ha ovviamente fatto tutto il lavoro in meno di ~ 22,1 [us] )).

Ma a parte questo caso artificiosamente ottimistico, il lavoro non sembra così economico da essere efficientemente parallelo.

  • Prova a non avere uno zero, ma solo circa ~ 0,01% i costi generali di installazione di o , e la linea inizia a mostrare una natura molto diversa del ridimensionamento overhead-aware anche per il caso estremo [PARALLEL] (avendo ancora p == 1.0 ), e avendo la potenziale accelerazione da qualche parte vicino alla metà del caso di speedup lineare inizialmente super-idealistico.

  • Ora, trasforma il p in qualcosa di più vicino alla realtà, in un luogo meno artificialmente impostato rispetto al caso super-idealistico iniziale di == 1.00 --> { 0.99, 0.98, 0.95 } e ... bingo, questa è la realtà, dove processo- la pianificazione dovrebbe essere testata e pre-validata.

Cosa significa?

Ad esempio, se un overhead (di lancio + finale che unisce un pool di coroutine) richiederebbe più del ~ 0.1% della durata effettiva della sezione di elaborazione [PARALLEL] , non ci sarebbe una maggiore velocità di 4x (circa un 1/4 di la durata originale nel tempo) per 5 coroutine (con p ~ 0,95), non più di 10x (una durata 10 volte più veloce) per 20 coroutine (tutto presupponendo che un sistema abbia 5 core CPU, rispettivamente 20-CPU- core liberi e disponibili e pronti (meglio con processori / thread mappati CPU-core-affinità O / S) per la fornitura ininterrotta di tutte quelle coroutine durante tutta la loro durata, in modo da raggiungere qualsiasi speedup previsto sopra.

Non avendo una tale quantità di risorse hardware libere e pronte per tutte quelle unità operative, destinate ad implementare la parte [PARALLEL] della schedulazione del processo, gli stati di blocco / attesa introdurranno stati di attesa assoluti aggiuntivi e le prestazioni risultanti aggiunge queste nuove sezioni [SERIAL] -blocking / waiting alla durata complessiva del processo e gli speedup inizialmente desiderati cessano improvvisamente di esistere e il fattore di performance scende sotto << 1.00 (il che significa che il tempo di esecuzione effettivo era dovuto più lentamente al modo di blocco degli stati, rispetto al stream di lavoro [SERIAL] non parallelizzato).

Questo può sembrare complicato per i nuovi sperimentatori appassionati, tuttavia possiamo metterlo in una prospettiva invertita. Dato l'intero processo di distribuzione, il pool di compiti [PARALLEL] previsto è noto per non essere più breve di, ad esempio, di un 10 [us] , i grafici rigidi in testa mostrano, ci deve essere almeno circa 1000 x 10 [us] di elaborazione intensiva di elaborazione non bloccante all'interno della sezione [PARALLEL] modo da non devastare l'efficienza dell'elaborazione parallela.

Se non vi è un pezzo di elaborazione sufficientemente "grasso", i costi generali (che vanno notevolmente al di sopra della soglia sopra citata di ~ 0.1% ) devastano brutalmente l'efficienza netta dell'elaborazione parallellizzata con successo (ma avendo eseguito tali costi relativi ingiustificatamente elevati della configurazione rispetto agli effetti netti limitati di qualsiasi N -processor, come è stato dimostrato nei grafici live disponibili).

Non c'è da sorprendersi per i nerd del computing distribuito, che il sovraccarico o deriva anche da ulteriori dipendenze - su N (più processi, più sforzi sono da spendere per distribuire pacchetti di lavoro), su dati marshalled - dimensioni dei BLOB (il più grandi i BLOB, più i dispositivi MEM / I / O rimangono bloccati, prima di servire il processo successivo per ricevere un BLOB distribuito su tale dispositivo / risorsa per ciascuno dei target 2..N -esimo processo di ricezione), su evitato / CSP -Comunicazione inter-processo globale, canalizzata (chiamarla blocco aggiuntivo per incidente, riducendo il p ulteriore e ulteriormente al di sotto dell'ottimo ideale di 1. ).

Quindi, la realtà del mondo reale è piuttosto molto lontana dall'idealizzata inizialmente, bella e promettente p == 1.0 , ( 1 - p ) == 0.0 e o == 0.0

Come ovvio fin dall'inizio, prova a battere piuttosto la soglia 22.1 [us] [SERIAL] , che cercare di superare questo, mentre peggiora sempre peggio, se si passa a [PARALLEL] dove overhead e ridimensionamenti realistici, utilizzando approcci già poco performanti , non aiuta un singolo bit.