Nesting ti aspetta in Parallel.ForEach

In un’app della metropolitana, ho bisogno di eseguire un numero di chiamate WCF. C’è un numero significativo di chiamate da effettuare, quindi ho bisogno di farlo in un ciclo parallelo. Il problema è che il ciclo parallelo termina prima che le chiamate WCF siano complete.

Come si dovrebbe fare il refactoring per funzionare come previsto?

var ids = new List() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; var customers = new System.Collections.Concurrent.BlockingCollection(); Parallel.ForEach(ids, async i => { ICustomerRepo repo = new CustomerRepo(); var cust = await repo.GetCustomer(i); customers.Add(cust); }); foreach ( var customer in customers ) { Console.WriteLine(customer.ID); } Console.ReadKey(); 

L’idea alla base di Parallel.ForEach() è che hai un set di thread e ogni thread elabora parte della collezione. Come hai notato, questo non funziona con asyncawait , dove vuoi rilasciare il thread per la durata della chiamata asincrona.

Si può “aggiustarlo” bloccando i thread ForEach() , ma ciò sconfigge l’intero punto di async : await .

Quello che potresti fare è usare TPL Dataflow invece di Parallel.ForEach() , che supporta bene le Task asincrone.

Nello specifico, il tuo codice potrebbe essere scritto utilizzando un TransformBlock che trasforma ciascun ID in un Customer utilizzando il lambda async . Questo blocco può essere configurato per l’esecuzione in parallelo. Dovresti colbind quel blocco a un ActionBlock che scrive ciascun Customer sulla console. Dopo aver configurato la rete a blocchi, puoi Post() ogni id su TransformBlock .

