Qual è il modo più semplice per attendere il completamento di tutte le attività di ExecutorService
? Il mio compito è principalmente computazionale, quindi voglio solo eseguire un gran numero di lavori, uno per ogni core. In questo momento il mio setup assomiglia a questo:
ExecutorService es = Executors.newFixedThreadPool(2); for (DataTable singleTable : uniquePhrases) { es.execute(new ComputeDTask(singleTable)); } try{ es.wait(); } catch (InterruptedException e){ e.printStackTrace(); }
ComputeDTask
implementa eseguibile. Sembra che esegua correttamente le attività, ma il codice si arresta IllegalMonitorStateException
in wait()
con IllegalMonitorStateException
. Questo è strano, perché ho giocato con alcuni esempi di giocattoli e sembrava funzionare.
uniquePhrases
contiene diverse decine di migliaia di elementi. Dovrei usare un altro metodo? Sto cercando qualcosa di più semplice ansible
L’approccio più semplice è l’utilizzo di ExecutorService.invokeAll()
che fa ciò che si desidera in una sola riga. Nel tuo linguaggio, dovrai modificare o avvolgere ComputeDTask
per implementare Callable<>
, che può darti un po ‘più di flessibilità. Probabilmente nella tua app c’è un’implementazione significativa di Callable.call()
, ma ecco un modo per completarlo se non si utilizza Executors.callable()
.
ExecutorService es = Executors.newFixedThreadPool(2); List> todo = new ArrayList>(singleTable.size()); for (DataTable singleTable: uniquePhrases) { todo.add(Executors.callable(new ComputeDTask(singleTable))); } List> answers = es.invokeAll(todo);
Come altri hanno sottolineato, è ansible utilizzare la versione di timeout di invokeAll()
se appropriato. In questo esempio, le answers
contengono una serie di Future
s che restituiscono valori null (vedere la definizione di Executors.callable()
. Probabilmente ciò che si vuole fare è un leggero refactoring in modo da poter ottenere una risposta utile o un riferimento al ComputeDTask
sottostante, ma non posso dire dal tuo esempio.
Se non è chiaro, si noti che invokeAll()
non ritornerà fino a quando tutte le attività non saranno completate. (cioè, tutti i Future
nella tua collezione di answers
riporteranno .isDone()
se richiesto). Questo evita tutte le interruzioni manuali, WaitTermination, ecc … e ti consente di riutilizzare questo ExecutorService
modo ordinato per più cicli, se lo desideri.
Ci sono alcune domande correlate su SO:
Come aspettare che tutti i thread finiscano
Restituisce valori dai thread java
invokeAll () non disposto ad accettare una Collection
Devo sincronizzare?
Nessuno di questi è rigorosamente in linea per la tua domanda, ma forniscono un po ‘di colore su come la gente dovrebbe pensare che Executor
/ ExecutorService
debba essere usato.
Se si desidera attendere il completamento di tutte le attività, utilizzare il metodo di shutdown
anziché wait
. Quindi seguilo con awaitTermination
.
Inoltre, è ansible utilizzare Runtime.availableProcessors
per ottenere il numero di thread hardware in modo da poter inizializzare correttamente il threadpool.
Se attendere che tutte le attività in ExecutorService
finiscano non è precisamente il tuo objective, ma piuttosto aspettare fino al completamento di un gruppo specifico di attività , puoi utilizzare un CompletionService
, in particolare un ExecutorCompletionService
.
L’idea è di creare un ExecutorCompletionService
avvolgendo il tuo Executor
, inviare un numero noto di attività attraverso il CompletionService
, quindi tracciare lo stesso numero di risultati dalla coda di completamento usando take()
(che blocca) o poll()
(che non lo fa) . Una volta che hai disegnato tutti i risultati attesi corrispondenti alle attività che hai inviato, sai che sono tutti fatti.
Permettetemi di dirlo ancora una volta, perché non è ovvio dall’interfaccia: dovete sapere quante cose inserite nel CompletionService
per sapere quante cose cercare di tracciare. Ciò è particolarmente importante con il metodo take()
: chiamalo una volta troppe e bloccherà il tuo thread chiamante fino a quando qualche altro thread invierà un altro job allo stesso CompletionService
.
Ci sono alcuni esempi che mostrano come usare CompletionService
nel libro Java Concurrency in Practice .
Se si desidera attendere il completamento dell’esecuzione del servizio executor, richiamare shutdown()
e quindi attendere WaitTermination (unità, unitType) , ad esempio awaitTermination(1, MINUTE)
. ExecutorService non blocca il proprio monitor, quindi non è ansible utilizzare l’ wait
ecc.
Potresti aspettare che i lavori finiscano in un determinato intervallo:
int maxSecondsPerComputeDTask = 20; try { while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) { // consider giving up with a 'break' statement under certain conditions } } catch (InterruptedException e) { throw new RuntimeException(e); }
O potresti usare ExecutorService . invia ( Runnable ) e raccogli gli oggetti Future che restituisce e chiama get () a turno per aspettare che finiscano.
ExecutorService es = Executors.newFixedThreadPool(2); Collection> futures = new LinkedList<>(); for (DataTable singleTable : uniquePhrases) { futures.add(es.submit(new ComputeDTask(singleTable))); } for (Future> future : futures) { try { future.get(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } }
InterruptedException è estremamente importante da gestire correttamente. È ciò che consente a te o agli utenti della tua biblioteca di terminare in sicurezza un lungo processo.
Basta usare
latch = new CountDownLatch(noThreads)
In ogni thread
latch.countDown();
e come barriera
latch.await();
Causa principale di IllegalMonitorStateException :
Viene generato per indicare che un thread ha tentato di attendere sul monitor di un object o di notificare altri thread in attesa sul monitor di un object senza possedere il monitor specificato.
Dal tuo codice, hai appena chiamato wait () su ExecutorService senza possedere il lock.
Sotto il codice verrà risolto IllegalMonitorStateException
try { synchronized(es){ es.wait(); // Add some condition before you call wait() } }
Seguire uno dei seguenti approcci per attendere il completamento di tutte le attività, che sono state inviate a ExecutorService
.
Esegui tutte le attività Future
da submit
su ExecutorService
e controlla lo stato con la chiamata di blocco get()
sull’object Future
Utilizzo di invokeAll su ExecutorService
Utilizzando CountDownLatch
Uso di ForkJoinPool o newWorkStealingPool di Executors
(da java 8)
Arresta il pool come consigliato nella pagina della documentazione di Oracle
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); }
Se si desidera attendere con grazia tutte le attività per il completamento quando si utilizza l’opzione 5 invece delle opzioni da 1 a 4, modificare
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
a
un while(condition)
che controlla ogni 1 minuto.
Ho anche la situazione in cui ho una serie di documenti da sottoporre a scansione. Comincio con un documento iniziale “seme” che dovrebbe essere elaborato, quel documento contiene collegamenti ad altri documenti che dovrebbero essere elaborati, e così via.
Nel mio programma principale, voglio solo scrivere qualcosa come il seguente, in cui Crawler
controlla un gruppo di thread.
Crawler c = new Crawler(); c.schedule(seedDocument); c.waitUntilCompletion()
La stessa situazione accadrebbe se volessi navigare su un albero; inserirò il nodo root, il processore per ogni nodo aggiungerebbe i figli alla coda, se necessario, e un gruppo di thread elaborerebbe tutti i nodes dell’albero, finché non ce ne fossero altri.
Non ho trovato nulla nella JVM che pensavo fosse un po ‘sorprendente. Così ho scritto una class AutoStopThreadPool
che si può usare direttamente o in sottoclass per aggiungere metodi adatti al dominio, ad es. schedule(Document)
. Spero che sia d’aiuto!
AutoStopThreadPool Javadoc | Scaricare
Aggiungi tutti i thread nella raccolta e invokeAll
utilizzando invokeAll
. Se è ansible utilizzare il metodo invokeAll
di ExecutorService
, JVM non procederà alla riga successiva finché tutti i thread non saranno completi.
Ecco un buon esempio: invokeAll tramite ExecutorService
Invia i tuoi compiti nel Runner e poi attendi chiamando il metodo waitTillDone () in questo modo:
Runner runner = Runner.runner(2); for (DataTable singleTable : uniquePhrases) { runner.run(new ComputeDTask(singleTable)); } // blocks until all tasks are finished (or failed) runner.waitTillDone(); runner.shutdown();
Per usarlo aggiungi questa dipendenza gradle / maven: 'com.github.matejtymes:javafixes:1.0'
Per maggiori dettagli guarda qui: https://github.com/MatejTymes/JavaFixes o qui: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html
È ansible utilizzare il metodo ExecutorService.invokeAll
, eseguirà tutte le attività e attenderà fino a quando tutti i thread hanno completato il loro compito.
Qui è completo javadoc
È anche ansible utilizzare la versione sovraccaricata di questo metodo per specificare il timeout.
Ecco un codice di esempio con ExecutorService.invokeAll
public class Test { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService service = Executors.newFixedThreadPool(3); List> taskList = new ArrayList<>(); taskList.add(new Task1()); taskList.add(new Task2()); List> results = service.invokeAll(taskList); for (Future f : results) { System.out.println(f.get()); } } } class Task1 implements Callable { @Override public String call() throws Exception { try { Thread.sleep(2000); return "Task 1 done"; } catch (Exception e) { e.printStackTrace(); return " error in task1"; } } } class Task2 implements Callable { @Override public String call() throws Exception { try { Thread.sleep(3000); return "Task 2 done"; } catch (Exception e) { e.printStackTrace(); return " error in task2"; } } }
Una semplice alternativa a questo è usare i thread insieme a join. Fare riferimento a: Unione di fili
Aspetterò solo che l’esecutore termini con un timeout specificato che pensi sia adatto per completare le attività.
try { //do stuff here exe.execute(thread); } finally { exe.shutdown(); } boolean result = exe.awaitTermination(4, TimeUnit.HOURS); if (!result) { LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour."); }
Sembra che sia necessario ForkJoinPool
e utilizzare il pool globale per eseguire attività.
public static void main(String[] args) { // the default `commonPool` should be sufficient for many cases. ForkJoinPool pool = ForkJoinPool.commonPool(); // The root of your task that may spawn other tasks. // Make sure it submits the additional tasks to the same executor that it is in. Runnable rootTask = new YourTask(pool); pool.execute(rootTask); pool.awaitQuiescence(...); // that's it. }
Il bello è in pool.awaitQuiescence
dove il metodo bloccherà utilizzare il thread del chiamante per eseguire i suoi compiti e poi tornare quando è veramente vuoto.
È ansible utilizzare un ThreadPoolExecutor con una dimensione del pool impostata su Runtime.getRuntime (). AvailableProcessors () e .execute()
tutte le attività che si desidera eseguire, quindi chiama tpe.shutdown()
e quindi attende un while(!tpe.terminated()) { /* waiting for all tasks to complete */}
quali blocchi per tutte le attività inviate da completare. dove tpe
è un riferimento all’istanza ThreadPoolExecutor
.
O se è più appropriato utilizzare un ExecutorCompletionService A CompletionService che utilizza un Executor fornito per eseguire le attività. Questa class dispone che le attività inoltrate siano, una volta completate, posizionate su una coda accessibile utilizzando Take. La class è abbastanza leggera da essere adatta per l’uso transitorio durante l’elaborazione di gruppi di attività.
NOTA: questa soluzione non si blocca come ExecutorService.invokeAll()
durante l’attesa di tutti i lavori. Quindi, mentre il più semplice è anche il più limitato in quanto bloccherà il thread principale dell’applicazione. Non va bene se si desidera che lo stato di ciò che sta accadendo, o per fare altre cose mentre questi lavori sono in esecuzione, come post-elaborazione dei risultati o produzione e risultato aggregato incrementale.