Java ExecutorService: attendi la determinazione di tutte le attività create in modo ricorsivo

Io uso un ExecutorService per eseguire un’attività. Questa attività può creare in modo ricorsivo altre attività che vengono inviate allo stesso ExecutorService e anche quelle attività secondarie possono farlo.

Ora ho il problema che voglio aspettare fino a quando tutte le attività sono terminate (cioè, tutte le attività sono finite e non ne hanno presentate di nuove) prima di continuare.

Non riesco a chiamare ExecutorService.shutdown() nel thread principale perché questo impedisce l’accettazione di nuove attività da parte di ExecutorService .

E Calling ExecutorService.awaitTermination() sembra non fare nulla se l’ shutdown non è stato chiamato.

Quindi sono un po ‘bloccato qui. Non può essere così difficile per ExecutorService vedere che tutti i lavoratori sono inattivi, vero? L’unica soluzione inelegante che potrei ThreadPoolExecutor è di usare direttamente un ThreadPoolExecutor e interrogare il suo getPoolSize() ogni tanto. Non c’è davvero modo migliore di farlo?

Se il numero di attività nell’albero delle attività ricorsive è inizialmente sconosciuto, forse il modo più semplice sarebbe implementare la propria primitiva di sincronizzazione, una sorta di “semaforo inverso” e condividerla tra i propri compiti. Prima di inviare ogni attività si incrementa un valore, quando l’attività è completata, diminuisce quel valore e si attende che il valore sia 0.

L’implementazione come primitiva separata chiamata esplicitamente dai task separa questa logica dall’implementazione del pool di thread e consente di inviare diversi alberi indipendenti di attività ricorsive nello stesso pool.

Qualcosa come questo:

 public class InverseSemaphore { private int value = 0; private Object lock = new Object(); public void beforeSubmit() { synchronized(lock) { value++; } } public void taskCompleted() { synchronized(lock) { value--; if (value == 0) lock.notifyAll(); } } public void awaitCompletion() throws InterruptedException { synchronized(lock) { while (value > 0) lock.wait(); } } } 

Nota che taskCompleted() dovrebbe essere chiamato all’interno di un blocco finally , per renderlo immune alle possibili eccezioni.

Si noti inoltre che beforeSubmit() deve essere richiamato dal thread di invio prima che l’attività venga inoltrata, non dall’attività stessa, per evitare possibili “falsi completamenti” quando le attività precedenti vengono completate e quelle nuove non ancora avviate.

EDIT: problema importante con il modello di utilizzo corretto.

Questo è davvero un candidato ideale per un phaser. Java 7 sta uscendo con questa nuova class. È un CountdonwLatch / CyclicBarrier flessibile. È ansible ottenere una versione stabile presso il sito di interesse JSR 166 .

Il modo in cui è più flessibile CountdownLatch / CyclicBarrier è perché è in grado non solo di supportare un numero sconosciuto di parti (thread) ma è anche riutilizzabile (è qui che entra in gioco la parte di fase)

Per ogni attività che invii verrai registrato, quando l’attività sarà completata arriverà. Questo può essere fatto in modo ricorsivo.

 Phaser phaser = new Phaser(); ExecutorService e = // Runnable recursiveRunnable = new Runnable(){ public void run(){ //do work recursively if you have to if(shouldBeRecursive){ phaser.register(); e.submit(recursiveRunnable); } phaser.arrive(); } } public void doWork(){ int phase = phaser.getPhase(); phaser.register(); e.submit(recursiveRunnable); phaser.awaitAdvance(phase); } 

Modifica: Grazie a @depthofreality per indicare la condizione della gara nel mio esempio precedente. Lo sto aggiornando in modo che l’esecuzione del thread attenda solo l’avanzamento della fase corrente mentre blocca il completamento della funzione ricorsiva.

Il numero di fase non scatterà fino a quando il numero di arrive s == register s. Poiché prima di ogni richiamo di chiamata ricorsiva si verifica un incremento di fase quando tutte le invocazioni sono complete.

Wow, voi ragazzi siete veloci 🙂

