Buffer circolare senza blocco

Sono in procinto di progettare un sistema che si connette a uno o più flussi di feed di dati e di eseguire analisi sui dati piuttosto che eventi trigger basati sul risultato. In una tipica installazione di produttori / consumatori multi-thread, avrò più thread di produzione che mettono i dati in una coda e più thread di consumo che leggono i dati, e ai consumatori interessa solo l’ultimo punto di dati più il numero di punti. Le discussioni del produttore dovranno bloccare se il consumatore lento non riesce a stare al passo, e ovviamente i thread dei consumatori si bloccheranno quando non ci saranno aggiornamenti non elaborati. L’utilizzo di una tipica coda concorrente con lock reader / writer funzionerà bene ma la velocità dei dati in arrivo potrebbe essere enorme, quindi ho voluto ridurre il sovraccarico del mio blocco soprattutto i lock dei produttori per i produttori. Penso che un buffer circolare senza blocco sia ciò di cui avevo bisogno.

Ora due domande:

  1. La risposta è un buffer circolare senza blocco?

  2. Se è così, prima di pubblicare il mio, conosci qualche implementazione pubblica adatta alle mie esigenze?

Qualsiasi suggerimento nell’implementazione di un buffer circolare senza blocco è sempre il benvenuto.

BTW, facendo questo in C ++ su Linux.

Alcune informazioni aggiuntive:

Il tempo di risposta è fondamentale per il mio sistema. Idealmente, i thread dei consumatori vorranno vedere gli aggiornamenti in arrivo il prima ansible perché un ulteriore ritardo di 1 millisecondo potrebbe rendere il sistema inutile, o vale molto meno.

L’idea di design a cui mi sto avvicinando è un buffer circolare semi-lock-free in cui il thread del produttore inserisce i dati nel buffer il più velocemente ansible, chiamiamo la testina del buffer A, senza bloccare a meno che il buffer non sia pieno, quando A incontra la fine del buffer Z. I thread consumer avranno ciascuno due puntatori al buffer circolare, P e P n , dove P è la testa del buffer locale del thread, e P n è l’ultimo elemento dopo P. Ogni thread del consumatore avanzerà la sua P e P n una volta che finisce l’elaborazione della corrente P e la fine del puntatore del buffer Z è avanzata con il P n più lento. Quando P raggiunge fino a A, il che significa che non c’è più un nuovo aggiornamento da elaborare, il consumatore gira e fa attenzione ad aspettare che A ritorni di nuovo. Se il thread del consumer gira troppo a lungo, può essere messo in sleep e attendere una variabile di condizione, ma sto bene con il consumo del ciclo della CPU in attesa di aggiornamento perché non aumenta la latenza (avrò più core CPU di fili). Immagina di avere una traccia circolare, e il produttore è in esecuzione di fronte a un gruppo di consumatori, la chiave è di mettere a punto il sistema in modo che il produttore stia di solito solo pochi passi avanti rispetto ai consumatori, e la maggior parte di queste operazioni può essere fatto usando tecniche lock-free. Capisco che ottenere i dettagli dell’implementazione non sia facile … okay, molto difficile, ecco perché voglio imparare dagli errori degli altri prima di crearne alcuni.

