事件總線(event-bus)使用指南

概覽

  • 案例
  • 事件發(fā)布
    • 前置條件
    • IDL定義
    • 關(guān)鍵性配置
    • 發(fā)布任務服務實現(xiàn)
    • 事件發(fā)布
  • 事件訂閱
    • 依賴
    • 作為訂閱者
  • 既是事件發(fā)送者撇吞,也是訂閱者燥筷?
  • 示例項目

案例

  • 假設一個 A 服務為事件發(fā)送方舵盈,B 服務為事件訂閱方
  • 假設 A 服務中的 register 接口入庫操作后,會發(fā)送 RegisteredEvent
  • 假設 B 服務訂閱了該事件消息,由訂閱者自行處理訂閱到的消息

事件發(fā)布(生產(chǎn)者,Producer)

前置條件

1.需要發(fā)送消息的項目依賴jar包

  • sbt項目在build.sbt里加入如下依賴
"com.today" % "event-bus_2.12" % "0.1-SNAPSHOT"
  • maven項目在pom.xml中加入如下依賴:
<dependency>
    <groupId>com.today</groupId>
    <artifactId>event-bus_2.12</artifactId>
    <version>0.1-SNAPSHOT</version>
</dependency>

2.數(shù)據(jù)庫存儲支持结笨,需在業(yè)務數(shù)據(jù)庫中加入此表

SET FOREIGN_KEY_CHECKS = 0;

