ARTICLE

Handling the Queuing of Messages in a Multithreaded Program

Posted by Mike Gold Articles | Visual Basic 2010 November 10, 2005
This article gives brief description about how to handle the queue of messages in a multithreaded program.
 
Reader Level:

QueueI1.jpg

Figure 1 - Sending Messages to the Queue

In the financial world you have to deal with messages being spewed at you in large quantities at a rapid rate. For example stock quotes, market data, and orders come flying at you through some sort of wire and you as a programmer have to handle them in a way that doesn't overwhelm you or the machine.  I suppose a real-world analogy of this message traffic might be as if someone came by your cubicle every second and dumped a wastepaper basket on your desk, and then your boss shouts across the room, "Arrange those papers in some way that I can read them!"  In this situation we turn to a method of queuing up the messages and handling them when the program is good and ready.  Also, since message handling is only a small portion of our program requirements, we shove the queue into a separate thread, so it doesn't hog the processing of the rest of the program (such as processing of user input). 

To handle the queuing confusion, I've designed a singleton class called MessageOrderWriter that allows the programmer to write their message to the class and forget about it.  The MessageOrderWriter processes all messages in the queue in a separate thread from the thread asking for the processing.  (Of course, the user should keep in mind that if they pass messages faster than the queue can process them, the queue will grow rather large.  It is problems like these that the financial industry is tackling everyday.)   

Below is the singleton class of the MessageOrderWriter shown in a UML Diagram:

QueueI2.jpg

Figure 2 - UML Diagram of the MessageOrderWriter reverse engineered using WithClass  for .NET

There are three steps to processing the message in the queue 

1.Spawn a thread that sits around in an infinite while loop and waits for the queue to populate
2.Write a Message to the Queue
3.Dequeue the message in the while loop, and process the message

Although this process seems pretty simple upon first glance, it actually becomes a little bit more complicated when you introduce multithreading.  First of all we want a queue that is thread-safe.  For this, we turn to the Synchronized queue.  Microsoft allows you to create a wrapped synchronized queue from your existing queue using the following calls:  
 

Imports System
Imports
System.IO
Imports
System.Threading
Imports
System.Runtime.Remoting.Messaging
Imports System.Runtime.Remoting.Channels

Private
_orderMessageQueue = New Queue
Private
_syncOrderMessageQueue = Queue.Synchronized(_orderMessageQueue)
Dim _streamWriter As
BinaryWriter
Dim _queueWritingThread As
Thread
Dim _waitForObject As
AutoResetEvent

Internally, Microsoft locks the queue when you enqueue and dequeue, however, you can actually do your own manual locking, which we will do in this article. Now that we have a synchronized queue, we need to process it.  First we will spawn the separate thread that will process the queue:  

Public Sub Start()
' Open a file for writing
Dim fs As FileStream = New FileStream("myfile.data", FileMode.Create, FileAccess.Write)
' create a binary file to write the messages to
_streamWriter = New
BinaryWriter(fs)
' start a new thread for writing messages
_queueWritingThread = New Thread(AddressOf
ProcessQueue)
' make it a background thread and start it
_queueWritingThread.IsBackground =
True
_queueWritingThread.Start()
End
Sub

Next we will process the queue by looping and dequeuing the messages as we loop.  When we have exhausted the queue, we will sit and wait until we have more items in the queue.  It is important, that as we are dequeuing the queue, we lock the queue so the queue count doesn't change when we check it.  It's also important that we process the slow operation of writing the message (SerializeMessage) outside the lock, because locking down the file writing would hinder performance.  When we do not have any messages to process in the queue, we simply block the thread until a new message appears in the queue with a ManualResetEvent. 

