Creazione di una coda di blocco in .NET?

Ho uno scenario in cui ho più thread che aggiungono a una coda e più thread che leggono dalla stessa coda. Se la coda raggiunge una dimensione specifica, tutti i thread che stanno riempiendo la coda verranno bloccati in aggiunta fino a quando un articolo non viene rimosso dalla coda.

La soluzione qui sotto è ciò che sto usando in questo momento e la mia domanda è: come può essere migliorato? C’è un object che già abilita questo comportamento nel BCL che dovrei usare?

internal class BlockingCollection : CollectionBase, IEnumerable { //todo: might be worth changing this into a proper QUEUE private AutoResetEvent _FullEvent = new AutoResetEvent(false); internal T this[int i] { get { return (T) List[i]; } } private int _MaxSize; internal int MaxSize { get { return _MaxSize; } set { _MaxSize = value; checkSize(); } } internal BlockingCollection(int maxSize) { MaxSize = maxSize; } internal void Add(T item) { Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.WaitOne(); List.Add(item); Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId)); checkSize(); } internal void Remove(T item) { lock (List) { List.Remove(item); } Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId)); } protected override void OnRemoveComplete(int index, object value) { checkSize(); base.OnRemoveComplete(index, value); } internal new IEnumerator GetEnumerator() { return List.GetEnumerator(); } private void checkSize() { if (Count < MaxSize) { Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.Set(); } else { Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.Reset(); } } } 

Sembra molto pericoloso (pochissima sincronizzazione); che ne dici di qualcosa come:

 class SizeQueue { private readonly Queue queue = new Queue(); private readonly int maxSize; public SizeQueue(int maxSize) { this.maxSize = maxSize; } public void Enqueue(T item) { lock (queue) { while (queue.Count >= maxSize) { Monitor.Wait(queue); } queue.Enqueue(item); if (queue.Count == 1) { // wake up any blocked dequeue Monitor.PulseAll(queue); } } } public T Dequeue() { lock (queue) { while (queue.Count == 0) { Monitor.Wait(queue); } T item = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return item; } } } 

(modificare)

In realtà, vorrai un modo per chiudere la coda in modo che i lettori inizino a uscire in modo pulito – forse qualcosa come un bool flag – se impostato, una coda vuota restituisce (anziché bloccare):

 bool closing; public void Close() { lock(queue) { closing = true; Monitor.PulseAll(queue); } } public bool TryDequeue(out T value) { lock (queue) { while (queue.Count == 0) { if (closing) { value = default(T); return false; } Monitor.Wait(queue); } value = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return true; } } 

Usa .net 4 BlockingCollection, per accodare l’uso Add (), per rimuovere la coda usa Take (). Utilizza internamente ConcurrentQueue non bloccante. Maggiori informazioni qui Veloce e Best Producer / consumer queue tecnica BlockingCollection vs Queue concurrent

“Come può essere migliorato?”

Bene, è necessario esaminare tutti i metodi della class e considerare cosa succederebbe se un altro thread stesse chiamando contemporaneamente quel metodo o qualsiasi altro metodo. Ad esempio, si inserisce un blocco nel metodo Rimuovi, ma non nel metodo Aggiungi. Cosa succede se un thread aggiunge contemporaneamente a un altro thread Rimuove? Cose brutte.

Inoltre, considera che un metodo può restituire un secondo object che fornisce l’accesso ai dati interni del primo object, ad esempio GetEnumerator. Immagina che un thread stia attraversando quell’enumeratore, un altro thread sta modificando l’elenco contemporaneamente. Non bene.

Una buona regola empirica è semplificare la procedura, riducendo al minimo il numero di metodi nella class.

In particolare, non ereditare un’altra class contenitore, perché esporrai tutti i metodi di quella class, fornendo un modo per il chiamante di danneggiare i dati interni, o di vedere le modifiche parzialmente completate ai dati (altrettanto male, perché i dati sembra corrotto in quel momento). Nascondi tutti i dettagli e sii completamente spietato su come autorizzi l’accesso a loro.

Ti consiglio vivamente di utilizzare soluzioni off-the-shelf – ottenere un libro su threading o utilizzare la libreria di terze parti. Altrimenti, dato quello che stai tentando, eseguirai il debug del tuo codice per molto tempo.

Inoltre, non avrebbe più senso per Remove restituire un elemento (ad esempio, quello che è stato aggiunto per primo, in quanto è una coda), piuttosto che il chiamante che sceglie un elemento specifico? E quando la coda è vuota, forse anche Rimuovere dovrebbe bloccarsi.

Aggiornamento: la risposta di Marc implementa tutti questi suggerimenti! 🙂 Ma lascerò questo qui perché potrebbe essere utile capire perché la sua versione è un tale miglioramento.

È ansible utilizzare BlockingCollection e ConcurrentQueue nello spazio dei nomi System.Collections.Concurrent

