Come creare una fonte che può ricevere elementi in seguito tramite una chiamata al metodo?

Vorrei creare una Source e successivamente inserire elementi, come in:

 val src = ... // create the Source here // and then, do something like this pushElement(x1, src) pushElement(x2, src) 

Qual è il modo consigliato per farlo?

Grazie!

Ci sono tre modi in cui questo può essere ottenuto:

1. Pubblica materializzazione con SourceQueue

È ansible utilizzare Source.queue che materializza il stream in un SourceQueue :

 case class Weather(zipCode : String, temperature : Double, raining : Boolean) val bufferSize = 100 //if the buffer fills up then this strategy drops the oldest elements //upon the arrival of a new element. val overflowStrategy = akka.stream.OverflowStrategy.dropHead val queue = Source.queue(bufferSize, overflowStrategy) .filter(!_.raining) .to(Sink foreach println) .run() // in order to "keep" the queue Materialized value instead of the Sink's queue offer Weather("02139", 32.0, true) 

2. Post Materializzazione con attore

C’è una domanda e una risposta simili qui , l’essenza è che si materializza il stream come un ActorRef e si inviano messaggi a tale riferimento:

 val ref = Source.actorRef[Weather](Int.MaxValue, fail) .filter(!_.raining) .to(Sink foreach println ) .run() // in order to "keep" the ref Materialized value instead of the Sink's ref ! Weather("02139", 32.0, true) 

3. Pre materializzazione con attore

Allo stesso modo, è ansible creare esplicitamente un attore che contiene un buffer di messaggi, utilizzare tale attore per creare un’origine e quindi inviare i messaggi di attore come descritto nella risposta qui :

 object WeatherForwarder { def props : Props = Props[WeatherForwarder] } //see provided link for example definition class WeatherForwarder extends Actor {...} val actorRef = actorSystem actorOf WeatherForwarder.props //note the stream has not been instatiated yet actorRef ! Weather("02139", 32.0, true) //stream already has 1 Weather value to process which is sitting in the //ActorRef's internal buffer val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...} 

Dopo aver giocato intorno e cercando una buona soluzione a questo mi sono imbattuto in questa soluzione che è pulita, semplice, e funziona sia pre e post materializzazione. https://stackoverflow.com/a/32553913/6791842

  val (ref: ActorRef, publisher: Publisher[Int]) = Source.actorRef[Int](bufferSize = 1000, OverflowStrategy.fail) .toMat(Sink.asPublisher(true))(Keep.both).run() ref ! 1 //before val source = Source.fromPublisher(publisher) ref ! 2 //before Thread.sleep(1000) ref ! 3 //before source.runForeach(println) ref ! 4 //after Thread.sleep(1000) ref ! 5 //after 

Produzione:

 1 2 3 4 5