Author Archives: krbeut

Broadcast-Style Publish-Subscribe Messaging with CCR

The Publish-Subscribe pattern is an incredibly robust, generally useful, and well understood  messaging pattern. The PubSub pattern, has about a million advantages and is the basis for other super popular patterns such as the observer pattern made famous by data binding and the like. As such, it should be no surprise that tools like Decentralized Software Services (DSS) have built-in mechanisms to support such a pattern.

It is, however, surprising (at least to me) that the Concurrency and Coordination Runtime (CCR), with its’ messaging-type API style and targeted use cases would not include at least some kind of rudimentary support for the Publish-Subscribe pattern. Don’t get me wrong, the CCR is great and I love working with it – I am just surprised at the lack of support provided directly in the library.

I hope that the CCR community as a whole will benefit from this blog post where I provide a re-useable CCR component that provides all the plumbing needed to support the Publish-Subscribe patter using CCR ports. I will also share some of the insight I have gained while creating this component as well as some high level remarks about how to use it and potential gotchas to look out for as you adopt this component in your code.

The Current State of Things and the Problem There-in

Let’s say we have something like the following lines of code somewhere in an application using CCR:

var tsunamiPort = new Port<TsunamiWarning>();
               

Arbiter
.Activate(taskQueue,
   
Arbiter.Receive<TsunamiWarning>(true, tsunamiPort, w => Console.WriteLine("Handler 1: Running!!!")));

Arbiter.Activate(taskQueue,
   
Arbiter.Receive<TsunamiWarning>(true, tsunamiPort, w => Console.WriteLine("Handler 2: Running!!!")));

for (int i = 0; i < 10; i++)
{
    tsunamiPort.Post(
new TsunamiWarning());
}

You may, or may not, be surprised at the resulting output of this code. Each tsunami warning posted to the tsunamiPort will be received by *either* Handler 1 OR Handler 2 but never both. Someone is in for a huge surprise.

The CCR model is such that each message posted to any IPort object creates, at most, one task. So, in this case, when a TsunamiWarning is posted to the tsunamiPort , it will be evaluated by the Arbiter hierarchy to determine the first receiver that can handle the message. If no handler is found, then no task is generated. On the other hand, once a receiver is found, the delegate inside of that receiver is used to create a task and the search for receivers stops. So, even if there are other activated receivers on that port, they will not be sent the message.

What’s more, when multiple receivers are activated on the same port for the same message type, the receiver that actually gets a message posted to that port can change throughout the course of an application running based on whether other receivers are available to accept messages or not (this case is most usually caused when involving interleaves on the receiver activation).

So what can we do? A tsunami is coming and if we can’t get that message out to a large number of receivers, this is going to get ugly. You might try re-posting the message, or using the overload of the Arbiter.Active method that takes a predicate but you’ll quickly find that these approaches don’t work in most cases and require a lot of custom code that is not generally re-useable.

The Approach

At first, I wanted to go into the CCR itself and create a custom CCR Port or PortSet that would allow multiple receivers to be served every message posted to it. I quickly found out that though there are IPort and IPortSet interfaces exposed by the CCR API, the CCR does not seem to be very ‘sub-classable’. At least not as easily as I might have wished/thought.

So, my second thought was to ask the CCR community. As it turned out, this was a great idea. The CCR and DSS communities are great. I asked my question and got a great answer back.The suggestion I got from the forum was to create a CCR service whose main port accepts three messages: Publish, Subscribe, and UnSubscribe.

As it turns out this is a great approach. While the actual implementation is not as simple as stated in the forum, I still think the idea is a very good one and is what I have based my solution on for this post.

How it Works: Subscribing and UnSubscribing

After creating a new Pub-Sub CCR service, or getting a reference to an existing instance  Subscribing is simple. All you have to do is send the Pub-Sub service a reference to IPort object you want to receive published messages on then register receivers on that port exactly as you always do. Here’s an example:

