Spinlock inline-assembly più veloce

Sto scrivendo un’applicazione multithread in c ++, dove le prestazioni sono critiche. Ho bisogno di usare un sacco di blocchi durante la copia di piccole strutture tra i thread, per questo ho scelto di usare gli spinlock.

Ho fatto alcune ricerche e test di velocità su questo e ho scoperto che la maggior parte delle implementazioni sono più o meno ugualmente veloci:

  • Microsofts CRITICAL_SECTION, con SpinCount impostato su 1000, segna circa 140 unità di tempo
  • Implementando questo algoritmo con Microsofts InterlockedCompareExchange si ottengono circa 95 unità di tempo
  • Ive ha anche provato a utilizzare alcuni assembly inline con __asm {} usando qualcosa come questo codice e ha ottenuto circa 70 unità di tempo, ma non sono sicuro che sia stata creata una barriera di memoria adeguata.

Modifica: I tempi indicati qui sono il tempo impiegato da 2 thread per bloccare e sbloccare lo spinlock 1.000.000 volte.

So che questa non è una grande differenza, ma visto che uno spinlock è un object molto usato, si potrebbe pensare che i programmatori avrebbero concordato sul modo più veloce di creare uno spinlock. Googling porta comunque a molti approcci diversi. CMPXCHG8B che questo metodo sopra menzionato sarebbe il più veloce se implementato usando l’assembly inline e usando l’istruzione CMPXCHG8B invece di confrontare i registri a 32 bit. Inoltre, le barriere di memoria devono essere prese in considerazione, questo potrebbe essere fatto da LOCK CMPXHG8B (credo?) , Che garantisce “diritti esclusivi” alla memoria condivisa tra i core. Infine [alcuni suggerisce] che per le attese occupate dovrebbe essere accompagnato da NOP: REP che abilita i processori Hyper-threading a passare a un altro thread, ma non sono sicuro che sia vero o no?

Dal mio test delle prestazioni di diversi spinlock, si è visto che non c’è molta differenza, ma per scopi puramente accademici mi piacerebbe sapere qual è il più veloce. Tuttavia, poiché ho un’esperienza estremamente limitata nel linguaggio assembly e con le barriere della memoria, sarei felice se qualcuno potesse scrivere il codice assembly per l’ultimo esempio fornito con LOCK CMPXCHG8B e le corrette barriere di memoria nel seguente modello:

 __asm { spin_lock: ;locking code. spin_unlock: ;unlocking code. } 

Basta guardare qui: x86 spinlock usando cmpxchg

E grazie a Cory Nelson

 __asm{ spin_lock: xorl %ecx, %ecx incl %ecx spin_lock_retry: xorl %eax, %eax lock; cmpxchgl %ecx, (lock_addr) jnz spin_lock_retry ret spin_unlock: movl $0 (lock_addr) ret } 

E un’altra fonte dice: http://www.geoffchappell.com/studies/windows/km/cpu/cx8.htm

  lock cmpxchg8b qword ptr [esi] is replaceable with the following sequence try: lock bts dword ptr [edi],0 jnb acquired wait: test dword ptr [edi],1 je try pause ; if available jmp wait acquired: cmp eax,[esi] jne fail cmp edx,[esi+4] je exchange fail: mov eax,[esi] mov edx,[esi+4] jmp done exchange: mov [esi],ebx mov [esi+4],ecx done: mov byte ptr [edi],0 

Ed ecco una discussione sulle implementazioni lock-free vs lock: http://newsgroups.derkeiler.com/Archive/Comp/comp.programming.threads/2011-10/msg00009.html

Sebbene ci sia già una risposta accettata, ci sono alcune cose che potrebbero essere perse per migliorare tutte le risposte, tratte da questo articolo di Intel, tutte sopra l’implementazione del blocco rapido :

  1. Girare su una lettura volatile, non su un’istruzione atomica, questo evita il blocco del bus non necessario, specialmente su serrature altamente contese.
  2. Usa il back-off per le serrature altamente contestate
  3. In linea il blocco, preferibilmente con intrinseco per i compilatori in cui asm inline è dannoso (in pratica MSVC).

Wikipedia ha un buon articolo sugli spinlock, ecco l’implementazione x86

http://en.wikipedia.org/wiki/Spinlock#Example_implementation

Si noti che la loro implementazione non usa il prefisso “lock”, perché è ridondante su x86 per l’istruzione “xchg” – ha implicitamente una semantica di blocco, come discusso in questa discussione StackOverflow:

Su un multicore x86, è necessario un BLOCCO come prefisso per XCHG?

The REP: NOP è un alias per l’istruzione PAUSE, qui puoi saperne di più

In che modo le istruzioni di pausa x86 funzionano in spinlock * e * possono essere utilizzate in altri scenari?

