spring cloud stream環(huán)境搭建

1 簡介

SpringCloud Stream 是基于消息驅(qū)動(dòng)微服務(wù)應(yīng)用框架乍丈。

Spring Cloud Stream is a framework for building message-driven microservice applications. Spring Cloud Stream builds upon Spring Boot to create standalone, production-grade Spring applications, and uses Spring Integration to provide connectivity to message brokers. It provides opinionated configuration of middleware from several vendors, introducing the concepts of persistent publish-subscribe semantics, consumer groups, and partitions.

1.1 spring cloud stream結(jié)構(gòu)

術(shù)語:

bindding:連接channel和binder的組件

binder:粘合劑寓娩,對外連接brokers

group:對消費(fèi)者分組箕别,實(shí)現(xiàn)類似于active mq中topic的發(fā)布訂閱

out: 輸出channel

input: 輸入channel

框架結(jié)構(gòu)如下圖所示:

1.2 搭建工程

1.2.1 ?idea創(chuàng)建: File -> new -> spring initializr -> next -> 填寫工程名纯衍,maven路徑碘梢,next -> 選擇spring boot 版本想括,選擇spring組件(本工程選擇spring-cloud鉴腻,spring-web)

1.2.2 ?spring boot cli創(chuàng)建:spring init -dweb,cloud(該命令需要安裝spring boot cli,默認(rèn)創(chuàng)建為maven結(jié)構(gòu)的工程-d為maven依賴)

1.2.3 jdk版本忠寻,官方建議為1.6以上惧浴。推薦使用1.7+。

1.3 版本選擇

本文選擇的spring cloud stream的版本為Chelsea.SR1奕剃,對應(yīng)的kafka版本為0.10.1.1,對接的kafka集群最好是0.10版本以上的衷旅。如果低于0.10,會出現(xiàn)工程啟動(dòng)失敗的錯(cuò)誤纵朋。有2種解決方案:

1)柿顶、加驗(yàn)證,具體操作請參考操软,連接:驗(yàn)證

2)嘁锯、降低spring cloud stream版本至1.0.2.RELEASE,該版本對應(yīng)的kafka-client版本為0.8.2.2

1.4 配置

spring cloud stream默認(rèn)提供了一套基礎(chǔ)配置。broker為localhost:9092,zkNode為localhost:2181寺鸥,其他選項(xiàng)也均為默認(rèn)值(所以猪钮,如果你為本地部署,原則上不需要添加任何配置項(xiàng))胆建。具體可參考spring-scloud-stream jar中的org.springframework.cloud.stream.config包中的javaconfig

spring 1.x xml配置烤低,spring 2.x 注解配置,spring 3.x java 配置笆载,spring boot習(xí)慣性配置扑馁。官方推薦為java config+習(xí)慣性配置

1.5 注解

SpringBootApplication:繼承了EnableAutoConfiguration、ComponentScan凉驻。主要完成自動(dòng)化配置和路徑掃描腻要。

EnableBinding:開啟stream功能。參數(shù)可以source涝登、sink雄家、process。也可以自己實(shí)現(xiàn)(主要為聲明channel)胀滚。

RestController:開啟web功能趟济。與Controller類似乱投。官方解釋:@RestController is a stereotype annotation that combines @ResponseBody and @Controller.

RequestMapping:開啟綁定url功能,url可通過參數(shù)指定顷编。

StreamListener:開啟監(jiān)聽channel功能戚炫,參數(shù)為默認(rèn)或?qū)崿F(xiàn)的channel接口。

EnableAutoConfiguration:開啟自動(dòng)配置功能媳纬。繼承自

ComponentScan:開啟掃描功能双肤,主要針對bean和配置文件。

1.6 代碼示例

1.6.1 生產(chǎn)者代碼示例:

//啟動(dòng)類

@SpringBootApplication//自動(dòng)化配置

@EnableBinding(Source.class)//開啟與消息應(yīng)用代理的連接

public class SenderApp {

public static void main(String[] args) {

SpringApplication.run(SenderApp.class, args);//啟動(dòng)spring應(yīng)用

}

}

//控制類

@RestController//web 注解

