配置文件:
pom文件:
<spring-rabbit.version>1.7.2.RELEASE</spring-rabbit.version>
<amqp-client.version>3.6.1</amqp-client.version>
<rabbit.mq.address></rabbit.mq.address>
<rabbit.mq.userName>guest</rabbit.mq.userName>
<rabbit.mq.password>guest</rabbit.mq.password>
Properties文件幻锁,properties引用pom中的配置:
wfpt.properties:
wfpt.rabbitmq.address=${rabbit.mq.address}
wfpt.rabbitmq.userName=${rabbit.mq.userName}
wfpt.rabbitmq.password=${rabbit.mq.password}
wfpt.rabbitmq.virtualHost=${rabbit.mq.virtualHost}
RabbiitMQ使用的模式:
工作流中使用的是Routingkey 模式,但是沒有指定Exchange的類型和名稱辽幌,當(dāng)Exchange沒有指定類型和名稱時(shí)巡社,RabbitMQ將會(huì)為每一個(gè)隊(duì)列設(shè)定一個(gè)Default Exchange,它的Routing key為消息隊(duì)列的名稱奋构。見下圖:
RabbitMQ生產(chǎn)者:ActRabbitMQProducer
RabbitMQ消費(fèi)者:ActRabbitMQConsumer
控制是否推送代辦:(發(fā)起流程時(shí)壳影,是否推送待辦)
IActPushTODOService中的isPushTODO(DelegateTask delegateTask)方法,該方法根據(jù)流程變量中是否存在isPushTODO變量來決定是否推送待辦弥臼。
RabbitMQ與KafKa的切換:(狀態(tài)模式的使用)
1. 控制使用的消息中間件的類型:ActTodoTaskMQManager宴咧,根據(jù)配置文件中wfpt.todoTask.sendType的配置的類型和數(shù)據(jù)庫中act_ex_system表的pushType的值來判斷使用的消息中間件是哪個(gè)。
2. 狀態(tài)接口ActTodoTaskMQState径缅,RabbitMQ和KafKa發(fā)送方法共同實(shí)現(xiàn)該接口掺栅。
3. ActRabbitMQTodoTaskService和ActKafkaTodoTaskService方法時(shí)發(fā)送代辦的實(shí)現(xiàn)類
4. 實(shí)際調(diào)用形式:
1. ActTodoTaskMQManager mqManager = new ActTodoTaskMQManager();
2. ActTodoTaskMQState state = mqManager.getMQState();
3. state.sendTodoMsgAndUpdateTodoTask(distributeTaskDTO,todoTaskList);
系統(tǒng)說明:
(1)pom文件依賴:
<!-- RabbitMQ -->
<spring-rabbit.version>1.7.2.RELEASE</spring-rabbit.version>
<amqp-client.version>3.6.1</amqp-client.version>
<!-- 工作流服務(wù)(開發(fā)環(huán)境) -->
<wfpt.version>0.0.5-SNAPSHOT</wfpt.version>
<!-- RabbitMQ start -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring-rabbit.version}</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${amqp-client.version}</version>
</dependency>
<!-- RabbitMQ end -->
<!-- 工作流服務(wù)wfpt start -->
<dependency>
<groupId>com.yixin</groupId>
<artifactId>wfpt-rpc</artifactId>
<version>${wfpt.version}</version>
</dependency>
<!-- 工作流服務(wù)wfpt end -->
(2)xml配置文件:rpc-impl層箱吕,spring-context.xml
1. <!-- RabbitMQ接收待辦:請(qǐng)注意默認(rèn)是不啟用狀態(tài),否則有可能產(chǎn)生消費(fèi)了別人待辦的消息 -->
<bean id="ActRabbitReceiveTodoTaskMessage"
class="com.yixin.wfpt.api.todotask.rabbitmq.ActRabbitReceiveTodoTaskMessage" init-method="init">
<property name="queueName" value="sample"/>
<property name="initFlag" value="true"/>
<property name="receiveClass" value="com.yixin.demo.todotask.ActExDemoTodoTaskServiceImpl"/>
</bean>
說明:ActRabbitReceiveTodoTaskMessage類是上面所講的接收待辦的客戶端柿冲,工作流deploy的時(shí)候會(huì)把該類deploy到nexus茬高,sample會(huì)依賴到該類
ActExDemoTodoTaskServiceImpl類是業(yè)務(wù)系統(tǒng)接收待辦的實(shí)現(xiàn)類,該類由業(yè)務(wù)系統(tǒng)實(shí)現(xiàn)假抄,該類實(shí)現(xiàn)了接口IActExDemoTodoTaskService怎栽,該接口是工作流提供的接口
- 接收待辦的客戶端
ActRabbitReceiveTodoTaskMessage類
public void init(){
if(initFlag){
logger.info("接收待辦,連接RabbitMQ開始宿饱!");
if(!StringUtils.hasText(queueName)){
logger.info("RabbitMQ接收待辦消息,傳入?yún)?shù)queueName為空熏瞄!");
return ;
}
logger.info("RabbitMQ接收待辦消息,queueName為:{}",queueName);
if (queueName.endsWith(",")) {
logger.info("RabbitMQ接收待辦消息,queueName以逗號(hào)結(jié)尾谬以,不符合規(guī)范强饮,queueName={}",queueName);
return;
}
if(queueName.indexOf(',') > -1){
String[] queues = queueName.split(",");
for(String queue : queues){
consumeMsg(queue);
}
}else{
consumeMsg(queueName);
}
}
}
說明:
(1) init方法是在業(yè)務(wù)系統(tǒng)spring-context.xml文件中配置的init-method="init",在業(yè)務(wù)系統(tǒng)啟動(dòng)的時(shí)候而且是initFlag為true的時(shí)候執(zhí)行該方法。
(2) 支持多個(gè)隊(duì)列为黎,以逗號(hào)分隔邮丰。queueName指的是接入系統(tǒng)的名稱,該出配置的queueName需要與工作流頁面中接入系統(tǒng)的編碼一樣铭乾。
private void consumeMsg(String queue){
try {
/**
* 創(chuàng)建channel
*/
final Channel channel= ActRabbitMQConsumer.createChannel();
/**
* 聲明隊(duì)列
*/
channel.queueDeclare(queue,true,false,false,null);
channel.basicConsume(queue,false,newQueueingConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] bodys) throws IOException{
String message = new String(bodys);
List<TodoTaskDTO> todoTaskList = parseMessage(message);
execute(todoTaskList);
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}catch(Exception e) {
logger.error("RabbitMQ消費(fèi)者接收消息失敿袅!", e);
throw new BzException("RabbitMQ消費(fèi)者接收消息失斂婚荨斗蒋!");
}
}
說明:該方法用于接收消息,basicConsume(queue,false,new QueueingConsumer())中第二個(gè)參數(shù)表示是否自動(dòng)確認(rèn)笛质,此處為false表示手工確認(rèn)泉沾。手工確認(rèn)的方法為:channel.basicAck(envelope.getDeliveryTag(),false);
handleDelivery方法是監(jiān)聽器監(jiān)聽消息,如果RabbitMQ中存在消息則接收消息妇押。
execute方法用applicationContext.getBean的方式跷究,實(shí)現(xiàn)IActExDemoTodoTaskService接口的方法,receiveClass就是現(xiàn)實(shí)IActExDemoTodoTaskService接口的方法舆吮。
IPushTodoTaskAPI接口:
該接口時(shí)工作流提供的接口揭朝,但是由業(yè)務(wù)系統(tǒng)去實(shí)現(xiàn)該接口。