Buffer IObserver Rx per appianare esplosioni di eventi

Ho una sequenza osservabile che produce eventi a raffiche rapide (ad esempio: cinque eventi uno dopo l’altro, quindi un lungo ritardo, quindi un altro rapido burst di eventi, ecc.). Voglio appianare queste raffiche inserendo un breve ritardo tra gli eventi. Immagina il seguente diagramma come esempio:

 Raw: --oooo -------------- ooooo ----- oo ---------------- ooo |
 Buffered: --o - o - o - o -------- o - o - o - o - o - o - o --------- O - O - O |

Il mio attuale approccio è quello di generare un timer simile al metronomo tramite Observable.Interval() che segnala quando è ok per estrarre un altro evento dal stream raw. Il problema è che non riesco a capire come combinare quel timer con la mia sequenza osservabile non bloccata.

IObservable.Zip() è vicino a fare quello che voglio, ma funziona solo fino a quando il stream grezzo sta producendo eventi più velocemente del timer. Non appena si verifica una pausa significativa nel stream non elaborato, il timer crea una serie di eventi indesiderati che immediatamente si accoppiano con il successivo burst di eventi dal stream non elaborato.

Idealmente, voglio un metodo di estensione IObservable con la seguente firma di funzione che produce il bevaior che ho delineato sopra. Ora, vieni in mio soccorso StackOverflow 🙂

 public static IObservable Buffered(this IObservable src, TimeSpan minDelay) 

PS. Sono nuovo di zecca per Rx, quindi le mie scuse se questa è una domanda banalmente semplice …


1. Approccio semplice ma imperfetto

Ecco la mia iniziale soluzione ingenua e semplicistica che ha alcuni problemi:

 public static IObservable Buffered(this IObservable source, TimeSpan minDelay) { Queue q = new Queue(); source.Subscribe(x => q.Enqueue(x)); return Observable.Interval(minDelay).Where(_ => q.Count > 0).Select(_ => q.Dequeue()); } 

Il primo ovvio problema con questo è che l’IDisposable restituito dall’abbonamento interno alla sorgente grezza viene perso e quindi l’abbonamento non può essere terminato. Chiamare Dispose sull’IDisposable restituito da questo metodo uccide il timer, ma non il feed di eventi raw sottostante che ora sta inutilmente riempiendo la coda senza che nessuno sia rimasto per estrarre gli eventi dalla coda.

Il secondo problema è che non è ansible inoltrare le eccezioni o le notifiche di fine stream dal stream di eventi non elaborati al stream memorizzato nel buffer: vengono semplicemente ignorate quando si sottoscrive la fonte originale.

E, ultimo ma non meno importante, ora ho il codice che si sveglia periodicamente indipendentemente dal fatto che ci sia effettivamente un lavoro da fare, che preferirei evitare in questo meraviglioso nuovo mondo reattivo.


2. Approcci eccessivamente complessi

Per risolvere i problemi incontrati nel mio approccio semplicistico iniziale, ho scritto una funzione molto più complicata che si comporta molto come IObservable.Delay() (ho usato .NET Reflector per leggere quel codice e usarlo come base della mia funzione). Sfortunatamente, molte delle logiche standard come AnonymousObservable non sono accessibili pubblicamente al di fuori del codice system.reactive, quindi ho dovuto copiare e incollare un sacco di codice. Questa soluzione sembra funzionare, ma data la sua complessità, sono meno sicuro che sia privo di errori.

Non riesco a credere che non ci sia un modo per farlo usando una combinazione delle estensioni Reactive standard. Odio sentirmi reinventare inutilmente la ruota, e lo schema che sto cercando di build sembra piuttosto standard.

Questo è in realtà un duplicato di Un modo per spingere gli eventi con buffer in intervalli regolari , ma includerò un riassunto qui (l’originale sembra abbastanza confuso perché guarda ad alcune alternative).

 public static IObservable Buffered(this IObservable source, TimeSpan minDelay) { return source.Drain(x => Observable.Empty() .Delay(minDelay) .StartWith(x) ); } 

La mia implementazione di Drain funziona come SelectMany , ma aspetta che l’output precedente finisca per primo (si potrebbe pensare a ConactMany , mentre SelectMany è più simile a MergeMany ). Lo Drain integrato non funziona in questo modo, quindi dovrai includere l’implementazione di seguito:

 public static class ObservableDrainExtensions { public static IObservable Drain( this IObservable source, Func> selector) { return Observable.Defer(() => { BehaviorSubject queue = new BehaviorSubject(new Unit()); return source .Zip(queue, (v, q) => v) .SelectMany(v => selector(v) .Do(_ => { }, () => queue.OnNext(new Unit())) ); }); } }