Chapter Nine《SpringCloud微服務實戰(zhàn)》

消息總線: Spring Cloud Bus

1.什么是BUS?

spring cloud是按照spring的配置對一系列微服務框架的集成,spring cloud bus是其中一個微服務框架,用于實現(xiàn)微服務之間的通信桥狡。spring cloud bus整合 java的事件處理機制和消息中間件消息的發(fā)送和接受凛忿,主要由發(fā)送端肖方、接收端和事件組成冰悠。針對不同的業(yè)務需求节槐,可以設(shè)置不同的事件操灿,發(fā)送端發(fā)送事件锯仪,接收端接受相應的事件,并進行相應的處理趾盐。

Spring cloud Bus將分布式系統(tǒng)的節(jié)點與輕量級消息代理鏈接庶喜。這可以用于廣播狀態(tài)更改(例如配置更改)或其他管理指令。一個關(guān)鍵的想法是救鲤,Bus就像一個擴展的Spring Boot應用程序的分布式執(zhí)行器久窟,但也可以用作應用程序之間的通信渠道。

如下架構(gòu)圖所示:

image.png

2.原理

spring cloud bus整合了java的事件處理機制和消息中間件本缠,所以下面就從這兩個方面來說明spring cloud bus的原理斥扛。

image.png

如圖所示,作如下解釋:

(1)完整流程:發(fā)送端(endpoint)構(gòu)造事件event丹锹,將其publish到context上下文中(spring cloud bus有一個父上下文稀颁,bootstrap),然后將事件發(fā)送到channel中(json串message)楣黍,接收端從channel中獲取到message匾灶,將message轉(zhuǎn)為事件event(轉(zhuǎn)換過程這一塊沒有深究),然后將event事件publish到context上下文中租漂,最后接收端(Listener)收到event阶女,調(diào)用服務進行處理颊糜。整個流程中,只有發(fā)送/接收端從context上下文中取事件和發(fā)送事件是需要我們在代碼中明確寫出來的张肾,其它部分都由框架封裝完成芭析。
(2)先大致描述了一下流程,關(guān)于封裝的部分流程吞瞪,我們基本上可以在BusAutoConfiguration.class中找到,下面的代碼都是這個類中的代碼:

@EventListener(classes = RemoteApplicationEvent.class)
public void acceptLocal(RemoteApplicationEvent event) {
    if (this.serviceMatcher.isFromSelf(event)
            && !(event instanceof AckRemoteApplicationEvent)) {
        this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
    }
}

這是封裝了java事件處理機制驾孔,當收到RemoteApplicationEvent時芍秆,如果這個event是從這個服務發(fā)出的,而且不是ack事件翠勉,那么就會把這個事件發(fā)送到channel中妖啥。

@StreamListener(SpringCloudBusClient.INPUT)
public void acceptRemote(RemoteApplicationEvent event) {
    if (event instanceof AckRemoteApplicationEvent) {
        if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
                && this.applicationEventPublisher != null) {
            this.applicationEventPublisher.publishEvent(event);
        }
        // If it's an ACK we are finished processing at this point
        return;
    }
    if (this.serviceMatcher.isForSelf(event)
            && this.applicationEventPublisher != null) {
        if (!this.serviceMatcher.isFromSelf(event)) {
            this.applicationEventPublisher.publishEvent(event);
        }
        if (this.bus.getAck().isEnabled()) {
            AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
                    this.serviceMatcher.getServiceId(),
                    this.bus.getAck().getDestinationService(),
                    event.getDestinationService(), event.getId(), event.getClass());
            this.cloudBusOutboundChannel
                    .send(MessageBuilder.withPayload(ack).build());
            this.applicationEventPublisher.publishEvent(ack);
        }
    }
    if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
        this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
                event.getOriginService(), event.getDestinationService(),
                event.getId(), event.getClass()));
    }
}

@StreamListener標簽,這個方法就是從channel中取出事件進行處理的過程(message轉(zhuǎn)事件部分需要自行了解对碌,我沒有深入研究)荆虱,根據(jù)事件的類型、發(fā)送方和接收方來處理這個事件:如果是ack事件朽们,發(fā)送到context上下文中怀读;如果自己是接收端且不是發(fā)送端,就會將事件發(fā)送到context上下文骑脱。

3.如何整合BUS菜枷?