Sulla questione delle barriere della memoria, ecco tutto quello che potresti voler sapere

Memory Barriers: una visione hardware per gli hacker del software di Paul E. McKenney

http://irl.cs.ucla.edu/~yingdi/paperreading/whymb.2010.06.07c.pdf

Di solito non sono uno di quelli che cercano di ottenere un codice veloce: di solito è un ottimo esercizio che si traduce in una migliore comprensione della programmazione e di un codice più veloce.

Nemmeno qui rimarrò, ma posso affermare inequivocabilmente che la domanda di un veloce spinlock 3 istruzioni lunghe o poche è – almeno sull’architettura x86 – un inseguimento inutile.

Ecco perché:

Richiamo di uno spinlock con una sequenza di codice tipica

 lock_variable DW 0 ; 0 <=> free mov ebx,offset lock_variable mov eax,1 xchg eax,[ebx] ; if eax contains 0 (no one owned it) you own the lock, ; if eax contains 1 (someone already does) you don't 

Liberare uno spinlock è banale

 mov ebx,offset lock_variable mov dword ptr [ebx],0 

L’istruzione xchg solleva il pin di blocco sul processore che in effetti significa che voglio il bus durante i cicli di clock successivi. Questo segnale si fa strada attraverso le cache e verso il basso fino al dispositivo di masterizzazione bus più lento che di solito è il bus PCI. Quando ogni dispositivo busmastering ha completato il segnale locka (lock acknowledge) viene inviato indietro. Quindi avviene lo scambio vero e proprio. Il problema è che la sequenza di lock / locka richiede molto tempo. Il bus PCI può funzionare a 33 MHz con diversi cicli di latenza. Su una CPU da 3,3 GHz significa che ogni ciclo del bus PCI richiede cento cicli di CPU.

Come regola generale presumo che un blocco richieda tra 300 e 3000 cicli di CPU per completare e alla fine non so se posseggo anche il blocco. Quindi i pochi cicli che puoi risparmiare con uno spinlock “veloce” sarà un miraggio perché nessun blocco è come il prossimo, dipenderà dalla tua situazione di autobus in quel breve lasso di tempo.

________________MODIFICARE________________

Ho appena letto che lo spinlock è un “object pesantemente usato”. Beh, ovviamente non capisci che uno spinlock consuma un’enorme quantità di cicli della CPU ogni volta che viene invocato. Oppure, per dirla in altro modo, ogni volta che la invochi perdi una quantità significativa della tua capacità di elaborazione.

Il trucco quando si usano gli spinlock (o il loro fratello più grande, la sezione critica) è di usarli il meno ansible mentre si sta ancora ottenendo la funzione di programma desiderata. Usarli dappertutto è facile e ti ritroverai con prestazioni poco brillanti.

Non si tratta solo di scrivere codice veloce, ma anche di organizzare i tuoi dati. Quando scrivi “copia di piccole strutture tra i fili” dovresti capire che il blocco potrebbe richiedere centinaia di volte più tempo per essere completato rispetto alla copia effettiva.

________________MODIFICARE________________

Quando si calcola un tempo medio di blocco, probabilmente dirà molto poco in quanto viene misurato sulla macchina che potrebbe non essere il bersaglio desiderato (che potrebbe avere caratteristiche di utilizzo del bus completamente diverse). Per la vostra macchina, la media sarà composta da singoli tempi molto veloci (quando le attività di masterizzazione del bus non interferiscono) fino a tempi molto lenti (quando l’interferenza sul mastering del bus era significativa).

È ansible introdurre il codice che determina i casi più veloci e più lenti e calcolare il quoziente per vedere quanto i tempi di spinlock possono variare.

________________MODIFICARE________________

Aggiornamento maggio 2016.

Peter Cordes ha promosso l’idea che “l’ottimizzazione di un blocco nel caso non conteso può avere senso” e che i tempi di blocco di molte centinaia di cicli di clock non si verificano nelle CPU moderne, tranne nelle situazioni in cui la variabile di blocco è disallineata. Ho iniziato a chiedermi se il mio precedente programma di test, scritto in Watcom C a 32 bit, potrebbe essere ostacolato da WOW64 poiché era in esecuzione su un sistema operativo a 64 bit: Windows 7.

