ExecutorService, come aspettare che tutte le attività finiscano

Qual è il modo più semplice per attendere il completamento di tutte le attività di ExecutorService ? Il mio compito è principalmente computazionale, quindi voglio solo eseguire un gran numero di lavori, uno per ogni core. In questo momento il mio setup assomiglia a questo:

 ExecutorService es = Executors.newFixedThreadPool(2); for (DataTable singleTable : uniquePhrases) { es.execute(new ComputeDTask(singleTable)); } try{ es.wait(); } catch (InterruptedException e){ e.printStackTrace(); } 

ComputeDTask implementa eseguibile. Sembra che esegua correttamente le attività, ma il codice si arresta IllegalMonitorStateException in wait() con IllegalMonitorStateException . Questo è strano, perché ho giocato con alcuni esempi di giocattoli e sembrava funzionare.

uniquePhrases contiene diverse decine di migliaia di elementi. Dovrei usare un altro metodo? Sto cercando qualcosa di più semplice ansible

L’approccio più semplice è l’utilizzo di ExecutorService.invokeAll() che fa ciò che si desidera in una sola riga. Nel tuo linguaggio, dovrai modificare o avvolgere ComputeDTask per implementare Callable<> , che può darti un po ‘più di flessibilità. Probabilmente nella tua app c’è un’implementazione significativa di Callable.call() , ma ecco un modo per completarlo se non si utilizza Executors.callable() .

 ExecutorService es = Executors.newFixedThreadPool(2); List> todo = new ArrayList>(singleTable.size()); for (DataTable singleTable: uniquePhrases) { todo.add(Executors.callable(new ComputeDTask(singleTable))); } List> answers = es.invokeAll(todo); 