消息總線可支持的有:

image.png

ActiveMQ是比較老牌的消息系統(tǒng),當然了不一定是大家第一個熟知的消息系統(tǒng)叁丧,因為現(xiàn)在電商啤誊、互聯(lián)網(wǎng)規(guī)模越來越大,不斷進入程序員眼簾的大多是Kafka和RocketMQ拥娄。ActiveMQ出現(xiàn)的要比他們早蚊锹,而且涵蓋的功能也特別全,路由稚瘾、備份牡昆、查詢、事務孟抗、集群等等迁杨。他的美中不足是不能支撐超大規(guī)模、超高并發(fā)的互聯(lián)網(wǎng)應用凄硼,ActiveMQ的并發(fā)承受能力在百萬級別铅协,大概500次/s的消息頻率。

image.png

Kafka是新一代的消息系統(tǒng)摊沉,相對于ActiveMQ來說增加了分片功能狐史,類似于數(shù)據(jù)庫分庫分表,一臺Broker僅負責一部分數(shù)據(jù)收發(fā),從而使得他的伸縮性特別好骏全,通過增加Broker就可以不斷增加處理能力苍柏。一般來說,Kafka被用來處理日志流姜贡,作為流計算的接入點试吁。在電商的訂單、庫存等系統(tǒng)里邊一般不用楼咳,主要顧慮的是Kafka異步刷盤機制可能導致數(shù)據(jù)丟失熄捍。當然,對于數(shù)據(jù)丟失這一點不同的工程師也有不同的看法母怜,認為Kafka的Master-Slave的多寫機制尘应,完全能夠避免數(shù)據(jù)丟失纬朝。

image.png

RocketMQ是阿里開源的一款消息系統(tǒng)凑术,開發(fā)的初衷就是要支撐阿里龐大的電商系統(tǒng)移盆。RocketMQ和Kafka有很多相似之處,由于RocketMQ開發(fā)中很大程度上參考了Kafka的實現(xiàn)轨域。RocketMQ同樣提供了優(yōu)秀的分片機制袱耽,RocketMQ的分片比Kafka的分片有所增強,區(qū)分了絕對有序和非絕對有序兩種選項疙挺。另外RocketMQ采用的是同步刷盤扛邑,一般認為不會造成數(shù)據(jù)丟失。

image.png

RabbitMQ類似于ActiveMQ也是一個相對小型的消息系統(tǒng)铐然,他的優(yōu)勢在于靈活的路由機制蔬崩,可以進行自由配置。

image.png

Redis的pub/sub功能搀暑,由于Redis是內(nèi)存級的系統(tǒng)沥阳,所以速度和單機的并發(fā)能力是上述四個消息系統(tǒng)不能比擬的,但是也是由于內(nèi)存存儲的緣故自点,在消息的保障上就更弱一些桐罕。

Kafka為例:

Kafak架構(gòu)圖如下:

image.png

Kafka是基于消息發(fā)布/訂閱模式實現(xiàn)的消息系統(tǒng),其主要設(shè)計目標如下:

1.消息持久化:以時間復雜度為O(1)的方式提供消息持久化能力桂敛,即使對TB級以上數(shù)據(jù)也能保證常數(shù)時間復雜度的訪問性能功炮。

2.高吞吐:在廉價的商用機器上也能支持單機每秒100K條以上的吞吐量

3.分布式:支持消息分區(qū)以及分布式消費,并保證分區(qū)內(nèi)的消息順序

4.跨平臺:支持不同技術(shù)平臺的客戶端(如:Java术唬、PHP薪伏、Python等)

5.實時性:支持實時數(shù)據(jù)處理和離線數(shù)據(jù)處理

6.伸縮性:支持水平擴展

Kafka中涉及的一些基本概念:

1.Broker:Kafka集群包含一個或多個服務器,這些服務器被稱為Broker粗仓。

2.Topic:邏輯上同Rabbit的Queue隊列相似嫁怀,每條發(fā)布到Kafka集群的消息都必須有一個Topic设捐。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于一個或多個Broker上塘淑,但用戶只需指定消息的Topic即可生產(chǎn)或消費數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)

3.Partition:Partition是物理概念上的分區(qū)萝招,為了提供系統(tǒng)吞吐率,在物理上每個Topic會分成一個或多個Partition存捺,每個Partition對應一個文件夾(存儲對應分區(qū)的消息內(nèi)容和索引文件)槐沼。

