Implementazione del pattern Producer / Consumer in C #

Come posso implementare i pattern Producer / Consumer in C # usando Eventi e Delegati ? Di cosa ho bisogno per tenere d’occhio quando si tratta di risorse quando si utilizzano questi schemi di progettazione? Ci sono dei casi limite di cui ho bisogno di essere a conoscenza?

So che questo thread è un po ‘vecchio, ma visto che a volte mi sono imbattuto nelle mie ricerche, ho deciso di condividere questo codice produttore-consumatore per le persone che si chiedevano come implementare una semplice coda di lavoro generico produttore-consumatore.

La class Job viene utilizzata per “archiviare” la chiamata di un metodo di un object sotto forma di delegato. Il delegato viene quindi chiamato quando il lavoro viene elaborato. Tutti gli argomenti rilevanti sono anche memorizzati in questa class di lavoro.

Con questo semplice schema è ansible ottenere multi-threading nei processi di accodamento e dequeue. In realtà questa è solo la parte più semplice: il multi-threading porta nuove sfide al tuo codice, te ne accorgi dopo 😉

Ho originariamente pubblicato questo codice in questa discussione .

using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Threading; // Compiled and tested in: Visual Studio 2017, DotNET 4.6.1 namespace MyNamespace { public class Program { public static void Main(string[] args) { MyApplication app = new MyApplication(); app.Run(); } } public class MyApplication { private BlockingCollection JobQueue = new BlockingCollection(); private CancellationTokenSource JobCancellationTokenSource = new CancellationTokenSource(); private CancellationToken JobCancellationToken; private Timer Timer; private Thread UserInputThread; public void Run() { // Give a name to the main thread: Thread.CurrentThread.Name = "Main"; // Fires a Timer thread: Timer = new Timer(new TimerCallback(TimerCallback), null, 1000, 2000); // Fires a thread to read user inputs: UserInputThread = new Thread(new ThreadStart(ReadUserInputs)) { Name = "UserInputs", IsBackground = true }; UserInputThread.Start(); // Prepares a token to cancel the job queue: JobCancellationToken = JobCancellationTokenSource.Token; // Start processing jobs: ProcessJobs(); // Clean up: JobQueue.Dispose(); Timer.Dispose(); UserInputThread.Abort(); Console.WriteLine("Done."); } private void ProcessJobs() { try { // Checks if the blocking collection is still up for dequeueing: while (!JobQueue.IsCompleted) { // The following line blocks the thread until a job is available or throws an exception in case the token is cancelled: JobQueue.Take(JobCancellationToken).Run(); } } catch { } } private void ReadUserInputs() { // User input thread is running here. ConsoleKey key = ConsoleKey.Enter; // Reads user inputs and queue them for processing until the escape key is pressed: while ((key = Console.ReadKey(true).Key) != ConsoleKey.Escape) { Job userInputJob = new Job("UserInput", this, new Action(ProcessUserInputs), key); JobQueue.Add(userInputJob); } // Stops processing the JobQueue: JobCancellationTokenSource.Cancel(); } private void ProcessUserInputs(ConsoleKey key) { // Main thread is running here. Console.WriteLine($"You just typed '{key}'. (Thread: {Thread.CurrentThread.Name})"); } private void TimerCallback(object param) { // Timer thread is running here. Job job = new Job("TimerJob", this, new Action(ProcessTimer), "A job from timer callback was processed."); JobQueue.TryAdd(job); // Just enqueues the job for later processing } private void ProcessTimer(string message) { // Main thread is running here. Console.WriteLine($"{message} (Thread: {Thread.CurrentThread.Name})"); } } ///  /// The Job class wraps an object's method call, with or without arguments. This method is called later, during the Job execution. ///  public class Job { public string Name { get; } private object TargetObject; private Delegate TargetMethod; private object[] Arguments; public Job(string name, object obj, Delegate method, params object[] args) { Name = name; TargetObject = obj; TargetMethod = method; Arguments = args; } public void Run() { try { TargetMethod.Method.Invoke(TargetObject, Arguments); } catch(Exception ex) { Debug.WriteLine($"Unexpected error running job '{Name}': {ex}"); } } } }