TThreadedQueue non è capace di più utenti?

Tentativo di utilizzare TThreadedQueue (Generics.Collections) in un unico schema consumer multiplo. (Delphi-XE). L’idea è di spingere gli oggetti in una coda e lasciare che diversi thread di lavoro scarichino la coda.

Tuttavia, non funziona come previsto. Quando due o più thread di lavoro chiamano PopItem, le violazioni di accesso vengono generate da TThreadedQueue.

Se la chiamata a PopItem è serializzata con una sezione critica, tutto va bene.

Sicuramente TThreadedQueue dovrebbe essere in grado di gestire più utenti, quindi mi manca qualcosa o si tratta di un bug puro in TThreadedQueue?

Ecco un semplice esempio per produrre l’errore.

    program TestThreadedQueue; {$APPTYPE CONSOLE} uses // FastMM4 in '..\..\..\FastMM4\FastMM4.pas', Windows, Messages, Classes, SysUtils, SyncObjs, Generics.Collections; type TThreadTaskMsg = class(TObject) private threadID : integer; threadMsg : string; public Constructor Create( ID : integer; const msg : string); end; type TThreadReader = class(TThread) private fPopQueue : TThreadedQueue; fSync : TCriticalSection; fMsg : TThreadTaskMsg; fException : Exception; procedure DoSync; procedure DoHandleException; public Constructor Create( popQueue : TThreadedQueue; sync : TCriticalSection); procedure Execute; override; end; Constructor TThreadReader.Create( popQueue : TThreadedQueue; sync : TCriticalSection); begin fPopQueue:= popQueue; fMsg:= nil; fSync:= sync; Self.FreeOnTerminate:= FALSE; fException:= nil; Inherited Create( FALSE); end; procedure TThreadReader.DoSync ; begin WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId)); end; procedure TThreadReader.DoHandleException; begin WriteLn('Exception ->' + fException.Message); end; procedure TThreadReader.Execute; var signal : TWaitResult; begin NameThreadForDebugging('QueuePop worker'); while not Terminated do begin try {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. } Sleep(20); {- Serializing calls to PopItem works } if Assigned(fSync) then fSync.Enter; try signal:= fPopQueue.PopItem( TObject(fMsg)); finally if Assigned(fSync) then fSync.Release; end; if (signal = wrSignaled) then begin try if Assigned(fMsg) then begin fMsg.threadMsg:= ''; fMsg.Free; // We are just dumping the message in this test //Synchronize( Self.DoSync); //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0); end; except on E:Exception do begin end; end; end; except FException:= Exception(ExceptObject); try if not (FException is EAbort) then begin {Synchronize(} DoHandleException; //); end; finally FException:= nil; end; end; end; end; Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string); begin Inherited Create; threadID:= ID; threadMsg:= msg; end; var fSync : TCriticalSection; fThreadQueue : TThreadedQueue; fReaderArr : array[1..4] of TThreadReader; i : integer; begin try IsMultiThread:= TRUE; fSync:= TCriticalSection.Create; fThreadQueue:= TThreadedQueue.Create(1024,1,100); try {- Calling without fSync throws exceptions when two or more threads calls PopItem at the same time } WriteLn('Creating worker threads ...'); for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil); {- Calling with fSync works ! } //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync); WriteLn('Init done. Pushing items ...'); for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,'')); ReadLn; finally for i:= 1 to 4 do fReaderArr[i].Free; fThreadQueue.Free; fSync.Free; end; except on E: Exception do begin Writeln(E.ClassName, ': ', E.Message); ReadLn; end; end; end. 

    Aggiornamento : l’errore in TMonitor che ha causato il blocco di TThreadedQueue è stato risolto in Delphi XE2.

    Aggiornamento 2 : il test precedente ha sollecitato la coda nello stato vuoto. Darian Miller ha scoperto che sottolineando la coda allo stato completo, è comunque ansible riprodurre l’errore in XE2. L’errore è ancora una volta in TMonitor. Vedi la sua risposta qui sotto per maggiori informazioni. E anche un link al QC101114.

    Aggiornamento 3 : con l’aggiornamento 4 di Delphi-XE2 c’era una soluzione annunciata per TMonitor che avrebbe TThreadedQueue i problemi in TThreadedQueue . I miei test finora non sono più in grado di riprodurre più errori in TThreadedQueue . Test singolo produttore / più thread di consumo quando la coda è vuota e piena. Testati anche più produttori / più consumatori. Ho variato i thread dei lettori e i thread degli autori da 1 a 100 senza alcun problema tecnico. Ma conoscendo la storia, sfido gli altri a rompere TMonitor .

    Beh, è ​​difficile essere sicuri senza un sacco di test, ma sicuramente sembra che questo sia un bug, sia in TThreadedQueue che in TMonitor. In entrambi i casi è nella RTL e non nel tuo codice. Si dovrebbe archiviare questo come un rapporto di controllo qualità e utilizzare il tuo esempio sopra come il codice “come riprodurre”.

    Vi consiglio di usare OmniThreadLibrary http://www.thedelphigeek.com/search/label/OmniThreadLibrary quando lavorate con discussioni, parallelismo, ecc. Primoz ha fatto un ottimo lavoro, e sul sito troverete molta documentazione utile .

    Il tuo esempio sembra funzionare bene con XE2, ma se riempi la tua coda fallisce con AV su un PushItem. (Testato con XE2 Update1)

    Per riprodurre, basta aumentare la creazione dell’attività da 100 a 1100 (la profondità della coda è stata impostata su 1024)

     for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,'')); 

    Questo muore per me ogni volta su Windows 7. Inizialmente ho provato una spinta continua per testarlo e non è riuscito al ciclo 30 … quindi al loop 16 … quindi a 65 quindi a intervalli diversi ma ha costantemente fallito in alcuni punto.

      iLoop := 0; while iLoop < 1000 do begin Inc(iLoop); WriteLn('Loop: ' + IntToStr(iLoop)); for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,'')); end; 

    Ho cercato la class TThreadedQueue ma non sembra averla nella mia D2009. Non mi ucciderò esattamente per questo: il supporto ai thread di Delphi è sempre stato err. Errm … ‘non ottimale’ e sospetto che TThreadedQueue non sia diverso 🙂

    Perché utilizzare i generici per oggetti PC (Producer / Consumer)? Un semplice discendente di TObjectQueue andrà bene – l’ha usato per decenni – funziona bene con più produttori / consumatori:

     unit MinimalSemaphorePCqueue; { Absolutely minimal PC queue based on TobjectQueue and a semaphore. The semaphore count reflects the queue count 'push' will always succeed unless memory runs out, then you're stuft anyway. 'pop' has a timeout parameter as well as the address of where any received object is to be put. 'pop' returns immediately with 'true' if there is an object on the queue available for it. 'pop' blocks the caller if the queue is empty and the timeout is not 0. 'pop' returns false if the timeout is exceeded before an object is available from the queue. 'pop' returns true if an object is available from the queue before the timeout is exceeded. If multiple threads have called 'pop' and are blocked because the queue is empty, a single 'push' will make only one of the waiting threads ready. Methods to push/pop from the queue A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call. When the handle is signaled, the 'peek' method will retrieve the queued object. } interface uses Windows, Messages, SysUtils, Classes,syncObjs,contnrs; type pObject=^Tobject; TsemaphoreMailbox=class(TobjectQueue) private countSema:Thandle; protected access:TcriticalSection; public property semaHandle:Thandle read countSema; constructor create; virtual; procedure push(aObject:Tobject); virtual; function pop(pResObject:pObject;timeout:DWORD):boolean; virtual; function peek(pResObject:pObject):boolean; virtual; destructor destroy; override; end; implementation { TsemaphoreMailbox } constructor TsemaphoreMailbox.create; begin {$IFDEF D2009} inherited Create; {$ELSE} inherited create; {$ENDIF} access:=TcriticalSection.create; countSema:=createSemaphore(nil,0,maxInt,nil); end; destructor TsemaphoreMailbox.destroy; begin access.free; closeHandle(countSema); inherited; end; function TsemaphoreMailbox.pop(pResObject: pObject; timeout: DWORD): boolean; // dequeues an object, if one is available on the queue. If the queue is empty, // the caller is blocked until either an object is pushed on or the timeout // period expires begin // wait for a unit from the semaphore result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout)); if result then // if a unit was supplied before the timeout, begin access.acquire; try pResObject^:=inherited pop; // get an object from the queue finally access.release; end; end; end; procedure TsemaphoreMailbox.push(aObject: Tobject); // pushes an object onto the queue. If threads are waiting in a 'pop' call, // one of them is made ready. begin access.acquire; try inherited push(aObject); // shove the object onto the queue finally access.release; end; releaseSemaphore(countSema,1,nil); // release one unit to semaphore end; function TsemaphoreMailbox.peek(pResObject: pObject): boolean; begin access.acquire; try result:=(count>0); if result then pResObject^:=inherited pop; // get an object from the queue finally access.release; end; end; end. 

    Non penso che TThreadedQueue supporterà più utenti. È un FIFO, come da file di aiuto. Ho l’impressione che ci sia un thread che spinge e un altro (solo uno!) Popping.