二、MSMQ在开塬项目PetShop中的应用分析。 在PetShop 4.0中,利用消息队列临时存放要插入的数据,来避免因为频繁访问数据库的操作。而队列中的消息,则等待系统的专用的应用程序来处理,最后将数据插入到数据库中。
PetShop 4.0中的消息处理,主要分为下面几大部分:订单策略接口IOrderStategy、消息接口IMessageing、消息工厂MessageFactory、MSMQ实现MSMQMessaging、后台处理应用程序OrderProessor。如下图:

附件:
您所在的用户组无法下载或查看附件 1、订单策略接口IOrderStategy
PetShop 4.0的体系结构是非常庞大,在订单处理上有两种处理策略,这里也是策略模式的一个应用,IOrderStrategy接口作为订单策略的高层抽象,实现不同订单处理的具体策略去实现它,UML如下:

附件:
您所在的用户组无法下载或查看附件 示意性代码:

Code
1namespace PetShop.IBLLStrategy
2{
3 public interface IOrderStrategy
4 {
5 void Insert(OrderInfo order);
6 }
7}
8
9namespace PetShop.BLL
10{
11 public class OrderSynchronous:IOrderStrategy
12 {
13 private static readonly IOrder asynchOrder = QueueAccess.CreateOrder();
14
15 public void Insert(OrderInfo order)
16 {
17 asynchOrder.Send(order);
18 }
19 }
20}
21
22//
从上面UML和代码就可以看出,订单策略接口下有两种实现,使用了抽象工厂模式来完成相应的订单策略对象的创建 。关于这点在后面消息工厂部分去介绍,这里不作讲解。
2、消息接口IMessageing
在PetShop 4.0中,由于对订单处理使用了异步处理方式,在消息接口中仅定义了一个IOrder接口。IOrder接口的定义与MSMQ的实现是一致的,需要提供发送和接收操作。在Send方法中,参数为数据访问层的数据实体对象(OrderInfo),具体的实现则是用MSMQ的实现类(PetShop.MSMQMessaging.Order)去完成的。

附件:
您所在的用户组无法下载或查看附件 
附件:
您所在的用户组无法下载或查看附件 MS的开发人员真的是什么都能想到,在消息接口的实现上考虑得很全面,为了避免将来的扩展会有其他的数据对象也使用到MSMQ;因此,在PetShop 4.0中的消息接口实现中,定义了一个队列的基类(PetShopQueue),实现了消息的发送(Send)和接收(Receive)方法的基本操作。代码如下:

Code
1namespace PetShop.MSMQMessaging
2{
3 public class PetShopQueue:IDisposable
4 {
5 //指定消息队列事务的类型
6 protected MessageQueueTransactionType transactionType = MessageQueueTransactionType.Automatic;
7 protected MessageQueue queue; //消息队列
8 protected TimeSpan timeout; //时间间隔
9
10 public PetShopQueue(string queuePath, int timeoutSeconds)
11 {
12 queue = new MessageQueue(queuePath); //根据传入quueuPath创建队列
13 timeout = TimeSpan.FromSeconds(Convert.ToDouble(timeoutSeconds));
14
15 queue.DefaultPropertiesToSend.AttachSenderId = false;
16 queue.DefaultPropertiesToSend.UseAuthentication = false;
17 queue.DefaultPropertiesToSend.UseEncryption = false;
18 queue.DefaultPropertiesToSend.AcknowledgeType = AcknowledgeTypes.None;
19 queue.DefaultPropertiesToSend.UseJournalQueue = false;
20 }
21
22 /**//// <summary>
23 /// 接收消息方法
24 /// </summary>
25 public virtual object Receive()
26 {
27 try
28 {
29 using (Message message = queue.Receive(timeout, transactionType))
30 return message;
31 }
32 catch (MessageQueueException mqex)
33 {
34 if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
35 throw new TimeoutException();
36 throw;
37 }
38 }
39
40 /**//// <summary>
41 /// 发送消息
42 /// </summary>
43 public virtual void Send(object msg)
44 {
45 queue.Send(msg, transactionType);
46 }
47
48 IDisposable 成员#region IDisposable 成员
49 public void Dispose()
50 {
51 queue.Dispose(); //解放资源
52 }
53 #endregion
54 }
55}
MSMQ队列是一个可持久的队列,不会因用户不间断的下订单导致数据丢失。queue作为存放数据的队列,为消息队列(MessageQueue)类型,同时还为PetShopQueue设置了timeout值,后台处理应用程序(OrderProessor)会根据timeout的值定期扫描队列中的订单数量。
3、消息工厂MessageFactory
可能是考虑到IOrder的实现会改变不同的策略吧,在PetShop里利用了抽象工厂模式,将IOrder对象的创建用了专门的工厂模块(MessageFactory)进行封装,定义如下:

