The Reactive Extensions library is amazing. It has turned what was sure to be a mess of crap code into a few lines that are easy to understand and very easy to test.
I had a requirement that was very easy to state but hard to implement on my latest project. The idea was that the app would stop processing if some sensor stopped going off either consistently for n seconds or bounced between states some configurable amount of times for the same n seconds. Easy right? Well, you’d have to cache the data, start some threads or at least some timers to check the history. Then start and stop those async things, clean up, test, etc. I didn’t want to do that. There had to be a better way. Enter Reactive Extensions.
Reactive Extensions allow a developer to query streams of events, timers, whatever async things you might want. Imagine turning a bunch of event handlers or whatever into something you could do Linq queries against?
So I met my requirements by using two main Rx tools, Buffer and DistinctUntilChanged. First I turned my already-written events providing the sensor data into Observables. Before you had to listen to the event and procedurally do what you wanted with the data. Now the data just streams in and you can query against it. Buffer will cache the output of the stream after something is observed until another thing is observed. DistinctUntilChanged is pretty simple, it just filters out values from the stream that aren’t successively unique.
Prep: make sure whatever events and properties required are observable
//make the sensor values observable
_sensorValuesSequence = Observable.FromEvent<EventHandler<SensorDataEventArgs>, bool>(
handler =>
{
EventHandler<SensorDataEventArgs> dataHandler = (sender, e) => handler(e.Data.BooleanValue);
return dataHandler;
},
asdfHandler => NewData += fsdfHandler,
asdfHandler => NewData -= asdfHandler);
//make the simple boolean sensor Connected property an observable
_connectedSequence = this.ObservePropertyChanged(x => x.Connected);
First requirement: stop processing if some sensor stops going off for n seconds consistently.
/*
* Here's how the below code works...
* time: n sec 2n sec 3n sec 4n sec //time vs timeout period
* sensor value stream: F F F T|F F F F|F F F F|F F F T F|F F F F| //take the sensor values
* distinctUntilChanged: F T|F | | T F| | //observe only the changes
* buffer created #: 1 2|3 | | 4 5| | //Buffer the value changes until a timeout
* buffer expires #: |1 2|3 |3 | 4 5| //each buffer 'expires' at timeout and is eval'ed
* evals to: buffer 1: no abort //1's first value is false, gets a true later
* buffer 2: no abort //2 starts with true, doesn't count
* buffer 3: abort //3 gets a false, no trues later, abort!
* buffer 4: no abort //4 starts with true, doesn't count
* buffer 5: abort //5 starts with false, no trues later, abort!
*/
//get some timespan, the n seconds above
var timeout = TimeSpan.FromSeconds(Config.SomeSecondsValue);
//create a buffer of unique sensor values that is n seconds long whenever the sensor starts or stops going off
var bufferedNotGoingOff =
//get the sequence of sensor values, we only care about when they change though so use distinct
_sensorValuesSequence.DistinctUntilChanged()
//first param: create a buffer of same values whenever the sensor value changes
//second param: stop buffering values when we hit the timeout, this could be any other observable
.Buffer(_sensorValuesSequence.DistinctUntilChanged(), o => Observable.Interval(timeout))
//log some more for debugging
//see if every value in each buffer is false, then the sensor stopped going off
var bufferedNotGoingOffSequence = bufferedNotGoingOff.Select(o => o.All(p => !p));
//create an observable that only streams when a n second window of not going off occurs
return from asdf in bufferedNotGoingOffSequence where asdf select new Unit();
Second requirement: only start processing when the sensor has started to go off consistently for n seconds and hasn’t gone offline in the same amount of time.
/* Here's how the code below works... * time: n sec 2n sec 3n sec //time vs timeout period * sensor value stream: T T T T|T T T T|T F T T|T T T T| //stream of sensor values * disconnect value stream: F | | | | //stream of when the sensor disconnects (just falses) * merged: T T TFT|T T T T|T F T T|T T T T| //merge the two streams, sensor values & false disconnects * distinct: T FT| | F T | | //only care about the changes * buffer created #: 1 23| | 4 5 | | //Buffer the changes until timeout * buffer expires #: |1 23| | 4 5 | //each buffer 'expires' and is eval'ed * evals to: buffer 1: don't process //1 sees the first sensor true but also the disconnect * buffer 2: don't process //2 is started on the disconnect, doesn't count * buffer 3: process //3 starts on a sensor true value, doesn't see anything bad * buffer 4: don't process //4 is started on a sensor false, doesn't count * buffer 5: process //5 sees just the sensor start */ //create a merged stream of distinct sensor values (bools) and disconnects (as false) var distinctValuesAndDisconnects = _sensorValuesSequence.Merge(_connectedSequence.Where(o => !o)).DistinctUntilChanged(); //get some configurable time required var timeout = TimeSpan.FromSeconds(Config.SomeSecondsValue); //buffer the merged sensor values and disconnects for n seconds whenever there is a change var bufferedDistinctValuesAndDisconnects = distinctValuesAndDisconnects .Buffer(distinctAlarmsAndDisconnects, o => Observable.Interval(timeout)) //see if every value in each buffer is true, then nothing disconnected and the sensor kept going off, yay! var windowedSequence = bufferedDistinctValuesAndDisconnects.Select(o => o.All(p => p)); //create an observable that only streams when a n second window of sensor going off occurs with no disconnects return from asdf in windowedSequence where asdf select new Unit();
Leave a comment