DROP TABLE IF EXISTS `dp_common_event`;
CREATE TABLE `dp_common_event` (
  `id` bigint(20) NOT NULL COMMENT '事件id,全局唯一, 可用于冪等操作',
  `event_type` varchar(255) DEFAULT NULL COMMENT '事件類型',
  `event_binary` blob DEFAULT NULL COMMENT '事件內(nèi)容',
  `updated_at` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '更新時間',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
--  Table structure for `event_lock`
-- ----------------------------
DROP TABLE IF EXISTS `dp_event_lock`;
CREATE TABLE `dp_event_lock` (
  `id` int(11) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
--  Records of `event_lock`
-- ----------------------------
BEGIN;
INSERT INTO `dp_event_lock` VALUES ('1', 'event_lock');
COMMIT;

SET FOREIGN_KEY_CHECKS = 1;

3.IDL定義

  • 以事件雙方約定的消息內(nèi)容定義IDL結(jié)構(gòu)體
  • 規(guī)定必須為每個事件定義事件ID湿镀,以便消費者做消息冪等

==> events.thrift

namespace java com.github.dapeng.user.events

/**
* 注冊成功事件, 由于需要消費者做冪等,故加上事件Id
**/
struct RegisteredEvent {
    /**
    * 事件Id
    **/
    1: i64 id,
    /**
    * 用戶id
    **/
    2: i64 userId
}

...more

4.IDL服務接口事件聲明

  • 接口可能會觸發(fā)一個或多個事件

== >user_service.thrift

namespace java com.github.dapeng.user.service

include "user_domain.thrift"
include "events.thrift"

/**
* 事件發(fā)送端業(yè)務服務
**/
service UserService{
/**
# 用戶注冊
## 事件
    注冊成功事件炕吸,激活事件
**/
    string register(user_domain.User user)
    (events="events.RegisteredEvent,events.ActivedEvent")
    
    ...more
    
}(group="EventTest")

關(guān)鍵性配置(定時任務)

在spring的配置文件spring/services.xml進行定義,注意init-method指定startScheduled

這里采用的是同步模式勉痴,當然eventbus也支持異步模式

<!--messageScheduled 定時發(fā)送消息bean-->
<bean id="messageTask" class="com.today.eventbus.scheduler.MsgPublishTask" init-method="startScheduled">
    <constructor-arg name="topic" value="${kafka_topic}"/>
    <constructor-arg name="kafkaHost" value="${kafka_producer_host}"/>
    <constructor-arg name="tidPrefix" value="${kafka_tid_prefix}"/>
    <constructor-arg name="dataSource" ref="tx_demo_dataSource"/>
</bean>
  • topic kafka消息topic赫模,領(lǐng)域區(qū)分(建議:領(lǐng)域_版本號_event)
  • kafkaHost kafka集群地址(如:127.0.0.1:9091,127.0.0.1:9092)
  • tidPrefix kafka事務id前綴,領(lǐng)域區(qū)分
  • dataSource 使用業(yè)務的 dataSource

==>config_user_service.properties

# event config
kafka_topic=user_1.0.0_event
kafka_producer_host=127.0.0.1:9092
kafka_tid_prefix=user_1.0.0

在dapeng.properties中配置:



soa.eventbus.publish.period=500 //代表輪詢數(shù)據(jù)庫消息庫時間蒸矛,如果對消息及時性很高瀑罗,請將此配置調(diào)低,建議最低為100ms莉钙,默認配置是1000ms


事件觸發(fā)

  • 在做事件觸發(fā)前,你需要實現(xiàn) AbstractEventBus ,并將其交由spring托管,來做自定義的本地監(jiān)聽分發(fā)

==>commons/EventBus.scala

object EventBus extends AbstractEventBus {

  /**
    * 事件在觸發(fā)后筛谚,可能存在本地的監(jiān)聽者磁玉,以及跨領(lǐng)域的訂閱者
    * 本地監(jiān)聽者可以通過實現(xiàn)該方法進行分發(fā)
    * 同時,也會將事件發(fā)送到其他領(lǐng)域的事件消息訂閱者
    * @param event
    */
  override def dispatchEvent(event: Any): Unit = {
    event match {
      case e:RegisteredEvent =>
        // do somthing 
      case _ =>
        LOGGER.info(" nothing ")
    }
  }
  override def getInstance: EventBus.this.type = this
}
  • 當本地無任何監(jiān)聽時==>
override def dispatchEvent(event: Any): Unit = {}

==> spring/services.xml

<bean id="eventBus" class="com.github.dapeng.service.commons.EventBus" factory-method="getInstance">
    <property name="dataSource" ref="tx_demo_dataSource"/>
</bean>
  • 事件發(fā)布
EventBus.fireEvent(RegisteredEvent(event_id,user.id))

事件定時發(fā)布修改:

在dapeng.properties加入環(huán)境變量配置

//每次輪詢間隔事件為100ms
soa.eventbus.publish.period=100


在業(yè)務系統(tǒng)的services.xml中配置,指定初始化方法,即定時輪詢?nèi)蝿盏姆椒ǎ?/p>

<bean id="messageTask" class="com.today.eventbus.scheduler.MsgPublishTask" init-method="startScheduled">
    <constructor-arg name="topic" value="${KAFKA_TOPIC}"/>
    <constructor-arg name="kafkaHost" value="${KAFKA_PRODUCER_HOST}"/>
    <constructor-arg name="tidPrefix" value="${KAFKA_TID_PREFIX}"/>
    <constructor-arg name="dataSource" ref="tx_demo_dataSource"/>
</bean>


重點: 配置輪詢發(fā)布消息的時間間隔驾讲,以ms為單位蚊伞,在dapeng.properties中配置

soa.eventbus.publish.period=500 //代表500ms


生產(chǎn)方因為輪詢數(shù)據(jù)庫發(fā)布消息,如果間隔很短吮铭,會產(chǎn)生大量的日志时迫,需要修改級別,在logback下進行如下配置:

<!--將eventbus包下面的日志都放入單獨的日志文件里 dapeng-eventbus.%d{yyyy-MM-dd}.log-->
<appender name="eventbus" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <prudent>true</prudent>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                    注意: 這里detail-  后面 加自己系統(tǒng)的名字谓晌。 例如這里的 goods
        <fileNamePattern>${soa.base}/logs/detail-goods-eventbus.%d{yyyy-MM-dd}.log</fileNamePattern>
        <maxHistory>30</maxHistory>
    </rollingPolicy>
    <encoder>
        <pattern>%d{MM-dd HH:mm:ss SSS} %t %p - %m%n</pattern>
    </encoder>
</appender>

<!-- additivity:是否向上級(root)傳遞日志信息, -->
<!--com.today.eventbus包下的日志都放在上面配置的單獨的日志文件里-->
<logger name="com.today.eventbus" level="DEBUG" additivity="false">
    <appender-ref ref="eventbus"/>
</logger>

<!--sql 日志顯示級別-->
<logger name="druid.sql" level="OFF"/>
<logger name="wangzx.scala_commons.sql" level="DEBUG"/>
<logger name="org.apache.kafka.clients.consumer.KafkaConsumer" level="INFO"/>
<logger name="org.springframework.jdbc.datasource.DataSourceUtils" level="INFO"/>



事件訂閱 (消費者 Consumer)

依賴

除需要向上面生產(chǎn)者一樣依賴eventbus的jar包外,還需要依賴生產(chǎn)者端的api jar包

<!--事件發(fā)送方api-->
<dependency>
    <groupId>com.today</groupId>
    <artifactId>user-api_2.12</artifactId>
    <version>0.1-SNAPSHOT</version>
</dependency>

<!--if => sbt project--> 
"com.today" % "event-bus_2.12" % "0.1-SNAPSHOT",
"com.today" % "user-api_2.12" % "0.1-SNAPSHOT"

注解支持配置:

<bean id="postProcessor" class="com.today.eventbus.spring.MsgAnnotationBeanPostProcessor"/>

附(kafka日志級別調(diào)整):
==>logback.xml

<logger name="org.apache.kafka.clients.consumer" level="INFO"/>

作為一個訂閱者

// java

@KafkaConsumer(groupId = "eventConsumer1", topic = "user_1.0.0_event",kafkaHostKey = "kafka.consumer.host"))
public class EventConsumer {
    @KafkaListener(serializer = RegisteredEventSerializer.class)
    public void subscribeRegisteredEvent(RegisteredEvent event){
        LOGGER.info("Subscribed RegisteredEvent ==> {}",event.toString());
    }
    ...
}

