升級原因
- 根據(jù)業(yè)務(wù)需求襟衰,需要依賴kafka-0.10。
<dependencies>
<!-- 官方kafka java 客戶端 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<!-- 消息云客戶端擴展 -->
<dependency>
<groupId>com.xx.xx.xxxx</groupId>
<artifactId>kafka-client-ext</artifactId>
<version>1.0.2</version>
</dependency>
</dependencies>
- 客戶端升級后粪摘,新版sdk訪問較舊版的kafka, 發(fā)送kafka不支持的request瀑晒。當(dāng)前用的kafka版本為0.9.0.1, 支持的request最大id為16, 這個18是新版 kafka中的ApiVersion Request, 因此會拋這個異常出來。
[2018-07-06 17:59:51,835] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException: 18
at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79)
at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426)
at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.run(SocketServer.scala:421)
at java.lang.Thread.run(Thread.java:724)
- 事實上kafka服務(wù)端版本可以大于客戶端版本(服務(wù)端kafka-0.10徘意,客戶端還是0.9的kafka版本)苔悦,但會有個別的配置可能客戶端不支持,比如sasl.jaas.conf這種椎咧。
升級步驟
- 下載
kafka_2.10-0.10.0.1.tgz
下載鏈接 - 解壓
tar -xzf kafka_2.10-0.10.0.1.tgz
- 修改配置玖详,參考0.9的配置文件進行修改
- kafka_2.10-0.10.0.1/config/zookeeper.properties
dataDir=/export/data
- kafka_2.10-0.10.0.1/config/server.properties
broker.id=1 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://<ip>:9092 num.network.threads=6 num.io.threads=16 log.dirs=/export/kafka-logs log.flush.interval.messages=10000 log.flush.interval.ms=3000 zookeeper.connect=<ip>:<port> delete.topic.enable=true inter.broker.protocol.version=0.10.0.1 log.message.format.version=0.10.0.1
其中inter.broker.protocol.version必須配置,否則重啟服務(wù)后勤讽,還是原來的0.9版本
在0.9的配置中有以下幾項蟋座,可以不進行配置
host.name 如果設(shè)置了它,會僅綁定這個地址脚牍。如果沒有設(shè)置向臀,則會綁定所有的網(wǎng)絡(luò)接口,并提交一個給ZK诸狭。不推薦使用 只有當(dāng)listeners沒有設(shè)置時才有必要使用券膀。
port server用來接受client連接的端口。不推薦使用,使用listeners配置項代替驯遇;只有在listeners沒有配置時才使用芹彬。
advertised.host.name 會將hostname通知給生產(chǎn)者和消費者,在多網(wǎng)卡時需要設(shè)置該值為另一個ip地址叉庐。如果沒有設(shè)置該值雀监,則返回 配置項host.name設(shè)置的值,如果host.name沒有設(shè)置則返回java.net.InetAddress.getCanonicalHostName()不推薦使用 只有當(dāng)advertised.listeners或listeners沒有設(shè)置時才有必要使用眨唬。
advertised.port 分發(fā)這個端口給所有的producer会前,consumer和其他broker來建立連接。如果此端口跟server綁定的端口不同匾竿,則才有必要設(shè)置瓦宜。不推薦使用 只有當(dāng)advertised.listeners或listeners沒有設(shè)置時才有必要使用。
具體配置項含義岭妖,參見鏈接
- 停止kafka-0.9的服務(wù)
/bin/kafka-server-stop.sh
- 開啟kafka-0.10的服務(wù)
/bin/kafka-server-start.sh -daemon config/server.properties
命令中-daemon临庇,防止關(guān)閉遠(yuǎn)程連接(Xshell)時停止kafka服務(wù)
檢查
- 查一下當(dāng)前運行時是否是期望的版本
ps -ef | grep java | grep kafka
- 若不是正確的版本重新運行上述第4反璃、5步
- 出現(xiàn)無法停止服務(wù)的情況,那就
kill -9 PID
吧 - kill之前可以先
cd /proc/PID
確認(rèn)是否是想要kill的進程 - 關(guān)閉Xshell假夺,檢查kafka是否正常運行
更多信息淮蜈,請移步個人blog