Perché Parallel.ForEach è molto più veloce di AsParallel (). ForAll () anche se MSDN suggerisce diversamente?

Ho fatto alcune ricerche per vedere come possiamo creare un’applicazione multithread che passa attraverso un albero.

Per scoprire come questo può essere implementato nel modo migliore ho creato un’applicazione di test che attraversa il mio C: \ disk e apre tutte le directory.

class Program { static void Main(string[] args) { //var startDirectory = @"C:\The folder\RecursiveFolder"; var startDirectory = @"C:\"; var w = Stopwatch.StartNew(); ThisIsARecursiveFunction(startDirectory); Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds); Console.ReadKey(); } public static void ThisIsARecursiveFunction(String currentDirectory) { var lastBit = Path.GetFileName(currentDirectory); var depth = currentDirectory.Count(t => t == '\\'); //Console.WriteLine(depth + ": " + currentDirectory); try { var children = Directory.GetDirectories(currentDirectory); //Edit this mode to switch what way of parallelization it should use int mode = 3; switch (mode) { case 1: foreach (var child in children) { ThisIsARecursiveFunction(child); } break; case 2: children.AsParallel().ForAll(t => { ThisIsARecursiveFunction(t); }); break; case 3: Parallel.ForEach(children, t => { ThisIsARecursiveFunction(t); }); break; default: break; } } catch (Exception eee) { //Exception might occur for directories that can't be accessed. } } } 

Tuttavia, ciò che ho riscontrato è che quando si esegue questo in modalità 3 (Parallel.ForEach) il codice viene completato in circa 2,5 secondi (sì, ho un SSD;)). Eseguendo il codice senza parallelizzazione, esso termina in circa 8 secondi. E eseguendo il codice in modalità 2 (AsParalle.ForAll ()) richiede un tempo quasi infinito.

Durante il processo di esplorazione di processo, incontro anche alcuni strani fatti:

 Mode1 (No Parallelization): Cpu: ~25% Threads: 3 Time to complete: ~8 seconds Mode2 (AsParallel().ForAll()): Cpu: ~0% Threads: Increasing by one per second (I find this strange since it seems to be waiting on the other threads to complete or a second timeout.) Time to complete: 1 second per node so about 3 days??? Mode3 (Parallel.ForEach()): Cpu: 100% Threads: At most 29-30 Time to complete: ~2.5 seconds 

Quello che trovo particolarmente strano è che Parallel.ForEach sembra ignorare qualsiasi thread / attività padre che sono ancora in esecuzione mentre AsParallel (). ForAll () sembra attendere che l’attività precedente sia completata (che non sarà presto da tutte le attività padre stanno ancora aspettando di completare i compiti dei figli).

Inoltre, ciò che ho letto su MSDN è stato: “Preferisci che sia tutto per tutti quando è ansible”

Fonte: http://msdn.microsoft.com/en-us/library/dd997403(v=vs.110).aspx

Qualcuno ha idea del perché questo potrebbe essere?

Modifica 1:

Come richiesto da Matthew Watson, ho prima caricato l’albero in memoria prima di collegarlo. Ora il caricamento dell’albero è fatto in sequenza.

I risultati comunque sono gli stessi. Unparallelized e Parallel.ForEach completare ora l’intero albero in circa 0,05 secondi mentre AsParallel (). ForAll continua ancora solo a circa 1 step al secondo.

Codice:

