跳到导航
BEA Dev2Dev Oracle and BEA
首页 资源中心 dev2dev学堂 在线技术论坛 User Group CodeShare
dev2dev 首页 > 资源中心 > 技术文章
基于ESB的MQ连接方案

时间:2007-12-25
作者:周警伟
浏览次数:
本文关键字:AquaLogic Service Bus ESB MQ
文章工具
推荐给朋友 推荐给朋友
打印文章 打印文章

前言

  最近碰到很多这样的案例,即需要实现系统和MQ的互联。以前已经有一些文章论述了如何使用WebLogic Server的JMS Bridge来连接MQ,也有使用WebLogic Server内置的Foreign JMS Server来实现的,但是这需要编写很多代码如MDB来侦听消息,JMS客户端程序来发送消息等。其实BEA的SOA基础架构软件企业服务总线AquaLogic ESB能方便实现这样的功能,且无需编程。利用ESB还有一个好处是可以直接使用XML接口,而前者得直接编程面对消息解析。

  文章还对如何实现传递中文XML做了详细的解释,同时也加入了如何利用XQuery技术快速实现数据格式的转换。

基于ESB的MQ连接方案

概述

  BEA ESB 2.6RP1加入了Native MQ Transport的新特性,使得BEA ESB可以直接与IBM MQ进行消息交换。可以通过Business 服务向MQ发送消息,支持单向发送和发送等待返回两种模式;可以通过Proxy服务从MQ取消息,支持单向取消息和取消息消费以后再放入MQ返回队列两种模式。

  使用ESB Native MQ Transport有几个好处:

  1. 可以读取和产生MQ Message,如果直接使用MQ JMS接口无法直接设置某些header属性
  2. 支持发送和收取MQ receipt message
  3. 内置在ESB,无需额外配置如JMS Bridge或Foreign JMS Server等。

  关键特性:

  1. 支持Inbound和Outbound连接
  2. 支持IBM MQ5.3和6.0
  3. 可以处理所有MQMD(MQ message descriptor)头信息
  4. 支持Biding模式(ESB与MQ装在相同机器上)和TCP模式(ESB可与MQ装在不同机器上)
  5. 支持单向和双向SSL(用于TCP模式)

  Native MQ Transport目前支持消息服务(Message service)和任意XML服务(Any xml serivce),根据不同业务场景可以选择合适的服务模式。任意XML服务支持将XML作为服务的调用和返回接口,对于传递数据非常方便,下面的例子即是讲解如何使用任意XML服务把IBM MQ集成到BEA AquaLogic ESB上。

场景描述

  基于ESB的MQ连接方案

  1. 代理服务Send2MQ_withResponse发起请求,将xml request消息路由到业务服务Send2MQ_withReponse_Business
  2. Business服务Send2MQ_withReponse_Business使用Native MQ Transport将消息路由到MQ接收队列esb.mqReceiveQueue,随后即刻侦听esb.mqSendQueue队列。
  3. 代理服务MQMessageResponder,从esb.mqReceiveQueue取消息,进行业务处理和数据格式转换,将结果xml发送到esb.mqSendQueue
  4. 业务服务Send2MQ_withReponse_Business侦听的esb.mqSendQueue队列有了先前消息的返回结果,该服务接收到返回消息
  5. 代理服务Send2MQ_withResponse得到经过处理的返回结果

具体步骤

1、设置 MQ Server

  (1)创建队列管理器QM_YourIP(如QM_jizhou01),创建Listner监听1414端口, 创建服务器连接通道(如BRIDGE.CHANNEL传输协议TCP)

  (2)创建本地队列esb.mqReceiveQueue, esb.mqSendQueue, 选择“持久”。如图:

  基于ESB的MQ连接方案

  (3)编辑JMSAdmin.config,如下:

#INITIAL_CONTEXT_FACTORY=com.sun.jndi.ldap.LdapCtxFactory
INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactory
#INITIAL_CONTEXT_FACTORY=com.ibm.ejs.ns.jndi.CNInitialContextFactory
#INITIAL_CONTEXT_FACTORY=com.ibm.websphere.naming.WsnInitialContextFactory

#PROVIDER_URL=ldap://polaris/o=ibm,c=us
PROVIDER_URL=file:/C:/bea/IBM/JNDI
#PROVIDER_URL=iiop://localhost/

SECURITY_AUTHENTICATION=none

  运行JMSAdmin.bat

