Gestione delle eccezioni dalle attività di ExecutorService di Java

Sto cercando di utilizzare la class ThreadPoolExecutor di Java per eseguire un numero elevato di attività pesanti con un numero fisso di thread. Ognuna delle attività ha molte posizioni durante le quali potrebbe non riuscire a causa di eccezioni.

Ho sottoclassato ThreadPoolExecutor e ho sovrascritto il metodo afterExecute che dovrebbe fornire tutte le eccezioni non afterExecute durante l’esecuzione di un’attività. Tuttavia, non riesco a farlo funzionare.

Per esempio:

 public class ThreadPoolErrors extends ThreadPoolExecutor { public ThreadPoolErrors() { super( 1, // core threads 1, // max threads 1, // timeout TimeUnit.MINUTES, // timeout units new LinkedBlockingQueue() // work queue ); } protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if(t != null) { System.out.println("Got an error: " + t); } else { System.out.println("Everything's fine--situation normal!"); } } public static void main( String [] args) { ThreadPoolErrors threadPool = new ThreadPoolErrors(); threadPool.submit( new Runnable() { public void run() { throw new RuntimeException("Ouch! Got an error."); } } ); threadPool.shutdown(); } } 

L’output di questo programma è “Tutto a posto – situazione normale!” anche se l’unico Runnable inviato al pool di thread genera un’eccezione. Qualche indizio su cosa sta succedendo qui?

Grazie!

    Dai documenti :

    Nota: quando le azioni sono racchiuse in attività (come FutureTask) in modo esplicito o tramite metodi come submit, questi oggetti task acquisiscono e gestiscono eccezioni di calcolo e pertanto non causano interruzioni improvvise e le eccezioni interne non vengono passate a questo metodo .

    Quando invii un Runnable, verrà avvolto in un Futuro.

    Il tuo afterExecute dovrebbe essere qualcosa del genere:

     public final class ExtendedExecutor extends ThreadPoolExecutor { // ... protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future) { try { Future future = (Future) r; if (future.isDone()) { future.get(); } } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } if (t != null) { System.out.println(t); } } } 

    ATTENZIONE : Va notato che questa soluzione bloccherà il thread chiamante.


    Se si desidera elaborare le eccezioni generate dall’attività, in genere è preferibile utilizzare Callable anziché Runnable .

    Callable.call() è autorizzato a generare eccezioni controllate e queste vengono propagate al thread chiamante:

     Callable task = ... Future future = executor.submit(task); try { future.get(); } catch (ExecutionException ex) { ex.getCause().printStackTrace(); } 

    Se Callable.call() genera un’eccezione, questa verrà Future.get() in ExecutionException e generata da Future.get() .

    Questo è probabilmente preferibile alla sottoclass di ThreadPoolExecutor . Ti dà anche la possibilità di inviare nuovamente l’attività se l’eccezione è recuperabile.

    La spiegazione di questo comportamento è corretta in javadoc per afterExecute :

    Nota: quando le azioni sono racchiuse in attività (come FutureTask) in modo esplicito o tramite metodi come submit, questi oggetti task acquisiscono e gestiscono eccezioni di calcolo e pertanto non causano interruzioni improvvise e le eccezioni interne non vengono passate a questo metodo .

    L’ho aggirato avvolgendo il runnable fornito sottoposto all’esecutore.

     CompletableFuture.runAsync( () -> { try { runnable.run(); } catch (Throwable e) { Log.info(Concurrency.class, "runAsync", e); } }, executorService ); 

    Sto usando la class VerboseRunnable da jcabi-log , che ingoia tutte le eccezioni e le registra. Molto comodo, ad esempio:

     import com.jcabi.log.VerboseRunnable; scheduler.scheduleWithFixedDelay( new VerboseRunnable( Runnable() { public void run() { // the code, which may throw } }, true // it means that all exceptions will be swallowed and logged ), 1, 1, TimeUnit.MILLISECONDS ); 

    Un’altra soluzione potrebbe essere l’uso di ManagedTask e ManagedTaskListener .

    È necessario un Callable o Runnable che implementa l’interfaccia ManagedTask .

    Il metodo getManagedTaskListener restituisce l’istanza desiderata.

     public ManagedTaskListener getManagedTaskListener() { 

    E implementa in ManagedTaskListener il metodo taskDone :

     @Override public void taskDone(Future future, ManagedExecutorService executor, Object task, Throwable exception) { if (exception != null) { LOGGER.log(Level.SEVERE, exception.getMessage()); } } 

    Ulteriori dettagli sul ciclo di vita delle attività gestite e listener .

    Se si desidera monitorare l’esecuzione dell’attività, è ansible ruotare 1 o 2 thread (forse più a seconda del carico) e utilizzarli per eseguire attività da un wrapper ExecutionCompletionService.

    Se ExecutorService proviene da un’origine esterna (ovvero non è ansible eseguire la sottoclass di ThreadPoolExecutor e sovrascrivere afterExecute() ), è ansible utilizzare un proxy dinamico per ottenere il comportamento desiderato:

     public static ExecutorService errorAware(final ExecutorService executor) { return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] {ExecutorService.class}, (proxy, method, args) -> { if (method.getName().equals("submit")) { final Object arg0 = args[0]; if (arg0 instanceof Runnable) { args[0] = new Runnable() { @Override public void run() { final Runnable task = (Runnable) arg0; try { task.run(); if (task instanceof Future) { final Future future = (Future) task; if (future.isDone()) { try { future.get(); } catch (final CancellationException ce) { // Your error-handling code here ce.printStackTrace(); } catch (final ExecutionException ee) { // Your error-handling code here ee.getCause().printStackTrace(); } catch (final InterruptedException ie) { Thread.currentThread().interrupt(); } } } } catch (final RuntimeException re) { // Your error-handling code here re.printStackTrace(); throw re; } catch (final Error e) { // Your error-handling code here e.printStackTrace(); throw e; } } }; } else if (arg0 instanceof Callable) { args[0] = new Callable() { @Override public Object call() throws Exception { final Callable task = (Callable) arg0; try { return task.call(); } catch (final Exception e) { // Your error-handling code here e.printStackTrace(); throw e; } catch (final Error e) { // Your error-handling code here e.printStackTrace(); throw e; } } }; } } return method.invoke(executor, args); }); } 

    Questo a causa di AbstractExecutorService :: submit sta FutureTask in RunnableFuture (nient’altro che FutureTask ) come sotto

     AbstractExecutorService.java public Future submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, null); /////////HERE//////// execute(ftask); return ftask; } 

    Quindi execute lo passerà a Worker e Worker.run() chiamerà il seguente.

     ThreadPoolExecutor.java final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); /////////HERE//////// } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } 

    Finalmente task.run(); nel codice sopra la chiamata chiamerà FutureTask.run() . Ecco il codice del gestore delle eccezioni, perché NON ottieni l’eccezione prevista.

     class FutureTask implements RunnableFuture public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { /////////HERE//////// result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } 

    Questo funziona

    • È derivato da SingleThreadExecutor, ma è ansible adattarlo facilmente
    • Codice Java 8 lamdas, ma facile da risolvere

    Creerà un Executor con un singolo thread, che può ottenere molte attività; e attenderà che l’attuale termini l’esecuzione per iniziare con il prossimo

    In caso di errore o eccezione di uncaugth, uncaughtExceptionHandler lo prenderà

     public final class SingleThreadExecutorWithExceptions {
    
         public static ExecutorService newSingleThreadExecutorWithExceptions (final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
    
             ThreadFactory factory = (Runnable runnable) -> {
                 thread finale newThread = new Thread (eseguibile, "SingleThreadExecutorWithExceptions");
                 newThread.setUncaughtExceptionHandler ((finale Thread caugthThread, finale Throwable throwable) -> {
                     uncaughtExceptionHandler.uncaughtException (caugthThread, throwable);
                 });
                 return newThread;
             };
             ritorna nuovo FinalizableDelegatedExecutorService
                     (nuovo ThreadPoolExecutor (1, 1,
                             0L, TimeUnit.MILLISECONDS,
                             nuovo LinkedBlockingQueue (),
                             fabbrica){
    
    
                         protected void afterExecute (Runnable eseguibile, Throwable throwable) {
                             super.afterExecute (runnable, throwable);
                             if (throwable == null && runnable instanceof Future) {
                                 provare {
                                     Futuro futuro = (Futuro) percorribile;
                                     if (future.isDone ()) {
                                         future.get ();
                                     }
                                 } catch (CancellationException ce) {
                                     throwable = ce;
                                 } catch (ExecutionException ee) {
                                     throwable = ee.getCause ();
                                 } catch (InterruptedException ie) {
                                     Thread.currentThread () interrupt ().;  // ignora / ripristina
                                 }
                             }
                             if (throwable! = null) {
                                 uncaughtExceptionHandler.uncaughtException (Thread.currentThread (), throwable);
                             }
                         }
                     });
         }
    
    
    
         class statica privata FinalizableDelegatedExecutorService
                 estende DelegatedExecutorService {
             FinalizableDelegatedExecutorService (ExecutorService executor) {
                 super (esecutore);
             }
             protected void finalize () {
                 super.shutdown ();
             }
         }
    
         / **
          * Una class wrapper che espone solo i metodi ExecutorService
          * di un'implementazione di ExecutorService.
          * /
         class statica privata DelegatedExecutorService estende AbstractExecutorService {
             private final ExecutorService e;
             DelegatedExecutorService (ExecutorService executor) {e = executor;  }
             public void execute (comando Runnable) {e.execute (comando);  }
             public void shutdown () {e.shutdown ();  }
             public List shutdownNow () {return e.shutdownNow ();  }
             public boolean isShutdown () {return e.isShutdown ();  }
             public boolean isTerminated () {return e.isTerminated ();  }
             public boolean awaitTermination (long timeout, unità TimeUnit)
                     lancia InterruptedException {
                 restituire e.awaitTermination (timeout, unità);
             }
             invio pubblico futuro (attività eseguibile) {
                 return e.submit (task);
             }
             invio pubblico futuro (operazione richiamabile) {
                 return e.submit (task);
             }
             invio pubblico futuro (attività eseguibile, risultato T) {
                 return e.submit (task, result);
             }
             elenco pubblico> invokeAll (Collection> tasks)
                     lancia InterruptedException {
                 return e.invokeAll (tasks);
             }
             elenco pubblico> invokeAll (Collection> tasks,
                                                  timeout lungo, unità TimeUnit)
                     lancia InterruptedException {
                 return e.invokeAll (tasks, timeout, unit);
             }
             public T invokeAny (Collection> tasks)
                     lancia InterruptedException, ExecutionException {
                 return e.invokeAny (tasks);
             }
             public T invokeAny (Collection> tasks,
                                    timeout lungo, unità TimeUnit)
                     lancia InterruptedException, ExecutionException, TimeoutException {
                 return e.invokeAny (tasks, timeout, unit);
             }
         }
    
    
    
         private SingleThreadExecutorWithExceptions () {}
     }
    

    Invece di sottoclass ThreadPoolExecutor, lo fornirei con un’istanza ThreadFactory che crea nuovi thread e fornisce loro un UncaughtExceptionHandler