注意: 訂閱方在消費消息時掠拳,如果處理消息可能會拋出業(yè)務異常(就是業(yè)務有關(guān)的異常,如前置檢查不通過纸肉,等等),在消費消息時溺欧,需要捕獲業(yè)務系統(tǒng)。

@KafkaListener(serializer = classOf[ModifySkuBuyingPriceEventSerializer])
def modifySkuBuyingPriceEvent(event: ModifySkuBuyingPriceEvent): Unit = {
  // 重點
 try {
    logger.info(s"=====> ModifySkuBuyingPriceEvent")
    val ModifySkuBuyingPriceItemList = event.modifySkuBuyingPriceEventItems.map(
      x => build[ModifySkuBuyingPriceConsumer](x)()
    )
    val result = consumer.modifySkuBuyingPrice(ModifySkuBuyingPriceItemList) // 收到事件后調(diào)用業(yè)務接口示例
    logger.info(s"收到消息$event =>成功修改sku進價柏肪, ${result} ")
  } catch {
  //logger的寫法自己定義
    case e: SoaException => logger.error("業(yè)務拋出的異常姐刁,消息不會重試", e)
  }

}

//scala

serializer = classOf[RegisteredEventSerializer]

@KafkaConsumer

  • groupId 訂閱者領(lǐng)域區(qū)分
  • topic 訂閱的 kafka 消息 topic
  • kafkaHostKey 可自行配置的kafka地址,默認值為dapeng.kafka.consumer.host烦味∧羰梗可以自定義以覆蓋默認值
    • 用戶只要負責把這些配置放到env或者properties里面
    • 如:System.setProperty("kafka.consumer.host","127.0.0.1:9092");

@KafkaListener

  • serializer 事件消息解碼器,由事件發(fā)送方提供.

既是消費者也是訂閱者?

如果服務既有接口會觸發(fā)事件柏靶,也存在訂閱其他領(lǐng)域的事件情況弃理。只要增加缺少的配置即可

重點可以看如下發(fā)布者demo

https://github.com/leihuazhe/publish-demo

示例項目

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市宿礁,隨后出現(xiàn)的幾起案子案铺,更是在濱河造成了極大的恐慌,老刑警劉巖梆靖,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件控汉,死亡現(xiàn)場離奇詭異,居然都是意外死亡返吻,警方通過查閱死者的電腦和手機姑子,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來测僵,“玉大人街佑,你說我怎么就攤上這事『纯浚” “怎么了沐旨?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長榨婆。 經(jīng)常有香客問我磁携,道長,這世上最難降的妖魔是什么良风? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任谊迄,我火速辦了婚禮,結(jié)果婚禮上烟央,老公的妹妹穿的比我還像新娘统诺。我一直安慰自己,他們只是感情好疑俭,可當我...
    茶點故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布粮呢。 她就那樣靜靜地躺著,像睡著了一般钞艇。 火紅的嫁衣襯著肌膚如雪鬼贱。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天香璃,我揣著相機與錄音这难,去河邊找鬼。 笑死葡秒,一個胖子當著我的面吹牛姻乓,可吹牛的內(nèi)容都是我干的嵌溢。 我是一名探鬼主播,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼蹋岩,長吁一口氣:“原來是場噩夢啊……” “哼赖草!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起剪个,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤秧骑,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后扣囊,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體乎折,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年侵歇,在試婚紗的時候發(fā)現(xiàn)自己被綠了骂澄。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡惕虑,死狀恐怖坟冲,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情溃蔫,我是刑警寧澤健提,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站伟叛,受9級特大地震影響私痹,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜痪伦,卻給世界環(huán)境...
    茶點故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一侄榴、第九天 我趴在偏房一處隱蔽的房頂上張望雹锣。 院中可真熱鬧网沾,春花似錦、人聲如沸蕊爵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽攒射。三九已至醋旦,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間会放,已是汗流浹背饲齐。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留咧最,地道東北人。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓逝段,卻偏偏與公主長得像灌具,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子酸纲,可洞房花燭夜當晚...
    茶點故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容