var pubSubPort = CreateOrGetPubSubPort();

var subscriberOne = new Port<TsunamiWarning>();
pubSubPort.Post(
new Subscribe(subscriberOne));

Arbiter.Activate(taskQueue,
   
Arbiter.Receive<TsunamiWarning>(true
, subscriberOne,
    (msg) =>
    {
       
Console.WriteLine("Subscriber One got: {0}", msg);
    }));

 
That’s it! The Subscribe message takes any IPort object, this includes both Port<>s and any PortSet. In-fact,there is also a generic version of the subscribe message that provides a little syntactic sugar for passing a reference to a single port inside of a PortSet to the Pub-Sub service.

When the Subscribe message is posted to the Pub-Sub service, the following code is triggered

protected override void AddSubscriber(Subscribe subscribeMessage)
{
   
var
port = subscribeMessage.NotificationPort;
   
var portSet = port as IPortSet
;
   
if (portSet != null
)
    {
        AddSubScribers(portSet);
    }
   
else
    {
       
this._recipients.Add(port);
    }
               
}

If the subscribe message contains a plain-old port item, we add it to the list of our message recipients. If, however, the Subscribe message contains a PortSet, then we will individually add each port in the port set to the list of recipients (this is an important detail that we will discuss more later).

Hopefully, the code used to respond to a Subscribe is so simple you can imagine what UnSubscribing looks like. In-case you can’t though, here it is:

protected override void RemoveSubscriber(UnSubscribe unSubscribeMessage)
{
   
var
portToRemove = unSubscribeMessage.NotificationPort;
   
var portSet = portToRemove as IPortSet
;
   
if (portSet != null
)
    {
       
this
.RemoveAllPortsInPortSet(portSet);
    }
   
else
    {
       
this.RemovePort(portToRemove);
    }
}

 

How it Works: Publishing

Publishing a message is almost as easy as a normal message post. See:

var pubSubPort = CreateOrGetPubSubPort();
pubSubPort.Post(
new Publish(new TsunamiWarning()));

 
Instead of sending the message itself, we wrap the message we want published in the Publish message. Beyond the message wrapping though, this is absolutely no different than a normal CCR port post.
 
When a Publish message is received by the Pub-Sub service, the following code responds:
protected override void PublishMessage(Publish publishMessage)
{
   
foreach (var port in this._recipients.Where(p => p != null))
    {
        port.TryPostUnknownType(publishMessage.Message);
    }
}
 
Basically, the code does the simplest, most obvious thing: it gives every receiver a chance to handle the message. This is not the most computationally efficient thing to do but it is definitely makes the code super easy to write, use, and understand.

Saving Lives and Averting a Catastrophe with Our New Pub-Sub Service

static void Main(string[] args)
{
   
using (var taskQueue = new DispatcherQueue
())
    {
                        
       
var pubSubPort = BroadcastPubSubService
.Create(taskQueue);
       
var beachBumPort = new Port<TsunamiWarning
>();
       
var surferPort = new Port<SharkWarning
>();
       
var surferBumPort = new PortSet<TsunamiWarning, SharkWarning
>();

        pubSubPort.Post(new Subscribe(beachBumPort));
        pubSubPort.Post(
new Subscribe
(surferPort));
        pubSubPort.Post(
new Subscribe
(surferBumPort));
                        
        pubSubPort.Post(
new Publish(new TsunamiWarning
()));
        pubSubPort.Post(
new Publish(new SharkWarning
()));

        Arbiter.Activate(taskQueue,
       
Arbiter
.Interleave(
           
new TeardownReceiverGroup
(),
           
new ExclusiveReceiverGroup
                (
                   
Arbiter.Receive<TsunamiWarning>(true, beachBumPort, (msg) => { Console.WriteLine("BeachBum got: {0}"
, msg); }),
                   
Arbiter.Receive<SharkWarning>(true, surferPort, (msg) => { Console.WriteLine("Surfer got: {0}"
, msg); }),
                   
Arbiter.Receive<TsunamiWarning>(true, surferBumPort, (msg) => { Console.WriteLine("SurferBum got: {0}"
, msg); }),
                   
Arbiter.Receive<SharkWarning>(true, surferBumPort, (msg) => { Console.WriteLine("SurferBum got: {0}"
, msg.ToString()); })
                ),
           
new ConcurrentReceiverGroup
()));

        Console.ReadLine();

    }
}

