system.reactive - Reactive Extensions Group By, Unique BufferWindow till time span with cancellation -
i have hot stream of events coming of following type:
event { string name; int state ; // 1 or 2 ie active or unactive }
there function provides parent name of given name - string getparent(string name)
i need buffer event per parent 2 minutes, if during 2 minute , recv event child state =2 given parent , buffer should cancel , should output 0 otherwise count of events recvd .
i know have use groupby partition, , buffer , count unable think of way create buffer unique per parent, though of using distinct doesnt solve problem, dont want create buffer till parent active (as once parent's buffer gets cancelled or 2 minutes over, parent buffer can created again) understand need create custom buffer checks condition creating buffer, how do via reactive extensions. appreciated. regards
thanks brandon help. main program using testing. not working.as new reactive extension problem can in way testing
namespace testreactive { class program { static int abc = 1; static void main(string[] args) { subject<aevent> memberadded = new subject<aevent>(); //isubject<aevent, aevent> syncedsubject = new isubject<aevent, aevent>(); var timer = new timer { interval = 5 }; timer.enabled = true; timer.elapsed += (sender, e) => myelapsedmethod(sender, e, memberadded); var bc = memberadded.subscribe(); var cdc = memberadded.groupby(e => e.parent) .selectmany(parentgroup => { var children = parentgroup.publish().refcount(); var inactivechild = children.skipwhile(c => c.state != 2).take(1).select(c => 0); var timer1 = observable.timer(timespan.fromseconds(1)); var activecount = children.takeuntil(timer1).count(); return observable.amb(activecount, inactivechild) .select(count => new { parentname = parentgroup.key, count = count }); }); observable.foreachasync(cdc, x => writeme("dum dum " + x.parentname+x.count)); // group.dump("dum"); console.readkey(); } static void writeme(string sb) { console.writeline(sb); } static void myelapsedmethod(object sender, elapsedeventargs e, subject<aevent> s) { aevent ab = helpermethods.getalarm(); console.writeline(abc + " p =" + ab.parent + ", c = " + ab.name + " ,s = " + ab.state); s.onnext(ab); } }
}
public static aevent getalarm() { if (gp> 4) gp = 1; if (p > 4) p = 1; if (c > 4) c = 1; aevent = new aevent(); a.parent = "p" + gp + p; a.name = "c" + gp + p + c; if (containedkeys.containskey(a.name)) { a.state = containedkeys[a.name]; if (a.state == 1) containedkeys[a.name] = 2; else containedkeys[a.name] = 1; } else { containedkeys.tryadd(a.name, 1); } gp++; p++; c++; return a; }
so method , generates event parent @ each tick. generates event parent p11,p22,p33,p44 state =1 , followed events parent p11,p22,p33,p44 state =2 using observable.foreach print result, see being called 4 times , after nothing, cancellation of group not happening
assuming 2 minute buffer each group should open first event group seen, , close after 2 minutes or 0 state seen, think following works:
public static iobservable<eventcount> eventcountbyparent( iobservable<event> source, ischeduler scheduler) { return observable.create<eventcount>(observer => source.groupbyuntil( evt => getparent(evt.name), evt => evt, group => @group.where(evt => evt.state == 2) .merge(observable.timer( timespan.fromminutes(2), scheduler).select(_ => event.null))) .selectmany( go => go.aggregate(0, (acc, evt) => (evt.state == 2 ? 0 : acc + 1)) .select(count => new eventcount(go.key, count))).subscribe(observer)); }
with eventcount (implementing equality overrides testing) as:
public class eventcount { private readonly string _name; private readonly int _count; public eventcount(string name, int count) { _name = name; _count = count; } public string name { { return _name; } } public int count { { return _count; } } public override string tostring() { return string.format("name: {0}, count: {1}", _name, _count); } protected bool equals(eventcount other) { return string.equals(_name, other._name) && _count == other._count; } public override bool equals(object obj) { if (referenceequals(null, obj)) return false; if (referenceequals(this, obj)) return true; if (obj.gettype() != this.gettype()) return false; return equals((eventcount) obj); } public override int gethashcode() { unchecked { return ((_name != null ? _name.gethashcode() : 0)*397) ^ _count; } } }
and event as:
public class event { public static event null = new event(string.empty, 0); private readonly string _name; private readonly int _state; public event(string name, int state) { _name = name; _state = state; } public string name { { return _name; } } public int state { { return _state; } } }
i did quick (i.e. not exhaustive) test rx-testing:
public class eventcountbyparenttests : reactivetest { private readonly testscheduler _testscheduler; public eventcountbyparenttests() { _testscheduler = new testscheduler(); } [fact] public void iscorrect() { var source = _testscheduler.createhotobservable( onnext(timespan.fromseconds(10).ticks, new event("a", 1)), onnext(timespan.fromseconds(20).ticks, new event("b", 1)), onnext(timespan.fromseconds(30).ticks, new event("a", 1)), onnext(timespan.fromseconds(40).ticks, new event("b", 1)), onnext(timespan.fromseconds(50).ticks, new event("a", 1)), onnext(timespan.fromseconds(60).ticks, new event("b", 2)), onnext(timespan.fromseconds(70).ticks, new event("a", 1)), onnext(timespan.fromseconds(140).ticks, new event("a", 1)), onnext(timespan.fromseconds(150).ticks, new event("a", 1))); var results = _testscheduler.createobserver<eventcount>(); var sut = source.eventcountbyparent(_testscheduler).subscribe(results); _testscheduler.start(); results.messages.assertequal( onnext(timespan.fromseconds(60).ticks, new eventcount("b", 0)), onnext(timespan.fromseconds(130).ticks, new eventcount("a", 4)), onnext(timespan.fromseconds(260).ticks, new eventcount("a", 2))); } }
Comments
Post a Comment