创建binding的JNDI定义:

    DEFINE XAQCF(esb.mqQCFXA) 
DEFINE Q(esb.mqSendQueue) QUEUE(esb.mqSendQueue) QMGR(QM_jizhou01)
DEFINE Q(esb.mqReceiveQueue) QUEUE(esb. mqReceiveQueue) QMGR(QM_jizhou01)
DIS CTX
END …

2、设置 BEA AquaLogic ESB:

  (1) 登录ESB 控制台hhttp://localhost:7021/sbconsole

  创建Project和文件夹,如图:

  基于ESB的MQ连接方案

  (2) 在resources文件夹下创建 MQ Connection

  基于ESB的MQ连接方案

  (注意,需要先取得编辑锁,左上角点击Edit)

  然后定义MQ Connection:名称为MQResource, 如图:

  基于ESB的MQ连接方案

  我们看到,这里没有更多的属性设置,所以都是使用MQ默认设置的,如CCSID为1381。

  (3) 在Business Services目录下创建Send2MQ_withResponse_Business,

  选择Any XML Service, 选择mq作为连接协议,如图:

  基于ESB的MQ连接方案

  End Point URL为

  mq://esb.mqReceiveQueue?conn=MQTransport/resources/MQResource

  具体配置如图:

  基于ESB的MQ连接方案

  注意返回选择Bytes作为消息类型,MQCorrelationID为返回模式,返回等待的结果队列为esb.mqSendQueue。

  (4) 在Proxy Services文件夹下创建Proxy Servcie,名称为:Send2MQwithResponseProxy,创建该代理的目的是为封装发送消息的业务服务,这样本来是直接调用MQ协议的服务变为基于http的服务。具体配置如图:

  基于ESB的MQ连接方案

  为Proxy Service 添加Route节点:

  基于ESB的MQ连接方案

  编辑该节点,添加Action

  基于ESB的MQ连接方案

  路由服务选择刚才创建的Business服务Send2MQ_withResponse_Business,

  基于ESB的MQ连接方案

  到这里,本来应该可以了;但是我们在测试中可以发现,在传输中文XML时发现到MQ队列里变成了乱码。经过分析,原来MQ服务器端的字符集设置一般为默认设置1381,而使用同样的设置发送中文就出现乱码了。UTF-8在MQ的字符集中的编号为1208,因此我们在把中文XML消息发送到MQ队列之前,要进行CCSID设置。方法就是在路由节点的Request动作里,即发送前设置。

  基于ESB的MQ连接方案

  然后选择Outbound Request, 记住调用Business服务是outbound对象。

  设置header信息,如图:

  基于ESB的MQ连接方案

  我们可以看到BEA ESB在这方面是非常强大的,可以设置每一个header信息而无需编码。

  测试:

  先确认能否发送XML消息到MQ队列。点击测试Proxy服务的按钮,使用测试消息:

      <?xml version="1.0"  encoding="UTF-8"?>
<ns0:Request xmlns:ns0="hhttp://www.example.org/wlsRequestSchema">
<Item>
<companyID>001</companyID>
<companyName>公司</companyName>
<EmployeeCount>10</EmployeeCount>
<Total>199</Total>
</Item>
</ns0:Request>

  基于ESB的MQ连接方案

  点击执行,该测试程序会一直等待结果(因为你已经设置等待返回结果了)。我们转到MQ服务器端,看看消息是否已经到达。

  基于ESB的MQ连接方案

  可以看到,消息队列esb.mqReceiveQueue已经有了消息,且消息体正确反应出了中文信息。

  至此,从ESB发送消息到MQ已经完成。下面,我们要设计如何从MQ队列中取消息,并如何消费消息。

  (5) 在Proxy Services文件夹下创建Proxy 服务MQMessageResponder,创建该服务的目的是从MQ队列中取消息,然后进行处理,再把经过处理的消息放回指定队列中。

  创建Any XML Service,mq为传输协议,具体如图:

  基于ESB的MQ连接方案

  此时,该服务只是从esb.mqReceiveQueue中取出消息,没有做如何处理又放入了esb.mqSendQueue队列。

  

  我们可以做一个简单的格式转换处理,即将源xml映射到结果xml中,并增添一个字段MQProcessResult,值为“MQ处理完毕”

  基于ESB的MQ连接方案

  实际上,这个代理服务是模拟了在MQ服务器端对消息的处理,也可以理解为BEA ESB可以直接消费MQ队列的消息。

  用BEA提供的Workshp工具,新建一个XQueryProject

  基于ESB的MQ连接方案

  创建两个xml schema。一个是源schema:

  <?xml version="1.0" encoding="UTF-8"?> 