4.Producer:消息生產(chǎn)者,負責生產(chǎn)消息并發(fā)送到Kafka Broker召噩。

5.Consumer:消息消費者母赵,向Kafka Broker讀取消息并處理的客戶端。

6.Consumer Group:每個Consumer屬于一個特定的組(可為每個Consumer指定屬于一個組具滴,若不指定則屬于默認組),組可以用來實現(xiàn)一條消息被組內(nèi)多個成員消費等功能师倔。

可以從kafka的架構(gòu)圖看到Kafka是需要Zookeeper支持的构韵,你需要在你的Kafka配置里面指定Zookeeper在哪里,它是通過Zookeeper做一些可靠性的保證趋艘,做broker的主從疲恢,我們還要知道Kafka的消息是以topic形式作為組織的,Producers發(fā)送topic形式的消息瓷胧,
Consumer是按照組來分的显拳,所以,一組Consumers都會都要同樣的topic形式的消息搓萧。在服務端杂数,它還做了一些分片,那么一個Topic可能分布在不同的分片上面瘸洛,方便我們拓展部署多個機器揍移,Kafka是天生分布式的。

4.什么時候用cloud bus

spring cloud bus在整個后端服務中起到聯(lián)通的作用反肋,聯(lián)通后端的多臺服務器那伐。我們?yōu)槭裁葱枰雎?lián)通呢?

后端服務器一般都做了集群化石蔗,很多臺服務器罕邀,而且在大促活動期經(jīng)常發(fā)生服務的擴容、縮容养距、上線诉探、下線。這樣铃在,后端服務器的數(shù)量阵具、IP就會變來變?nèi)グ椋绻覀兿脒M行一些線上的管理和維護工作,就需要維護服務器的IP阳液。

比如我們需要更新配置怕敬、比如我們需要同時失效所有服務器上的某個緩存,都需要向所有的相關(guān)服務器發(fā)送命令帘皿,也就是調(diào)用一個接口东跪。

你可能會說,我們一般會采用zookeeper的方式鹰溜,統(tǒng)一存儲服務器的ip地址虽填,需要的時候,向?qū)掌靼l(fā)送命令曹动。這是一個方案斋日,但是他的解耦性、靈活性墓陈、實時性相比消息總線都差那么一點恶守。

總的來說,就是在我們需要把一個操作散發(fā)到所有后端相關(guān)服務器的時候贡必,就可以選擇使用cloud bus了兔港。

spring cloud config 配合spring cloud bus實現(xiàn)配置信息更新
spring cloud config 配置更新有兩種方式:1.配置git倉庫的web hook,當git倉庫有更新時自動調(diào)用bus提供的刷新接口仔拟,刷新緩存衫樊;2.手工調(diào)用bus提供的刷新接口。

不論一方案還是二方案區(qū)別僅在于是不同的人觸發(fā)了刷新接口利花。實際上科侈,線上服務器一般很少采用自動刷新的機制,都會在修改后晋被,確認無誤后再執(zhí)行刷新兑徘。

關(guān)鍵的修改點是把所有的后端服務器連接到同一個消息系統(tǒng)上,然后監(jiān)聽配置更新消息羡洛。

安裝RabbitMQ
安裝方法很簡單挂脑,直接在官網(wǎng)下載對應的安裝文件就可以了。

因為RabbitMQ是Erlang語言寫的欲侮,所以如果你的機器上沒有安裝Erlang崭闲,那么需要先安裝Erlang。

增加bus包的引用

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

增加RabbitMQ配置

  spring.rabbitmq.host=127.0.0.1
  spring.rabbitmq.port=5671
  spring.rabbitmq.username=guest
  spring.rabbitmq.password=guest

增加@RefreshScope注解

  @SpringBootApplication
  @RestController
  @RefreshScope
  public class ConfigClientApplication {

       public static void main(String[] args) {
            SpringApplication.run(ConfigClientApplication.class, args);
       }

       @Value("${app-name}")
       private String app_name;

      @RequestMapping("hi")
      public String hi(){
            return "hello "+ app_name;
      }
  }

spring cloud擴展消息總線方法

可以參考spring cloud bus 擴展消息總線方法

5.Spring Cloud 集成Kafka

