Blog Archives

Broadcast-Style Publish-Subscribe Messaging with CCR

The Publish-Subscribe pattern is an incredibly robust, generally useful, and well understood  messaging pattern. The PubSub pattern, has about a million advantages and is the basis for other super popular patterns such as the observer pattern made famous by data binding and the like. As such, it should be no surprise that tools like Decentralized Software Services (DSS) have built-in mechanisms to support such a pattern.

It is, however, surprising (at least to me) that the Concurrency and Coordination Runtime (CCR), with its’ messaging-type API style and targeted use cases would not include at least some kind of rudimentary support for the Publish-Subscribe pattern. Don’t get me wrong, the CCR is great and I love working with it – I am just surprised at the lack of support provided directly in the library.

I hope that the CCR community as a whole will benefit from this blog post where I provide a re-useable CCR component that provides all the plumbing needed to support the Publish-Subscribe patter using CCR ports. I will also share some of the insight I have gained while creating this component as well as some high level remarks about how to use it and potential gotchas to look out for as you adopt this component in your code.

The Current State of Things and the Problem There-in

Let’s say we have something like the following lines of code somewhere in an application using CCR:

var tsunamiPort = new Port<TsunamiWarning>();
               

Arbiter
.Activate(taskQueue,
   
Arbiter.Receive<TsunamiWarning>(true, tsunamiPort, w => Console.WriteLine("Handler 1: Running!!!")));

Arbiter.Activate(taskQueue,
   
Arbiter.Receive<TsunamiWarning>(true, tsunamiPort, w => Console.WriteLine("Handler 2: Running!!!")));

for (int i = 0; i < 10; i++)
{
    tsunamiPort.Post(
new TsunamiWarning());
}

You may, or may not, be surprised at the resulting output of this code. Each tsunami warning posted to the tsunamiPort will be received by *either* Handler 1 OR Handler 2 but never both. Someone is in for a huge surprise.

The CCR model is such that each message posted to any IPort object creates, at most, one task. So, in this case, when a TsunamiWarning is posted to the tsunamiPort , it will be evaluated by the Arbiter hierarchy to determine the first receiver that can handle the message. If no handler is found, then no task is generated. On the other hand, once a receiver is found, the delegate inside of that receiver is used to create a task and the search for receivers stops. So, even if there are other activated receivers on that port, they will not be sent the message.

What’s more, when multiple receivers are activated on the same port for the same message type, the receiver that actually gets a message posted to that port can change throughout the course of an application running based on whether other receivers are available to accept messages or not (this case is most usually caused when involving interleaves on the receiver activation).

So what can we do? A tsunami is coming and if we can’t get that message out to a large number of receivers, this is going to get ugly. You might try re-posting the message, or using the overload of the Arbiter.Active method that takes a predicate but you’ll quickly find that these approaches don’t work in most cases and require a lot of custom code that is not generally re-useable.

The Approach

At first, I wanted to go into the CCR itself and create a custom CCR Port or PortSet that would allow multiple receivers to be served every message posted to it. I quickly found out that though there are IPort and IPortSet interfaces exposed by the CCR API, the CCR does not seem to be very ‘sub-classable’. At least not as easily as I might have wished/thought.

So, my second thought was to ask the CCR community. As it turned out, this was a great idea. The CCR and DSS communities are great. I asked my question and got a great answer back.The suggestion I got from the forum was to create a CCR service whose main port accepts three messages: Publish, Subscribe, and UnSubscribe.

As it turns out this is a great approach. While the actual implementation is not as simple as stated in the forum, I still think the idea is a very good one and is what I have based my solution on for this post.

How it Works: Subscribing and UnSubscribing

After creating a new Pub-Sub CCR service, or getting a reference to an existing instance  Subscribing is simple. All you have to do is send the Pub-Sub service a reference to IPort object you want to receive published messages on then register receivers on that port exactly as you always do. Here’s an example:

var pubSubPort = CreateOrGetPubSubPort();

var subscriberOne = new Port<TsunamiWarning>();
pubSubPort.Post(
new Subscribe(subscriberOne));

Arbiter.Activate(taskQueue,
   
Arbiter.Receive<TsunamiWarning>(true
, subscriberOne,
    (msg) =>
    {
       
Console.WriteLine("Subscriber One got: {0}", msg);
    }));

 
That’s it! The Subscribe message takes any IPort object, this includes both Port<>s and any PortSet. In-fact,there is also a generic version of the subscribe message that provides a little syntactic sugar for passing a reference to a single port inside of a PortSet to the Pub-Sub service.

When the Subscribe message is posted to the Pub-Sub service, the following code is triggered

protected override void AddSubscriber(Subscribe subscribeMessage)
{
   
var
port = subscribeMessage.NotificationPort;
   
var portSet = port as IPortSet
;
   
if (portSet != null
)
    {
        AddSubScribers(portSet);
    }
   
else
    {
       
this._recipients.Add(port);
    }
               
}

If the subscribe message contains a plain-old port item, we add it to the list of our message recipients. If, however, the Subscribe message contains a PortSet, then we will individually add each port in the port set to the list of recipients (this is an important detail that we will discuss more later).

