How to count the number of event invocations using Rx

A while ago I’ve needed to count the number of events arriving from an external system and perform a specific task once a specific threshold has been reached. I’ve needed to count groups of events each group had it’s own unique key. Although I might have been able to solve this problem using a dictionary the fact that those events were raised from multiple different threads made the problem more complex. I did not want to use synchronization (a.k.a locks) or a concurrent dictionary. And so I chose to use Rx instead. Reactive Extension seemed like a good fit with it’s (most of the time) easy to understand declarative API and it looked to be what I needed.

Using Observable.Count

The first solution I’ve tried was to use Count extension method:

var input = new MyEventInput();

input.NotificationSubject
    .GroupBy(notification => notification.GroupId)
    .SelectMany(group => group.Count()
        .Select(count => 
                 new Counter<string>{Id = group.Key, Count = count}))
    .Subscribe(counter =>
    {
        Console.WriteLine($"{counter.Id} Final count: {counter.Count}");
    });
 
input.Start();

For my tests/demo I’ve created a simple class (MyEventInput) that sends randomly generated notifications using System.Reactive.Subjects.Subject.

Next I’ve used GroupBy to group by the notification’s groupId the result was transformed using Count and put into a custom class I’ve created called Counter that only stored the results.

Now all that was left was to write out the count and there you go…

Close, but no cigar

If only it was that simple, the problem with using Count is that it only returned a value once the sequence was finished. In other word the following code (from intro to Rx):

var numbers = Observable.Range(0,3);
numbers.Dump("numbers");
numbers.Count().Dump("count");

Will output the following:

numbers-->1
numbers-->2
numbers-->3
numbers Completed
count-->3
count Complete

This was not good enough, I’ve needed the number of events that occurred so far and perform an action when the number of events reached a specific threshold. And so I’ve had to look for some other way to count my notifications.

Back to the drawing board

In my frustration I’ve decided to find a solution elsewhere and so I’ve called a friend, you might have heard of about a little book he wrote called “Rx.NET in Action“. Tamir has given me several great ideas and especially suggested I look into using Scan for aggregation.

Armed with this new knowledge I’ve came up with the following code:

input.NotificationSubject
    .GroupBy(notification => notification.GroupId)
    .SelectMany(group => group.Scan(new Counter<string> { Id = group.Key, Count = 0 },
        (previous, current) => new Counter<string> { Id = previous.Id, Count = previous.Count + 1 }))
    .Subscribe(counter =>
    {
        Console.WriteLine($"{counter.Id} count: {counter.Count}");
    });

The code starts with GroupBy but then I use Scan to count the number of invocations, for each group I’ll start by creating a new counter with value of count equals to zero and with each invocation I add one more.

Running this code with 3 groups I got the following output:

event-1 count: 1
event-2 count: 1
event-0 count: 1
event-1 count: 2
event-1 count: 3
event-0 count: 2
event-0 count: 3
event-1 count: 4
event-1 count: 5
event-1 count: 6

As you can see event-0 was called 3 times, event-1 was called 6 times and event-2 was called only once.

The end?

So as you can see I was finally able to count new notifications as they arrived, but there was quite a lot of work left to do, but that’s a tale for a different blog post.

In the meantime you can check out the code on GitHub and marvel on the wonder which is Rx.

Happy coding…

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.