public class ProducerController {

@Autowired// output channel注入

private Source source;

//寫入channel

@RequestMapping(value = "/push/{msg}", method = RequestMethod.GET)

public String send(@PathVariable("msg") String msg){

source.output().send(MutableMessageBuilder.withPayload(msg).build());

return "success";

}

}

1.6.2 消費(fèi)者代碼示例:

//啟動(dòng)類

@SpringBootApplication

@EnableBinding(Sink.class)

public class ReceiverApp{

public static void main(String[] args) {

SpringApplication.run(ReceiverApp.class, args);

}

}

//消費(fèi)者類

@RestController

public class KafkaStreamConsumer {

//display msg for html

private Map msgCache = new TreeMap();

@RequestMapping("/pull")

public Message display(){

return msgCache.get(0);

}

@StreamListener(Sink.INPUT)

public void process(Message message){

//add msg handler------------------>

{

//to do

msgCache.put(0,message);

System.out.println(message.getPayload().toString());

}

//handle msg end-------------------->

Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

if (acknowledgment != null) {

acknowledgment.acknowledge();

}

}

}

1.6.3 maven示例:

org.springframework.cloud

spring-cloud-starter-stream-kafka

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-actuator

1.7 問題

1.7.1 配置問題

spring cloud stream官網(wǎng)介紹太過簡單钮惠,如按官方例子配置茅糜,通常工程無法啟動(dòng)或拋出各種運(yùn)行時(shí)錯(cuò)誤。建議從github上搜索項(xiàng)目sample作參考萌腿。本文給出經(jīng)過驗(yàn)證使用的連接kafka的配置樣例限匣。

生產(chǎn)者yml配置:

server:

port: 8081 #web應(yīng)用端口號

spring:

application:

name: kafka-output

cloud:

stream:

binders:

kafka_output:#為binding output綁定的名字

type: kafka

environment:

spring:

cloud:

stream:

kafka:

binder:

brokers: localhost:9092? #kafka集群host,逗號分隔毁菱。host:port或host均可以米死,如果不設(shè)置端口號,spring boot會選用默認(rèn)的9092端口號

zknodes: localhost:2181? #zookeeper集群host贮庞,逗號分隔峦筒。host:port或host均可以,如果不設(shè)置端口號窗慎,spring boot會選用默認(rèn)的2181端口號

instanceCount: 1

instanceIndex: 0

bindings:

output: #channel名字物喷,項(xiàng)目中指定

auto-add-partitions: false

auto-create-topics: true

min-partition-count: 1

destination: test

producer:

partitionCount: 1

binder: kafka_output #output綁定名

group: s1? ? ? ? ? ? #分組名

#actuator j監(jiān)控運(yùn)行jar信息,關(guān)閉驗(yàn)證

management:

health:

mail:

enabled:false

management:

security:

enabled:false

消費(fèi)者 properties配置:

server.port = 8082

spring.application.name = input

#actuator j監(jiān)控運(yùn)行jar信息遮斥,關(guān)閉驗(yàn)證

management.health.mail.enabled=false

management.security.enabled=false

spring.cloud.stream.instancecount = 1

spring.cloud.stream.instanceIndex = 0

#binding config

spring.cloud.stream.bindings.input.destination= test

spring.cloud.stream.bindings.input.group = s1

spring.cloud.stream.bindings.input.autoCommitOffset = false

spring.cloud.stream.bindings.input.concurrency = 1

spring.cloud.stream.bindings.input.partitioned = false

spring.cloud.stream.bindings.input.binder = kafka_input

spring.cloud.stream.bindings.input.consumer.headerMode = raw

spring.cloud.stream.bindings.input.content-type = text/plain;

#binder config

spring.cloud.stream.binders.kafka_input.type = kafka

spring.cloud.stream.binders.kafka_input.environment.spring.cloud.stream.kafka.binder.brokers = localhost:9092

spring.cloud.stream.binders.kafka_input.environment.spring.cloud.stream.kafka.binder.zkNodes = localhost:2181

1.7.2 私服問題

公司私服缺少spring cloud stream應(yīng)用中大部分的jar包峦失,maven中央倉庫因國內(nèi)環(huán)境下載速度過慢。建議倉庫配置添加國內(nèi)地址术吗。