Così ho scritto un programma a 64 bit e lo ho compilato con GCC 5.3 di TDM. Il programma utilizza la variante di istruzione implicitamente bloccante del bus “XCHG r, m” per il blocco e una semplice assegnazione “MOV m, r” per lo sblocco. In alcune varianti di blocco la variabile lock era pre-testata per determinare se fosse fattibile persino tentare un lock (usando un semplice confronto “CMP r, m”, probabilmente non avventurarsi fuori da L3). Ecco qui:

 // compiler flags used: // -O1 -m64 -mthreads -mtune=k8 -march=k8 -fwhole-program -freorder-blocks -fschedule-insns -falign-functions=32 -g3 -Wall -c -fmessage-length=0 #define CLASSIC_BUS_LOCK #define WHILE_PRETEST //#define SINGLE_THREAD typedef unsigned char u1; typedef unsigned short u2; typedef unsigned long u4; typedef unsigned int ud; typedef unsigned long long u8; typedef signed char i1; typedef short i2; typedef long i4; typedef int id; typedef long long i8; typedef float f4; typedef double f8; #define usizeof(a) ((ud)sizeof(a)) #define LOOPS 25000000 #include  #include  #ifndef bool typedef signed char bool; #endif u8 CPU_rdtsc (void) { ud tickl, tickh; __asm__ __volatile__("rdtsc":"=a"(tickl),"=d"(tickh)); return ((u8)tickh << 32)|tickl; } volatile u8 bus_lock (volatile u8 * block, u8 value) { __asm__ __volatile__( "xchgq %1,%0" : "=r" (value) : "m" (*block), "0" (value) : "memory"); return value; } void bus_unlock (volatile u8 * block, u8 value) { __asm__ __volatile__( "movq %0,%1" : "=r" (value) : "m" (*block), "0" (value) : "memory"); } void rfence (void) { __asm__ __volatile__( "lfence" : : : "memory"); } void rwfence (void) { __asm__ __volatile__( "mfence" : : : "memory"); } void wfence (void) { __asm__ __volatile__( "sfence" : : : "memory"); } volatile bool LOCK_spinlockPreTestIfFree (const volatile u8 *lockVariablePointer) { return (bool)(*lockVariablePointer == 0ull); } volatile bool LOCK_spinlockFailed (volatile u8 *lockVariablePointer) { return (bool)(bus_lock (lockVariablePointer, 1ull) != 0ull); } void LOCK_spinlockLeave (volatile u8 *lockVariablePointer) { *lockVariablePointer = 0ull; } static volatile u8 lockVariable = 0ull, lockCounter = 0ull; static volatile i8 threadHold = 1; static u8 tstr[4][32]; /* 32*8=256 bytes for each thread's parameters should result in them residing in different cache lines */ struct LOCKING_THREAD_STRUCTURE { u8 numberOfFailures, numberOfPreTests; f8 clocksPerLock, failuresPerLock, preTestsPerLock; u8 threadId; HANDLE threadHandle; ud idx; } *lts[4] = {(void *)tstr[0], (void *)tstr[1], (void *)tstr[2], (void *)tstr[3]}; DWORD WINAPI locking_thread (struct LOCKING_THREAD_STRUCTURE *ltsp) { ud n = LOOPS; u8 clockCycles; SetThreadAffinityMask (ltsp->threadHandle, 1ull<idx); while (threadHold) {} clockCycles = CPU_rdtsc (); while (n) { Sleep (0); #ifdef CLASSIC_BUS_LOCK while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;} #else #ifdef WHILE_PRETEST while (1) { do { ++ltsp->numberOfPreTests; } while (!LOCK_spinlockPreTestIfFree (&lockVariable)); if (!LOCK_spinlockFailed (&lockVariable)) break; ++ltsp->numberOfFailures; } #else while (1) { ++ltsp->numberOfPreTests; if (LOCK_spinlockPreTestIfFree (&lockVariable)) { if (!LOCK_spinlockFailed (&lockVariable)) break; ++ltsp->numberOfFailures; } } #endif #endif ++lockCounter; LOCK_spinlockLeave (&lockVariable); #ifdef CLASSIC_BUS_LOCK while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;} #else #ifdef WHILE_PRETEST while (1) { do { ++ltsp->numberOfPreTests; } while (!LOCK_spinlockPreTestIfFree (&lockVariable)); if (!LOCK_spinlockFailed (&lockVariable)) break; ++ltsp->numberOfFailures; } #else while (1) { ++ltsp->numberOfPreTests; if (LOCK_spinlockPreTestIfFree (&lockVariable)) { if (!LOCK_spinlockFailed (&lockVariable)) break; ++ltsp->numberOfFailures; } } #endif #endif --lockCounter; LOCK_spinlockLeave (&lockVariable); n-=2; } clockCycles = CPU_rdtsc ()-clockCycles; ltsp->clocksPerLock = (f8)clockCycles/ (f8)LOOPS; ltsp->failuresPerLock = (f8)ltsp->numberOfFailures/(f8)LOOPS; ltsp->preTestsPerLock = (f8)ltsp->numberOfPreTests/(f8)LOOPS; //rwfence (); ltsp->idx = 4u; ExitThread (0); return 0; } int main (int argc, char *argv[]) { u8 processAffinityMask, systemAffinityMask; memset (tstr, 0u, usizeof(tstr)); lts[0]->idx = 3; lts[1]->idx = 2; lts[2]->idx = 1; lts[3]->idx = 0; GetProcessAffinityMask (GetCurrentProcess(), &processAffinityMask, &systemAffinityMask); SetPriorityClass (GetCurrentProcess(), HIGH_PRIORITY_CLASS); SetThreadAffinityMask (GetCurrentThread (), 1ull); lts[0]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[0], 0, (void *)&lts[0]->threadId); #ifndef SINGLE_THREAD lts[1]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[1], 0, (void *)&lts[1]->threadId); lts[2]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[2], 0, (void *)&lts[2]->threadId); lts[3]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[3], 0, (void *)&lts[3]->threadId); #endif SetThreadAffinityMask (GetCurrentThread (), processAffinityMask); threadHold = 0; #ifdef SINGLE_THREAD while (lts[0]->idx<4u) {Sleep (1);} #else while (lts[0]->idx+lts[1]->idx+lts[2]->idx+lts[3]->idx<16u) {Sleep (1);} #endif printf ("T0:%1.1f,%1.1f,%1.1f\n", lts[0]->clocksPerLock, lts[0]->failuresPerLock, lts[0]->preTestsPerLock); printf ("T1:%1.1f,%1.1f,%1.1f\n", lts[1]->clocksPerLock, lts[1]->failuresPerLock, lts[1]->preTestsPerLock); printf ("T2:%1.1f,%1.1f,%1.1f\n", lts[2]->clocksPerLock, lts[2]->failuresPerLock, lts[2]->preTestsPerLock); printf ("T3:%1.1f,%1.1f,%1.1f\n", lts[3]->clocksPerLock, lts[3]->failuresPerLock, lts[3]->preTestsPerLock); printf ("T*:%1.1f,%1.1f,%1.1f\n", (lts[0]->clocksPerLock+ lts[1]->clocksPerLock+ lts[2]->clocksPerLock+ lts[3]->clocksPerLock)/ 4., (lts[0]->failuresPerLock+lts[1]->failuresPerLock+lts[2]->failuresPerLock+lts[3]->failuresPerLock)/4., (lts[0]->preTestsPerLock+lts[1]->preTestsPerLock+lts[2]->preTestsPerLock+lts[3]->preTestsPerLock)/4.); printf ("LC:%u\n", (ud)lockCounter); return 0; } 