Grazie per tutti i suggerimenti. I futures non si integrano facilmente con il mio modello perché non so quanti runnables sono programmati in anticipo. Quindi, se tengo vivo il compito di un genitore solo per aspettare che le attività ricorsive dei bambini finiscano, ho un mucchio di spazzatura in giro.

Ho risolto il mio problema usando il suggerimento di AtomicInteger. Essenzialmente, ho sottoclass ThreadPoolExecutor e incremento il contatore delle chiamate da eseguire () e decrementa le chiamate a afterExecute (). Quando il contatore ottiene 0, chiamo shutdown (). Questo sembra funzionare per i miei problemi, non sono sicuro che sia un modo generalmente buono per farlo. Soprattutto, presumo che tu usi solo execute () per aggiungere Runnables.

Come nodo laterale: prima ho provato a registrare afterExecute () il numero di Runnables nella coda e il numero di worker attivi e di arresto quando quelli sono 0; ma questo non ha funzionato perché non tutti i Runnables si sono presentati in coda e il getActiveCount () non ha fatto quello che mi aspettavo.

Comunque, ecco la mia soluzione: (se qualcuno trova seri problemi con questo, per favore fatemelo sapere 🙂

 public class MyThreadPoolExecutor extends ThreadPoolExecutor { private final AtomicInteger executing = new AtomicInteger(0); public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime, TimeUnit seconds, BlockingQueue queue) { super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue); } @Override public void execute(Runnable command) { //intercepting beforeExecute is too late! //execute() is called in the parent thread before it terminates executing.incrementAndGet(); super.execute(command); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); int count = executing.decrementAndGet(); if(count == 0) { this.shutdown(); } } } 

È ansible creare il proprio pool di thread che estende ThreadPoolExecutor . Vuoi sapere quando un compito è stato inviato e quando viene completato.

 public class MyThreadPoolExecutor extends ThreadPoolExecutor { private int counter = 0; public MyThreadPoolExecutor() { super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue()); } @Override public synchronized void execute(Runnable command) { counter++; super.execute(command); } @Override protected synchronized void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); counter--; notifyAll(); } public synchronized void waitForExecuted() throws InterruptedException { while (counter == 0) wait(); } } 

Usa un futuro per le tue attività (invece di inviare Runnable ), un callback aggiorna il suo stato quando è completato, quindi puoi utilizzare Future.isDone per tracciare lo stato di tutte le tue attività.

(mea culpa: è un po ‘passato la mia ora di andare a dormire;) ma ecco un primo tentativo di chiusura dynamic):

 package oss.alphazero.sto4958330; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class DynamicCountDownLatch { @SuppressWarnings("serial") private static final class Sync extends AbstractQueuedSynchronizer { private final CountDownLatch toplatch; public Sync() { setState(0); this.toplatch = new CountDownLatch(1); } @Override protected int tryAcquireShared(int acquires){ try { toplatch.await(); } catch (InterruptedException e) { throw new RuntimeException("Interrupted", e); } return getState() == 0 ? 1 : -1; } public boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } public boolean tryExtendState(int acquires) { for (;;) { int s = getState(); int exts = s+1; if (compareAndSetState(s, exts)) { toplatch.countDown(); return exts > 0; } } } } private final Sync sync; public DynamicCountDownLatch(){ this.sync = new Sync(); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } public void join() { sync.tryExtendState(1); } } 

Questo latch introduce un nuovo metodo join () all’API CountDownLatch (clonata) esistente, utilizzata dalle attività per segnalare la loro immissione nel gruppo di attività più ampio.

Il latch viene passato dall’attività padre all’attività figlio. Ogni attività dovrebbe, per modello di Suraj, prima ‘join ()’ il latch, eseguire il suo task () e quindi countDown ().

Per affrontare situazioni in cui il thread principale avvia il gruppo di task e quindi attende immediatamente () – prima che uno qualsiasi dei thread di attività abbia avuto la possibilità di unirsi a join () – il topLatch viene utilizzato nella class di Sync interna int. Questo è un latch che verrà contato su ogni join (); solo il primo conto alla rovescia è ovviamente significativo, poiché tutti i successivi sono nops.