  public class ProducerConsumerQueue : BlockingCollection { ///  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality ///  public ProducerConsumerQueue() : base(new ConcurrentQueue()) { } ///  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality ///  ///  public ProducerConsumerQueue(int maxSize) : base(new ConcurrentQueue(), maxSize) { } } 

L’ho appena fatto usando le Reattive Extensions e ho ricordato questa domanda:

 public class BlockingQueue { private readonly Subject _queue; private readonly IEnumerator _enumerator; private readonly object _sync = new object(); public BlockingQueue() { _queue = new Subject(); _enumerator = _queue.GetEnumerator(); } public void Enqueue(T item) { lock (_sync) { _queue.OnNext(item); } } public T Dequeue() { _enumerator.MoveNext(); return _enumerator.Current; } } 

Non necessariamente del tutto sicuro, ma molto semplice.

Questo è quello che mi è venuto in mente per una coda di blocco delimitata da thread.

 using System; using System.Collections.Generic; using System.Text; using System.Threading; public class BlockingBuffer { private Object t_lock; private Semaphore sema_NotEmpty; private Semaphore sema_NotFull; private T[] buf; private int getFromIndex; private int putToIndex; private int size; private int numItems; public BlockingBuffer(int Capacity) { if (Capacity <= 0) throw new ArgumentOutOfRangeException("Capacity must be larger than 0"); t_lock = new Object(); buf = new T[Capacity]; sema_NotEmpty = new Semaphore(0, Capacity); sema_NotFull = new Semaphore(Capacity, Capacity); getFromIndex = 0; putToIndex = 0; size = Capacity; numItems = 0; } public void put(T item) { sema_NotFull.WaitOne(); lock (t_lock) { while (numItems == size) { Monitor.Pulse(t_lock); Monitor.Wait(t_lock); } buf[putToIndex++] = item; if (putToIndex == size) putToIndex = 0; numItems++; Monitor.Pulse(t_lock); } sema_NotEmpty.Release(); } public T take() { T item; sema_NotEmpty.WaitOne(); lock (t_lock) { while (numItems == 0) { Monitor.Pulse(t_lock); Monitor.Wait(t_lock); } item = buf[getFromIndex++]; if (getFromIndex == size) getFromIndex = 0; numItems--; Monitor.Pulse(t_lock); } sema_NotFull.Release(); return item; } } 

Non ho esplorato completamente il TPL, ma potrebbero avere qualcosa che si adatta alle tue esigenze o, per lo meno, alcuni foraggi Reflector da cui trarre ispirazione.

Spero possa aiutare.

Bene, potresti guardare la class System.Threading.Semaphore . A parte questo, no, devi farlo da solo. AFAIK non esiste una collezione così integrata.

Se si desidera il massimo rendimento, consentendo a più lettori di leggere e solo uno scrittore di scrivere, BCL ha qualcosa chiamato ReaderWriterLockSlim che dovrebbe aiutare a snellire il codice …