 class Program { private static DirWithSubDirs RootDir; static void Main(string[] args) { //var startDirectory = @"C:\The folder\RecursiveFolder"; var startDirectory = @"C:\"; Console.WriteLine("Loading file system into memory..."); RootDir = new DirWithSubDirs(startDirectory); Console.WriteLine("Done"); var w = Stopwatch.StartNew(); ThisIsARecursiveFunctionInMemory(RootDir); Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds); Console.ReadKey(); } public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory) { var depth = currentDirectory.Path.Count(t => t == '\\'); Console.WriteLine(depth + ": " + currentDirectory.Path); var children = currentDirectory.SubDirs; //Edit this mode to switch what way of parallelization it should use int mode = 2; switch (mode) { case 1: foreach (var child in children) { ThisIsARecursiveFunctionInMemory(child); } break; case 2: children.AsParallel().ForAll(t => { ThisIsARecursiveFunctionInMemory(t); }); break; case 3: Parallel.ForEach(children, t => { ThisIsARecursiveFunctionInMemory(t); }); break; default: break; } } } class DirWithSubDirs { public List SubDirs = new List(); public String Path { get; private set; } public DirWithSubDirs(String path) { this.Path = path; try { SubDirs = Directory.GetDirectories(path).Select(t => new DirWithSubDirs(t)).ToList(); } catch (Exception eee) { //Ignore directories that can't be accessed } } } 

Modifica 2:

Dopo aver letto l’aggiornamento sul commento di Matthew, ho provato ad aggiungere il seguente codice al programma:

 ThreadPool.SetMinThreads(4000, 16); ThreadPool.SetMaxThreads(4000, 16); 

Questo tuttavia non cambia il modo in cui le peform di AsParallel. Ancora i primi 8 passi vengono eseguiti in un istante prima di rallentare a 1 passo / secondo.

(Nota extra, al momento sto ignorando le eccezioni che si verificano quando non riesco ad accedere a una directory tramite il blocco Try Catch attorno a Directory.GetDirectories ())

Modifica 3:

Inoltre, quello che mi interessa maggiormente è la differenza tra Parallel.ForEach e AsParallel.ForAll perché per me è strano che per qualche ragione il secondo crea un Thread per ogni ricorsione che fa mentre il primo gestisce tutto in circa 30 thread max. (E anche perché MSDN suggerisce di utilizzare AsParallel anche se crea così tanti thread con un timeout di ~ 1 secondo)

Modifica 4:

Un’altra cosa strana che ho scoperto: quando provo a impostare i MinThreads nel pool di thread sopra 1023, sembra ignorare il valore e ridimensionare a circa 8 o 16: ThreadPool.SetMinThreads (1023, 16);

Ancora quando uso il 1023, i primi 1023 elementi sono molto veloci, seguiti da un ritmo lento che ho sperimentato in continuazione.

Nota: anche ora vengono creati più di 1000 thread (rispetto a 30 per l’intero Parallel. Per ognuno di essi).

Questo significa Parallel.ForEach è semplicemente più intelligente nella gestione delle attività?

Altre informazioni, questo codice stampa due volte 8 – 8 quando si imposta il valore superiore a 1023: (Quando si impostano i valori su 1023 o inferiore, viene stampato il valore corretto)

  int threadsMin; int completionMin; ThreadPool.GetMinThreads(out threadsMin, out completionMin); Console.WriteLine("Cur min threads: " + threadsMin + " and the other thing: " + completionMin); ThreadPool.SetMinThreads(1023, 16); ThreadPool.SetMaxThreads(1023, 16); ThreadPool.GetMinThreads(out threadsMin, out completionMin); Console.WriteLine("Now min threads: " + threadsMin + " and the other thing: " + completionMin); 

Modifica 5:

A partire dalla richiesta di Dean, ho creato un altro caso per creare manualmente le attività:

 case 4: var taskList = new List(); foreach (var todo in children) { var itemTodo = todo; taskList.Add(Task.Run(() => ThisIsARecursiveFunctionInMemory(itemTodo))); } Task.WaitAll(taskList.ToArray()); break; 

Anche questo è veloce come il ciclo Parallel.ForEach (). Quindi non abbiamo ancora la risposta al perché AsParallel (). ForAll () è molto più lento.

Questo problema è piuttosto debuggabile, un lusso non comune quando si hanno problemi con i thread. Lo strumento di base qui è Debug> Windows> finestra Debug di thread. Mostra i thread attivi e ti dà una sbirciatina nella loro traccia dello stack. Vedrai facilmente che, una volta che rallenta, avrai dozzine di thread attivi che sono tutti bloccati. Le loro tracce di stack sembrano tutte uguali:

  mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout, bool exitContext) + 0x16 bytes mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout) + 0x7 bytes mscorlib.dll!System.Threading.ManualResetEventSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x182 bytes mscorlib.dll!System.Threading.Tasks.Task.SpinThenBlockingWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x93 bytes mscorlib.dll!System.Threading.Tasks.Task.InternalRunSynchronously(System.Threading.Tasks.TaskScheduler scheduler, bool waitForCompletion) + 0xba bytes mscorlib.dll!System.Threading.Tasks.Task.RunSynchronously(System.Threading.Tasks.TaskScheduler scheduler) + 0x13 bytes System.Core.dll!System.Linq.Parallel.SpoolingTask.SpoolForAll(System.Linq.Parallel.QueryTaskGroupState groupState, System.Linq.Parallel.PartitionedStream partitions, System.Threading.Tasks.TaskScheduler taskScheduler) Line 172 C# // etc.. 

Ogni volta che vedi qualcosa del genere, dovresti immediatamente pensare al problema della manichetta antincendio . Probabilmente il terzo bug più comune con discussioni, dopo gare e deadlock.

Che puoi ragionare, ora che conosci la causa, il problema con il codice è che ogni thread che completa aggiunge N altri thread. Dove N è il numero medio di sottodirectory in una directory. In effetti, il numero di thread cresce in modo esponenziale , questo è sempre negativo. Resterà in controllo solo se N = 1, che ovviamente non si verifica mai su un disco tipico.

Fai attenzione che, come quasi ogni problema di threading, questo comportamento scorretto tende a ripetersi male. L’SSD nella tua macchina tende a nasconderlo. Così fa la RAM nella tua macchina, il programma potrebbe benissimo completare rapidamente e senza problemi la seconda volta che lo esegui. Dal momento che ora leggerete dalla cache del file system invece del disco, molto velocemente. Armeggiare con ThreadPool.SetMinThreads () lo nasconde pure, ma non può risolverlo. Non risolve mai alcun problema, li nasconde solo. Perché qualunque cosa accada, il numero esponenziale trarrà sempre il numero minimo di thread impostato. Puoi solo sperare che finisca di completare l’iterazione del disco prima che ciò accada. Idle speranza per un utente con una grande unità.

La differenza tra ParallelEnumerable.ForAll () e Parallel.ForEach () è ora forse anche facilmente spiegabile. Dalla traccia dello stack è ansible distinguere che ForAll () fa qualcosa di malizioso, il metodo RunSynchronously () blocca fino a quando tutti i thread sono completati. Il blocco è qualcosa che i thread del threadpool non devono fare, danneggia il pool di thread e non gli consente di programmare il processore per un altro lavoro. E ha l’effetto che hai osservato, il pool di thread viene rapidamente sopraffatto dai thread che sono in attesa degli altri N thread da completare. Che non sta succedendo, stanno aspettando in piscina e non vengono programmati perché ce ne sono già molti attivi.

Questo è uno scenario di deadlock, piuttosto comune, ma il gestore di threadpool ha una soluzione alternativa. Osserva i thread del threadpool attivi e interviene quando non vengono completati in modo tempestivo. Consente quindi l’avvio di un thread aggiuntivo , uno in più del minimo impostato da SetMinThreads (). Ma non più del set massimo di SetMaxThreads (), avere troppi thread attivi di tp è rischioso e probabilmente innesca OOM. Ciò risolve il deadlock, esso ottiene una delle chiamate ForAll () per completare. Ma questo accade a un ritmo molto lento, il threadpool lo fa solo due volte al secondo. Rimarrai a corto di pazienza prima che si riprenda.

Parallel.ForEach () non ha questo problema, non blocca quindi non gum up il pool.

Sembra essere la soluzione, ma tieni a mente che il tuo programma sta ancora bruciando la memoria della tua macchina, aggiungendo sempre più thread in attesa per il pool. Questo può mandare in crash il tuo programma, ma non è così probabile perché hai molta memoria e il threadpool non ne usa molto per tenere traccia di una richiesta. Alcuni programmatori, tuttavia, ottengono questo risultato .

La soluzione è molto semplice, ma non usare la filettatura. È dannoso , non c’è concorrenza quando si ha un solo disco. E non gli piace essere requisito da più thread. Soprattutto su un disco mandrino, la ricerca della testa è molto, molto lenta. Gli SSD lo fanno molto meglio, tuttavia richiede ancora 50 microsecondi, un sovraccarico che semplicemente non vuoi o di cui hai bisogno. Il numero ideale di thread per accedere a un disco che non si può altrimenti aspettare di essere memorizzato nella cache è sempre uno .

La prima cosa da notare è che si sta tentando di parallelizzare un’operazione legata all’IO, che distorce significativamente i tempi.

La seconda cosa da notare è la natura delle attività parallele: si sta ricorsivamente discendendo un albero delle directory. Se si creano più thread per fare ciò, è probabile che ogni thread acceda simultaneamente a una parte diversa del disco, il che farà sì che la testina di lettura del disco salti dappertutto e rallenti notevolmente.

Prova a cambiare test per creare un albero in memoria e accedici con più thread. Quindi sarai in grado di confrontare correttamente i tempi senza che i risultati siano distorti oltre ogni utilità.

Inoltre, potresti creare un numero elevato di thread e saranno (per impostazione predefinita) thread threadpool. Avere un numero elevato di thread rallenta le cose quando superano il numero di core del processore.

Si noti inoltre che quando si superano i thread minimi del pool di thread (definiti da ThreadPool.GetMinThreads() ), viene introdotto un ritardo dal gestore del pool di thread tra ogni nuova creazione del thread del threadpool. (Penso che questo è intorno a 0,5 secondi per nuovo thread).

Inoltre, se il numero di thread supera il valore restituito da ThreadPool.GetMaxThreads() , il thread di creazione verrà bloccato fino ThreadPool.GetMaxThreads() da uno degli altri thread. Penso che questo stia accadendo.

Puoi verificare questa ipotesi chiamando ThreadPool.SetMaxThreads() e ThreadPool.SetMinThreads() per aumentare questi valori e vedere se fa alcuna differenza.

(Infine, nota che se stai davvero provando a discendere ricorsivamente da C:\ , quasi certamente otterrai un’eccezione IO quando raggiunge una cartella OS protetta).

NOTA: imposta i thread del threadpool max / min in questo modo:

 ThreadPool.SetMinThreads(4000, 16); ThreadPool.SetMaxThreads(4000, 16); 

Azione supplementare

Ho provato il tuo codice di test con i conteggi threadpool impostati come descritto sopra, con i seguenti risultati (non eseguiti sull’intero C: \ drive, ma su un sottoinsieme più piccolo):

  • La modalità 1 ha richiesto 06,5 secondi.
  • La modalità 2 ha richiesto 15,7 secondi.
  • La modalità 3 ha impiegato 16,4 secondi.

Questo è in linea con le mie aspettative; l’aggiunta di un carico di threading in questo modo lo rende più lento rispetto a quello a thread singolo e i due approcci paralleli richiedono all’incirca lo stesso tempo.


Nel caso in cui qualcun altro voglia indagare su questo, ecco un codice di test determinativo (il codice dell’OP non è riproducibile perché non conosciamo la sua struttura di directory).

 using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; namespace Demo { internal class Program { private static DirWithSubDirs RootDir; private static void Main() { Console.WriteLine("Loading file system into memory..."); RootDir = new DirWithSubDirs("Root", 4, 4); Console.WriteLine("Done"); //ThreadPool.SetMinThreads(4000, 16); //ThreadPool.SetMaxThreads(4000, 16); var w = Stopwatch.StartNew(); ThisIsARecursiveFunctionInMemory(RootDir); Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds); Console.ReadKey(); } public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory) { var depth = currentDirectory.Path.Count(t => t == '\\'); Console.WriteLine(depth + ": " + currentDirectory.Path); var children = currentDirectory.SubDirs; //Edit this mode to switch what way of parallelization it should use int mode = 3; switch (mode) { case 1: foreach (var child in children) { ThisIsARecursiveFunctionInMemory(child); } break; case 2: children.AsParallel().ForAll(t => { ThisIsARecursiveFunctionInMemory(t); }); break; case 3: Parallel.ForEach(children, t => { ThisIsARecursiveFunctionInMemory(t); }); break; default: break; } } } internal class DirWithSubDirs { public List SubDirs = new List(); public String Path { get; private set; } public DirWithSubDirs(String path, int width, int depth) { this.Path = path; if (depth > 0) for (int i = 0; i < width; ++i) SubDirs.Add(new DirWithSubDirs(path + "\\" + i, width, depth - 1)); } } } 

I metodi Parallel.For e .ForEach sono implementati internamente come equivalenti all’esecuzione di iterazioni in Attività, ad esempio un ciclo come:

 Parallel.For(0, N, i => { DoWork(i); }); 

è equivalente a:

 var tasks = new List(N); for(int i=0; i DoWork((int)state), i)); } Task.WaitAll(tasks.ToArray()); 

E dal punto di vista di ogni iterazione potenzialmente in esecuzione in parallelo con ogni altra iterazione, questo è un modello mentale ok, ma non accade nella realtà. Parallelamente, infatti, non usa necessariamente un compito per iterazione, poiché questo è significativamente più sovraccarico del necessario. Parallel.ForEach tenta di utilizzare il numero minimo di attività necessarie per completare il ciclo il più velocemente ansible. Sposta le attività come thread diventano disponibili per elaborare tali attività, e ciascuna di queste attività partecipa a uno schema di gestione (penso che sia chiamato chunking): Un’attività richiede che vengano eseguite più iterazioni, le acquisiscano e quindi i processi che funzionano, e poi torna per di più. Le dimensioni del blocco variano in base al numero di attività che partecipano, al carico sulla macchina, ecc.

PLINQ. AsParallel () ha un’implementazione diversa, ma ‘può’ comunque recuperare analogamente più iterazioni in un archivio temporaneo, eseguire i calcoli in un thread (ma non come un’attività) e inserire i risultati della query in un buffer piccolo. (Si ottiene qualcosa basato su ParallelQuery e poi su. Le funzioni Any () si collegano a un insieme alternativo di metodi di estensione che forniscono implementazioni parallele).

Quindi, ora che abbiamo una piccola idea di come funzionano questi due meccanismi, cercherò di fornire una risposta alla tua domanda iniziale:

Quindi, perché è .AsParallel () più lento di Parallel.ForEach ? La ragione deriva dal seguente. Le attività (o la loro implementazione equivalente qui) NON bloccano le chiamate di tipo I / O. Attendono e liberano la CPU per fare qualcos’altro. Ma (citando C # nutshell book): ” PLINQ non può eseguire il lavoro legato all’I / O senza bloccare i thread “. Le chiamate sono sincrone . Sono stati scritti con l’intenzione di aumentare il grado di parallelismo se (e SOLO se) si stanno facendo cose come il download di pagine Web per attività che non risparmiano tempo CPU.

E il motivo per cui le chiamate di funzione sono esattamente analoghe alle chiamate con I / O è questo: uno dei tuoi thread (chiamalo T) blocca e non fa nulla finché tutti i suoi thread figli non hanno finito, il che può essere un processo lento qui. La stessa T non ha bisogno di molta CPU mentre aspetta che i bambini si sblocchino, non fa altro che aspettare . Quindi è identico a una tipica chiamata di funzione legata all’I / O.

Basato sulla risposta accettata a Come esattamente funziona AsParallel?

.AsParallel.ForAll() restituisce a IEnumerable prima di chiamare .ForAll()

quindi crea 1 nuovo thread + N chiamate ricorsive (ognuna delle quali genera un nuovo thread).