Elabora la coda con multithreading o attività

Ho un’applicazione per messaggi telefonici in cui ci sono molti messaggi da elaborare. Perché le porte telefoniche sono limitate, quindi il messaggio verrà elaborato prima nella prima uscita. Ogni messaggio ha un contrassegno “Riconoscimento” che indica che viene elaborato. È stato inizializzato come falso, naturalmente.

Voglio mettere tutti i messaggi in una coda, quindi elaborarli con più thread o attività.

public class MessageQueue { public Queue MessageWorkItem { get; set; } public Messages Message { get; set; } public MessageQueue() { MessageWorkItem = new Queue(); Message = new Messages(); } public void GetMessageMetaData() { try { // It is just a test, add only one item into the queue Message.MessageID = Guid.NewGuid(); Message.NumberToCall = "1111111111"; Message.FacilityID = "3333"; Message.NumberToDial = "2222222222"; Message.CountryCode = "1"; Message.Acknowledge = false; } catch (Exception ex) { } } public void AddingItemToQueue() { GetMessageMetaData(); if (!Message.Acknowledge) { lock (MessageWorkItem) { MessageWorkItem.Enqueue(Message); } } } } public class Messages { public Guid MessageID { get; set; } public string NumberToCall { get; set; } public string FacilityID { get; set; } public string NumberToDial { get; set; } public string CountryCode { get; set; } public bool Acknowledge { get; set; } } 

Ora la mia domanda è come disconnettere l’elemento dalla coda con il multithreading. Per ogni articolo dalla coda, voglio eseguire uno script.

  public void RunScript(Message item) { try { PlayMessage(item); return; } catch (HangupException hex) { Log.WriteWithId("Caller Hungup!", hex.Message); } catch (Exception ex) { Log.WriteException(ex, "Unexpected exception: {0}"); } } 

Quello che pensavo era vedere se

if (MessageWorkItem.Count> = 1) Quindi fare qualcosa ma ho bisogno di aiuto per il codice.

Se è ansible utilizzare. Net 4.5, suggerirei di guardare Dataflow dalla Task Parallel Library (TPL) .

Quella pagina porta a molti esempi di procedure dettagliate come: Come: Implementare un modello e una procedura per il stream di dati di Producer-Consumer : Utilizzo del stream di dati in un’applicazione Windows Form .

Dai un’occhiata a quella documentazione per vedere se ti può aiutare. È molto da prendere, ma penso che sarebbe probabilmente il tuo approccio migliore.

In alternativa, è ansible esaminare l’utilizzo di un BlockingCollection insieme al relativo metodo GetConsumingEnumerable() per accedere agli elementi nella coda.

Quello che fai è dividere il lavoro in oggetti che vuoi elaborare in qualche modo, e usare un BlockingCollection per gestire la coda.

Alcuni esempi di codice che utilizzano gli ints anziché gli oggetti come elementi di lavoro aiuteranno a dimostrarlo:

Quando un thread di lavoro ha terminato con l’elemento corrente, rimuoverà un nuovo elemento dalla coda di lavoro, elaborerà quell’elemento, quindi lo aggiungerà alla coda di output.

Un thread utente separato rimuove gli elementi completati dalla coda di output e fa qualcosa con loro.

Alla fine dobbiamo aspettare che tutti i lavoratori finiscano (Task.WaitAll (worker)) prima di poter contrassegnare la coda di output come completata (outputQueue.CompleteAdding ()).

 using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace Demo { class Program { static void Main(string[] args) { new Program().run(); } void run() { int threadCount = 4; Task[] workers = new Task[threadCount]; Task.Factory.StartNew(consumer); for (int i = 0; i < threadCount; ++i) { int workerId = i; Task task = new Task(() => worker(workerId)); workers[i] = task; task.Start(); } for (int i = 0; i < 100; ++i) { Console.WriteLine("Queueing work item {0}", i); inputQueue.Add(i); Thread.Sleep(50); } Console.WriteLine("Stopping adding."); inputQueue.CompleteAdding(); Task.WaitAll(workers); outputQueue.CompleteAdding(); Console.WriteLine("Done."); Console.ReadLine(); } void worker(int workerId) { Console.WriteLine("Worker {0} is starting.", workerId); foreach (var workItem in inputQueue.GetConsumingEnumerable()) { Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem); Thread.Sleep(100); // Simulate work. outputQueue.Add(workItem); // Output completed item. } Console.WriteLine("Worker {0} is stopping.", workerId); } void consumer() { Console.WriteLine("Consumer is starting."); foreach (var workItem in outputQueue.GetConsumingEnumerable()) { Console.WriteLine("Consumer is using item {0}", workItem); Thread.Sleep(25); } Console.WriteLine("Consumer is finished."); } BlockingCollection inputQueue = new BlockingCollection(); BlockingCollection outputQueue = new BlockingCollection(); } } 

Parallel.ForOgni da TPL . È parallelo per-ciascuno.

Esempio (modificato MessageWorkItem in coda generica):

  public class MessageQueue { public Queue MessageWorkItem { get; set; } public MessageQueue() { MessageWorkItem = new Queue(); } public Message GetMessageMetaData() { try { // It is just a test, add only one item into the queue return new Message() { MessageID = Guid.NewGuid(), NumberToCall = "1111111111", FacilityID = "3333", NumberToDial = "2222222222", CountryCode = "1", Acknowledge = false }; } catch (Exception ex) { return null; } } public void AddingItemToQueue() { var message = GetMessageMetaData(); if (!message.Acknowledge) { lock (MessageWorkItem) { MessageWorkItem.Enqueue(message); } } } } public class Message { public Guid MessageID { get; set; } public string NumberToCall { get; set; } public string FacilityID { get; set; } public string NumberToDial { get; set; } public string CountryCode { get; set; } public bool Acknowledge { get; set; } } class Program { static void Main(string[] args) { MessageQueue me = new MessageQueue(); for (int i = 0; i < 10000; i++) me.AddingItemToQueue(); Console.WriteLine(me.MessageWorkItem.Count); Parallel.ForEach(me.MessageWorkItem, RunScript); } static void RunScript(Message item) { // todo: ... Console.WriteLine(item.MessageID); Thread.Sleep(300); } }