ExecutorService che interrompe le attività dopo un timeout

Sto cercando un’implementazione ExecutorService che può essere fornita con un timeout. Le attività inviate a ExecutorService vengono interrotte se impiegano più tempo del timeout per l’esecuzione. Implementare una tale bestia non è un compito così difficile, ma mi chiedo se qualcuno sa di un’implementazione esistente.

Ecco cosa mi è venuto in mente in base ad alcune delle discussioni seguenti. Qualche commento?

import java.util.List; import java.util.concurrent.*; public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor { private final long timeout; private final TimeUnit timeoutUnit; private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); private final ConcurrentMap runningTasks = new ConcurrentHashMap(); public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.timeout = timeout; this.timeoutUnit = timeoutUnit; } @Override public void shutdown() { timeoutExecutor.shutdown(); super.shutdown(); } @Override public List shutdownNow() { timeoutExecutor.shutdownNow(); return super.shutdownNow(); } @Override protected void beforeExecute(Thread t, Runnable r) { if(timeout > 0) { final ScheduledFuture scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit); runningTasks.put(r, scheduled); } } @Override protected void afterExecute(Runnable r, Throwable t) { ScheduledFuture timeoutTask = runningTasks.remove(r); if(timeoutTask != null) { timeoutTask.cancel(false); } } class TimeoutTask implements Runnable { private final Thread thread; public TimeoutTask(Thread thread) { this.thread = thread; } @Override public void run() { thread.interrupt(); } } } 

È ansible utilizzare un servizio ScheduledExecutor per questo. Per prima cosa lo invii solo una volta per iniziare immediatamente e conservare il futuro che viene creato. Successivamente è ansible inviare una nuova attività che annullerebbe il futuro conservato dopo un certo periodo di tempo.

  ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); final Future handler = executor.submit(new Callable(){ ... }); executor.schedule(new Runnable(){ public void run(){ handler.cancel(); } }, 10000, TimeUnit.MILLISECONDS); 

Questo eseguirà il tuo gestore (funzionalità principale da interrompere) per 10 secondi, quindi annullerà (cioè interromperà) quell’attività specifica.

Sfortunatamente la soluzione è difettosa. Esiste una sorta di bug con ScheduledThreadPoolExecutor , riportato anche in questa domanda : l’annullamento di un’attività inoltrata non rilascia completamente le risorse di memoria associate all’attività; le risorse vengono rilasciate solo quando l’attività scade.

Se quindi si crea un TimeoutThreadPoolExecutor con una scadenza piuttosto lunga (un utilizzo tipico) e si inoltrano le attività abbastanza velocemente, si finisce col riempire la memoria, anche se le attività sono state effettivamente completate correttamente.

Puoi vedere il problema con il seguente (molto grezzo) programma di test:

 public static void main(String[] args) throws InterruptedException { ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue(), 10, TimeUnit.MINUTES); //ExecutorService service = Executors.newFixedThreadPool(1); try { final AtomicInteger counter = new AtomicInteger(); for (long i = 0; i < 10000000; i++) { service.submit(new Runnable() { @Override public void run() { counter.incrementAndGet(); } }); if (i % 10000 == 0) { System.out.println(i + "/" + counter.get()); while (i > counter.get()) { Thread.sleep(10); } } } } finally { service.shutdown(); } } 

Il programma esaurisce la memoria disponibile, anche se attende che i Runnable spawn siano completati.

Ho pensato a questo per un po ‘, ma sfortunatamente non sono riuscito a trovare una buona soluzione.

EDIT: ho scoperto che questo problema è stato segnalato come bug JDK 6602600 e sembra essere stato risolto molto recentemente.

Avvolgi l’attività in FutureTask e puoi specificare il timeout per il FutureTask. Guarda l’esempio nella mia risposta a questa domanda,

timeout del processo nativo java

Che ne dici di utilizzare il metodo ExecutorService.shutDownNow() come descritto in http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html ? Sembra essere la soluzione più semplice.

Sembra che il problema non si trovi nell’errore di JDK 6602600 (è stato risolto nel 2010-05-22), ma in una chiamata errata di sleep (10) in cerchio. Nota addizionale, che il thread principale deve dare direttamente CHANCE ad altri thread per realizzare le loro attività invocando SLEEP (0) in OGNI ramo del cerchio esterno. È meglio, penso, utilizzare Thread.yield () invece di Thread.sleep (0)

