Scala Futures – costruito nel timeout?

c’è un aspetto del futuro che non capisco esattamente dal tutorial ufficiale ref. http://docs.scala-lang.org/overviews/core/futures.html

I futuri in scala hanno un meccanismo di time-out di qualche tipo? Supponiamo che l’esempio sotto sia un file di testo di 5 gigabyte … l’ambito implicito di “Implicits.global” alla fine causa l’triggerszione di onFailure in modo non bloccante o può essere definita? E senza un timeout predefinito di qualche tipo, non implicherebbe che né il successo né il fallimento avrebbero mai potuto sparare?

import scala.concurrent._ import ExecutionContext.Implicits.global val firstOccurence: Future[Int] = future { val source = scala.io.Source.fromFile("myText.txt") source.toSeq.indexOfSlice("myKeyword") } firstOccurence onSuccess { case idx => println("The keyword first appears at position: " + idx) } firstOccurence onFailure { case t => println("Could not process file: " + t.getMessage) } 

Si ottiene un comportamento di timeout solo quando si utilizza il blocco per ottenere i risultati del Future . Se si desidera utilizzare i callback non bloccanti onComplete , onSuccess o onFailure , è necessario eseguire la gestione del timeout. Akka ha incorporato la gestione del timeout per la richiesta / risposta ( ? ) Di messaggistica tra attori, ma non è sicuro se si desidera iniziare ad usare Akka. FWIW, in Akka, per la gestione del timeout, compongono due Futures insieme tramite Future.firstCompletedOf , uno che rappresenta l’attuale compito asincrono e uno che rappresenta il timeout. Se il timer di timeout (tramite un HashedWheelTimer ) appare per primo, si ottiene un errore sul callback asincrono.

