概覽
- 案例
- 事件發(fā)布
- 前置條件
- IDL定義
- 關(guān)鍵性配置
- 發(fā)布任務服務實現(xiàn)
- 事件發(fā)布
- 事件訂閱
- 依賴
- 作為訂閱者
- 既是事件發(fā)送者撇吞,也是訂閱者燥筷?
- 示例項目
案例
- 假設一個 A 服務為事件發(fā)送方舵盈,B 服務為事件訂閱方
- 假設 A 服務中的 register 接口入庫操作后,會發(fā)送 RegisteredEvent
- 假設 B 服務訂閱了該事件消息,由訂閱者自行處理訂閱到的消息
事件發(fā)布(生產(chǎn)者,Producer)
前置條件
1.需要發(fā)送消息的項目依賴jar包
- sbt項目在
build.sbt
里加入如下依賴
"com.today" % "event-bus_2.12" % "0.1-SNAPSHOT"
- maven項目在
pom.xml
中加入如下依賴:
<dependency>
<groupId>com.today</groupId>
<artifactId>event-bus_2.12</artifactId>
<version>0.1-SNAPSHOT</version>
</dependency>
2.數(shù)據(jù)庫存儲支持结笨,需在業(yè)務數(shù)據(jù)庫中加入此表
SET FOREIGN_KEY_CHECKS = 0;
DROP TABLE IF EXISTS `dp_common_event`;
CREATE TABLE `dp_common_event` (
`id` bigint(20) NOT NULL COMMENT '事件id,全局唯一, 可用于冪等操作',
`event_type` varchar(255) DEFAULT NULL COMMENT '事件類型',
`event_binary` blob DEFAULT NULL COMMENT '事件內(nèi)容',
`updated_at` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '更新時間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for `event_lock`
-- ----------------------------
DROP TABLE IF EXISTS `dp_event_lock`;
CREATE TABLE `dp_event_lock` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of `event_lock`
-- ----------------------------
BEGIN;
INSERT INTO `dp_event_lock` VALUES ('1', 'event_lock');
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
3.IDL定義
- 以事件雙方約定的消息內(nèi)容定義IDL結(jié)構(gòu)體
- 規(guī)定必須為每個事件定義事件ID湿镀,以便消費者做消息冪等
==> events.thrift
namespace java com.github.dapeng.user.events
/**
* 注冊成功事件, 由于需要消費者做冪等,故加上事件Id
**/
struct RegisteredEvent {
/**
* 事件Id
**/
1: i64 id,
/**
* 用戶id
**/
2: i64 userId
}
...more
4.IDL服務接口事件聲明
- 接口可能會觸發(fā)一個或多個事件
== >user_service.thrift
namespace java com.github.dapeng.user.service
include "user_domain.thrift"
include "events.thrift"
/**
* 事件發(fā)送端業(yè)務服務
**/
service UserService{
/**
# 用戶注冊
## 事件
注冊成功事件炕吸,激活事件
**/
string register(user_domain.User user)
(events="events.RegisteredEvent,events.ActivedEvent")
...more
}(group="EventTest")
關(guān)鍵性配置(定時任務)
在spring的配置文件spring/services.xml
進行定義,注意init-method
指定startScheduled
這里采用的是同步模式勉痴,當然
eventbus
也支持異步模式
<!--messageScheduled 定時發(fā)送消息bean-->
<bean id="messageTask" class="com.today.eventbus.scheduler.MsgPublishTask" init-method="startScheduled">
<constructor-arg name="topic" value="${kafka_topic}"/>
<constructor-arg name="kafkaHost" value="${kafka_producer_host}"/>
<constructor-arg name="tidPrefix" value="${kafka_tid_prefix}"/>
<constructor-arg name="dataSource" ref="tx_demo_dataSource"/>
</bean>
- topic kafka消息topic赫模,領(lǐng)域區(qū)分(建議:領(lǐng)域_版本號_event)
- kafkaHost kafka集群地址(如:127.0.0.1:9091,127.0.0.1:9092)
- tidPrefix kafka事務id前綴,領(lǐng)域區(qū)分
- dataSource 使用業(yè)務的 dataSource
==>config_user_service.properties
# event config
kafka_topic=user_1.0.0_event
kafka_producer_host=127.0.0.1:9092
kafka_tid_prefix=user_1.0.0
在dapeng.properties中配置:
soa.eventbus.publish.period=500 //代表輪詢數(shù)據(jù)庫消息庫時間蒸矛,如果對消息及時性很高瀑罗,請將此配置調(diào)低,建議最低為100ms莉钙,默認配置是1000ms
事件觸發(fā)
- 在做事件觸發(fā)前,你需要實現(xiàn)
AbstractEventBus
,并將其交由spring托管,來做自定義的本地監(jiān)聽分發(fā)
==>commons/EventBus.scala
object EventBus extends AbstractEventBus {
/**
* 事件在觸發(fā)后筛谚,可能存在本地的監(jiān)聽者磁玉,以及跨領(lǐng)域的訂閱者
* 本地監(jiān)聽者可以通過實現(xiàn)該方法進行分發(fā)
* 同時,也會將事件發(fā)送到其他領(lǐng)域的事件消息訂閱者
* @param event
*/
override def dispatchEvent(event: Any): Unit = {
event match {
case e:RegisteredEvent =>
// do somthing
case _ =>
LOGGER.info(" nothing ")
}
}
override def getInstance: EventBus.this.type = this
}
- 當本地無任何監(jiān)聽時==>
override def dispatchEvent(event: Any): Unit = {}
==> spring/services.xml
<bean id="eventBus" class="com.github.dapeng.service.commons.EventBus" factory-method="getInstance">
<property name="dataSource" ref="tx_demo_dataSource"/>
</bean>
- 事件發(fā)布
EventBus.fireEvent(RegisteredEvent(event_id,user.id))
事件定時發(fā)布修改:
在dapeng.properties加入環(huán)境變量配置
//每次輪詢間隔事件為100ms
soa.eventbus.publish.period=100
在業(yè)務系統(tǒng)的services.xml
中配置,指定初始化方法,即定時輪詢?nèi)蝿盏姆椒ǎ?/p>
<bean id="messageTask" class="com.today.eventbus.scheduler.MsgPublishTask" init-method="startScheduled">
<constructor-arg name="topic" value="${KAFKA_TOPIC}"/>
<constructor-arg name="kafkaHost" value="${KAFKA_PRODUCER_HOST}"/>
<constructor-arg name="tidPrefix" value="${KAFKA_TID_PREFIX}"/>
<constructor-arg name="dataSource" ref="tx_demo_dataSource"/>
</bean>
重點: 配置輪詢發(fā)布消息的時間間隔驾讲,以ms為單位蚊伞,在dapeng.properties中配置
soa.eventbus.publish.period=500 //代表500ms
生產(chǎn)方因為輪詢數(shù)據(jù)庫發(fā)布消息,如果間隔很短吮铭,會產(chǎn)生大量的日志时迫,需要修改級別,在logback下進行如下配置:
<!--將eventbus包下面的日志都放入單獨的日志文件里 dapeng-eventbus.%d{yyyy-MM-dd}.log-->
<appender name="eventbus" class="ch.qos.logback.core.rolling.RollingFileAppender">
<prudent>true</prudent>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
注意: 這里detail- 后面 加自己系統(tǒng)的名字谓晌。 例如這里的 goods
<fileNamePattern>${soa.base}/logs/detail-goods-eventbus.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%d{MM-dd HH:mm:ss SSS} %t %p - %m%n</pattern>
</encoder>
</appender>
<!-- additivity:是否向上級(root)傳遞日志信息, -->
<!--com.today.eventbus包下的日志都放在上面配置的單獨的日志文件里-->
<logger name="com.today.eventbus" level="DEBUG" additivity="false">
<appender-ref ref="eventbus"/>
</logger>
<!--sql 日志顯示級別-->
<logger name="druid.sql" level="OFF"/>
<logger name="wangzx.scala_commons.sql" level="DEBUG"/>
<logger name="org.apache.kafka.clients.consumer.KafkaConsumer" level="INFO"/>
<logger name="org.springframework.jdbc.datasource.DataSourceUtils" level="INFO"/>
事件訂閱 (消費者 Consumer)
依賴
除需要向上面生產(chǎn)者一樣依賴eventbus的jar包外,還需要依賴生產(chǎn)者端的api jar包
<!--事件發(fā)送方api-->
<dependency>
<groupId>com.today</groupId>
<artifactId>user-api_2.12</artifactId>
<version>0.1-SNAPSHOT</version>
</dependency>
<!--if => sbt project-->
"com.today" % "event-bus_2.12" % "0.1-SNAPSHOT",
"com.today" % "user-api_2.12" % "0.1-SNAPSHOT"
注解支持配置:
<bean id="postProcessor" class="com.today.eventbus.spring.MsgAnnotationBeanPostProcessor"/>
附(kafka日志級別調(diào)整):
==>logback.xml
<logger name="org.apache.kafka.clients.consumer" level="INFO"/>
作為一個訂閱者
// java
@KafkaConsumer(groupId = "eventConsumer1", topic = "user_1.0.0_event",kafkaHostKey = "kafka.consumer.host"))
public class EventConsumer {
@KafkaListener(serializer = RegisteredEventSerializer.class)
public void subscribeRegisteredEvent(RegisteredEvent event){
LOGGER.info("Subscribed RegisteredEvent ==> {}",event.toString());
}
...
}
注意: 訂閱方在消費消息時掠拳,如果處理消息可能會拋出業(yè)務異常(就是業(yè)務有關(guān)的異常,如前置檢查不通過纸肉,等等),在消費消息時溺欧,需要捕獲業(yè)務系統(tǒng)。
@KafkaListener(serializer = classOf[ModifySkuBuyingPriceEventSerializer])
def modifySkuBuyingPriceEvent(event: ModifySkuBuyingPriceEvent): Unit = {
// 重點
try {
logger.info(s"=====> ModifySkuBuyingPriceEvent")
val ModifySkuBuyingPriceItemList = event.modifySkuBuyingPriceEventItems.map(
x => build[ModifySkuBuyingPriceConsumer](x)()
)
val result = consumer.modifySkuBuyingPrice(ModifySkuBuyingPriceItemList) // 收到事件后調(diào)用業(yè)務接口示例
logger.info(s"收到消息$event =>成功修改sku進價柏肪, ${result} ")
} catch {
//logger的寫法自己定義
case e: SoaException => logger.error("業(yè)務拋出的異常姐刁,消息不會重試", e)
}
}
//scala
serializer = classOf[RegisteredEventSerializer]
@KafkaConsumer
- groupId 訂閱者領(lǐng)域區(qū)分
- topic 訂閱的 kafka 消息 topic
- kafkaHostKey 可自行配置的kafka地址,默認值為
dapeng.kafka.consumer.host
烦味∧羰梗可以自定義以覆蓋默認值- 用戶只要負責把這些配置放到env或者properties里面
- 如:
System.setProperty("kafka.consumer.host","127.0.0.1:9092");
@KafkaListener
- serializer 事件消息解碼器,由事件發(fā)送方提供.
既是消費者也是訂閱者?
如果服務既有接口會觸發(fā)事件柏靶,也存在訂閱其他領(lǐng)域的事件情況弃理。只要增加缺少的配置即可
重點可以看如下發(fā)布者demo
https://github.com/leihuazhe/publish-demo