Ho fatto uno studio particolare sulle strutture di dati senza blocchi negli ultimi due anni. Ho letto la maggior parte dei documenti sul campo (ce ne sono solo una quarantina circa – anche se solo una decina o quindici sono di reale utilità 🙂

AFAIK, un buffer circolare senza blocco non è stato inventato. Il problema riguarderà la condizione complessa in cui un lettore sorpassa uno scrittore o viceversa.

Se non hai trascorso almeno sei mesi a studiare strutture di dati senza blocchi, non tentare di scriverne uno da solo. Lo avrai sbagliato e potrebbe non essere ovvio per te che esistono degli errori, fino a quando il tuo codice non funzionerà, dopo l’implementazione, su nuove piattaforms.

Credo tuttavia che ci sia una soluzione al tuo fabbisogno.

Dovresti accoppiare una coda bloccata con una lista libera priva di blocchi.

La lista libera ti darà una pre-allocazione e quindi ovvierà al requisito (costoso in termini di costi) per un allocatore senza lock; quando la lista libera è vuota, si replica il comportamento di un buffer circolare eliminando immediatamente dalla coda un elemento dalla coda e usando quello.

(Ovviamente, in un buffer circolare basato su lock, una volta ottenuto il lock, ottenere un elemento è molto veloce – fondamentalmente solo una dereferenziazione puntatore – ma non lo si otterrà in alcun algoritmo lock-free, ma spesso devono andare ben fuori dal loro modo di fare le cose, l’overhead di fallire un pop a lista libera seguito da un dequio è alla pari con la quantità di lavoro che qualsiasi algoritmo lock-free dovrà fare).

Nel 1996 Michael e Scott hanno sviluppato una coda davvero esente da blocchi. Un link qui sotto ti darà abbastanza dettagli per rintracciare il PDF del loro articolo; Michael e Scott, FIFO

Una free-list priva di lock è l’algoritmo lock-free più semplice e in effetti non penso di aver visto un vero e proprio documento per questo.

Il termine dell’arte per quello che vuoi è una coda senza blocco . C’è un eccellente set di note con collegamenti a codice e documenti di Ross Bencina. Il ragazzo di cui mi fido di più è Maurice Herlihy (per gli americani, pronuncia il suo nome come “Morris”).

Il requisito che i produttori o i consumatori bloccano se il buffer è vuoto o pieno suggerisce che si dovrebbe utilizzare una normale struttura di dati di blocco, con semafori o variabili condizionali per impedire a produttori e consumatori di bloccare i dati fino a quando non saranno disponibili. Generalmente, il codice di blocco non blocca tali condizioni: fa girare o abbandonare operazioni che non possono essere eseguite invece di bloccare utilizzando il sistema operativo. (Se puoi permetterti di aspettare fino a quando un altro thread produce o consuma dati, allora perché è in attesa di un blocco per un altro thread per completare l’aggiornamento della struttura dei dati?

Acceso (x86 / x64) Linux, la sincronizzazione intra-thread usando mutex è ragionevolmente economica se non c’è contesa. Concentrati sulla riduzione al minimo del tempo necessario ai produttori e ai consumatori per trattenere le serrature. Dato che hai detto che ti interessano solo gli ultimi N punti di dati registrati, penso che un buffer circolare lo farebbe ragionevolmente bene. Tuttavia, non capisco in che modo questo si adatta ai requisiti di blocco e all’idea che i consumatori in realtà consumino (rimuovendo) i dati che leggono. (Vuoi che i consumatori guardino solo gli ultimi N punti di dati e non li rimuovano? Vuoi che i produttori non si preoccupino se i consumatori non riescono a tenere il passo e semplicemente sovrascrivono i vecchi dati?)

Inoltre, come ha commentato Zan Lynx, puoi aggregare / bufferizzare i tuoi dati in blocchi più grandi quando ce n’è in abbondanza. Puoi memorizzare un numero fisso di punti o tutti i dati ricevuti entro un certo periodo di tempo . Ciò significa che ci saranno meno operazioni di sincronizzazione. Tuttavia, introduce la latenza, ma se non stai usando Linux in tempo reale, dovrai comunque occupartene comunque.

C’è una buona serie di articoli su questo su DDJ . Come segno di quanto difficile possa essere questa roba, è una correzione su un articolo precedente che ha sbagliato. Assicurati di aver compreso gli errori prima di tirare il tuo) -;

L’implementazione nella libreria boost merita di essere presa in considerazione. È facile da usare e ha prestazioni piuttosto elevate. Ho scritto un test e l’ho eseguito su un laptop i7 quad core (8 thread) e ho ottenuto ~ 4M operazioni di accodamento / dequeue al secondo. Un’altra implementazione non menzionata finora è la coda MPMC su http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue . Ho fatto alcuni test semplici con questa implementazione sullo stesso portatile con 32 produttori e 32 consumatori. È, come pubblicizzato, più veloce della coda lockless boost.

Come la maggior parte delle altre risposte, la programmazione lockless è difficile. La maggior parte delle implementazioni avrà difficoltà a rilevare casi d’angolo che richiedono molti test e debug per risolvere il problema. Questi sono in genere risolti con un’attenta disposizione delle barriere di memoria nel codice. Troverai anche prove di correttezza pubblicate in molti articoli accademici. Preferisco testare queste implementazioni con uno strumento a forza bruta. Qualsiasi algoritmo senza chiave che si pianifica di utilizzare in produzione dovrebbe essere controllato per la correttezza utilizzando uno strumento come http://research.microsoft.com/en-us/um/people/lamport/tla/tla.html .

Una tecnica utile per ridurre la contesa è quella di hash gli elementi in più code e ogni consumatore dedicato a un “argomento”.

Per il numero più recente di articoli a cui i tuoi consumatori sono interessati: non vuoi bloccare l’intera coda e iterare su di essa per trovare un elemento da sovrascrivere: basta pubblicare elementi in N-tuple, ovvero tutti gli elementi N recenti. Punti bonus per l’implementazione in cui il produttore bloccherebbe l’intera coda (quando i consumatori non possono tenere il passo) con un timeout, aggiornando la propria cache di tuple locale – in questo modo non si esercita pressione sull’origine dati.

Sarei d’accordo con questo articolo e mi raccomando di non utilizzare strutture di dati prive di lock. Un documento relativamente recente sulle code fifo prive di lock è questo , cercare ulteriori documenti dallo stesso autore / i; c’è anche una tesi di dottorato su Chalmers riguardante strutture di dati prive di lock (ho perso il link). Tuttavia, non hai detto quanto sono grandi i tuoi elementi: le strutture di dati senza blocchi funzionano in modo efficiente solo con elementi a dimensione di parola, quindi dovrai allocare dynamicmente i tuoi elementi se sono più grandi di una parola macchina (32 o 64 bit). Se allocate dynamicmente gli elementi, spostate (supponete, dal momento che non avete profilato il vostro programma e fondamentalmente state facendo un’ottimizzazione prematura) un collo di bottiglia all’allocatore di memoria, quindi avete bisogno di un allocatore di memoria senza lock, ad es. Streamflow e integrazione con la tua applicazione.

La coda di Sutter è sub-ottimale e lui lo sa. La programmazione Art of Multicore è un ottimo riferimento, ma non fidatevi dei ragazzi Java sui modelli di memoria, punto. I link di Ross non ti daranno una risposta definitiva perché hanno le loro librerie in tali problemi e così via.

Fare una programmazione senza blocchi è un problema, a meno che tu non voglia dedicare molto tempo a qualcosa di cui sei chiaramente sopra-ingegnerizzato prima di risolvere il problema (a giudicare dalla descrizione di esso, è una follia comune di “cercare la perfezione” ‘nella coerenza della cache). Ci vogliono anni e porta a non risolvere prima i problemi e ad ottimizzarli più tardi, una malattia comune.

Non sono esperto di modelli di memoria hardware e blocco strutture dati libere e io tendo ad evitare di utilizzare quelli nei miei progetti e vado con strutture dati bloccate tradizionali.

Tuttavia, ho notato di recente quel video: coda SPSC Lockless basata su ring buffer

Questo è basato su una libreria Java ad alte prestazioni open source chiamata distruptor LMAX usata da un sistema di trading: LMAX Distruptor

In base alla presentazione sopra riportata, fai puntatori atomici e atomici per controllare la condizione in cui la testa cattura la coda da dietro o viceversa.

Qui sotto puoi vedere un’implementazione C ++ 11 molto semplice per questo:

// USING SEQUENTIAL MEMORY #include #include #include  using namespace std; #define RING_BUFFER_SIZE 1024 // power of 2 for efficient % class lockless_ring_buffer_spsc { public : lockless_ring_buffer_spsc() { write.store(0); read.store(0); } bool try_push(int64_t val) { const auto current_tail = write.load(); const auto next_tail = increment(current_tail); if (next_tail != read.load()) { buffer[current_tail] = val; write.store(next_tail); return true; } return false; } void push(int64_t val) { while( ! try_push(val) ); // TODO: exponential backoff / sleep } bool try_pop(int64_t* pval) { auto currentHead = read.load(); if (currentHead == write.load()) { return false; } *pval = buffer[currentHead]; read.store(increment(currentHead)); return true; } int64_t pop() { int64_t ret; while( ! try_pop(&ret) ); // TODO: exponential backoff / sleep return ret; } private : std::atomic write; std::atomic read; static const int64_t size = RING_BUFFER_SIZE; int64_t buffer[RING_BUFFER_SIZE]; int64_t increment(int n) { return (n + 1) % size; } }; int main (int argc, char** argv) { lockless_ring_buffer_spsc queue; std::thread write_thread( [&] () { for(int i = 0; i<1000000; i++) { queue.push(i); } } // End of lambda expression ); std::thread read_thread( [&] () { for(int i = 0; i<1000000; i++) { queue.pop(); } } // End of lambda expression ); write_thread.join(); read_thread.join(); return 0; } 

Questo è un thread vecchio, ma poiché non è stato ancora menzionato, esiste un FIFO senza blocchi, circolare, 1 produttore -> 1 consumatore, disponibile nel framework JUCE C ++.

https://www.juce.com/doc/classAbstractFifo#details

Scopri Disruptor ( Come usarlo ) che è un ring-buffer a cui più thread possono iscriversi:

Sebbene si tratti di una vecchia domanda, nessuno ha menzionato il buffer ad anello senza serratura di DPDK. È un buffer ad anello ad alto rendimento che supporta più produttori e più utenti. Fornisce anche modalità singolo consumatore e produttore singolo e il buffer circolare è senza attesa in modalità SPSC. È scritto in C e supporta più architetture.

Inoltre, supporta le modalità Bulk e Burst in cui gli articoli possono essere accodati / dequanditi alla rinfusa. Il design consente a più utenti o più produttori di scrivere contemporaneamente in coda semplicemente riservando lo spazio tramite lo spostamento di un puntatore atomico.

Ecco come lo farei:

  • mappare la coda in un array
  • mantenere lo stato con una prossima lettura e successivi indici di scrittura successivi
  • mantieni un vettore a vuoto pieno di bit

L’inserimento consiste nell’usare un CAS con un incremento e passare alla successiva scrittura. Una volta che hai uno slot, aggiungi il tuo valore e poi imposta il bit vuoto / completo che lo corrisponde.

Le rimozioni richiedono un controllo del bit prima per eseguire il test sui sottobussi ma, a parte questo, sono uguali a quelle della scrittura ma utilizzando l’indice di lettura e la cancellazione del bit vuoto / completo.

Stai attento,

  1. Non sono esperto in queste cose
  2. le operazioni atomiche ASM sembrano essere molto lente quando le ho usate, quindi se ne finisci con più di una, potresti essere più veloce nell’usare i lucchetti incorporati nelle funzioni di inserimento / rimozione. La teoria è che un singolo op atomico per afferrare il lock seguito da (molto) pochi ops ASM non atomici potrebbe essere più veloce della stessa cosa fatta da diverse operazioni atomiche. Ma per fare questo lavoro sarebbe necessario un allineamento manuale o automatico, quindi è tutto un blocco corto di ASM.

Solo per completezza: c’è un buffer circolare lock-free ben testato in OtlContainers , ma è scritto in Delphi (TOmniBaseBoundedQueue è un buffer circolare e TOmniBaseBoundedStack è uno stack limitato). C’è anche una coda illimitata nella stessa unità (TOmniBaseQueue). La coda illimitata è descritta in Coda senza blocco dinamico: funziona correttamente . L’implementazione iniziale della coda limitata (buffer circolare) è stata descritta in Una coda senza blocco, finalmente! ma il codice è stato aggiornato da allora.

Puoi provare lfqueue

È semplice da usare, è senza serratura di design circolare

 int *ret; lfqueue_t results; lfqueue_init(&results); /** Wrap This scope in multithread testing **/ int_data = (int*) malloc(sizeof(int)); assert(int_data != NULL); *int_data = i++; /*Enqueue*/ while (lfqueue_enq(&results, int_data) != 1) ; /*Dequeue*/ while ( (ret = lfqueue_deq(&results)) == NULL); // printf("%d\n", *(int*) ret ); free(ret); /** End **/ lfqueue_clear(&results);