Hopefully, the code used to respond to a Subscribe is so simple you can imagine what UnSubscribing looks like. In-case you can’t though, here it is:

protected override void RemoveSubscriber(UnSubscribe unSubscribeMessage)
{
   
var
portToRemove = unSubscribeMessage.NotificationPort;
   
var portSet = portToRemove as IPortSet
;
   
if (portSet != null
)
    {
       
this
.RemoveAllPortsInPortSet(portSet);
    }
   
else
    {
       
this.RemovePort(portToRemove);
    }
}

 

How it Works: Publishing

Publishing a message is almost as easy as a normal message post. See:

var pubSubPort = CreateOrGetPubSubPort();
pubSubPort.Post(
new Publish(new TsunamiWarning()));

 
Instead of sending the message itself, we wrap the message we want published in the Publish message. Beyond the message wrapping though, this is absolutely no different than a normal CCR port post.
 
When a Publish message is received by the Pub-Sub service, the following code responds:
protected override void PublishMessage(Publish publishMessage)
{
   
foreach (var port in this._recipients.Where(p => p != null))
    {
        port.TryPostUnknownType(publishMessage.Message);
    }
}
 
Basically, the code does the simplest, most obvious thing: it gives every receiver a chance to handle the message. This is not the most computationally efficient thing to do but it is definitely makes the code super easy to write, use, and understand.

Saving Lives and Averting a Catastrophe with Our New Pub-Sub Service

static void Main(string[] args)
{
   
using (var taskQueue = new DispatcherQueue
())
    {
                        
       
var pubSubPort = BroadcastPubSubService
.Create(taskQueue);
       
var beachBumPort = new Port<TsunamiWarning
>();
       
var surferPort = new Port<SharkWarning
>();
       
var surferBumPort = new PortSet<TsunamiWarning, SharkWarning
>();

        pubSubPort.Post(new Subscribe(beachBumPort));
        pubSubPort.Post(
new Subscribe
(surferPort));
        pubSubPort.Post(
new Subscribe
(surferBumPort));
                        
        pubSubPort.Post(
new Publish(new TsunamiWarning
()));
        pubSubPort.Post(
new Publish(new SharkWarning
()));

        Arbiter.Activate(taskQueue,
       
Arbiter
.Interleave(
           
new TeardownReceiverGroup
(),
           
new ExclusiveReceiverGroup
                (
                   
Arbiter.Receive<TsunamiWarning>(true, beachBumPort, (msg) => { Console.WriteLine("BeachBum got: {0}"
, msg); }),
                   
Arbiter.Receive<SharkWarning>(true, surferPort, (msg) => { Console.WriteLine("Surfer got: {0}"
, msg); }),
                   
Arbiter.Receive<TsunamiWarning>(true, surferBumPort, (msg) => { Console.WriteLine("SurferBum got: {0}"
, msg); }),
                   
Arbiter.Receive<SharkWarning>(true, surferBumPort, (msg) => { Console.WriteLine("SurferBum got: {0}"
, msg.ToString()); })
                ),
           
new ConcurrentReceiverGroup
()));

        Console.ReadLine();

    }
}

Now, instead of only giving the message to *either* the beach bum, the surfer, or the surfer bum – we get the following output showing that all three were given the hazard warnings and potentially saved from sure death and all we had to do was post a single message to a single CCR port:

PubSubServiceOutput

Conclusion

With great power comes great responsibility. In working with this PubSub CCR service I have discovered a few reasons why it may not be included as part of the CCR library itself. First of all, you must be very careful to make sure that all messages passed to the PubSub service are read-only or thread safe otherwise you can run into all kinds of sometimes hard to reproduce resource contention issues. These are exactly the type of issues CCR was designed to help us avoid and may be a major factor in the decision to exclude a service like this one from the library.

Secondly, before using the PubSub service discussed here I highly recommend reviewing the overload of the Arbiter.Activate method that takes a predicate delegate. Since every message Published to a PubSub service port is given to every valid receiver it is easy for components to get message that they really don’t care about and should not respond to. Such cases are usually dealt with by using a token pattern but I decided to leave message tokens and predicate objects out of this implementation since predicates are already built into the CCR API and provide such an elegant mechanism for accomplishing the same thing.

My final word of advice is to be mindful of how PubSub service port instances are shared across your application. This PubSub CCR service is NOT a single. It is therefore possible to have different components in your code hold references to different PubSub port instances. This can be extremely powerful in-terms of defining who can talk to who in your system. At the same time though, it can be extremely confusing if one component is not getting a message that another component is Publishing simply because the Subscribe is happening on a different service instance than Publishing. For this, and about a million other reasons, I’d like to recommend the use of a DI mechanism or an IoC container for managing who gets which instance of the PubSub operations port.

Source Code

If you’d like to use this PubSub service in your code and don’t want to build it yourself or want to extend/customize what I have already done – please feel free to download the source code (right click the link and select download target) and do whatever you’d like with it. I provide no guarantees nor warranty of this code. that said, if you find a bug or issue, I’d like to know so I can fix it. Also, if you come up with a great idea for making the code better, please leave a comment. I look forward to any and all discussions on this topic.