Il programma è stato eseguito su un computer basato su DELL i5-4310U con DDR3-800, 2 core / 2 HT in grado di 2,7 GHz e una cache L3 comune.

Per cominciare sembra che l’impatto di WOW64 sia stato trascurabile.

Un singolo thread che esegue il blocco / sblocco non bloccato è stato in grado di farlo una volta ogni 110 cicli. L’ottimizzazione del blocco non mirato è inutile: qualsiasi codice aggiunto per migliorare la singola istruzione XCHG lo renderà più lento.

Con quattro HT che bombardano la variabile di blocco con i tentativi di blocco, la situazione cambia radicalmente. Il tempo necessario per ottenere un blocco riuscito passa a 994 cicli, di cui una parte significativa può essere attribuita ai tentativi di blocco non riusciti. Per dirla in un altro modo, in una situazione di contesa elevata in media 3,2 blocchi devono essere tentati per ottenere un blocco di successo. Ovviamente i 110 cicli non sono diventati 110 * 3.2 ma più vicini a 110 * 9. Quindi altri meccanismi sono in gioco qui come con i test sulla macchina più vecchia. Inoltre, i cicli medi 994 comprendono un intervallo compreso tra 716 e 1157

Le varianti di blocco che implementavano il pre-test richiedevano circa il 95% dei cicli richiesti dalla variante più semplice (XCHG). In media avrebbero eseguito 17 CMP per trovare la possibilità di provare 1,75 blocchi di cui 1 ha avuto successo. Consiglio di utilizzare il pre-test non solo perché è più veloce: impone meno sforzo al meccanismo di blocco del bus (3.2-1.75 = 1.45 tentativi di blocco in meno) anche se aumenta leggermente la complessità.

Solo per chiedere:

Prima di scavare in profondità nelle strutture di dati spinlock e quasi senza blocco:

Hai – nei tuoi benchmark e nella tua applicazione – assicurato che i thread concorrenti siano garantiti per l’esecuzione su core diversi?

Altrimenti potresti finire con un programma che funziona alla grande sul tuo computer di sviluppo ma che fa schifo / fallisce nel campo perché un thread deve essere sia l’armadietto che lo sblocco del tuo spinlock.

Per darti una cifra: su Windows hai un intervallo temporale standard di 10 millisecondi. Se non fai in modo che due thread fisici siano coinvolti nel blocco / sblocco, ti ritroverai con circa 500 blocchi / sblocchi al secondo, e questo risultato sarà molto