L’implementazione iniziale di cui sopra introduce una sorta di ruga semantica dal momento che tryAcquiredShared (int) non deve generare un InterruptedException, ma poi dobbiamo occuparci dell’interrupt sull’attesa sul topLatch.

Si tratta di un miglioramento rispetto alla soluzione propria dell’OP che utilizza i contatori atomici? Direi probabilmente che non è IFF che è insistente nell’usare gli Executor, ma è, credo, un approccio alternativo altrettanto valido che usa l’AQS in quel caso, ed è utilizzabile anche con thread generici.

Crit lontano colleghi hacker.

Se si desidera utilizzare le classi JSR166y – ad esempio Phaser o Fork / Join – che potrebbero funzionare per voi, è sempre ansible scaricare il backport Java 6 da: http://gee.cs.oswego.edu/dl/concurrency -interessare / e usarlo come base piuttosto che scrivere una soluzione completamente homebrew. Poi, quando esce 7, puoi semplicemente eliminare la dipendenza dal backport e modificare alcuni nomi di pacchetti.

(Full disclosure: abbiamo utilizzato LinkedTransferQueue in prodotto per un po ‘di tempo. Nessun problema)

Devo dire che le soluzioni descritte sopra del problema con l’attività di chiamata ricorsiva e l’attesa per le attività di sottordine di fine non mi soddisfano. C’è la mia soluzione ispirata alla documentazione originale di Oracle: CountDownLatch ed esempio: Risorse umane CountDownLatch .

Il primo thread comune in corso, ad esempio, nella class HRManagerCompact ha un latch in attesa per i thread di due figlia, che ha latch in attesa per i successivi thread della figlia 2 … ecc.

Naturalmente, latch può essere impostato su un valore diverso da 2 (nel costruttore di CountDownLatch), così come il numero di oggetti eseguibili può essere stabilito in iteration cioè ArrayList, ma deve corrispondere (il numero di count down deve essere uguale al parametro nel costruttore CountDownLatch).