Un esempio molto semplice di rolling your own potrebbe essere qualcosa di simile. Innanzitutto, un object per la pianificazione dei timeout:

 import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout} import java.util.concurrent.TimeUnit import scala.concurrent.duration.Duration import scala.concurrent.Promise import java.util.concurrent.TimeoutException object TimeoutScheduler{ val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS) def scheduleTimeout(promise:Promise[_], after:Duration) = { timer.newTimeout(new TimerTask{ def run(timeout:Timeout){ promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis")) } }, after.toNanos, TimeUnit.NANOSECONDS) } } 

Quindi una funzione per prendere un futuro e aggiungere un comportamento timeout ad esso:

 import scala.concurrent.{Future, ExecutionContext, Promise} import scala.concurrent.duration.Duration def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = { val prom = Promise[T]() val timeout = TimeoutScheduler.scheduleTimeout(prom, after) val combinedFut = Future.firstCompletedOf(List(fut, prom.future)) fut onComplete{case result => timeout.cancel()} combinedFut } 

Si noti che l’ HashedWheelTimer che sto usando qui proviene da Netty.

Ho appena creato una lezione TimeoutFuture per un collega:

TimeoutFuture

 package model import scala.concurrent._ import scala.concurrent.duration._ import play.libs.Akka import play.api.libs.concurrent.Execution.Implicits._ object TimeoutFuture { def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = { val prom = promise[A] // timeout logic Akka.system.scheduler.scheduleOnce(timeout) { prom tryFailure new java.util.concurrent.TimeoutException } // business logic Future { prom success block } prom.future } } 

uso

 val future = TimeoutFuture(10 seconds) { // do stuff here } future onComplete { case Success(stuff) => // use "stuff" case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block) } 

Gli appunti:

  • Assume il gioco! quadro (ma è abbastanza facile adattarsi)
  • Ogni pezzo di codice viene eseguito nello stesso ExecutionContext che potrebbe non essere l’ideale.

Tutte queste risposte richiedono dipendenze aggiuntive. Ho deciso di scrivere una versione utilizzando java.util.Timer che è un modo efficiente per eseguire una funzione in futuro, in questo caso per triggersre un timeout.

Post del blog con maggiori dettagli qui

Usando questo con Scala’s Promise, possiamo creare un futuro con timeout come segue:

 package justinhj.concurrency import java.util.concurrent.TimeoutException import java.util.{Timer, TimerTask} import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, Future, Promise} import scala.language.postfixOps object FutureUtil { // All Future's that use futureWithTimeout will use the same Timer object // it is thread safe and scales to thousands of active timers // The true parameter ensures that timeout timers are daemon threads and do not stop // the program from shutting down val timer: Timer = new Timer(true) /** * Returns the result of the provided future within the given time or a timeout exception, whichever is first * This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a * Thread.sleep would * @param future Caller passes a future to execute * @param timeout Time before we return a Timeout exception instead of future's outcome * @return Future[T] */ def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = { // Promise will be fulfilled with either the callers Future or the timer task if it times out val p = Promise[T] // and a Timer task to handle timing out val timerTask = new TimerTask() { def run() : Unit = { p.tryFailure(new TimeoutException()) } } // Set the timeout to check in the future timer.schedule(timerTask, timeout.toMillis) future.map { a => if(p.trySuccess(a)) { timerTask.cancel() } } .recover { case e: Exception => if(p.tryFailure(e)) { timerTask.cancel() } } p.future } } 

Play framework contiene Promise.timeout in modo da poter scrivere codice come segue

 private def get(): Future[Option[Boolean]] = { val timeoutFuture = Promise.timeout(None, Duration("1s")) val mayBeHaveData = Future{ // do something Some(true) } // if timeout occurred then None will be result of method Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture)) } 

Puoi specificare il timeout quando aspetti nel futuro:

Per scala.concurrent.Future , il metodo result ti consente di specificare un timeout.

Per scala.actors.Future , Futures.awaitAll ti consente di specificare un timeout.

Non penso che ci sia un timeout incorporato nell’esecuzione di un futuro.

Nessuno ha ancora menzionato gli akka-streams . I flussi hanno un metodo di completionTimeout Timeout, e applicando che su un stream single-source funziona come un futuro.

Ma, akka-streams esegue anche la cancellazione in modo che possa effettivamente terminare l’esecuzione della sorgente, cioè segnala il timeout alla sorgente.

Se vuoi che lo scrittore (titolare di promise) sia colui che controlla la logica di timeout, usa akka.pattern.after , nel modo seguente:

 val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during..."))) Future.firstCompletedOf(Seq(promiseRef.future, timeout)) 

In questo modo, se la tua logica di completamento della promise non ha mai luogo, il futuro del tuo chiamante sarà comunque completato ad un certo punto con un errore.

Sono abbastanza sorpreso che questo non sia standard in Scala. Le mie versioni sono brevi e non hanno dipendenze

 import scala.concurrent.Future sealed class TimeoutException extends RuntimeException object FutureTimeout { import scala.concurrent.ExecutionContext.Implicits.global implicit class FutureTimeoutLike[T](f: Future[T]) { def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future { Thread.sleep(ms) throw new TimeoutException })) lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout } } 

Esempio di utilizzo

 import FutureTimeout._ Future { /* do smth */ } withTimeout 

Monix Task ha il supporto per il timeout

 import monix.execution.Scheduler.Implicits.global import monix.eval._ import scala.concurrent.duration._ import scala.concurrent.TimeoutException val source = Task("Hello!").delayExecution(10.seconds) // Triggers error if the source does not complete in 3 seconds after runOnComplete val timedOut = source.timeout(3.seconds) timedOut.runOnComplete(r => println(r)) //=> Failure(TimeoutException) 

Sto usando questa versione (basata su Play nell’esempio sopra) che utilizza il dispatcher di sistema Akka:

 object TimeoutFuture { def apply[A](system: ActorSystem, timeout: FiniteDuration)(block: => A): Future[A] = { implicit val executionContext = system.dispatcher val prom = Promise[A] // timeout logic system.scheduler.scheduleOnce(timeout) { prom tryFailure new java.util.concurrent.TimeoutException } // business logic Future { try { prom success block } catch { case t: Throwable => prom tryFailure t } } prom.future } }