Search This Blog

Saturday, October 23, 2010

Multiple Publishers - Multiple Subscribers Communication

Summary: Simple example showing how to implement the communication scenario where subscribing applications can receive notification messages from more publishing applications.



Introduction
The article is a free continuation of Public-Subscribe Communication where one application (publisher) sends notifications to more applications (subscribers).
Now I would like to describe the scenario where subscribing applications need to receive notifications from more publishing applications.

The source code for the example can be downloaded here.

The main question in this scenario is how to connect publishing applications with subscribing applications. If subscribers are connected directly to individual publishers, then we can get quickly many connections that can be difficult to manage.



To reduce the complexity and number of connections, we can implement one application as Broker that will maintain registered subscribers and forward them messages from publishers. Subscribers are so connected only to the broker and do not have to handle connections with publishers. Publishers send messages only to the broker and do not have to register/unregister subscribers.



The following example shows how to implement this scenario with using Eneter Messaging Framework.
(Full, not limited and for non-commercial usage free version of the framework can be downloaded from www.eneter.net. The online help for developers can be found at http://www.eneter.net/OnlineHelp/EneterMessagingFramework/Index.html)


Broker
The Broker is responsible for receiving notification messages from publishing applications and for forwarding them to all subscribed receivers.
The implementation of broker application is really trivial with Eneter Messaging Framework.
The whole implementation is here:

using System;
using Eneter.Messaging.DataProcessing.Serializing;
using Eneter.Messaging.MessagingSystems.MessagingSystemBase;
using Eneter.Messaging.MessagingSystems.TcpMessagingSystem;
using Eneter.Messaging.Nodes.Broker;

namespace BrokerApplication
{
    class Program
    {
        static void Main(string[] args)
        {
            // Create the broker.
            IDuplexBrokerFactory aBrokerFactory = new DuplexBrokerFactory(new XmlStringSerializer());
            IDuplexBroker aBroker = aBrokerFactory.CreateBroker();

            // Create the Input channel receiving messages via Tcp.
            // Note: You can also choose NamedPipes or Http.
            //       (if you choose http, do not forget to execute it with sufficient user rights)
            IMessagingSystemFactory aMessaging = new TcpMessagingSystemFactory();
            IDuplexInputChannel aBrokerInputChannel = aMessaging.CreateDuplexInputChannel("127.0.0.1:7091");

            // Attach the input channel to the broker and start listening.
            Console.WriteLine("The broker application is running.");
            aBroker.AttachDuplexInputChannel(aBrokerInputChannel);
        }
    }
}



Publisher
The Publisher is responsible for sending messages to the broker application. The broker application then forwards messages to subscribed receivers.
The whole implementation is here:

using System;
using System.Windows.Forms;
using Eneter.Messaging.DataProcessing.Serializing;
using Eneter.Messaging.MessagingSystems.MessagingSystemBase;
using Eneter.Messaging.MessagingSystems.TcpMessagingSystem;
using Eneter.Messaging.Nodes.Broker;

namespace Publisher
{
    public partial class Form1 : Form
    {
        // Notification message 1
        public class NotifyMsg1
        {
            public DateTime CurrentTime { get; set; }
        }

        // Notification message 2
        public class NotifyMsg2
        {
            public int Number { get; set; }
        }

        // Notification message 3
        public class NotifyMsg3
        {
            public string TextMessage { get; set; }
        }

        public Form1()
        {
            InitializeComponent();

            // Create broker client responsible for sending messages to the broker.
            IDuplexBrokerFactory aBrokerFactory = new DuplexBrokerFactory(new XmlStringSerializer());
            myBrokerClient = aBrokerFactory.CreateBrokerClient();

            // Create output channel to send messages via Tcp.
            IMessagingSystemFactory aMessaging = new TcpMessagingSystemFactory();
            myOutputChannel = aMessaging.CreateDuplexOutputChannel("127.0.0.1:7091");

            // Attach the output channel to the broker client to be able to send messages.
            myBrokerClient.AttachDuplexOutputChannel(myOutputChannel);
        }

        // Correctly close the output channel.
        private void Form1_FormClosed(object sender, FormClosedEventArgs e)
        {
            // Note: The duplex output channel can receive response messages too.
            //       Therefore we must close it to stop the thread receiving response messages.
            //       If the thred is not closed then the application could not be correctly closed.
            myBrokerClient.DetachDuplexOutputChannel();
            myOutputChannel.CloseConnection();
        }

        // Send NotifyMsg1
        private void Notify1Btn_Click(object sender, EventArgs e)
        {
            NotifyMsg1 aMsg = new NotifyMsg1();
            aMsg.CurrentTime = DateTime.Now;

            object aSerializedMsg = mySerializer.Serialize<NotifyMsg1>(aMsg);

            myBrokerClient.SendMessage("MyNotifyMsg1", aSerializedMsg);
        }

        // Send NotifyMsg2
        private void Notify2Btn_Click(object sender, EventArgs e)
        {
            NotifyMsg2 aMsg = new NotifyMsg2();
            aMsg.Number = 12345;

            object aSerializedMsg = mySerializer.Serialize<NotifyMsg2>(aMsg);

            myBrokerClient.SendMessage("MyNotifyMsg2", aSerializedMsg);
        }

        // Send NotifyMsg3
        private void Notify3Btn_Click(object sender, EventArgs e)
        {
            NotifyMsg3 aMsg = new NotifyMsg3();
            aMsg.TextMessage = "My notifying text message.";

            object aSerializedMsg = mySerializer.Serialize<NotifyMsg3>(aMsg);

            myBrokerClient.SendMessage("MyNotifyMsg3", aSerializedMsg);
        }


        // Broker client is used to send messages to the broker,
        // that forwards messages to subscribers.
        private IDuplexBrokerClient myBrokerClient;

        // The output channel used by the broker client to send messages to the broker.
        private IDuplexOutputChannel myOutputChannel;

        // Serializer used to serialize notification messages.
        // Note: It is possible to use BinarySerializer too.
        //       In that case the messages would have to be declared in a
        //       separate library that would be linked by subscribers too.
        private XmlStringSerializer mySerializer = new XmlStringSerializer();
    }
}


Subscriber
The Subscriber is responsible for its registering in the broker application to receive desired notification messages.
The whole implementation is here:

using System;
using System.Windows.Forms;
using Eneter.Messaging.DataProcessing.Serializing;
using Eneter.Messaging.MessagingSystems.MessagingSystemBase;
using Eneter.Messaging.MessagingSystems.TcpMessagingSystem;
using Eneter.Messaging.Nodes.Broker;

namespace Subscriber
{
    public partial class Form1 : Form
    {
        // Notification message 1
        public class NotifyMsg1
        {
            public DateTime CurrentTime { get; set; }
        }

        // Notification message 2
        public class NotifyMsg2
        {
            public int Number { get; set; }
        }

        // Notification message 3
        public class NotifyMsg3
        {
            public string TextMessage { get; set; }
        }

        public Form1()
        {
            InitializeComponent();

            // Create the broker client that will receive notification messages.
            IDuplexBrokerFactory aBrokerFactory = new DuplexBrokerFactory(new XmlStringSerializer());
            myBrokerClient = aBrokerFactory.CreateBrokerClient();
            myBrokerClient.BrokerMessageReceived += OnNotificationMessageReceived;

            // Create the Tcp messaging for the communication with the publisher.
            // Note: For the interprocess communication you can use: Tcp, NamedPipes and Http.
            IMessagingSystemFactory aMessagingFactory = new TcpMessagingSystemFactory();

            // Create duplex output channel for the communication with the publisher.
            // Note: The duplex output channel can send requests and receive responses.
            //       In our case, the broker client will send requests to subscribe/unsubscribe
            //       and receive notifications as response messages.
            myOutputChannel = aMessagingFactory.CreateDuplexOutputChannel("127.0.0.1:7091");

            // Attach the output channel to the broker client
            myBrokerClient.AttachDuplexOutputChannel(myOutputChannel);
        }

        // Correctly close the communication.
        // Note: If the communication is not correctly closed, the thread listening to
        //       response messages will not be closed.
        private void Form1_FormClosed(object sender, FormClosedEventArgs e)
        {
            myBrokerClient.DetachDuplexOutputChannel();
            myOutputChannel.CloseConnection();
        }

        // Method processing notification messages from the publisher.
        private void OnNotificationMessageReceived(object sender, BrokerMessageReceivedEventArgs e)
        {
            // The notification event does not come in UI thread.
            // Therefore, if we want to work with UI controls we must execute it in the UI thread.
            InvokeInUIThread(() =>
                {
                    if (e.ReceivingError == null)
                    {
                        if (e.MessageTypeId == "MyNotifyMsg1")
                        {
                            NotifyMsg1 aDeserializedMsg = mySerializer.Deserialize<NotifyMsg1>(e.Message);
                            Received1TextBox.Text = aDeserializedMsg.CurrentTime.ToString();
                        }
                        else if (e.MessageTypeId == "MyNotifyMsg2")
                        {
                            NotifyMsg2 aDeserializedMsg = mySerializer.Deserialize<NotifyMsg2>(e.Message);
                            Received2TextBox.Text = aDeserializedMsg.Number.ToString();
                        }
                        else if (e.MessageTypeId == "MyNotifyMsg3")
                        {
                            NotifyMsg3 aDeserializedMsg = mySerializer.Deserialize<NotifyMsg3>(e.Message);
                            Received3TextBox.Text = aDeserializedMsg.TextMessage;
                        }
                    }
                });
        }

        // Subscribe to notification message 1
        private void Subscribe1Btn_Click(object sender, EventArgs e)
        {
            myBrokerClient.Subscribe("MyNotifyMsg1");
        }

        // Unsubscribe from notification message 1
        private void Unsubscribe1Btn_Click(object sender, EventArgs e)
        {
            Received1TextBox.Text = "";
            myBrokerClient.Unsubscribe("MyNotifyMsg1");
        }

        // Subscribe to notification message 2
        private void Subscribe2Btn_Click(object sender, EventArgs e)
        {
            myBrokerClient.Subscribe("MyNotifyMsg2");
        }

        // Unsubscribe from notification message 2
        private void Unsubscribe2Btn_Click(object sender, EventArgs e)
        {
            Received2TextBox.Text = "";
            myBrokerClient.Unsubscribe("MyNotifyMsg2");
        }

        // Subscribe to notification message 3
        private void Subscribe3Btn_Click(object sender, EventArgs e)
        {
            myBrokerClient.Subscribe("MyNotifyMsg3");
        }

        // Unsubscribe from notification message 3
        private void Unsubscribe3Btn_Click(object sender, EventArgs e)
        {
            Received3TextBox.Text = "";
            myBrokerClient.Unsubscribe("MyNotifyMsg3");
        }

        // Helper method to invoke some functionality in UI thread.
        private void InvokeInUIThread(Action uiMethod)
        {
            // If we are not in the UI thread then we must synchronize via the invoke mechanism.
            if (InvokeRequired)
            {
                Invoke(uiMethod);
            }
            else
            {
                uiMethod();
            }
        }

        // BrokerClient provides the communication with the broker.
        private IDuplexBrokerClient myBrokerClient;

        // The output channel used by the broker client to send messages to the broker.
        private IDuplexOutputChannel myOutputChannel;

        // Serializer used to sdeerialize notification messages.
        // Note: It is possible to use BinarySerializer too.
        //       In that case the messages would have to be declared in a
        //       separate library that would be linked by publisher too.
        private XmlStringSerializer mySerializer = new XmlStringSerializer();
    }
}

And for the completeness, here are publishers and subscribers running together with the broker.



I hope, you found the article useful. If you have any comments or questions, please let me know.

3 comments:

  1. Dear Ondrej,

    Many thanks for the great work. We've been using Eneter 6.5 as the communication platform among our individual projects and it works like a charm. We will purchase Eneter soon.

    There is one small bug I would like to report to you. When a server sends a notification message to multiple clients (using IDuplexBroker.SendMessage method) and for some reason, one of them was abnormally terminated (that is, without unsubscribing itself from the server), the server will also be dead. I think the SendMessage throws an unhandled exception in such a case. Any way to fix this problem?

    Regards,

    Minh To

    ReplyDelete
    Replies
    1. Hi Minh,

      Thank you for your comment. I am really glad you found Eneter useful in your projects.

      The problem you describe happens because the socket was not closed and so Broker thinks the connection still exists. Therefore when it sends the message it waits a default timeout - which is infinite time.
      Therefore to resolve this issue I would recommend to setup the SendTimeout property in TcpMessagingSystemFactory to some acceptable value e.g. 5 seconds. Then in case the client terminates abnormally and the socket is not closed the broker will be blocked only 5 seconds.

      In case it is not acceptable for you then other alternative is to use MonitoredMessaging. Monitored messaging can internally use e.g. TcpMessaging and it performs pinging (in frequency you specify) and so continuously monitoring the connection availability. If the connection is broken the Broker service will get notified and will remove the client.

      Here is the link for details about monitored messaging:
      http://www.eneter.net/OnlineHelp/EneterMessagingFramework/html/T_Eneter_Messaging_MessagingSystems_Composites_MonitoredMessagingComposit_MonitoredMessagingFactory.htm

      Delete
    2. Very quick support. The issue is resolved thanks to your advice.

      Minh

      Delete