Attenzione, il numero di fermi aumenta in modo esponenziale in base alle condizioni di restrizione: ‘level.get () <2', nonché il numero di oggetti. 1, 2, 4, 8, 16 ... e latch 0, 1, 2, 4 ... Come potete vedere, per quattro livelli (level.get () <4) ci saranno 15 thread in attesa e 7 latch nel tempo, quando il picco 16 thread sono in esecuzione.

 package processes.countdownlatch.hr; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** Recursively latching running classs to wait for the peak threads * * @author hariprasad */ public class HRManagerCompact extends Thread { final int N = 2; // number of daughter's tasks for latch CountDownLatch countDownLatch; CountDownLatch originCountDownLatch; AtomicInteger level = new AtomicInteger(0); AtomicLong order = new AtomicLong(0); // id latched thread waiting for HRManagerCompact techLead1 = null; HRManagerCompact techLead2 = null; HRManagerCompact techLead3 = null; // constructor public HRManagerCompact(CountDownLatch countDownLatch, String name, AtomicInteger level, AtomicLong order){ super(name); this.originCountDownLatch=countDownLatch; this.level = level; this.order = order; } private void doIt() { countDownLatch = new CountDownLatch(N); AtomicInteger leveli = new AtomicInteger(level.get() + 1); AtomicLong orderi = new AtomicLong(Thread.currentThread().getId()); techLead1 = new HRManagerCompact(countDownLatch, "first", leveli, orderi); techLead2 = new HRManagerCompact(countDownLatch, "second", leveli, orderi); //techLead3 = new HRManagerCompact(countDownLatch, "third", leveli); techLead1.start(); techLead2.start(); //techLead3.start(); try { synchronized (Thread.currentThread()) { // to prevent print and latch in the same thread System.out.println("*** HR Manager waiting for recruitment to complete... " + level + ", " + order + ", " + orderi); countDownLatch.await(); // wait actual thread } System.out.println("*** Distribute Offer Letter, it means finished. " + level + ", " + order + ", " + orderi); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + ": working... " + level + ", " + order + ", " + Thread.currentThread().getId()); Thread.sleep(10*level.intValue()); if (level.get() < 2) doIt(); Thread.yield(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } /*catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }*/ // TODO Auto-generated method stub System.out.println("--- " +Thread.currentThread().getName() + ": recruted " + level + ", " + order + ", " + Thread.currentThread().getId()); originCountDownLatch.countDown(); // count down } public static void main(String args[]){ AtomicInteger levelzero = new AtomicInteger(0); HRManagerCompact hr = new HRManagerCompact(null, "zero", levelzero, new AtomicLong(levelzero.longValue())); hr.doIt(); } } 

Possibile output commentato (con qualche probabilità):

 first: working... 1, 1, 10 // thread 1, first daughter's task (10) second: working... 1, 1, 11 // thread 1, second daughter's task (11) first: working... 2, 10, 12 // thread 10, first daughter's task (12) first: working... 2, 11, 14 // thread 11, first daughter's task (14) second: working... 2, 11, 15 // thread 11, second daughter's task (15) second: working... 2, 10, 13 // thread 10, second daughter's task (13) --- first: recruted 2, 10, 12 // finished 12 --- first: recruted 2, 11, 14 // finished 14 --- second: recruted 2, 10, 13 // finished 13 (now can be opened latch 10) --- second: recruted 2, 11, 15 // finished 15 (now can be opened latch 11) *** HR Manager waiting for recruitment to complete... 0, 0, 1 *** HR Manager waiting for recruitment to complete... 1, 1, 10 *** Distribute Offer Letter, it means finished. 1, 1, 10 // latch on 10 opened --- first: recruted 1, 1, 10 // finished 10 *** HR Manager waiting for recruitment to complete... 1, 1, 11 *** Distribute Offer Letter, it means finished. 1, 1, 11 // latch on 11 opened --- second: recruted 1, 1, 11 // finished 11 (now can be opened latch 1) *** Distribute Offer Letter, it means finished. 0, 0, 1 // latch on 1 opened 

Usa CountDownLatch . Passa l’object CountDownLatch a ciascuna delle tue attività e codifica le tue attività come di seguito.

 public void doTask() { // do your task latch.countDown(); } 

Mentre il thread che deve attendere dovrebbe eseguire il seguente codice:

 public void doWait() { latch.await(); } 

Ovviamente, questo presuppone che tu sappia già il numero di compiti figli in modo da poter inizializzare il conteggio del latch.

L’unica soluzione inelegante che potrei inventare è di usare direttamente un ThreadPoolExecutor e interrogare il suo getPoolSize () ogni tanto. Non c’è davvero modo migliore di farlo?

Devi usare i metodi shutdown() , awaitTermination () and shutdownNow() in una sequenza corretta.

shutdown() : avvia uno spegnimento ordinato in cui vengono eseguite le attività precedentemente inviate, ma non verranno accettate nuove attività.

awaitTermination() : Blocca fino a quando tutte le attività hanno completato l’esecuzione dopo una richiesta di arresto, oppure si verifica il timeout o il thread corrente viene interrotto, a seconda di quale evento si verifica per primo.

shutdownNow() : tenta di interrompere tutte le attività che eseguono triggersmente, interrompe l’elaborazione delle attività in attesa e restituisce un elenco delle attività che erano in attesa di esecuzione.

Modo consigliato dalla pagina di documentazione di Oracle di ExecutorService :

  void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } 

È ansible sostituire se condizione con condizione di tempo in caso di lunga durata nel completamento delle attività come di seguito:

Modificare

 if (!pool.awaitTermination(60, TimeUnit.SECONDS)) 

A

  while(!pool.awaitTermination(60, TimeUnit.SECONDS)) { Thread.sleep(60000); } 

Puoi fare riferimento ad altre alternative (eccetto join() , che può essere usato con thread autonomo) in:

attendi che tutti i thread finiscano il loro lavoro in java