Code
1namespace PetShop.MessagingFactory
2{
3 /**//// <summary>
4 /// This class is implemented following the Abstract Factory pattern to create the Order
5 /// Messaging implementation specified from the configuration file
6 /// </summary>
7 public sealed class QueueAccess
8 {
9 //<add key="OrderMessaging" value="PetShop.MSMQMessaging"/>
10 private static readonly string path = "PetShop.MSMQMessaging";
11
12 /**//// <summary>
13 /// 私有构造器,防止使用new创建对象实例
14 /// </summary>
15 private QueueAccess()
16 { }
17
18 public static IOrder CreateOrder()
19 {
20 string className = path + ".Order";
21 return (IOrder)Assembly.Load(path).CreateInstance(className);
22 }
23 }
24}
在QueueAccess类中,通过CreateOrder方法利用反射技术创建正确IOrder类型对象(实际也就是创建了一个接口的具体实现类的对象,应用了多态的原理和反射技术)。UML图下:

附件:
您所在的用户组无法下载或查看附件 在PetShop4.0中,消息接口的具体实现是通过配置文件定义在web.config里:

Code
<add key="OrderMessaging" value="PetShop.MSMQMessaging"/>
这里我为了能够更直观的演示和介绍就把path固化定义了,如下:

Code
private static readonly string path = "PetShop.MSMQMessaging";
这里利用工厂模式来负责对象的创建,主要是方便业务逻辑层对定单处理策略的调用,如在PetShop.BLL模块中的OrderSynchronous类:

Code
1namespace PetShop.BLL
2{
3 public class OrderSynchronous:IOrderStrategy
4 {
5 private static readonly IOrder asynchOrder = QueueAccess.CreateOrder();
6
7 public void Insert(OrderInfo order)
8 {
9 asynchOrder.Send(order);
10 }
11 }
12}
这样一但IOrder接口的实现发生了变化,此时就只需要修改配置文件就OK,整个系统就显得很灵活,稳定。
4、MSMQ实现MSMQMessaging
在PetShop.MSMQMessaging模块中,订单对象实现了消息接口(IMessaging)模块中的IOrder,同时还继承了基类PetShopQueue。定义如下:

Code
1namespace PetShop.MSMQMessaging
2{
3 public class Order:PetShopQueue,IOrder
4 {
5 private static readonly string queuePath = ConfigurationManager.AppSettings["OrderQueuePath"];
6 private static int queueTimeout = 20; //20秒为超时
7
8 public Order()
9 : base(queuePath, queueTimeout)
10 {
11 queue.Formatter = new BinaryMessageFormatter();
12 Console.WriteLine(queuePath);
13 }
14
15 public new OrderInfo Receive()
16 {
17 //该方法会应用在分布式事务中,故而设置为Automatic的事务类型。
18 base.transactionType = MessageQueueTransactionType.Automatic;
19 return (OrderInfo)((Message)base.Receive()).Body;
20 }
21
22 public OrderInfo Receive(int timeout)
23 {
24 base.timeout = TimeSpan.FromSeconds(Convert.ToDouble(timeout));
25 return Receive();
26 }
27
28 /**//// <summary>
29 /// 异步发送定单到消息队列
30 /// </summary>
31 /// <param name="orderMessage"></param>
32 public void Send(OrderInfo orderMessage)
33 {
34 //该方法不会用于分布式事务中,故而设置为Single的事务类型。
35 base.transactionType = MessageQueueTransactionType.Single;
36 base.Send(orderMessage);
37 }
38 }
39}
UML草图:

附件:
您所在的用户组无法下载或查看附件 这里需要注意的是,Order类既继承了基类PetShopQueue,同时还实现了接口IOrder,而在消息接口和基类中所定义的接收消息(Receive)方法在方法的签名上是相同的。所以在Order的Receive方法实现中,必须使用new而非override关键字来重写父类PetShopQueue的Receive虚方法。此时Order类的Receive方法代表两个含义,一是实现了消息接口IOrder中的Receive方法;二则是利用了new关键字重写了父类PetShopQueue的Receive虚方法。
在PetShop4.0中,将面向对象的知识点应用得非常精妙,如上分析,此时我门可以怎么来调用Order呢?是这样吗?

Code
1//1、使用基类PetShopQueue
2PetShopQueue order = new Order();
3order.Receive();
4
5//2、使用消息接口IOrder
6IOrder order = new Order();
7order.Receive();
根据多态原理,上面这两种实现都是正确的,那我们那取谁舍谁呢?在PetShop4.0中正确的调用是第2种方法,这种调用方法也更符合“面向接口设计”的原则。----详细请查看消息工厂MessageFactory部分的介绍。
5、后台处理应用程序OrderProessor
前面一系列的操作,最终都会走向到这里,这里实现了最终的插入数据库的操作。在PetShop 4.0中OrderProessor是一个控制台应用程序,根据需求也可以将其设计为Windows Service。他完成的操作就是接收消息队列里的订单数据,将其插入到数据库。在OrderProessor里使用了多线程技术,监视队列里的订单信息,定期的将其处理。在主方法Main方法中用于控制线程序,核心的执行任何则委托给ProcessOrders方法去实现。

Code
1private static void ProcessOrders()
2{
3 TimeSpan tsTimeout = TimeSpan.FromSeconds(Convert.ToDouble(transactionTimeout * batchSize));
4 Order order = new Order(); //逻辑层的PetShop.BLL.Order
5 while (true)
6 {
7 TimeSpan datatimeStarting = new TimeSpan(DateTime.Now.Ticks);
8 double elapsedTime = 0;
9
10 int processedItems = 0;
11 ArrayList queueOrders = new ArrayList();
12
13 //首先验证事务
14 using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required, tsTimeout))
15 {
16 //从队列中检索订单
17 for (int i = 0; i < batchSize; i++)
18 {
19 try
20 {
21 //在一定时间 类接收队列的订单
22 if ((elapsedTime + queueTimeout + transactionTimeout) < tsTimeout.TotalSeconds)
23 {
24 queueOrders.Add(order.ReceiveFromQueue(queueTimeout));
25 }
26 else
27 {
28 i = batchSize; // 结束循环
29 }
30 elapsedTime = new TimeSpan(DateTime.Now.Ticks).TotalSeconds - datatimeStarting.TotalSeconds;
31 }
32 catch (TimeoutException)
33 {
34 //没有可以等待的消息也结束循环
35 i = batchSize;
36 }
37 }
38
39 //处理队列的订单
40 for (int k = 0; k < queueOrders.Count; k++)
41 {
42 order.Insert((OrderInfo)queueOrders[k]);
43 processedItems++;
44 totalOrdersProcessed++;
45 }
46 //处理完毕或者是超时
47 ts.Complete();
48 }
49 Console.WriteLine("(Thread Id " + Thread.CurrentThread.ManagedThreadId + ") batch finished, " + processedItems + " items, in " + elapsedTime.ToString() + " seconds.");
50 }
51}
ProcessOrders方法首先通过业务逻辑层PetShop.BLL.Order类的方法ReceiveFromQueue去获取消息队列中的订单数据,并将其放入一个ArrayList对象,然后将其插入到数据库。 OrderProcessor的完整代码定义如下: Code