Now, instead of only giving the message to *either* the beach bum, the surfer, or the surfer bum – we get the following output showing that all three were given the hazard warnings and potentially saved from sure death and all we had to do was post a single message to a single CCR port:

PubSubServiceOutput

Conclusion

With great power comes great responsibility. In working with this PubSub CCR service I have discovered a few reasons why it may not be included as part of the CCR library itself. First of all, you must be very careful to make sure that all messages passed to the PubSub service are read-only or thread safe otherwise you can run into all kinds of sometimes hard to reproduce resource contention issues. These are exactly the type of issues CCR was designed to help us avoid and may be a major factor in the decision to exclude a service like this one from the library.

Secondly, before using the PubSub service discussed here I highly recommend reviewing the overload of the Arbiter.Activate method that takes a predicate delegate. Since every message Published to a PubSub service port is given to every valid receiver it is easy for components to get message that they really don’t care about and should not respond to. Such cases are usually dealt with by using a token pattern but I decided to leave message tokens and predicate objects out of this implementation since predicates are already built into the CCR API and provide such an elegant mechanism for accomplishing the same thing.

My final word of advice is to be mindful of how PubSub service port instances are shared across your application. This PubSub CCR service is NOT a single. It is therefore possible to have different components in your code hold references to different PubSub port instances. This can be extremely powerful in-terms of defining who can talk to who in your system. At the same time though, it can be extremely confusing if one component is not getting a message that another component is Publishing simply because the Subscribe is happening on a different service instance than Publishing. For this, and about a million other reasons, I’d like to recommend the use of a DI mechanism or an IoC container for managing who gets which instance of the PubSub operations port.

Source Code

If you’d like to use this PubSub service in your code and don’t want to build it yourself or want to extend/customize what I have already done – please feel free to download the source code (right click the link and select download target) and do whatever you’d like with it. I provide no guarantees nor warranty of this code. that said, if you find a bug or issue, I’d like to know so I can fix it. Also, if you come up with a great idea for making the code better, please leave a comment. I look forward to any and all discussions on this topic.

Advertisements

Why I Think FIFO Is Not Always The Right Task Processing Strategy For CCR Ports

When a message is posted to a CCR port and there exists a receiver on that port that can receive that message then a task is created and queued for processing. This is great, and becomes a fact of life once you become accustomed to the way things are done in the CCR. But why must tasks always be queued? Should they always be queued? If they are not queued then how else would they be scheduled?

In this post we will start with an illustrative example of how FIFO task processing gone wrong. Then we will mention 2 other examples where FILO task processing is more well-suited for certain asynchronous tasks and conclude with ways of going-about performing FILO task processing.

An Illustrative Example

Take the following small console application for example:

class Program
{       

