attendibile coda basata su attività

Mi chiedo se esista un’implementazione / wrapper per ConcurrentQueue , simile a BlockingCollection in cui il prelievo dalla raccolta non viene bloccato, ma è invece asincrono e causerà un’attesa asincrona finché un elemento non viene inserito nella coda.

Sono arrivato con la mia implementazione, ma non sembra funzionare come previsto. Mi chiedo se sto reinventando qualcosa che già esiste.

Ecco la mia implementazione:

public class MessageQueue { ConcurrentQueue queue = new ConcurrentQueue(); ConcurrentQueue<TaskCompletionSource> waitingQueue = new ConcurrentQueue<TaskCompletionSource>(); object queueSyncLock = new object(); public void Enqueue(T item) { queue.Enqueue(item); ProcessQueues(); } public async Task Dequeue() { TaskCompletionSource tcs = new TaskCompletionSource(); waitingQueue.Enqueue(tcs); ProcessQueues(); return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; } private void ProcessQueues() { TaskCompletionSource tcs=null; T firstItem=default(T); while (true) { bool ok; lock (queueSyncLock) { ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem); if (ok) { waitingQueue.TryDequeue(out tcs); queue.TryDequeue(out firstItem); } } if (!ok) break; tcs.SetResult(firstItem); } } } 

Non conosco una soluzione lock-free, ma puoi dare un’occhiata alla nuova libreria Dataflow , parte di Async CTP . Un semplice BufferBlock dovrebbe essere sufficiente, ad esempio:

 BufferBlock buffer = new BufferBlock(); 

La produzione e il consumo sono fatti più facilmente tramite i metodi di estensione sui tipi di blocchi di stream di dati.

La produzione è semplice come:

 buffer.Post(13); 

e il consumo è pronto per Async:

 int item = await buffer.ReceiveAsync(); 

Io raccomando di usare Dataflow se ansible; rendere tale buffer sia efficiente che corretto è più difficile di quanto non appaia prima.

Il mio tentativo (ha un evento sollevato quando viene creata una “promise” e può essere usata da un produttore esterno per sapere quando produrre più oggetti):

 public class AsyncQueue { private ConcurrentQueue _bufferQueue; private ConcurrentQueue> _promisesQueue; private object _syncRoot = new object(); public AsyncQueue() { _bufferQueue = new ConcurrentQueue(); _promisesQueue = new ConcurrentQueue>(); } ///  /// Enqueues the specified item. ///  /// The item. public void Enqueue(T item) { TaskCompletionSource promise; do { if (_promisesQueue.TryDequeue(out promise) && !promise.Task.IsCanceled && promise.TrySetResult(item)) { return; } } while (promise != null); lock (_syncRoot) { if (_promisesQueue.TryDequeue(out promise) && !promise.Task.IsCanceled && promise.TrySetResult(item)) { return; } _bufferQueue.Enqueue(item); } } ///  /// Dequeues the asynchronous. ///  /// The cancellation token. ///  public Task DequeueAsync(CancellationToken cancellationToken) { T item; if (!_bufferQueue.TryDequeue(out item)) { lock (_syncRoot) { if (!_bufferQueue.TryDequeue(out item)) { var promise = new TaskCompletionSource(); cancellationToken.Register(() => promise.TrySetCanceled()); _promisesQueue.Enqueue(promise); this.PromiseAdded.RaiseEvent(this, EventArgs.Empty); return promise.Task; } } } return Task.FromResult(item); } ///  /// Gets a value indicating whether this instance has promises. ///  ///  /// true if this instance has promises; otherwise, false. ///  public bool HasPromises { get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; } } ///  /// Occurs when a new promise /// is generated by the queue ///  public event EventHandler PromiseAdded; } 

Potrebbe essere eccessivo per il tuo caso d’uso (data la curva di apprendimento), ma Reactive Extentions fornisce tutta la colla che potresti mai desiderare per la composizione asincrona.

In sostanza, si sottoscrivono alle modifiche e vengono inviate non appena diventano disponibili e si può fare in modo che il sistema invii le modifiche su un thread separato.

Ecco l’implementazione che sto attualmente utilizzando.

 public class MessageQueue { ConcurrentQueue queue = new ConcurrentQueue(); ConcurrentQueue> waitingQueue = new ConcurrentQueue>(); object queueSyncLock = new object(); public void Enqueue(T item) { queue.Enqueue(item); ProcessQueues(); } public async Task DequeueAsync(CancellationToken ct) { TaskCompletionSource tcs = new TaskCompletionSource(); ct.Register(() => { lock (queueSyncLock) { tcs.TrySetCanceled(); } }); waitingQueue.Enqueue(tcs); ProcessQueues(); return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; } private void ProcessQueues() { TaskCompletionSource tcs = null; T firstItem = default(T); lock (queueSyncLock) { while (true) { if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem)) { waitingQueue.TryDequeue(out tcs); if (tcs.Task.IsCanceled) { continue; } queue.TryDequeue(out firstItem); } else { break; } tcs.SetResult(firstItem); } } } } 

Funziona abbastanza bene, ma c’è un sacco di contesa su queueSyncLock , dato che sto facendo un bel po ‘di uso di CancellationToken per cancellare alcune delle attività di attesa. Certo, questo porta a un blocco notevolmente inferiore che vedrei con BlockingCollection ma …

Mi chiedo se ci sia un modo più agevole e privo di blocco per raggiungere lo stesso fine

Puoi semplicemente usare BlockingCollection (usando il ConcurrentQueue predefinito) e avvolgere la chiamata in Take in a Task modo da poterla await :

 var bc = new BlockingCollection(); T element = await Task.Run( () => bc.Take() );