Si tratta di un pool di thread di lavoro idiomatico in Go?

Sto tentando di scrivere un semplice pool di lavoratori con goroutine.

  • Il codice che ho scritto è idiomatico? Se no, allora cosa dovrebbe cambiare?
  • Voglio essere in grado di impostare il numero massimo di thread di lavoro su 5 e bloccare fino a quando un lavoratore diventa disponibile se tutti e 5 sono occupati. Come potrei estendere questo per avere solo un pool di 5 lavoratori max? work_channel le 5 goroutine statiche e do a ciascuna il work_channel ?

codice:

 package main import ( "fmt" "math/rand" "sync" "time" ) func worker(id string, work string, o chan string, wg *sync.WaitGroup) { defer wg.Done() sleepMs := rand.Intn(1000) fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) o <- work + fmt.Sprintf("-%dms", sleepMs) } func main() { var work_channel = make(chan string) var results_channel = make(chan string) // create goroutine per item in work_channel go func() { var c = 0 var wg sync.WaitGroup for work := range work_channel { wg.Add(1) go worker(fmt.Sprintf("%d", c), work, results_channel, &wg) c++ } wg.Wait() fmt.Println("closing results channel") close(results_channel) }() // add work to the work_channel go func() { for c := 'a'; c < 'z'; c++ { work_channel <- fmt.Sprintf("%c", c) } close(work_channel) fmt.Println("sent work to work_channel") }() for x := range results_channel { fmt.Printf("result: %s\n", x) } } 

La tua soluzione non è un pool di goroutine dei lavoratori in alcun senso: il tuo codice non limita le goroutine simultanee e non “riusa” le goroutine (ne inizia sempre una nuova quando viene ricevuto un nuovo lavoro).

Modello produttore-consumatore

Come ho postato su Bruteforce MD5 Password cracker , puoi utilizzare il pattern produttore-consumatore . Potresti avere una goroutine di produttori designata che genererebbe i lavori (cose da fare / calcolare) e li invierà su un canale di posti di lavoro . Potresti avere un gruppo fisso di goroutine dei consumatori (ad esempio 5 di esse) che si collegherebbero al canale su cui vengono consegnati i lavori, e ognuno eseguirà / completerà i lavori ricevuti.

La goroutine del produttore potrebbe semplicemente chiudere il canale di jobs quando tutti i lavori sono stati generati e inviati, segnalando adeguatamente ai consumatori che non arriveranno altri lavori. Il costrutto for ... range su un canale gestisce l’evento “close” e termina correttamente. Si noti che tutti i lavori inviati prima della chiusura del canale saranno comunque consegnati.

Ciò risulterebbe in un design pulito, risulterebbe in un numero fisso (ma arbitrario) di goroutine e utilizzerebbe sempre il 100% di CPU (se # di goroutine è maggiore di # di core della CPU). Ha anche il vantaggio di poter essere “strozzato” con la corretta selezione della capacità del canale (canale bufferizzato) e il numero di goroutine dei consumatori .

Si noti che questo modello per avere una goroutine di produttore designata non è obbligatorio. Potresti avere più goroutine per produrre anche lavori, ma devi anche sincronizzarli per chiudere il canale dei jobs solo quando tutte le goroutine dei produttori hanno finito di produrre lavori – altrimenti il ​​tentativo di inviare un altro lavoro sul canale dei jobs quando è già stato chiuso si traduce in un panico di runtime. Generalmente produrre lavori sono economici e possono essere prodotti a un ritmo molto più rapido di quello che possono essere eseguiti, quindi questo modello per produrli in 1 goroutine mentre molti li stanno consumando / eseguendoli è buono nella pratica.

Gestione dei risultati:

Se i lavori hanno risultati, è ansible scegliere di disporre di un canale di risultati designato sul quale i risultati possono essere consegnati (“rispediti”), oppure è ansible scegliere di gestire i risultati nel consumatore al termine del lavoro / completamento. Quest’ultimo può anche essere implementato con una funzione di “callback” che gestisce i risultati. La cosa importante è se i risultati possono essere elaborati in modo indipendente o se devono essere uniti (ad es. Quadro di riduzione della mappa) o aggregati.

Se si utilizza un canale dei results , è necessaria anche una goroutine che ne riceve i valori, impedendo ai consumatori di bloccarsi (si verificherebbe se il buffer dei results si riempisse).

Con il canale dei results

Invece di inviare valori di string semplici come lavori e risultati, creerei un tipo di wrapper che può contenere qualsiasi informazione aggiuntiva e quindi è molto più flessibile:

 type Job struct { Id int Work string Result string } 

Si noti che la struttura del Job include anche il risultato, quindi quando si restituisce il risultato, contiene anche il Job originale come contesto, spesso molto utile . Si noti inoltre che è vantaggioso inviare solo puntatori ( *Job ) sui canali anziché i valori di Job , quindi non è necessario creare copie “innumerevoli” di Job , e anche la dimensione del valore della struttura del Job diventa irrilevante.

Ecco come potrebbe apparire questo produttore-consumatore:

Vorrei utilizzare 2 valori sync.WaitGroup , il loro ruolo seguirà:

 var wg, wg2 sync.WaitGroup 

Il produttore è responsabile di generare lavori da eseguire:

 func produce(jobs chan<- *Job) { // Generate jobs: id := 0 for c := 'a'; c <= 'z'; c++ { id++ jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)} } close(jobs) } 

Una volta terminato (non ci sono più lavori), il canale di jobs viene chiuso e segnala ai consumatori che non arriveranno altri lavori.

Nota che produce() vede il canale di jobs come solo invio , perché è quello che il produttore deve fare solo con questo: inviare lavori su di esso (oltre a chiuderlo , ma è permesso anche su un canale di sola mandata ). Un ricevimento accidentale nel produttore sarebbe un errore di compilazione (rilevato in anticipo, al momento della compilazione).

La responsabilità del consumatore è quella di ricevere posti di lavoro fino a quando i lavori possono essere ricevuti, ed eseguirli:

 func consume(id int, jobs <-chan *Job, results chan<- *Job) { defer wg.Done() for job := range jobs { sleepMs := rand.Intn(1000) fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs) results <- job } } 

Si noti che consume() vede il canale dei jobs solo come ricezione ; il consumatore deve solo ricevere da esso. Allo stesso modo il canale dei results viene inviato solo per il consumatore.

Si noti inoltre che il canale dei results non può essere chiuso qui in quanto vi sono più goroutine dei consumatori, e solo il primo tentativo di chiuderlo avrebbe avuto esito positivo e altri avrebbero portato al panico del runtime! results canale dei results può (deve) essere chiuso dopo la conclusione di tutte le goroutine dei consumatori, perché in tal caso possiamo essere certi che nessun ulteriore valore (risultati) sarà inviato sul canale dei results .

Abbiamo risultati che devono essere analizzati:

 func analyze(results <-chan *Job) { defer wg2.Done() for job := range results { fmt.Printf("result: %s\n", job.Result) } } 

Come puoi vedere, anche questo riceve risultati finché possono arrivare (fino alla chiusura del canale dei results ). Il canale dei results per l'analizzatore è solo ricevibile .

Si prega di notare l'uso dei tipi di canale: ogni volta che è sufficiente, utilizzare solo un tipo di canale unidirezionale per rilevare e prevenire gli errori in anticipo, in fase di compilazione. Utilizza solo il tipo di canale bidirezionale se hai bisogno di entrambe le direzioni.

Ed è così che tutti questi sono incollati insieme:

 func main() { jobs := make(chan *Job, 100) // Buffered channel results := make(chan *Job, 100) // Buffered channel // Start consumers: for i := 0; i < 5; i++ { // 5 consumers wg.Add(1) go consume(i, jobs, results) } // Start producing go produce(jobs) // Start analyzing: wg2.Add(1) go analyze(results) wg.Wait() // Wait all consumers to finish processing jobs // All jobs are processed, no more values will be sent on results: close(results) wg2.Wait() // Wait analyzer to analyze all results } 

Esempio di output:

Ecco un esempio di output:

Come puoi vedere, i risultati arrivano e vengono analizzati prima che tutti i lavori vengano messi in coda:

 worker #4 received: 'e', sleep 81ms worker #0 received: 'a', sleep 887ms worker #1 received: 'b', sleep 847ms worker #2 received: 'c', sleep 59ms worker #3 received: 'd', sleep 81ms worker #2 received: 'f', sleep 318ms result: c-59ms worker #4 received: 'g', sleep 425ms result: e-81ms worker #3 received: 'h', sleep 540ms result: d-81ms worker #2 received: 'i', sleep 456ms result: f-318ms worker #4 received: 'j', sleep 300ms result: g-425ms worker #3 received: 'k', sleep 694ms result: h-540ms worker #4 received: 'l', sleep 511ms result: j-300ms worker #2 received: 'm', sleep 162ms result: i-456ms worker #1 received: 'n', sleep 89ms result: b-847ms worker #0 received: 'o', sleep 728ms result: a-887ms worker #1 received: 'p', sleep 274ms result: n-89ms worker #2 received: 'q', sleep 211ms result: m-162ms worker #2 received: 'r', sleep 445ms result: q-211ms worker #1 received: 's', sleep 237ms result: p-274ms worker #3 received: 't', sleep 106ms result: k-694ms worker #4 received: 'u', sleep 495ms result: l-511ms worker #3 received: 'v', sleep 466ms result: t-106ms worker #1 received: 'w', sleep 528ms result: s-237ms worker #0 received: 'x', sleep 258ms result: o-728ms worker #2 received: 'y', sleep 47ms result: r-445ms worker #2 received: 'z', sleep 947ms result: y-47ms result: u-495ms result: x-258ms result: v-466ms result: w-528ms result: z-947ms 

Prova l'applicazione completa su Go Playground .

Senza un canale di results

Il codice semplifica in modo significativo se non usiamo un canale di results , ma le goroutine dei consumatori gestiscono immediatamente il risultato (stampalo nel nostro caso). In questo caso non abbiamo bisogno di 2 valori sync.WaitGroup (il 2 ° era solo necessario attendere che l'analizzatore si completasse).

Senza un canale di results la soluzione completa è la seguente:

 var wg sync.WaitGroup type Job struct { Id int Work string } func produce(jobs chan<- *Job) { // Generate jobs: id := 0 for c := 'a'; c <= 'z'; c++ { id++ jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)} } close(jobs) } func consume(id int, jobs <-chan *Job) { defer wg.Done() for job := range jobs { sleepMs := rand.Intn(1000) fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs)) } } func main() { jobs := make(chan *Job, 100) // Buffered channel // Start consumers: for i := 0; i < 5; i++ { // 5 consumers wg.Add(1) go consume(i, jobs) } // Start producing go produce(jobs) wg.Wait() // Wait all consumers to finish processing jobs } 

L'output è "simile" a quello del canale dei results (ma ovviamente l'ordine di esecuzione / completamento è casuale).

Prova questa variante su Go Playground .

È ansible implementare un semaforo di conteggio per limitare la concorrenza di goroutine.

 var tokens = make(chan struct{}, 20) func worker(id string, work string, o chan string, wg *sync.WaitGroup) { defer wg.Done() tokens <- struct{}{} // acquire a token before performing work sleepMs := rand.Intn(1000) fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) <-tokens // release the token o <- work + fmt.Sprintf("-%dms", sleepMs) } 

Questo è il disegno generale usato per limitare il numero di lavoratori. Ovviamente puoi cambiare la posizione di rilascio / acquisizione di token per adattarli al tuo codice.