<xs:schema  targetNamespace="hhttp://www.example.org/wlsRequestSchema" xmlns:xs="hhttp://www.w3.org/2001/XMLSchema">
<xs:element name="Request">
<xs:complexType >
<xs:sequence>
<xs:element name="Item" minOccurs="0" maxOccurs="unbounded">
<xs:complexType >
<xs:sequence>
<xs:element name="companyID" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="companyName" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="EmployeeCount" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="Total" type="xs:string" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element> 
</xs:schema>

  另外是目标schema:

  <?xml version="1.0" encoding="UTF-8"?> 
<xs:schema  targetNamespace="hhttp://www.example.org/wlsResponseSchema" xmlns:xs="hhttp://www.w3.org/2001/XMLSchema">
  <xs:element name="Response">
<xs:complexType >
<xs:sequence>
<xs:element name="Item"  minOccurs="0" maxOccurs="unbounded">
<xs:complexType >
<xs:sequence>
<xs:element name="companyID" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="companyName" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="EmployeeCount" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="Total" type="xs:string" minOccurs="0" maxOccurs="1"/>
<xs:element name="MQProcessResult" type="xs:string" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element> 
</xs:schema>

  再新建一个XQuery转换myXQuery.xq,具体如图

  基于ESB的MQ连接方案

  设置MQProcessResult为字符串常量:“MQ处理完毕”。

  myXQuery.xq的源码为:

declare namespace xf = "hhttp://tempuri.org/MyXQueryProject/xquery/myXQuery/"; 
declare namespace ns-1 = "hhttp://www.example.org/wlsRequestSchema";
declare namespace ns-2 = "hhttp://www.example.org/wlsResponseSchema";
declare function xf:myXQuery($request1 as element(ns-1:Request))
as element(ns-2:Response) {
<ns-2:Response>
{
for $Item in $request1/Item
return
<Item>
<companyID>{ data($Item/companyID) }</companyID>
<companyName>{ data($Item/companyName) }</companyName>
<EmployeeCount>{ data($Item/EmployeeCount) }</EmployeeCount>
{
for $Total in $Item/Total
return
<Total>{ data($Total) }</Total>
}
<MQProcessResult>MQ处理完毕</MQProcessResult>
</Item>
}
</ns-2:Response>
};
declare variable $request1 as element(ns-1:Request) external;
xf:myXQuery($request1)

  在ESB控制台中,引入这两个schema和xquery。如图导入到文件夹 Transformation ,如图:

  基于ESB的MQ连接方案

  现在我们可以在MQMessageResponder中进行数据格式转换了。

  如图,添加pipeline和请求节点

  基于ESB的MQ连接方案

  编辑state1,将$body/wls:Request内容赋值到变量request中,然后对request为参数调用myXQuery, 将返回的转换结果赋值到response变量中,再用response的内容代替$body/wls:Request。这样即起到了转换效果。如图:

  基于ESB的MQ连接方案

  其中第二个Assign 的XQuery调用的参数设置为:

  基于ESB的MQ连接方案

  测试:

  直接测试MQMessageResponder,如果能得到正确结果,则再测试Send2MQwithResponseProxy。

  基于ESB的MQ连接方案

总结

  到这里,我们已经验证BEA ESB在处理与MQ的交互时是非常方便好用的,无需任何编码,且能方便设置消息的各种属性。BEA ESB的特性其实还远不止如此,一但服务封装完毕,就可以很多其他的传输接口来封装,比如我们可以设置File为传输协议的Proxy服务,然后在其内部配置路由,讲消息转发到往MQ发送消息的Business服务,这样我们就可以很容易地定制出这样的逻辑,即侦听某个文件夹,只要符合格式的xml文件到达,即可自动调用发送逻辑,将消息处理后发送到MQ,同时可以指定返回队列并异步侦听。一但MQ服务器端消息处理完毕,将消息转到指定队列后,该侦听逻辑即可获得处理过的消息。

 作者简介
周警伟是(dev2dev ID: zhoujw) BEA系统(中国)有限公司 渠道部技术顾问
dot dot dot

dot
  作者其它文章
您对本文的评价
您对这篇文章的看法如何?
太棒了!5分 不错啊 4分 一般般 3分 有待提高 2分 不好 1分

   
相关技术