    static void Main(string[] args)
    {
       
using (var taskQueue = new DispatcherQueue
())
        {
           
var hungryGf = HungryGirlfriend
.Create(taskQueue);

            var responsePort = new HungryGirlfriendPort();

            hungryGf.Post(new WhatDoYouWantToEatMessage(responsePort));
            
           
Arbiter
.Activate(taskQueue,
                           
Arbiter
.Interleave(
                           
new TeardownReceiverGroup
(),
                           
new ExclusiveReceiverGroup
                                (
                                   
Arbiter.ReceiveWithIterator<FoodRequest>(true
,
                                                                             responsePort,
                                                                             FoodRequestHandler)
                                ),
                           
new ConcurrentReceiverGroup
()));
                            

            Console.ReadLine();
        }
    }
            
   
protected static IEnumerator<ITask> FoodRequestHandler(FoodRequest
request)
    {            
       
Console.WriteLine("Me: Ok, I’m Going out to get us {0} food"
,request.Cuisine);

        Thread.Sleep(2 * 1000);//Really, it takes me a lot longer than 1 sec. to go get food

        Console.WriteLine("Me: I Got us some {0} food", request.Cuisine);

        yield break;
    }        

}

 
At first glance, this appears to be a simple CCR simulation of asking your girlfriend what she wants for dinner then going out and getting it. When we look into the implementation of the girlfriend CCR service though, we see that things are not as simple when dealing with a woman as indecisive as my girlfriend.

      I kid-you-not when I say that every time I ask my girlfriend what she wants to eat she first claims she doesn’t care, then changes her mind 5 times before deciding what she actually wants to eat. In other words, if we were to implement the HungryGirlfriend CCR service to mimic my girlfriend, it would look something like this:

public class HungryGirlfriend : CcrServiceBase
{

    public static Port<WhatDoYouWantToEatMessage> Create(DispatcherQueue taskQueue)
    {
       
var girlfriend = new HungryGirlfriend
(taskQueue);

        girlfriend.Initialize();
       
return
girlfriend._inputPort;
    }

    protected HungryGirlfriend(DispatcherQueue taskQueue)
        :
base
(taskQueue)
    {

    }

    protected void Initialize()
    {
       
this._inputPort = new Port<WhatDoYouWantToEatMessage
>();
       
Arbiter.Activate(this
.TaskQueue,
           
Arbiter.Receive<WhatDoYouWantToEatMessage>(true
, _inputPort, WhatDoYouWanttoEatHandler));
    }

    Port<WhatDoYouWantToEatMessage> _inputPort = new Port<WhatDoYouWantToEatMessage>();

    Random _rand = new Random();

    int _randMax = Enum.GetNames(typeof(Cuisines)).Count();

    protected void WhatDoYouWanttoEatHandler(WhatDoYouWantToEatMessage msg)
    {
                    
       
Console.WriteLine("Girldfriend: I don’t care what we eat."
);

        for (int i = 0; i < 5; i++)
        {
           
var cuisine = (Cuisines
)_rand.Next(_randMax);
           
Console.WriteLine("Girldfriend: Wait, I changed my mind. I want {0} food"
, cuisine);
            msg.ResponsePort.Post(
new FoodRequest(cuisine));
        }
    }

}

Currently, when I run the example given above, I get the following sample output:

image

What’s wrong with this? Well, in this simulation, even though I know my girlfriend is going to change her mind about what she wants to eat, I run out and get the food she wants as soon as she says she wants it. As soon as I get back, I find out that she really wants something else, so I run out again. You can only imagine how tired and agitated I’d be upon returning the forth time just to find another task on the response port with another change in decision.

More importantly, as you can see from this example, all of the mind-changing responses have been posted by the time I go out the very first time, so why didn’t I just take the last response posted and go get Thai food? Well, because the the tasks were created by a DispatcherQueue that ensures First-In-First-Out (FIFO) processing of messages. In this case, the DispatcherQueue is preventing me from getting the most recent, and in this case the most pertinent, information when I need it. In general, the DispatcherQueue helps me keep the order of tasks straight throughout my program. In this case though, it is forcing me to process old information that I should be ignoring before I can get access to information I really care about.

If, instead of always using a DispatcherQueue I could use a ‘DispatcherStack’ then messages would be processed in a First-In-Last-Out (FILO) order and I would have saved myself a lot of time, money and frustration. Specifically, I would have gotten her last change of mind first, instead of last and I could have ignored the other messages by either completely discarding the messages off of the task stack or checking a timestamp property on the message itself to ensure that only the most up-to-date data was being used.

