Sincronizzazione IPC con memoria condivisa (senza blocco)

Si consideri il seguente scenario:

Requisiti:

  • Intel x64 Server (più socket CPU => NUMA)
  • Ubuntu 12, GCC 4.6
  • Due processi che condividono grandi quantità di dati su (denominati) memoria condivisa
  • Scenario classico produttore-consumatore
  • La memoria è sistemata in un buffer circolare (con elementi M)

Sequenza di programma (pseudo codice):

Processo A (produttore):

int bufferPos = 0; while( true ) { if( isBufferEmpty( bufferPos ) ) { writeData( bufferPos ); setBufferFull( bufferPos ); bufferPos = ( bufferPos + 1 ) % M; } } 

Processo B (consumatore):

 int bufferPos = 0; while( true ) { if( isBufferFull( bufferPos ) ) { readData( bufferPos ); setBufferEmpty( bufferPos ); bufferPos = ( bufferPos + 1 ) % M; } } 

Ora la vecchia domanda: come sincronizzarli in modo efficace !?

  1. Proteggi ogni accesso in lettura / scrittura con mutex
  2. Introdurre un “periodo di prova”, per consentire il completamento delle scritture: leggere i dati nel buffer N, quando il buffer (N + 3) è stato contrassegnato come pieno (pericoloso, ma sembra funzionare …)
  3. ?!?

Idealmente mi piacerebbe qualcosa sulla falsariga di una barriera di memoria, che garantisca che tutte le precedenti letture / scritture siano visibili su tutte le CPU, sulla falsariga di:

 writeData( i ); MemoryBarrier(); //All data written and visible, set flag setBufferFull( i ); 

In questo modo, avrei solo dovuto monitorare i buffer buffer e quindi poter leggere i grandi blocchi di dati in modo sicuro.

Generalmente sto cercando qualcosa sulla falsariga di acquisire / rilasciare recinti come descritto da Preshing qui:

http://preshing.com/20130922/acquire-and-release-fences/

(Se ho capito bene, l’atomica C ++ 11 funziona solo per i thread di un singolo processo e non per più processi.)

Tuttavia le barriere di memoria proprie del GCC (__sync_synchronize in combinazione con la barriera del compilatore asm volatile (“” ::: “memory”) per essere sicuri) non sembrano funzionare come previsto, poiché le scritture diventano visibili dopo la barriera, quando io mi aspettavo che fossero completati.

Qualsiasi aiuto sarebbe apprezzato…

BTW: Sotto Windows funziona semplicemente usando variabili volatili (un comportamento specifico di Microsoft) …

Boost Interprocess supporta la memoria condivisa.

Boost Lockfree ha un tipo di coda Single-Consumer Single-Producer ( spsc_queue ). Questo è fondamentalmente ciò a cui ti riferisci come un buffer circolare.

Ecco una dimostrazione che passa i messaggi IPC (in questo caso, di tipo string ) usando questa coda, in modo lock-free.

Definire i tipi

Per prima cosa, definiamo i nostri tipi:

 namespace bip = boost::interprocess; namespace shm { template  using alloc = bip::allocator; using char_alloc = alloc; using shared_string = bip::basic_string, char_alloc >; using string_alloc = alloc; using ring_buffer = boost::lockfree::spsc_queue< shared_string, boost::lockfree::capacity<200> // alternatively, pass // boost::lockfree::allocator >; } 

Per semplicità ho scelto di dimostrare l’implementazione spsc_queue dimensione di spsc_queue , richiedendo in modo casuale una capacità di 200 elementi.

Il typedef shared_string definisce una stringa che verrà allocata in modo trasparente dal segmento di memoria condivisa, quindi sono anche “magicamente” condivise con l’altro processo.

Il lato del consumatore

Questo è il più semplice, quindi:

 int main() { // create segment and corresponding allocator bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536); shm::string_alloc char_alloc(segment.get_segment_manager()); shm::ring_buffer *queue = segment.find_or_construct("queue")(); 

Questo apre l’area di memoria condivisa, individua la coda condivisa se esiste. NOTA Questo dovrebbe essere sincronizzato nella vita reale.

Ora per la dimostrazione attuale:

 while (true) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); shm::shared_string v(char_alloc); if (queue->pop(v)) std::cout < < "Processed: '" << v << "'\n"; } 

Il consumatore monitora all'infinito la coda per i lavori in sospeso e ne elabora uno ogni ~ 10ms.

Il lato produttore

Il lato produttore è molto simile:

 int main() { bip::managed_shared_memory segment(bip::open_or_create, "MySharedMemory", 65536); shm::char_alloc char_alloc(segment.get_segment_manager()); shm::ring_buffer *queue = segment.find_or_construct("queue")(); 

Ancora una volta, aggiungi la sincronizzazione corretta alla fase di inizializzazione. Inoltre, probabilmente il produttore si incaricherà di liberare il segmento di memoria condivisa a tempo debito. In questa dimostrazione, ho solo "lasciato che si blocchino". Questo è bello per i test, vedi sotto.

Quindi, cosa fa il produttore?

  for (const char* s : { "hello world", "the answer is 42", "where is your towel" }) { std::this_thread::sleep_for(std::chrono::milliseconds(250)); queue->push({s, char_alloc}); } } 

Bene, il produttore produce esattamente 3 messaggi in ~ 750ms e poi esce.

Si noti che di conseguenza se lo facciamo (assumiamo una shell POSIX con controllo del lavoro):

 ./producer& ./producer& ./producer& wait ./consumer& 

Stamperà i messaggi 3x3 "immediatamente", lasciando in funzione il consumatore. fare

 ./producer& ./producer& ./producer& 

di nuovo, mostrerà i messaggi "in arrivo" in tempo reale (in raffica di 3 intervalli di ~ 250 ms) perché il consumatore è ancora in esecuzione in background

Vedi il codice completo online in questo elenco: https://gist.github.com/sehe/9376856