Private Sub ProcessQueue()
Dim msg As
IMessage
' infinitely loop and dequeue messages
Do While
True
msg =
Nothing
' Lock the queue so that other threads cannot
' enter while we are checking the queue count and
' dequeuing the message
SyncLock
_syncOrderMessageQueue.SyncRoot
' if we have messages in the queue
' lock the thread, and dequeue the next message
If _syncOrderMessageQueue.Count > 0
Then
msg = CType
(_syncOrderMessageQueue.Dequeue(), IMessage)
Else
' Tell the ManualResetEvent to block the thread
' When the WaitOne method is called on the
' ManualResetEvent since there are no messages
' in the queue.
' The thread will become unblocked, when the
' ManualResetEvent is signalled to Set.
_waitForObject.Reset()
End
If
End
SyncLock
If Not msg Is Nothing
Then
' serialize the message to the file outside the lock
'SerializeMessage(msg)
WriteMessage(msg, "Hello")
Else
' queue is empty, so block thread until Set event is called
_waitForObject.WaitOne()
End
If
Loop
' end while
End
Sub

Adding messages to the queue is performed outside the thread since it is not a CPU intensive operation.  We lock when we put messages on the queue so it doesn't interfere with the other queue threads. The message is enqueued and the manual reset event calls Set to release it from its blocking behavior because there is now at least one message on the queue that we can dequeue. 

Public Sub WriteMessage(ByVal msg As IMessage, ByVal subject As String)SyncLock _syncOrderMessageQueue.SyncRoot
_syncOrderMessageQueue.Enqueue(msg)
' We now have a message in the queue.
' signal the manual reset event to unblock the thread
_waitForObject.Set()
End
SyncLock
End Sub

When we are finished using our queue thread, we can call a StopWriting to abort the thread.  Also in this method, we can flush and close the file we are writing.  Of course a better technique would be to wait until the queue is empty and then abort the thread.  However, in the case when it will never be empty (because messages keep coming anyway), and you just want to finish writing, this is an acceptable solution. 

Public Sub StopWriting()
_streamWriter.Close()
Try
' stop the queueing thread
_queueWritingThread.Abort()
Catch ex As
Exception
' thread aborted;
Console.WriteLine("Finished Recording")
End
Try
End
Sub

Conclusion 
 
Although thread management is a complex topic,  threading issues are much alleviated with the help of .NET constructs such as locks, semaphores and mutexes.  Remember to lock collection operations that requiring adding, removing, or checking if the collection is empty to avoid race conditions.  Avoid locks when you can if they surround code that is CPU intensive.  Anyway I hope you manage to thread your way through multiprocess programming and unlock the secrets of coding with C# and .NET.

NOTE: THIS ARTICLE IS CONVERTED FROM C# TO VB.NET USING A CONVERSION TOOL. ORIGINAL ARTICLE CAN BE FOUND ON C# CORNER (WWW.C-SHARPCORNER.COM).

Login to add your contents and source code to this article
share this article :
post comment
 
Become a Sponsor
PREMIUM SPONSORS
  • Finally – a virtual platform that delivers next-generation Windows Server 2008 Hyper-V virtualization technology from a managed hosting partner you can truly depend on. Visit www.maximumasp.com/max for a FREE 30 day trial. Hurry offer ends soon. Climb aboard the MaxV platform and take advantage of High Availability, Intelligent Monitoring, Recurrent Backups, and Scalability – with no hassle or hidden fees. As a managed hosting partner focused solely on Microsoft technologies since 2000, MaximumASP is uniquely qualified to provide the superior support that our business is built on. Unparalleled expertise with Microsoft technologies lead to working directly with Microsoft as first to offer IIS 7 and SQL 2008 betas in a hosted environment; partnering in the Go Live Program for Hyper-V; and product co-launches built on WS 2008 with Hyper-V technology.
    The leading .NET charting control now features PDF, Flash and Silverlight export, visualization of large datasets and more. Deliver true charting functionality to your BI, Scorecard, Presentation or Scientific apps. Download evaluation now.
6 Months Free & No Setup Fees ASP.NET Hosting!
Become a Sponsor