Come altri hanno sottolineato, è ansible utilizzare la versione di timeout di invokeAll() se appropriato. In questo esempio, le answers contengono una serie di Future s che restituiscono valori null (vedere la definizione di Executors.callable() . Probabilmente ciò che si vuole fare è un leggero refactoring in modo da poter ottenere una risposta utile o un riferimento al ComputeDTask sottostante, ma non posso dire dal tuo esempio.

Se non è chiaro, si noti che invokeAll() non ritornerà fino a quando tutte le attività non saranno completate. (cioè, tutti i Future nella tua collezione di answers riporteranno .isDone() se richiesto). Questo evita tutte le interruzioni manuali, WaitTermination, ecc … e ti consente di riutilizzare questo ExecutorService modo ordinato per più cicli, se lo desideri.

Ci sono alcune domande correlate su SO:

  • Come aspettare che tutti i thread finiscano

  • Restituisce valori dai thread java

  • invokeAll () non disposto ad accettare una Collection >

  • Devo sincronizzare?

Nessuno di questi è rigorosamente in linea per la tua domanda, ma forniscono un po ‘di colore su come la gente dovrebbe pensare che Executor / ExecutorService debba essere usato.

Se si desidera attendere il completamento di tutte le attività, utilizzare il metodo di shutdown anziché wait . Quindi seguilo con awaitTermination .

Inoltre, è ansible utilizzare Runtime.availableProcessors per ottenere il numero di thread hardware in modo da poter inizializzare correttamente il threadpool.

Se attendere che tutte le attività in ExecutorService finiscano non è precisamente il tuo objective, ma piuttosto aspettare fino al completamento di un gruppo specifico di attività , puoi utilizzare un CompletionService , in particolare un ExecutorCompletionService .

L’idea è di creare un ExecutorCompletionService avvolgendo il tuo Executor , inviare un numero noto di attività attraverso il CompletionService , quindi tracciare lo stesso numero di risultati dalla coda di completamento usando take() (che blocca) o poll() (che non lo fa) . Una volta che hai disegnato tutti i risultati attesi corrispondenti alle attività che hai inviato, sai che sono tutti fatti.

Permettetemi di dirlo ancora una volta, perché non è ovvio dall’interfaccia: dovete sapere quante cose inserite nel CompletionService per sapere quante cose cercare di tracciare. Ciò è particolarmente importante con il metodo take() : chiamalo una volta troppe e bloccherà il tuo thread chiamante fino a quando qualche altro thread invierà un altro job allo stesso CompletionService .

Ci sono alcuni esempi che mostrano come usare CompletionService nel libro Java Concurrency in Practice .

Se si desidera attendere il completamento dell’esecuzione del servizio executor, richiamare shutdown() e quindi attendere WaitTermination (unità, unitType) , ad esempio awaitTermination(1, MINUTE) . ExecutorService non blocca il proprio monitor, quindi non è ansible utilizzare l’ wait ecc.

Potresti aspettare che i lavori finiscano in un determinato intervallo:

 int maxSecondsPerComputeDTask = 20; try { while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) { // consider giving up with a 'break' statement under certain conditions } } catch (InterruptedException e) { throw new RuntimeException(e); } 

O potresti usare ExecutorService . invia ( Runnable ) e raccogli gli oggetti Future che restituisce e chiama get () a turno per aspettare che finiscano.

 ExecutorService es = Executors.newFixedThreadPool(2); Collection> futures = new LinkedList<>(); for (DataTable singleTable : uniquePhrases) { futures.add(es.submit(new ComputeDTask(singleTable))); } for (Future future : futures) { try { future.get(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } } 

InterruptedException è estremamente importante da gestire correttamente. È ciò che consente a te o agli utenti della tua biblioteca di terminare in sicurezza un lungo processo.

Basta usare

 latch = new CountDownLatch(noThreads) 

In ogni thread

 latch.countDown(); 

e come barriera

 latch.await(); 

Causa principale di IllegalMonitorStateException :

Viene generato per indicare che un thread ha tentato di attendere sul monitor di un object o di notificare altri thread in attesa sul monitor di un object senza possedere il monitor specificato.

Dal tuo codice, hai appena chiamato wait () su ExecutorService senza possedere il lock.

Sotto il codice verrà risolto IllegalMonitorStateException

 try { synchronized(es){ es.wait(); // Add some condition before you call wait() } } 

Seguire uno dei seguenti approcci per attendere il completamento di tutte le attività, che sono state inviate a ExecutorService .

  1. Esegui tutte le attività Future da submit su ExecutorService e controlla lo stato con la chiamata di blocco get() sull’object Future

  2. Utilizzo di invokeAll su ExecutorService

  3. Utilizzando CountDownLatch

  4. Uso di ForkJoinPool o newWorkStealingPool di Executors (da java 8)

  5. Arresta il pool come consigliato nella pagina della documentazione di Oracle

     void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } 

    Se si desidera attendere con grazia tutte le attività per il completamento quando si utilizza l’opzione 5 invece delle opzioni da 1 a 4, modificare

     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { 

    a

    un while(condition) che controlla ogni 1 minuto.

Ho anche la situazione in cui ho una serie di documenti da sottoporre a scansione. Comincio con un documento iniziale “seme” che dovrebbe essere elaborato, quel documento contiene collegamenti ad altri documenti che dovrebbero essere elaborati, e così via.

Nel mio programma principale, voglio solo scrivere qualcosa come il seguente, in cui Crawler controlla un gruppo di thread.

 Crawler c = new Crawler(); c.schedule(seedDocument); c.waitUntilCompletion() 

La stessa situazione accadrebbe se volessi navigare su un albero; inserirò il nodo root, il processore per ogni nodo aggiungerebbe i figli alla coda, se necessario, e un gruppo di thread elaborerebbe tutti i nodes dell’albero, finché non ce ne fossero altri.

Non ho trovato nulla nella JVM che pensavo fosse un po ‘sorprendente. Così ho scritto una class AutoStopThreadPool che si può usare direttamente o in sottoclass per aggiungere metodi adatti al dominio, ad es. schedule(Document) . Spero che sia d’aiuto!

AutoStopThreadPool Javadoc | Scaricare

Aggiungi tutti i thread nella raccolta e invokeAll utilizzando invokeAll . Se è ansible utilizzare il metodo invokeAll di ExecutorService , JVM non procederà alla riga successiva finché tutti i thread non saranno completi.

Ecco un buon esempio: invokeAll tramite ExecutorService

Invia i tuoi compiti nel Runner e poi attendi chiamando il metodo waitTillDone () in questo modo:

 Runner runner = Runner.runner(2); for (DataTable singleTable : uniquePhrases) { runner.run(new ComputeDTask(singleTable)); } // blocks until all tasks are finished (or failed) runner.waitTillDone(); runner.shutdown(); 

Per usarlo aggiungi questa dipendenza gradle / maven: 'com.github.matejtymes:javafixes:1.0'

Per maggiori dettagli guarda qui: https://github.com/MatejTymes/JavaFixes o qui: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

È ansible utilizzare il metodo ExecutorService.invokeAll , eseguirà tutte le attività e attenderà fino a quando tutti i thread hanno completato il loro compito.

Qui è completo javadoc

È anche ansible utilizzare la versione sovraccaricata di questo metodo per specificare il timeout.

Ecco un codice di esempio con ExecutorService.invokeAll

 public class Test { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService service = Executors.newFixedThreadPool(3); List> taskList = new ArrayList<>(); taskList.add(new Task1()); taskList.add(new Task2()); List> results = service.invokeAll(taskList); for (Future f : results) { System.out.println(f.get()); } } } class Task1 implements Callable { @Override public String call() throws Exception { try { Thread.sleep(2000); return "Task 1 done"; } catch (Exception e) { e.printStackTrace(); return " error in task1"; } } } class Task2 implements Callable { @Override public String call() throws Exception { try { Thread.sleep(3000); return "Task 2 done"; } catch (Exception e) { e.printStackTrace(); return " error in task2"; } } } 

Una semplice alternativa a questo è usare i thread insieme a join. Fare riferimento a: Unione di fili

Aspetterò solo che l’esecutore termini con un timeout specificato che pensi sia adatto per completare le attività.

  try { //do stuff here exe.execute(thread); } finally { exe.shutdown(); } boolean result = exe.awaitTermination(4, TimeUnit.HOURS); if (!result) { LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour."); } 

Sembra che sia necessario ForkJoinPool e utilizzare il pool globale per eseguire attività.

 public static void main(String[] args) { // the default `commonPool` should be sufficient for many cases. ForkJoinPool pool = ForkJoinPool.commonPool(); // The root of your task that may spawn other tasks. // Make sure it submits the additional tasks to the same executor that it is in. Runnable rootTask = new YourTask(pool); pool.execute(rootTask); pool.awaitQuiescence(...); // that's it. } 

Il bello è in pool.awaitQuiescence dove il metodo bloccherà utilizzare il thread del chiamante per eseguire i suoi compiti e poi tornare quando è veramente vuoto.

È ansible utilizzare un ThreadPoolExecutor con una dimensione del pool impostata su Runtime.getRuntime (). AvailableProcessors () e .execute() tutte le attività che si desidera eseguire, quindi chiama tpe.shutdown() e quindi attende un while(!tpe.terminated()) { /* waiting for all tasks to complete */} quali blocchi per tutte le attività inviate da completare. dove tpe è un riferimento all’istanza ThreadPoolExecutor .

O se è più appropriato utilizzare un ExecutorCompletionService A CompletionService che utilizza un Executor fornito per eseguire le attività. Questa class dispone che le attività inoltrate siano, una volta completate, posizionate su una coda accessibile utilizzando Take. La class è abbastanza leggera da essere adatta per l’uso transitorio durante l’elaborazione di gruppi di attività.

NOTA: questa soluzione non si blocca come ExecutorService.invokeAll() durante l’attesa di tutti i lavori. Quindi, mentre il più semplice è anche il più limitato in quanto bloccherà il thread principale dell’applicazione. Non va bene se si desidera che lo stato di ciò che sta accadendo, o per fare altre cose mentre questi lavori sono in esecuzione, come post-elaborazione dei risultati o produzione e risultato aggregato incrementale.