So What’s Your Point?

I’m NOT writing this blog post because I’m in a fight with my girlfriend (not yet at least, we’ll see after she reads this) nor am I using my .NET blog to vent about her. My goal was to use this simple, everyday example to illustrate a potential use case for FILO task processing.

Now, I realize that this tongue-and-cheek example probably does not fall under any of the intended CCR use cases so I’d like to explain when FILO task processing might be useful in more normal CCR scenarios.

First off, let’s consider a Robotics application. Let’s say that we have some CCR (or DSS) service that takes in sensor data and iteratively spits out successively better and better approximations of some quantity to a port. Maybe this service is implementing some kind of maximum likelihood or expectation-maximization algorithm where the series of approximations is monotonically improving. In this case then, we’d only ever be interested in the most recent approximation and would probably discard all previous estimates. FIFO task processing does not allow us to easily accomplish such a task and this can make a huge difference in the performance of our software.

First of all, if you are lucky enough to have an iterative algorithm that converges exponentially, or at least sufficiently fast, then the quality of the n+1^st estimate may be much, much better than that of the n^th iteration. Secondly, if estimates are created much faster than they can be processed, then several iterations may execute before one can be used. Why waste time processing bad, old estimates when a much better estimate has already been found?

What about in a business application? More and more people are recognizing the CCR and DSS platforms as generally useful for business application. I know my company has and we are now heavily invested in CCR and DSS for all of our real-time systems.

Well, let’s say you are creating a stock trading application where-in one CCR service gets stock prices and posts them to one or more ports for other CCR services to analyze and act on. You can imagine that almost all of the analysis-related CCR services will only be interested in the most recent stock price. However, other analysis services that analyze larger trends will not only be interested in the most recent price, but will be interested in the ten most recent prices but nothing more beyond the latest 10. So then, if while the 10 most recent stock prices are being processed, the price-getting service posts 30 new prices, then the analysis service is only going to want the 10 most recent prices and will ignore the other 20.

The overall point is that the order in which information is received and processed is extremely important. While I agree that most cases require FIFO task processing, I also think that by limiting ourselves to always processing information in the same order it is received we can put ourselves in positions where old (less pertinent or sometimes, simply wrong / useless) information is processed first. This may results in cases where we use outdated information from the past to make decisions for right now, or in the future. The need for FILO task processing in the CCR is about the need for more ways to decouple the receiving of messages from the processing of the messages.

So What Are You Going To Do About It?

Well, there are a couple of ways I could approach this problem. The first would be to create a series of utility CCR and/or DSS services whose only job is to receive messages, reverse their order and send them back out to whoever asks for them. This approach sounds simple but has two short-comings. The obvious one is performance and source complexity. There would always be a middle man between the sender of the message and the message receiver. While this can sometimes be a strong decouple and extensibility mechanism, I fear that is not the point here. Furthermore, posting then re-posting messages may not affect performance too much, but it almost certainly would increase code complexity. 

Secondly, though it sounds simple, creating such a CCR service gets tricky when you really sit down to create it. Creating a CCR service that is general enough to be reused in an arbitrary number of situations without causing memory leaks or surprises is not impossible but is not as simple as you may initially think.

The approach I’d like to take is a more CCR-centric approach. What I’d like to do is to extend the CCR itself by creating a custom Dispatcher that orders task for execution using a stack instead of a Queue. I’ll have to spend some time digging through the CCR docs and maybe a little quality time with Ildasm or reflector to see how realistic this goal really is.

though I’d really like to create a custom DispatcherStack, I am not sure it is going to be possible. Maybe there’s a different approach out there that someone knows about to help me? Maybe there’s s simple way to do it with ports that I don’t see? Basically, I think the need and utility for FILO task processing in the CCR is tremendous, I just don’t know how to achieve it yet. I’ll keep you all up-to-date with any progress I make.