消息總線: Spring Cloud Bus
1.什么是BUS?
spring cloud是按照spring的配置對一系列微服務框架的集成,spring cloud bus是其中一個微服務框架,用于實現(xiàn)微服務之間的通信桥狡。spring cloud bus
整合 java的事件處理機制和消息中間件消息的發(fā)送和接受凛忿,主要由發(fā)送端肖方、接收端和事件組成冰悠。針對不同的業(yè)務需求节槐,可以設(shè)置不同的事件操灿,發(fā)送端發(fā)送事件锯仪,接收端接受相應的事件,并進行相應的處理趾盐。
Spring cloud Bus
將分布式系統(tǒng)的節(jié)點與輕量級消息代理鏈接庶喜。這可以用于廣播狀態(tài)更改(例如配置更改)或其他管理指令。一個關(guān)鍵的想法是救鲤,Bus就像一個擴展的Spring Boot應用程序的分布式執(zhí)行器久窟,但也可以用作應用程序之間的通信渠道。
如下架構(gòu)圖所示:
2.原理
spring cloud bus
整合了java的事件處理機制和消息中間件本缠,所以下面就從這兩個方面來說明spring cloud bus的原理斥扛。
如圖所示,作如下解釋:
(1)完整流程:發(fā)送端(endpoint)構(gòu)造事件event丹锹,將其publish到context上下文中(
spring cloud bus有一個父上下文稀颁,bootstrap
),然后將事件發(fā)送到channel中(json串message)楣黍,接收端從channel中獲取到message匾灶,將message轉(zhuǎn)為事件event(轉(zhuǎn)換過程這一塊沒有深究),然后將event事件publish到context上下文中租漂,最后接收端(Listener)收到event阶女,調(diào)用服務進行處理颊糜。整個流程中,只有發(fā)送/接收端從context上下文中取事件和發(fā)送事件是需要我們在代碼中明確寫出來的张肾,其它部分都由框架封裝完成芭析。
(2)先大致描述了一下流程,關(guān)于封裝的部分流程吞瞪,我們基本上可以在BusAutoConfiguration.class中找到,下面的代碼都是這個類中的代碼:
@EventListener(classes = RemoteApplicationEvent.class)
public void acceptLocal(RemoteApplicationEvent event) {
if (this.serviceMatcher.isFromSelf(event)
&& !(event instanceof AckRemoteApplicationEvent)) {
this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
}
}
這是封裝了java事件處理機制驾孔,當收到RemoteApplicationEvent時芍秆,如果這個event是從這個服務發(fā)出的,而且不是ack事件翠勉,那么就會把這個事件發(fā)送到channel中妖啥。
@StreamListener(SpringCloudBusClient.INPUT)
public void acceptRemote(RemoteApplicationEvent event) {
if (event instanceof AckRemoteApplicationEvent) {
if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
&& this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(event);
}
// If it's an ACK we are finished processing at this point
return;
}
if (this.serviceMatcher.isForSelf(event)
&& this.applicationEventPublisher != null) {
if (!this.serviceMatcher.isFromSelf(event)) {
this.applicationEventPublisher.publishEvent(event);
}
if (this.bus.getAck().isEnabled()) {
AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
this.serviceMatcher.getServiceId(),
this.bus.getAck().getDestinationService(),
event.getDestinationService(), event.getId(), event.getClass());
this.cloudBusOutboundChannel
.send(MessageBuilder.withPayload(ack).build());
this.applicationEventPublisher.publishEvent(ack);
}
}
if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
event.getOriginService(), event.getDestinationService(),
event.getId(), event.getClass()));
}
}
@StreamListener
標簽,這個方法就是從channel中取出事件進行處理的過程(message轉(zhuǎn)事件部分需要自行了解对碌,我沒有深入研究)荆虱,根據(jù)事件的類型、發(fā)送方和接收方來處理這個事件:如果是ack事件朽们,發(fā)送到context上下文中怀读;如果自己是接收端且不是發(fā)送端,就會將事件發(fā)送到context上下文骑脱。
3.如何整合BUS菜枷?
消息總線可支持的有:
ActiveMQ是比較老牌的消息系統(tǒng),當然了不一定是大家第一個熟知的消息系統(tǒng)叁丧,因為現(xiàn)在電商啤誊、互聯(lián)網(wǎng)規(guī)模越來越大,不斷進入程序員眼簾的大多是Kafka和RocketMQ拥娄。ActiveMQ出現(xiàn)的要比他們早蚊锹,而且涵蓋的功能也特別全,路由稚瘾、備份牡昆、查詢、事務孟抗、集群等等迁杨。他的美中不足是不能支撐超大規(guī)模、超高并發(fā)的互聯(lián)網(wǎng)應用凄硼,ActiveMQ的并發(fā)承受能力在百萬級別铅协,大概500次/s的消息頻率。
Kafka是新一代的消息系統(tǒng)摊沉,相對于ActiveMQ來說增加了分片功能狐史,類似于數(shù)據(jù)庫分庫分表,一臺Broker僅負責一部分數(shù)據(jù)收發(fā),從而使得他的伸縮性特別好骏全,通過增加Broker就可以不斷增加處理能力苍柏。一般來說,Kafka被用來處理日志流姜贡,作為流計算的接入點试吁。在電商的訂單、庫存等系統(tǒng)里邊一般不用楼咳,主要顧慮的是Kafka異步刷盤機制可能導致數(shù)據(jù)丟失熄捍。當然,對于數(shù)據(jù)丟失這一點不同的工程師也有不同的看法母怜,認為Kafka的Master-Slave的多寫機制尘应,完全能夠避免數(shù)據(jù)丟失纬朝。
RocketMQ是阿里開源的一款消息系統(tǒng)凑术,開發(fā)的初衷就是要支撐阿里龐大的電商系統(tǒng)移盆。RocketMQ和Kafka有很多相似之處,由于RocketMQ開發(fā)中很大程度上參考了Kafka的實現(xiàn)轨域。RocketMQ同樣提供了優(yōu)秀的分片機制袱耽,RocketMQ的分片比Kafka的分片有所增強,區(qū)分了絕對有序和非絕對有序兩種選項疙挺。另外RocketMQ采用的是同步刷盤扛邑,一般認為不會造成數(shù)據(jù)丟失。
RabbitMQ類似于ActiveMQ也是一個相對小型的消息系統(tǒng)铐然,他的優(yōu)勢在于靈活的路由機制蔬崩,可以進行自由配置。
Redis的pub/sub功能搀暑,由于Redis是內(nèi)存級的系統(tǒng)沥阳,所以速度和單機的并發(fā)能力是上述四個消息系統(tǒng)不能比擬的,但是也是由于內(nèi)存存儲的緣故自点,在消息的保障上就更弱一些桐罕。
Kafka為例:
Kafak架構(gòu)圖如下:
Kafka是基于消息發(fā)布/訂閱模式實現(xiàn)的消息系統(tǒng),其主要設(shè)計目標如下:
1.消息持久化:以時間復雜度為O(1)的方式提供消息持久化能力桂敛,即使對TB級以上數(shù)據(jù)也能保證常數(shù)時間復雜度的訪問性能功炮。
2.高吞吐:在廉價的商用機器上也能支持單機每秒100K條以上的吞吐量
3.分布式:支持消息分區(qū)以及分布式消費,并保證分區(qū)內(nèi)的消息順序
4.跨平臺:支持不同技術(shù)平臺的客戶端(如:Java术唬、PHP薪伏、Python等)
5.實時性:支持實時數(shù)據(jù)處理和離線數(shù)據(jù)處理
6.伸縮性:支持水平擴展
Kafka中涉及的一些基本概念:
1.Broker:Kafka集群包含一個或多個服務器,這些服務器被稱為Broker粗仓。
2.Topic:邏輯上同Rabbit的Queue隊列相似嫁怀,每條發(fā)布到Kafka集群的消息都必須有一個Topic设捐。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于一個或多個Broker上塘淑,但用戶只需指定消息的Topic即可生產(chǎn)或消費數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)
3.Partition:Partition是物理概念上的分區(qū)萝招,為了提供系統(tǒng)吞吐率,在物理上每個Topic會分成一個或多個Partition存捺,每個Partition對應一個文件夾(存儲對應分區(qū)的消息內(nèi)容和索引文件)槐沼。
4.Producer:消息生產(chǎn)者,負責生產(chǎn)消息并發(fā)送到Kafka Broker召噩。
5.Consumer:消息消費者母赵,向Kafka Broker讀取消息并處理的客戶端。
6.Consumer Group:每個Consumer屬于一個特定的組(可為每個Consumer指定屬于一個組具滴,若不指定則屬于默認組),組可以用來實現(xiàn)一條消息被組內(nèi)多個成員消費等功能师倔。
可以從kafka的架構(gòu)圖看到Kafka是需要Zookeeper支持的构韵,你需要在你的Kafka配置里面指定Zookeeper在哪里,它是通過Zookeeper做一些可靠性的保證趋艘,做broker的主從疲恢,我們還要知道Kafka的消息是以topic形式作為組織的,Producers發(fā)送topic形式的消息瓷胧,
Consumer是按照組來分的显拳,所以,一組Consumers都會都要同樣的topic形式的消息搓萧。在服務端杂数,它還做了一些分片,那么一個Topic可能分布在不同的分片上面瘸洛,方便我們拓展部署多個機器揍移,Kafka是天生分布式的。
4.什么時候用cloud bus
spring cloud bus
在整個后端服務中起到聯(lián)通的作用反肋,聯(lián)通后端的多臺服務器那伐。我們?yōu)槭裁葱枰雎?lián)通呢?
后端服務器一般都做了集群化石蔗,很多臺服務器罕邀,而且在大促活動期經(jīng)常發(fā)生服務的擴容、縮容养距、上線诉探、下線。這樣铃在,后端服務器的數(shù)量阵具、IP就會變來變?nèi)グ椋绻覀兿脒M行一些線上的管理和維護工作,就需要維護服務器的IP阳液。
比如我們需要更新配置怕敬、比如我們需要同時失效所有服務器上的某個緩存,都需要向所有的相關(guān)服務器發(fā)送命令帘皿,也就是調(diào)用一個接口东跪。
你可能會說,我們一般會采用zookeeper的方式鹰溜,統(tǒng)一存儲服務器的ip地址虽填,需要的時候,向?qū)掌靼l(fā)送命令曹动。這是一個方案斋日,但是他的解耦性、靈活性墓陈、實時性相比消息總線都差那么一點恶守。
總的來說,就是在我們需要把一個操作散發(fā)到所有后端相關(guān)服務器的時候贡必,就可以選擇使用cloud bus了兔港。
spring cloud config 配合spring cloud bus實現(xiàn)配置信息更新
spring cloud config 配置更新有兩種方式:1.配置git倉庫的web hook,當git倉庫有更新時自動調(diào)用bus提供的刷新接口仔拟,刷新緩存衫樊;2.手工調(diào)用bus提供的刷新接口。
不論一方案還是二方案區(qū)別僅在于是不同的人觸發(fā)了刷新接口利花。實際上科侈,線上服務器一般很少采用自動刷新的機制,都會在修改后晋被,確認無誤后再執(zhí)行刷新兑徘。
關(guān)鍵的修改點是把所有的后端服務器連接到同一個消息系統(tǒng)上,然后監(jiān)聽配置更新消息羡洛。
安裝RabbitMQ
安裝方法很簡單挂脑,直接在官網(wǎng)下載對應的安裝文件就可以了。
因為RabbitMQ是Erlang語言寫的欲侮,所以如果你的機器上沒有安裝Erlang崭闲,那么需要先安裝Erlang。
增加bus包的引用
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
增加RabbitMQ配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5671
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
增加@RefreshScope注解
@SpringBootApplication
@RestController
@RefreshScope
public class ConfigClientApplication {
public static void main(String[] args) {
SpringApplication.run(ConfigClientApplication.class, args);
}
@Value("${app-name}")
private String app_name;
@RequestMapping("hi")
public String hi(){
return "hello "+ app_name;
}
}
spring cloud擴展消息總線方法
5.Spring Cloud 集成Kafka
pom.xml加入依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
加入以上依賴后威蕉,Spring Cloud 消息總線 Kafka 已經(jīng)集成完成刁俭,使用的配置則是默認啟動 Kafka 和 ZooKeeper 時的配置。確定 ZooKeeper 和 Kafka 已經(jīng)啟動韧涨,然后再啟動有上面依賴的 Spring Boot 應用 牍戚,這時 Kafka 會新增一個名為 springCloudBus 的 Topic侮繁,可以使用命令 kafka-topics --list --zookeeper localhost:2181 來查看當前 Kafka 中的 Topic。
集成后Kafka 配置
以上的例子中 Kafka如孝、ZooKeeper 均運行于本地宪哩,但實際應用中,Kafka 和 ZooKeeper 一般會獨立部署第晰,所以需要為Kafka 和 ZooKeeper 配置一些連接信息锁孟,Spring Boot 1.3.7 沒有為 Kafka 直接提供 Starter 模塊,而是使用 Spring Cloud Stream 的 Kafka 模塊茁瘦,配置的時候則采用 spring.cloud.stream.kafka 前綴
spring.cloud.stream.kafka 配置
spring:
cloud:
stream:
binders:
#binderName
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
#kafka地址
brokers: localhost:9092
#zookeeper節(jié)點地址
zk-nodes: localhost:2181
bindings:
#channelName
channelKafka:
#binderName
binder: kafka1
destination: event-demo
content-type: text/plain
producer:
partitionCount: 1
spring.kafka 配置
在啟動Kafka的時候有這樣一個配置 config/server.properties 的 zookeeper.connect 指定 ZooKeeper連接地址品抽,但是在 spring.kafka 中并沒有看到可以配置 ZooKeeper 連接地址 的地方
spring:
kafka:
consumer:
#消費者服務器地址
bootstrap-servers: localhost:9092
producer:
#生產(chǎn)者服務器地址
bootstrap-servers: localhost:9092
cloud:
stream:
bindings:
channel1:
destination: event-demo
content-type: text/plain
producer:
partitionCount: 1
Less is more.