Limita il numero di thread paralleli in C #

Sto scrivendo un programma C # per generare e caricare mezzo milione di file via FTP. Voglio elaborare 4 file in parallelo poiché la macchina ha 4 core e la generazione del file richiede molto più tempo. È ansible convertire il seguente esempio di PowerShell in C #? O c’è qualche framework migliore come Actor framework in C # (come F # MailboxProcessor)?

Esempio di Powershell

$maxConcurrentJobs = 3; # Read the input and queue it up $jobInput = get-content .\input.txt $queue = [System.Collections.Queue]::Synchronized( (New-Object System.Collections.Queue) ) foreach($item in $jobInput) { $queue.Enqueue($item) } # Function that pops input off the queue and starts a job with it function RunJobFromQueue { if( $queue.Count -gt 0) { $j = Start-Job -ScriptBlock {param($x); Get-WinEvent -LogName $x} -ArgumentList $queue.Dequeue() Register-ObjectEvent -InputObject $j -EventName StateChanged -Action { RunJobFromQueue; Unregister-Event $eventsubscriber.SourceIdentifier; Remove-Job $eventsubscriber.SourceIdentifier } | Out-Null } } # Start up to the max number of concurrent jobs # Each job will take care of running the rest for( $i = 0; $i -lt $maxConcurrentJobs; $i++ ) { RunJobFromQueue } 

Aggiornare:
La connessione al server FTP remoto può essere lenta, quindi voglio limitare l’elaborazione dell’upload FTP.

Supponendo che tu lo stia creando con la TPL, puoi impostare ParallelOptions.MaxDegreesOfParallelism su qualsiasi cosa tu voglia che sia.

Parallelo. Per un esempio di codice.

Task Parallel Library è tuo amico qui. Vedi questo link che descrive ciò che è a tua disposizione. Fondamentalmente viene fornito il framework 4 che ottimizza questi thread in pool essenzialmente in background sul numero di processori sulla macchina in esecuzione.

Forse qualcosa sulla falsariga di:

 ParallelOptions options = new ParallelOptions(); options.MaxDegreeOfParallelism = 4; 

Quindi nel tuo ciclo qualcosa come:

 Parallel.Invoke(options, () => new WebClient().Upload("http://www.linqpad.net", "lp.html"), () => new WebClient().Upload("http://www.jaoo.dk", "jaoo.html")); 

Se stai usando .Net 4.0 puoi usare la libreria Parallel

Supponendo che tu stia iterando il mezzo milione di file puoi “parallelare” l’iterazione usando un Parallel Foreach per esempio oppure puoi dare un’occhiata a PLinq Ecco un confronto tra i due

In sostanza, si vorrà creare un’azione o un’attività per ogni file da caricare, inserirli in un elenco e quindi elaborare tale elenco, limitando il numero che può essere elaborato in parallelo.

Il mio post sul blog mostra come farlo sia con Attività che con Azioni e fornisce un progetto di esempio che è ansible scaricare ed eseguire per vedere entrambi in azione.

Con azioni

Se si utilizza Azioni, è ansible utilizzare la funzione .Net Parallel.Invoke integrata. Qui lo limitiamo a eseguire al massimo 4 thread in parallelo.

 var listOfActions = new List(); foreach (var file in files) { var localFile = file; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(() => UploadFile(localFile))); } var options = new ParallelOptions {MaxDegreeOfParallelism = 4}; Parallel.Invoke(options, listOfActions.ToArray()); 

Questa opzione non supporta async, e suppongo che la funzione di FileUpload sia, quindi potresti voler usare l’esempio di Task qui sotto.

Con compiti

Con Tasks non esiste una funzione integrata. Tuttavia, puoi utilizzare quello che fornisco sul mio blog.

  ///  /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. /// NOTE: If one of the given tasks has already been started, an exception will be thrown. ///  /// The tasks to run. /// The maximum number of tasks to run in parallel. /// The cancellation token. public static async Task StartAndWaitAllThrottledAsync(IEnumerable tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) { await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); } ///  /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel. /// NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed. /// NOTE: If one of the given tasks has already been started, an exception will be thrown. ///  /// The tasks to run. /// The maximum number of tasks to run in parallel. /// The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely. /// The cancellation token. public static async Task StartAndWaitAllThrottledAsync(IEnumerable tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) { // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. var tasks = tasksToRun.ToList(); using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) { var postTaskTasks = new List(); // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); // Start running each task. foreach (var task in tasks) { // Increment the number of tasks currently running and wait if too many are running. await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken); cancellationToken.ThrowIfCancellationRequested(); task.Start(); } // Wait for all of the provided tasks to complete. // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. await Task.WhenAll(postTaskTasks.ToArray()); } } 

E poi creando il tuo elenco di attività e chiamando la funzione per farle funzionare, con un massimo di 4 simultaneamente alla volta, potresti fare questo:

 var listOfTasks = new List(); foreach (var file in files) { var localFile = file; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(async () => await UploadFile(localFile))); } await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 4); 

Inoltre, poiché questo metodo supporta async, non bloccherà il thread dell’interfaccia utente come se utilizzasse Parallel.Invoke o Parallel.ForEach.

Ho codificato sotto la tecnica dove uso BlockingCollection come gestore di thread count. È abbastanza semplice implementare e gestire il lavoro. Accetta semplicemente oggetti Task e aggiunge un valore intero all’elenco di blocchi, aumentando il numero di thread in esecuzione per 1. Al termine del thread, esso deseleziona l’object e rilascia il blocco all’operazione di aggiunta per le attività imminenti.

  public class BlockingTaskQueue { private BlockingCollection threadManager { get; set; } = null; public bool IsWorking { get { return threadManager.Count > 0 ? true : false; } } public BlockingTaskQueue(int maxThread) { threadManager = new BlockingCollection(maxThread); } public async Task AddTask(Task task) { Task.Run(() => { Run(task); }); } private bool Run(Task task) { try { threadManager.Add(1); task.Start(); task.Wait(); return true; } catch (Exception ex) { return false; } finally { threadManager.Take(); } } }