pom.xml加入依賴

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

加入以上依賴后威蕉,Spring Cloud 消息總線 Kafka 已經(jīng)集成完成刁俭,使用的配置則是默認啟動 Kafka 和 ZooKeeper 時的配置。確定 ZooKeeper 和 Kafka 已經(jīng)啟動韧涨,然后再啟動有上面依賴的 Spring Boot 應用 牍戚,這時 Kafka 會新增一個名為 springCloudBus 的 Topic侮繁,可以使用命令 kafka-topics --list --zookeeper localhost:2181 來查看當前 Kafka 中的 Topic。

集成后Kafka 配置

以上的例子中 Kafka如孝、ZooKeeper 均運行于本地宪哩,但實際應用中,Kafka 和 ZooKeeper 一般會獨立部署第晰,所以需要為Kafka 和 ZooKeeper 配置一些連接信息锁孟,Spring Boot 1.3.7 沒有為 Kafka 直接提供 Starter 模塊,而是使用 Spring Cloud Stream 的 Kafka 模塊茁瘦,配置的時候則采用 spring.cloud.stream.kafka 前綴

spring.cloud.stream.kafka 配置

  spring:
    cloud:
      stream:
        binders:
          #binderName
          kafka1:  
            type: kafka  
              environment:
               spring:
                 cloud:
                   stream:
                     kafka: 
                       binder: 
                         #kafka地址
                         brokers: localhost:9092
                         #zookeeper節(jié)點地址
                         zk-nodes: localhost:2181
   bindings:
     #channelName
     channelKafka:
       #binderName
       binder: kafka1
       destination: event-demo
       content-type: text/plain
       producer:
         partitionCount: 1

spring.kafka 配置

在啟動Kafka的時候有這樣一個配置 config/server.properties 的 zookeeper.connect 指定 ZooKeeper連接地址品抽,但是在 spring.kafka 中并沒有看到可以配置 ZooKeeper 連接地址 的地方

    spring:
        kafka:
            consumer:
                #消費者服務器地址
                bootstrap-servers: localhost:9092
           producer:
              #生產(chǎn)者服務器地址
               bootstrap-servers: localhost:9092
        cloud:
            stream:
                bindings:
                  channel1:
                     destination: event-demo
                     content-type: text/plain
                     producer:
                        partitionCount: 1

Less is more.

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市甜熔,隨后出現(xiàn)的幾起案子圆恤,更是在濱河造成了極大的恐慌,老刑警劉巖腔稀,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件哑了,死亡現(xiàn)場離奇詭異,居然都是意外死亡烧颖,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進店門窄陡,熙熙樓的掌柜王于貴愁眉苦臉地迎上來炕淮,“玉大人,你說我怎么就攤上這事跳夭⊥吭玻” “怎么了?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵币叹,是天一觀的道長润歉。 經(jīng)常有香客問我,道長颈抚,這世上最難降的妖魔是什么踩衩? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮贩汉,結(jié)果婚禮上驱富,老公的妹妹穿的比我還像新娘。我一直安慰自己匹舞,他們只是感情好褐鸥,可當我...
    茶點故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著赐稽,像睡著了一般叫榕。 火紅的嫁衣襯著肌膚如雪浑侥。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天晰绎,我揣著相機與錄音寓落,去河邊找鬼。 笑死寒匙,一個胖子當著我的面吹牛零如,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播锄弱,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼考蕾,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了会宪?” 一聲冷哼從身側(cè)響起肖卧,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎掸鹅,沒想到半個月后塞帐,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡巍沙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年葵姥,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片句携。...
    茶點故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡榔幸,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出矮嫉,到底是詐尸還是另有隱情削咆,我是刑警寧澤,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布蠢笋,位于F島的核電站拨齐,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏昨寞。R本人自食惡果不足惜瞻惋,卻給世界環(huán)境...
    茶點故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望编矾。 院中可真熱鬧熟史,春花似錦、人聲如沸窄俏。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽凹蜈。三九已至限寞,卻和暖如春忍啸,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背履植。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工计雌, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人玫霎。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓凿滤,卻偏偏與公主長得像,于是被迫代替她去往敵國和親庶近。 傳聞我的和親對象是個殘疾皇子翁脆,可洞房花燭夜當晚...
    茶點故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內(nèi)容