添加方法:修改maven根目錄conf 文件夾中的 server.xml尉辑;或復(fù)制該文件到本地.m2文件夾中;或者項(xiàng)目pom中添加较屿。

http://maven.aliyun.com/nexus/content/groups/public/

http://uk.maven.org/maven2/

http://maven.oschina.net/content/groups/public/

http://maven.aliyun.com/nexus/content/repositories/snapshots/

1.7.3 actuator監(jiān)控問題

問題:type=Unauthorized, status=401

添加配置參數(shù):

#actuator j監(jiān)控運(yùn)行jar信息隧魄,關(guān)閉驗(yàn)證

management.health.mail.enabled=false

management.security.enabled=false

1.8 actuator 使用說明

/beans:查看當(dāng)前應(yīng)用Ioc容器bean狀態(tài)

/mappings:查看web url和方法映射

/trace:查看最近http請求

/env:查看環(huán)境變量

/configprops:查看配置變量

/info:查看基礎(chǔ)信息

/dump:查看線程堆棧生成的信息

/shutdown:優(yōu)雅關(guān)閉

/autoconfig:自動(dòng)裝配信息

/health:當(dāng)前健康狀況(可自定義配置)

排查問題神器,建議搭建spring boot應(yīng)用整合actuator隘蝎。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末购啄,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子嘱么,更是在濱河造成了極大的恐慌狮含,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,430評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異几迄,居然都是意外死亡表蝙,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,406評論 3 398
  • 文/潘曉璐 我一進(jìn)店門乓旗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人集索,你說我怎么就攤上這事屿愚。” “怎么了务荆?”我有些...
    開封第一講書人閱讀 167,834評論 0 360
  • 文/不壞的土叔 我叫張陵妆距,是天一觀的道長。 經(jīng)常有香客問我函匕,道長娱据,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,543評論 1 296
  • 正文 為了忘掉前任盅惜,我火速辦了婚禮中剩,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘抒寂。我一直安慰自己结啼,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,547評論 6 397
  • 文/花漫 我一把揭開白布屈芜。 她就那樣靜靜地躺著郊愧,像睡著了一般。 火紅的嫁衣襯著肌膚如雪井佑。 梳的紋絲不亂的頭發(fā)上属铁,一...
    開封第一講書人閱讀 52,196評論 1 308
  • 那天,我揣著相機(jī)與錄音躬翁,去河邊找鬼焦蘑。 笑死,一個(gè)胖子當(dāng)著我的面吹牛姆另,可吹牛的內(nèi)容都是我干的喇肋。 我是一名探鬼主播,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼迹辐,長吁一口氣:“原來是場噩夢啊……” “哼蝶防!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起明吩,我...
    開封第一講書人閱讀 39,671評論 0 276
  • 序言:老撾萬榮一對情侶失蹤间学,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體低葫,經(jīng)...
    沈念sama閱讀 46,221評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡详羡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,303評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嘿悬。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片实柠。...
    茶點(diǎn)故事閱讀 40,444評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖善涨,靈堂內(nèi)的尸體忽然破棺而出窒盐,到底是詐尸還是另有隱情,我是刑警寧澤钢拧,帶...
    沈念sama閱讀 36,134評論 5 350
  • 正文 年R本政府宣布蟹漓,位于F島的核電站,受9級特大地震影響源内,放射性物質(zhì)發(fā)生泄漏葡粒。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,810評論 3 333
  • 文/蒙蒙 一膜钓、第九天 我趴在偏房一處隱蔽的房頂上張望嗽交。 院中可真熱鬧,春花似錦呻此、人聲如沸轮纫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,285評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽掌唾。三九已至,卻和暖如春忿磅,著一層夾襖步出監(jiān)牢的瞬間糯彬,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,399評論 1 272
  • 我被黑心中介騙來泰國打工葱她, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留撩扒,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,837評論 3 376
  • 正文 我出身青樓吨些,卻偏偏與公主長得像搓谆,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子豪墅,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,455評論 2 359

推薦閱讀更多精彩內(nèi)容