Nel codice:

 var ids = new List { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; var getCustomerBlock = new TransformBlock( async i => { ICustomerRepo repo = new CustomerRepo(); return await repo.GetCustomer(i); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }); var writeCustomerBlock = new ActionBlock(c => Console.WriteLine(c.ID)); getCustomerBlock.LinkTo( writeCustomerBlock, new DataflowLinkOptions { PropagateCompletion = true }); foreach (var id in ids) getCustomerBlock.Post(id); getCustomerBlock.Complete(); writeCustomerBlock.Completion.Wait(); 

Sebbene tu voglia probabilmente limitare il parallelismo di TransformBlock a qualche piccola costante. Inoltre, è ansible limitare la capacità di TransformBlock e aggiungere gli elementi in modo asincrono utilizzando SendAsync() , ad esempio se la raccolta è troppo grande.

Come ulteriore vantaggio rispetto al tuo codice (se ha funzionato) è che la scrittura verrà avviata non appena un singolo elemento è finito, e non aspettare fino a quando tutta l’elaborazione è finita.

la risposta di svick è (come al solito) eccellente.

Tuttavia, trovo che Dataflow sia più utile quando si dispone di grandi quantità di dati da trasferire. O quando hai bisogno di una coda compatibile async .

Nel tuo caso, una soluzione più semplice è usare il parallelismo in stile async :

 var ids = new List() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" }; var customerTasks = ids.Select(i => { ICustomerRepo repo = new CustomerRepo(); return repo.GetCustomer(i); }); var customers = await Task.WhenAll(customerTasks); foreach (var customer in customers) { Console.WriteLine(customer.ID); } Console.ReadKey(); 

L’uso di DataFlow come suggerito di svick potrebbe essere eccessivo e la risposta di Stephen non fornisce i mezzi per controllare la concorrenza dell’operazione. Tuttavia, ciò può essere ottenuto piuttosto semplicemente:

 public static async Task RunWithMaxDegreeOfConcurrency( int maxDegreeOfConcurrency, IEnumerable collection, Func taskFactory) { var activeTasks = new List(maxDegreeOfConcurrency); foreach (var task in collection.Select(taskFactory)) { activeTasks.Add(task); if (activeTasks.Count == maxDegreeOfConcurrency) { await Task.WhenAny(activeTasks.ToArray()); //observe exceptions here activeTasks.RemoveAll(t => t.IsCompleted); } } await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => { //observe exceptions in a manner consistent with the above }); } 

Le chiamate ToArray() possono essere ottimizzate utilizzando una matrice anziché un elenco e sostituendo le attività completate, ma dubito che farebbe molta differenza nella maggior parte degli scenari. Esempio di utilizzo per la domanda dell’OP:

 RunWithMaxDegreeOfConcurrency(10, ids, async i => { ICustomerRepo repo = new CustomerRepo(); var cust = await repo.GetCustomer(i); customers.Add(cust); }); 

EDIT Fellow SO user e TPL wiz Eli Arbel mi ha indirizzato a un articolo correlato di Stephen Toub . Come al solito, la sua implementazione è allo stesso tempo elegante ed efficiente:

 public static Task ForEachAsync( this IEnumerable source, int dop, Func body) { return Task.WhenAll( from partition in Partitioner.Create(source).GetPartitions(dop) select Task.Run(async delegate { using (partition) while (partition.MoveNext()) await body(partition.Current).ContinueWith(t => { //observe exceptions }); })); } 

È ansible risparmiare con il nuovo pacchetto NuGet di AsyncEnumerator , che non esisteva 4 anni fa quando la domanda era stata originariamente pubblicata. Ti permette di controllare il grado di parallelismo:

 using System.Collections.Async; ... await ids.ParallelForEachAsync(async i => { ICustomerRepo repo = new CustomerRepo(); var cust = await repo.GetCustomer(i); customers.Add(cust); }, maxDegreeOfParallelism: 10); 

Dichiarazione di non responsabilità: sono l’autore della libreria AsyncEnumerator, che è open source e con licenza MIT, e sto postando questo messaggio solo per aiutare la comunità.

Task.Run() Parallel.Foreach in un Task.Run() e invece della parola chiave await usa [yourasyncmethod].Result

(è necessario eseguire Task.Run per non bloccare il thread dell’interfaccia utente)

Qualcosa come questo:

 var yourForeachTask = Task.Run(() => { Parallel.ForEach(ids, i => { ICustomerRepo repo = new CustomerRepo(); var cust = repo.GetCustomer(i).Result; customers.Add(cust); }); }); await yourForeachTask; 

Questo dovrebbe essere abbastanza efficiente e più semplice che ottenere l’intero stream di dati di TPL:

 var customers = await ids.SelectAsync(async i => { ICustomerRepo repo = new CustomerRepo(); return await repo.GetCustomer(i); }); ... public static async Task> SelectAsync(this IEnumerable source, Func> selector, int maxDegreesOfParallelism = 4) { var results = new List(); var activeTasks = new HashSet>(); foreach (var item in source) { activeTasks.Add(selector(item)); if (activeTasks.Count >= maxDegreesOfParallelism) { var completed = await Task.WhenAny(activeTasks); activeTasks.Remove(completed); results.Add(completed.Result); } } results.AddRange(await Task.WhenAll(activeTasks)); return results; } 

Dopo aver introdotto una serie di metodi di supporto, potrai eseguire query parallele con questa semplice syntax:

 const int DegreeOfParallelism = 10; IEnumerable result = await Enumerable.Range(0, 1000000) .Split(DegreeOfParallelism) .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false)) .ConfigureAwait(false); 

Quello che succede qui è dividere la raccolta di origine in 10 blocchi ( .Split(DegreeOfParallelism) ), quindi eseguire 10 attività ciascuna elaborando i suoi elementi uno per uno ( .SelectManyAsync(...) ) e .SelectManyAsync(...) in un unico elenco.

Vale la pena menzionare un approccio più semplice:

 double[] result2 = await Enumerable.Range(0, 1000000) .Select(async i => await CalculateAsync(i).ConfigureAwait(false)) .WhenAll() .ConfigureAwait(false); 

Ma serve una precauzione : se si dispone di una raccolta di origini troppo grande, verrà chedule un’attività per ogni elemento immediatamente, il che potrebbe causare risultati significativi in ​​termini di prestazioni.

I metodi di estensione utilizzati negli esempi sopra sono i seguenti:

 public static class CollectionExtensions { ///  /// Splits collection into number of collections of nearly equal size. ///  public static IEnumerable> Split(this IEnumerable src, int slicesCount) { if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount)); List source = src.ToList(); var sourceIndex = 0; for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++) { var list = new List(); int itemsLeft = source.Count - targetIndex; while (slicesCount * list.Count < itemsLeft) { list.Add(source[sourceIndex++]); } yield return list; } } ///  /// Takes collection of collections, projects those in parallel and merges results. ///  public static async Task> SelectManyAsync( this IEnumerable> source, Func> func) { List[] slices = await source .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false)) .WhenAll() .ConfigureAwait(false); return slices.SelectMany(s => s); } /// Runs selector and awaits results. public static async Task> SelectListAsync(this IEnumerable source, Func> selector) { List result = new List(); foreach (TSource source1 in source) { TResult result1 = await selector(source1).ConfigureAwait(false); result.Add(result1); } return result; } /// Wraps tasks with Task.WhenAll. public static Task WhenAll(this IEnumerable> source) { return Task.WhenAll(source); } } 

Sono un po ‘in ritardo per festeggiare, ma potresti prendere in considerazione l’idea di utilizzare GetAwaiter.GetResult () per eseguire il tuo codice asincrono nel contesto di sincronizzazione, ma in modo parallelo al seguente;

  Parallel.ForEach(ids, i => { ICustomerRepo repo = new CustomerRepo(); // Run this in thread which Parallel library occupied. var cust = repo.GetCustomer(i).GetAwaiter().GetResult(); customers.Add(cust); }); 

Un metodo di estensione per questo che fa uso di SemaphoreSlim e consente anche di impostare il massimo grado di parallelismo

  ///  /// Concurrently Executes async actions for each item of  ///  /// Type of IEnumerable /// instance of "/> /// an async  to execute /// Optional, An integer that represents the maximum degree of parallelism, /// Must be grater than 0 /// A Task representing an async operation /// If the maxActionsToRunInParallel is less than 1 public static async Task ForEachAsyncConcurrent( this IEnumerable enumerable, Func action, int? maxDegreeOfParallelism = null) { if (maxDegreeOfParallelism.HasValue) { using (var semaphoreSlim = new SemaphoreSlim( maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value)) { var tasksWithThrottler = new List(); foreach (var item in enumerable) { // Increment the number of currently running tasks and wait if they are more than limit. await semaphoreSlim.WaitAsync(); tasksWithThrottler.Add(Task.Run(async () => { await action(item); // action is completed, so decrement the number of currently running tasks semaphoreSlim.Release(); })); } // Wait for all tasks to complete. await Task.WhenAll(tasksWithThrottler.ToArray()); } } else { await Task.WhenAll(enumerable.Select(item => action(item))); } } 

Esempio di utilizzo:

 await enumerable.ForEachAsyncConcurrent( async item => { await SomeAsyncMethod(item); }, 5);