1 簡介
SpringCloud Stream 是基于消息驅(qū)動(dòng)微服務(wù)應(yīng)用框架乍丈。
Spring Cloud Stream is a framework for building message-driven microservice applications. Spring Cloud Stream builds upon Spring Boot to create standalone, production-grade Spring applications, and uses Spring Integration to provide connectivity to message brokers. It provides opinionated configuration of middleware from several vendors, introducing the concepts of persistent publish-subscribe semantics, consumer groups, and partitions.
1.1 spring cloud stream結(jié)構(gòu)
術(shù)語:
bindding:連接channel和binder的組件
binder:粘合劑寓娩,對外連接brokers
group:對消費(fèi)者分組箕别,實(shí)現(xiàn)類似于active mq中topic的發(fā)布訂閱
out: 輸出channel
input: 輸入channel
框架結(jié)構(gòu)如下圖所示:
1.2 搭建工程
1.2.1 ?idea創(chuàng)建: File -> new -> spring initializr -> next -> 填寫工程名纯衍,maven路徑碘梢,next -> 選擇spring boot 版本想括,選擇spring組件(本工程選擇spring-cloud鉴腻,spring-web)
1.2.2 ?spring boot cli創(chuàng)建:spring init -dweb,cloud(該命令需要安裝spring boot cli,默認(rèn)創(chuàng)建為maven結(jié)構(gòu)的工程-d為maven依賴)
1.2.3 jdk版本忠寻,官方建議為1.6以上惧浴。推薦使用1.7+。
1.3 版本選擇
本文選擇的spring cloud stream的版本為Chelsea.SR1奕剃,對應(yīng)的kafka版本為0.10.1.1,對接的kafka集群最好是0.10版本以上的衷旅。如果低于0.10,會出現(xiàn)工程啟動(dòng)失敗的錯(cuò)誤纵朋。有2種解決方案:
1)柿顶、加驗(yàn)證,具體操作請參考操软,連接:驗(yàn)證
2)嘁锯、降低spring cloud stream版本至1.0.2.RELEASE,該版本對應(yīng)的kafka-client版本為0.8.2.2
1.4 配置
spring cloud stream默認(rèn)提供了一套基礎(chǔ)配置。broker為localhost:9092,zkNode為localhost:2181寺鸥,其他選項(xiàng)也均為默認(rèn)值(所以猪钮,如果你為本地部署,原則上不需要添加任何配置項(xiàng))胆建。具體可參考spring-scloud-stream jar中的org.springframework.cloud.stream.config包中的javaconfig
spring 1.x xml配置烤低,spring 2.x 注解配置,spring 3.x java 配置笆载,spring boot習(xí)慣性配置扑馁。官方推薦為java config+習(xí)慣性配置
1.5 注解
SpringBootApplication:繼承了EnableAutoConfiguration、ComponentScan凉驻。主要完成自動(dòng)化配置和路徑掃描腻要。
EnableBinding:開啟stream功能。參數(shù)可以source涝登、sink雄家、process。也可以自己實(shí)現(xiàn)(主要為聲明channel)胀滚。
RestController:開啟web功能趟济。與Controller類似乱投。官方解釋:@RestController is a stereotype annotation that combines @ResponseBody and @Controller.
RequestMapping:開啟綁定url功能,url可通過參數(shù)指定顷编。
StreamListener:開啟監(jiān)聽channel功能戚炫,參數(shù)為默認(rèn)或?qū)崿F(xiàn)的channel接口。
EnableAutoConfiguration:開啟自動(dòng)配置功能媳纬。繼承自
ComponentScan:開啟掃描功能双肤,主要針對bean和配置文件。
1.6 代碼示例
1.6.1 生產(chǎn)者代碼示例:
//啟動(dòng)類
@SpringBootApplication//自動(dòng)化配置
@EnableBinding(Source.class)//開啟與消息應(yīng)用代理的連接
public class SenderApp {
public static void main(String[] args) {
SpringApplication.run(SenderApp.class, args);//啟動(dòng)spring應(yīng)用
}
}
//控制類
@RestController//web 注解
public class ProducerController {
@Autowired// output channel注入
private Source source;
//寫入channel
@RequestMapping(value = "/push/{msg}", method = RequestMethod.GET)
public String send(@PathVariable("msg") String msg){
source.output().send(MutableMessageBuilder.withPayload(msg).build());
return "success";
}
}
1.6.2 消費(fèi)者代碼示例:
//啟動(dòng)類
@SpringBootApplication
@EnableBinding(Sink.class)
public class ReceiverApp{
public static void main(String[] args) {
SpringApplication.run(ReceiverApp.class, args);
}
}
//消費(fèi)者類
@RestController
public class KafkaStreamConsumer {
//display msg for html
private Map msgCache = new TreeMap();
@RequestMapping("/pull")
public Message display(){
return msgCache.get(0);
}
@StreamListener(Sink.INPUT)
public void process(Message message){
//add msg handler------------------>
{
//to do
msgCache.put(0,message);
System.out.println(message.getPayload().toString());
}
//handle msg end-------------------->
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
acknowledgment.acknowledge();
}
}
}
1.6.3 maven示例:
org.springframework.cloud
spring-cloud-starter-stream-kafka
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-actuator
1.7 問題
1.7.1 配置問題
spring cloud stream官網(wǎng)介紹太過簡單钮惠,如按官方例子配置茅糜,通常工程無法啟動(dòng)或拋出各種運(yùn)行時(shí)錯(cuò)誤。建議從github上搜索項(xiàng)目sample作參考萌腿。本文給出經(jīng)過驗(yàn)證使用的連接kafka的配置樣例限匣。
生產(chǎn)者yml配置:
server:
port: 8081 #web應(yīng)用端口號
spring:
application:
name: kafka-output
cloud:
stream:
binders:
kafka_output:#為binding output綁定的名字
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092? #kafka集群host,逗號分隔毁菱。host:port或host均可以米死,如果不設(shè)置端口號,spring boot會選用默認(rèn)的9092端口號
zknodes: localhost:2181? #zookeeper集群host贮庞,逗號分隔峦筒。host:port或host均可以,如果不設(shè)置端口號窗慎,spring boot會選用默認(rèn)的2181端口號
instanceCount: 1
instanceIndex: 0
bindings:
output: #channel名字物喷,項(xiàng)目中指定
auto-add-partitions: false
auto-create-topics: true
min-partition-count: 1
destination: test
producer:
partitionCount: 1
binder: kafka_output #output綁定名
group: s1? ? ? ? ? ? #分組名
#actuator j監(jiān)控運(yùn)行jar信息,關(guān)閉驗(yàn)證
management:
health:
mail:
enabled:false
management:
security:
enabled:false
消費(fèi)者 properties配置:
server.port = 8082
spring.application.name = input
#actuator j監(jiān)控運(yùn)行jar信息遮斥,關(guān)閉驗(yàn)證
management.health.mail.enabled=false
management.security.enabled=false
spring.cloud.stream.instancecount = 1
spring.cloud.stream.instanceIndex = 0
#binding config
spring.cloud.stream.bindings.input.destination= test
spring.cloud.stream.bindings.input.group = s1
spring.cloud.stream.bindings.input.autoCommitOffset = false
spring.cloud.stream.bindings.input.concurrency = 1
spring.cloud.stream.bindings.input.partitioned = false
spring.cloud.stream.bindings.input.binder = kafka_input
spring.cloud.stream.bindings.input.consumer.headerMode = raw
spring.cloud.stream.bindings.input.content-type = text/plain;
#binder config
spring.cloud.stream.binders.kafka_input.type = kafka
spring.cloud.stream.binders.kafka_input.environment.spring.cloud.stream.kafka.binder.brokers = localhost:9092
spring.cloud.stream.binders.kafka_input.environment.spring.cloud.stream.kafka.binder.zkNodes = localhost:2181
1.7.2 私服問題
公司私服缺少spring cloud stream應(yīng)用中大部分的jar包峦失,maven中央倉庫因國內(nèi)環(huán)境下載速度過慢。建議倉庫配置添加國內(nèi)地址术吗。
添加方法:修改maven根目錄conf 文件夾中的 server.xml尉辑;或復(fù)制該文件到本地.m2文件夾中;或者項(xiàng)目pom中添加较屿。
http://maven.aliyun.com/nexus/content/groups/public/
http://uk.maven.org/maven2/
http://maven.oschina.net/content/groups/public/
http://maven.aliyun.com/nexus/content/repositories/snapshots/
1.7.3 actuator監(jiān)控問題
問題:type=Unauthorized, status=401
添加配置參數(shù):
#actuator j監(jiān)控運(yùn)行jar信息隧魄,關(guān)閉驗(yàn)證
management.health.mail.enabled=false
management.security.enabled=false
1.8 actuator 使用說明
/beans:查看當(dāng)前應(yīng)用Ioc容器bean狀態(tài)
/mappings:查看web url和方法映射
/trace:查看最近http請求
/env:查看環(huán)境變量
/configprops:查看配置變量
/info:查看基礎(chǔ)信息
/dump:查看線程堆棧生成的信息
/shutdown:優(yōu)雅關(guān)閉
/autoconfig:自動(dòng)裝配信息
/health:當(dāng)前健康狀況(可自定義配置)
排查問題神器,建議搭建spring boot應(yīng)用整合actuator隘蝎。