Il risultato corretto parte del precedente codice problema è simile a questo:

 ....................... ........................ Thread.yield(); if (i % 1000== 0) { System.out.println(i + "/" + counter.get()+ "/"+service.toString()); } // // while (i > counter.get()) { // Thread.sleep(10); // } 

Funziona correttamente con la quantità di contatore esterno fino a 150.000.000 di cerchi testati.

Dopo un sacco di tempo per sondare,
Infine, utilizzo il metodo invokeAll di ExecutorService per risolvere questo problema.
Ciò interromperà rigorosamente l’attività durante l’esecuzione dell’attività.
Ecco un esempio

 ExecutorService executorService = Executors.newCachedThreadPool(); try { List> callables = new ArrayList<>(); // Add your long time task (callable) callables.add(new VaryLongTimeTask()); // Assign tasks for specific execution timeout (eg 2 sec) List> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS); for (Future future : futures) { // Getting result } } catch (InterruptedException e) { e.printStackTrace(); } executorService.shutdown(); 

Il professionista è anche ansible inviare ListenableFuture allo stesso ExecutorService .
Basta modificare leggermente la prima riga di codice.

 ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); 

ListeningExecutorService è la funzione di ascolto di ExecutorService nel progetto google guava ( com.google.guava ))

Usando la risposta di John W ho creato un’implementazione che inizia correttamente il timeout quando l’attività inizia la sua esecuzione. Scrivo anche un test unitario per questo 🙂

Tuttavia, non soddisfa le mie esigenze poiché alcune operazioni IO non si interrompono quando viene chiamato Future.cancel() (cioè quando viene chiamato Thread.interrupted() ).

Ad ogni modo se qualcuno è interessato, ho creato una sintesi: https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf

Che dire di questa idea alternativa:

  • due hanno due esecutori:
    • uno per :
      • invio del compito, senza preoccuparsi del timeout dell’attività
      • aggiungendo il risultato futuro e il momento in cui dovrebbe terminare con una struttura interna
    • uno per eseguire un lavoro interno che sta controllando la struttura interna se alcune attività sono scadute e se devono essere cancellate.

Piccolo esempio è qui:

 public class AlternativeExecutorService { private final CopyOnWriteArrayList futureQueue = new CopyOnWriteArrayList(); private final ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job private final ListeningExecutorService threadExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for private ScheduledFuture scheduledFuture; private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L; public AlternativeExecutorService() { scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS); } public void pushTask(OwnTask task) { ListenableFuture future = threadExecutor.submit(task); // -> create your Callable futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end } public void shutdownInternalScheduledExecutor() { scheduledFuture.cancel(true); scheduledExecutor.shutdownNow(); } long getCurrentMillisecondsTime() { return Calendar.getInstance().get(Calendar.MILLISECOND); } class ListenableFutureTask { private final ListenableFuture future; private final OwnTask task; private final long milliSecEndTime; private ListenableFutureTask(ListenableFuture future, OwnTask task, long milliSecStartTime) { this.future = future; this.task = task; this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS); } ListenableFuture getFuture() { return future; } OwnTask getTask() { return task; } long getMilliSecEndTime() { return milliSecEndTime; } } class TimeoutManagerJob implements Runnable { CopyOnWriteArrayList getCopyOnWriteArrayList() { return futureQueue; } @Override public void run() { long currentMileSecValue = getCurrentMillisecondsTime(); for (ListenableFutureTask futureTask : futureQueue) { consumeFuture(futureTask, currentMileSecValue); } } private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue) { ListenableFuture future = futureTask.getFuture(); boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue; if (isTimeout) { if (!future.isDone()) { future.cancel(true); } futureQueue.remove(futureTask); } } } class OwnTask implements Callable { private long timeoutDuration; private TimeUnit timeUnit; OwnTask(long timeoutDuration, TimeUnit timeUnit) { this.timeoutDuration = timeoutDuration; this.timeUnit = timeUnit; } @Override public Void call() throws Exception { // do logic return null; } public long getTimeoutDuration() { return timeoutDuration; } public TimeUnit getTimeUnit() { return timeUnit; } } }