True CCR Concurrency - Using CCR without Iterators

5 minutes read

Originally posted on: http://geekswithblogs.net/jolson/archive/2008/06/21/true-ccr-concurrency---using-ccr-without-iterators.aspx

As developers get up to speed with the CCR, they quickly come across the (ab)use of iterators to achieve "concurrency." Okay, "(ab)use" may have been a bit of a low-blow. I call it an abuse as it is a totally unorthodox use of what iterators are usually used for, and I've came across my fair share of developers who find this unorthodox usage highly confusing. Not only is it confusing, but it's not what I would consider true concurrent code.

In a concurrent environment, if I create twenty different tasks that need to be executed, I expect several of those tasks to be executed at the exact same time (depending on the number of processors I have on my system). As a side effect of this concurrent execution of tasks, the order in which those tasks are executed is not guaranteed. If you don't have several tasks executing concurrently, and order of processing is indeed guaranteed, that's about as close you can get to being normal sequential code.

Look at this sample (example 18) from MSDN. If you execute and step through this code, you will find that the method actually executes sequentially. The code posts a number, processes the number, posts another number, processes that number, etc. This is highly sequential. One could argue this is simply not the best sample for CCR iterators. However, after using CCR when developing robots using Robotics Studio, you will find that this "sequential but not" pattern is a very frequent pattern that is used when using CCR iterators.

There's also the impact of having a method that essentially never exits due to the "while (true)" nature of the iterator-based method that is determining what tasks need to execute (leading to a highly coupled, monolithic application). This doesn't mean that the CCR itself isn't useful though. You just need to know how to leverage the framework while avoiding some of these "rough spots".

So let's take a look at an example of how to use CCR to achieve "true" concurrency with the CCR without the use of iterators (for you VB fans, the great part of this approach is that it is entirely possible with VB like I show here).

Dim myPort As New Port(Of Integer)

At the heart of the message-based infrastructure in the CCR is a Port. A Port is where messages are posted to, and pulled from when tasks are actually executed. As seen above, Ports are strongly typed through the use of generics, and so they support posting/pulling of largely any POCO (Plain Ole CLR Object) you wish. In the case of this very simple demo we just use integers, but in a real system this might be self-defined types like StockQuoteRequestMessage, FoodOrderMessage, etc.

    Sub Main()
        Dim taskDispatcher As New Dispatcher(4, "Sample Dispatcher")
        Dim taskQueue As New DispatcherQueue("Sample Tasks", taskDispatcher)

In the CCR, a DispatcherQueue contains a list of tasks to be executed, while a Dispatcher controls the execution of those tasks. Here we force our dispatcher to run four tasks concurrently even if we are on a single processor machine just so we can see the concurrency in action. Then we simply pass our dispatcher to the queue that contains the tasks that will be managed.

        ' Create the task that will handle our work
        Dim workReceiver = Arbiter.Receive(Of Integer)(True, _
                                myPort, _
                                Function(i) ProcessValue(i))

        ' Register the task with the CCR task queue
        Arbiter.Activate(taskQueue, workReceiver)

        ' Post some work to do
        For i = 0 To 9
            myPort.Post(i)
        Next

When we create our workReceiver, we are essentially telling the CCR that we want a persistent task (one that will last across the processing lifetime of many messages, possibly until the application exits), and that when a message comes in on our Port, we will process that message with our function ProcessValue (defined below). Once the receiver is created, we simply hook it up to the task queue, and we're off to the races.

Finally, all we need to do is wait until all values have been processed and exit the application.

        ' Wait until all tasks are finished
        While myPort.ItemCount > 0
            Thread.Sleep(TimeSpan.FromMilliseconds(200))
        End While

        taskQueue.Dispose()
        taskDispatcher.Dispose()

        Console.Write("Press any key to exit...")
        Console.ReadKey(True)
    End Sub

So when a message is posted to our port, CCR will call our function ProcessValue in order to process the message (as specified when we created the task to process messages). In our process function, we are simply going to write to the console at the beginning and at the end as well as sleeping our thread in the middle to simulate some work being done in the function.

    Function ProcessValue(ByVal value As Integer)

        Console.WriteLine("Begin " + value.ToString())
        Thread.Sleep(TimeSpan.FromMilliseconds(200))
        Console.WriteLine("End " + value.ToString())

        Return Nothing
    End Function

When I actually execute this sample, you can see the parallel nature of the execution by viewing the output (your output may differ as it can differ upon every execution thanks to the non-deterministic nature of concurrency):

Begin 0
Begin 1
Begin 3
Begin 2
End 2
Begin 4
End 1
Begin 5
End 0
Begin 6
End 3
Begin 7
End 4
Begin 8
End 5
End 6
Begin 9
End 7
End 8
End 9
Press any key to exit...

 

And that's all we need to do in order to use CCR to process incoming messages in parallel (and in a "true" concurrent fashion). Just as a recap, here's the entire sample in one place:

    Dim myPort As New Port(Of Integer)

    Sub Main()
        Dim taskDispatcher As New Dispatcher(4, "Sample Dispatcher")
        Dim taskQueue As New DispatcherQueue("Sample Tasks", taskDispatcher)

        ' Create the task that will handle our work
        Dim workReceiver = Arbiter.Receive(Of Integer)(True, _
                                myPort, _
                                Function(i) ProcessValue(i))

        ' Register the task with the CCR task queue
        Arbiter.Activate(taskQueue, workReceiver)

        ' Post some work to do
        For i = 0 To 9
            myPort.Post(i)
        Next

        ' Wait until all tasks are finished
        While myPort.ItemCount > 0
            Thread.Sleep(TimeSpan.FromMilliseconds(200))
        End While

        taskQueue.Dispose()
        taskDispatcher.Dispose()

        Console.Write("Press any key to exit...")
        Console.ReadKey(True)
    End Sub

    Function ProcessValue(ByVal value As Integer)

        Console.WriteLine("Begin " + value.ToString())
        Thread.Sleep(TimeSpan.FromMilliseconds(200))
        Console.WriteLine("End " + value.ToString())

        Return Nothing
    End Function

As a "hello world" sample for using the CCR, this is a very simple demo. But it shows that it is very easy to actually achieve concurrent applications with the CCR without having to use CCR iterators. At the same time, avoiding the use of CCR iterators enable us to write a message-oriented application that can be properly modularized by "separating concerns" into independent message processors through the use of Ports.

Of course, an added benefit of taking a "non-iterator" approach when using the CCR is that languages that don't have direct support for iterators (like VB.NET) can also develop CCR-based applications to achieve highly-parallel solutions.

Updated:

Leave a Comment