博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Azure Messaging-ServiceBus Messaging消息队列技术系列6-消息回执
阅读量:6904 次
发布时间:2019-06-27

本文共 5098 字,大约阅读时间需要 16 分钟。

上篇博文中我们介绍了Azure Messaging的重复消息机制、At most once 和At least once.

本文中我们主要研究并介绍Azure Messaging的消息回执机制:实际应用场景:

同步收发场景下,消息生产者和消费者双向应答模式,例如:张三写封信送到邮局中转站,然后李四从中转站获得信,然后在写一份回执信,放到中转站,然后张三去取,当然张三写信的时候就得写明回信地址。还

有,生成订单编号场景,发送一个生成订单编号的消息,消息消费者接收生成订单编号的消息,并通过消息回执返回。

Azure Messaging的消息回执机制主要通过:基于带会话的Queue/Topic、SessionId、ReplyTo属性来实现

在代码实现中,我们需要:

1. 两个工作线程,一个线程用于消息发送和接收回执消息,一个线程用于消息接收和发送消息回执。

2. 一个会话标识:ReceiptSession  

3. 两个队列Queue:RequestQueue:发送消息、接收消息,ResponseQueue:发送回执消息,接收回执消息。

直接Show Code:

首先,我们在ServiceBusMQManager增加一个线程安全的创建带回话的QueueClient方法:

private static object syncObj = new object();        ///         /// 获取要求会话带Session的QueueClient        ///         /// 队列名称        /// 
QueueClient
public QueueClient GetSessionQueueClient(string queueName) { var namespaceClient = NamespaceManager.Create(); if (!namespaceClient.QueueExists(queueName)) { lock (syncObj) { if (!namespaceClient.QueueExists(queueName)) { var queue = new QueueDescription(queueName) { RequiresSession = true }; namespaceClient.CreateQueue(queue); } } } return QueueClient.Create(queueName, ReceiveMode.ReceiveAndDelete); }

然后我们定义一些常量:

private static readonly string ReplyToSessionId = "ReceiptSession";        const double ResponseMessageTimeout = 20.0;        private static readonly string requestQueueName = "RequestQueue";        private static readonly string responseQueueName = "ResponseQueue";

实现发送并接收回执消息的方法:

///         /// 发送并接收回执消息        ///         ///         public static void SendMessage()        {            var manager = new ServiceBusUtils();            var responseClient = manager.GetSessionQueueClient(responseQueueName);            var requestClient = manager.GetSessionQueueClient(requestQueueName);            var messsageReceiver = responseClient.AcceptMessageSession(ReplyToSessionId);            var order = CreateSalesOrder(1);            //发送消息            var message = new BrokeredMessage(order);            message.Properties.Add("Type", order.GetType().ToString());            message.SessionId = ReplyToSessionId;            message.MessageId = "OrderMessage001";            message.ReplyTo = responseQueueName;            requestClient.Send(message);            Console.WriteLine("Send message: " + message.MessageId + ", SalesOrder ID: " + order.OrderID);            //接收消息回执            var receivedMessage = messsageReceiver.Receive(TimeSpan.FromSeconds(ResponseMessageTimeout * 2));            var receivedOrder = receivedMessage.GetBody
(); Console.WriteLine("Receive receipt message: " + receivedMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID); messsageReceiver.Close(); }

实现接收消息并发送回执方法:

1         ///  2         /// 接收消息并回执 3         ///  4         public static void ReceiveMessage() 5         { 6             var manager = new ServiceBusUtils(); 7  8             var requestClient = manager.GetSessionQueueClient(requestQueueName); 9             var session = requestClient.AcceptMessageSession();10             var requestMessage = session.Receive();11            12             if (requestMessage != null)13             {14                 var receivedOrder = requestMessage.GetBody
();15 Console.WriteLine("Receive message: " + requestMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);16 17 var responseMessage = new BrokeredMessage(receivedOrder);18 responseMessage.Properties.Add("Type", receivedOrder.GetType().ToString());19 responseMessage.ReplyToSessionId = ReplyToSessionId;20 responseMessage.MessageId = "ResponseOrderMessage001";21 responseMessage.SessionId = requestMessage.SessionId;22 23 //发送回执消息24 var responseClient = manager.GetSessionQueueClient(requestMessage.ReplyTo);25 responseClient.Send(responseMessage);26 Console.WriteLine("Send receipt message: " + responseMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID); 27 }28 }

Main方法中,启动两个工作线程:一个线程用于消息发送和接收回执消息,一个线程用于消息接收和发送消息回执。

因为涉及到Azure Messaging中队列的第一次创建,Azure Messaging是不支持多个请求同时创建同一个队列的,因此,我们两个线程间做一个简单的Task.Delay(3000).Wait();

1         static void Main(string[] args) 2         { 3             var sendTask = Task.Factory.StartNew(() => { SendMessage(); }); 4             Task.Delay(3000).Wait(); 5             var receiveTask = Task.Factory.StartNew(() => { ReceiveMessage(); }); 6  7             Task.WaitAll(sendTask, receiveTask); 8  9             Console.ReadKey();           10         }

我们看看程序输出:

Azure 服务总线中的队列:

可以看出:Azure Messaging-ServiceBus Messaging 基于带会话的Queue/Topic、SessionId、ReplyTo属性来实现消息回执机制。

 

周国庆

2017/3/23

 

posted on
2017-05-12 10:53 阅读(
...) 评论(
...)

转载于:https://www.cnblogs.com/yezuhui/p/6844497.html

你可能感兴趣的文章
Java的JNDI使用
查看>>
Android-布局管理-帧布局
查看>>
基于JAVA的反射机制
查看>>
Nginx 四种分配方式——session处理
查看>>
webdav java libraay
查看>>
冒泡排序
查看>>
win8.1 cygwin编译java轻量虚拟机avian
查看>>
PHP入门
查看>>
phpexcel导入导出excel文件常用操作
查看>>
利用HttpClient 4.1 下载文件
查看>>
LNMP环境搭建-php
查看>>
Hadoop云计算的初步认识
查看>>
windows下创建控制台窗口
查看>>
JVM配置参数
查看>>
jBPM5与Activiti5比较
查看>>
iOS App 的逆向
查看>>
Spring如何扫描class和配置文件
查看>>
Java压缩技术(一) ZLib
查看>>
【VMware虚拟化解决方案】VMware Horizon View Client 各平台配置文档
查看>>
java线程池
查看>>