Code
1using System;
2using System.Collections.Generic;
3using System.Text;
4using System.Configuration;
5using System.Threading;
6using PetShop.BLL;
7using System.Collections;
8using System.Transactions;
9using PetShop.Model;
10
11namespace PetShop.OrderProcessor
12{
13 class Program
14 {
15 private static int transactionTimeout = int.Parse(ConfigurationManager.AppSettings["TransactionTimeout"]);
16 private static int queueTimeout = int.Parse(ConfigurationManager.AppSettings["QueueTimeout"]);
17 private static int batchSize = int.Parse(ConfigurationManager.AppSettings["BatchSize"]);
18 private static int threadCount = int.Parse(ConfigurationManager.AppSettings["ThreadCount"]);
19 private static int totalOrdersProcessed = 0;
20
21 static void Main(string[] args)
22 {
23 Thread workTicketThread;
24 Thread[] workerThreads = new Thread[threadCount];
25
26 for (int i = 0; i < threadCount; i++)
27 {
28 workTicketThread = new Thread(new ThreadStart(ProcessOrders));
29 //指示线呈是否为一后台线程
30 workTicketThread.IsBackground = true;
31 workTicketThread.SetApartmentState(ApartmentState.STA);
32
33 workTicketThread.Start();
34 workerThreads = workTicketThread;
35 }
36
37 Console.WriteLine("开始处理,按任意键停止.");
38 Console.ReadLine();
39 Console.WriteLine("正在终止线程,请等待");
40
41 //终止所以线程
42 for (int i = 0; i < workerThreads.Length; i++)
43 {
44 workerThreads.Abort();
45 }
46
47 Console.WriteLine();
48 Console.WriteLine(totalOrdersProcessed + " 张订单已经处理.");
49 Console.WriteLine("已终止处理.按任意键退出");
50 Console.ReadLine();
51 }
52
53 private static void ProcessOrders()
54 {
55 TimeSpan tsTimeout = TimeSpan.FromSeconds(Convert.ToDouble(transactionTimeout * batchSize));
56 Order order = new Order(); //逻辑层的PetShop.BLL.Order
57 while (true)
58 {
59 TimeSpan datatimeStarting = new TimeSpan(DateTime.Now.Ticks);
60 double elapsedTime = 0;
61
62 int processedItems = 0;
63 ArrayList queueOrders = new ArrayList();
64
65 //首先验证事务
66 using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required, tsTimeout))
67 {
68 //从队列中检索订单
69 for (int i = 0; i < batchSize; i++)
70 {
71 try
72 {
73 //在一定时间 类接收队列的订单
74 if ((elapsedTime + queueTimeout + transactionTimeout) < tsTimeout.TotalSeconds)
75 {
76 queueOrders.Add(order.ReceiveFromQueue(queueTimeout));
77 }
78 else
79 {
80 i = batchSize; // 结束循环
81 }
82 elapsedTime = new TimeSpan(DateTime.Now.Ticks).TotalSeconds - datatimeStarting.TotalSeconds;
83 }
84 catch (TimeoutException)
85 {
86 //没有可以等待的消息也结束循环
87 i = batchSize;
88 }
89 }
90
91 //处理队列的订单
92 for (int k = 0; k < queueOrders.Count; k++)
93 {
94 order.Insert((OrderInfo)queueOrders[k]);
95 processedItems++;
96 totalOrdersProcessed++;
97 }
98 //处理完毕或者是超时
99 ts.Complete();
100 }
101 Console.WriteLine("(Thread Id " + Thread.CurrentThread.ManagedThreadId + ") batch finished, " + processedItems + " items, in " + elapsedTime.ToString() + " seconds.");
102 }
103 }
104 }
105}
106
MSMQ技术除了用于异步处理之外,还可作为一种分布式处理技术应用。关于使用MSMQ进行分布式处理相关的内容,本人能力有限,还请大家查看相关的资料进行了解。
本文介绍了MSMQ和多线程以及对PetShop 4.0中对MSMQ的应用进行了分析,结合之前我写的两篇关于MSMQ的相关知识点的介绍文章,对MSMQ算是建立了一个全面的认识,希望这三篇文章对大家学习MSMQ有所帮助。