一. 不同版本的讀兼容性結果
服務端版本 |
是否開snappy壓縮 |
客戶端0.8.2.2 |
客戶端0.10.2.2 |
0.8.2.2 |
true |
成功 |
失敗 |
0.8.2.2 |
false |
成功 |
失敗 |
0.10.2.2 |
true |
成功 |
成功 |
0.10.2.2 |
false |
成功 |
成功 |
二. 0.8server 命令
2.1 0.8 啟動
bin/kafka-server-start.sh -daemon config/server.properties
2.2 0.8 發(fā)送消息,不開壓縮
bin/kafka-topics.sh --zookeeper localhost:2181/ --create --topic t1 --partitions 3 --replication-factor 1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t1 --compression-codec none
2.2.1 0.10 消費不開壓縮的消息 失敗
afly@afly-desktop:~/local/kafka_2.11-0.10.2.2$ bin/kafka-console-consumer.sh --zookeeper localhost:2181/ --topic t1
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[2019-07-06 22:43:47,725] WARN [ConsumerFetcherThread-console-consumer-66613_afly-desktop-1562424227400-8033055c-0-0], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@5185dcf4 (kafka.consumer.ConsumerFetcherThread)
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:111)
at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:31)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
afly@afly-desktop:~/local/kafka_2.11-0.10.2.2$ bin/kafka-console-consumer.sh --zookeeper localhost:2181/ --topic t1
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[2019-07-06 22:43:47,725] WARN [ConsumerFetcherThread-console-consumer-66613_afly-desktop-1562424227400-8033055c-0-0], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@5185dcf4 (kafka.consumer.ConsumerFetcherThread)
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:111)
at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:31)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
2.2.2 0.8 消費不開壓縮的消息 Okay
bin/kafka-console-consumer.sh --zookeeper localhost:2181/ --topic t1
2.3 0.8 發(fā)送消息芜飘,開壓縮
bin/kafka-topics.sh --zookeeper localhost:2181/ --create --topic t2 --partitions 3 --replication-factor 1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic t1 --compression-codec snappy
2.3.1 0.10消費開壓縮的消息 無反應恍风,消費失敗
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t1
==> logs/server.log <==
[2019-07-06 22:46:08,957] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
kafka.common.KafkaException: Wrong request type 18
at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:64)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
at kafka.network.Processor.read(SocketServer.scala:450)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:748)
2.3.2 0.8消費開壓縮的消息 Okay
bin/kafka-console-consumer.sh --zookeeper localhost:2181/ --topic t1
三. 0.10 啟動
bin/kafka-server-start.sh -daemon config/server.properties
0.10 發(fā)送消息蹦狂,不開壓縮
bin/kafka-topics.sh --zookeeper localhost:2181/kafka010 --create --topic t1 --partitions 3 --replication-factor 1
bin/kafka-console-producer.sh --broker-list localhost:9092 --compression-codec none --topic t1
0.10 消費不開壓縮的消息 Okay
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t1
0.8 消費不開壓縮的消息 Okay
bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka010 --topic t1
0.10 發(fā)送消息誓篱,開壓縮
bin/kafka-topics.sh --zookeeper localhost:2181/kafka010 --create --topic t2 --partitions 3 --replication-factor 1
bin/kafka-console-producer.sh --broker-list localhost:9092 --compression-codec snappy --topic t2
0.10消費開壓縮的消息 Okay
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic t2 --from-beginning
0.8消費開壓縮的消息 Okay
bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka010 --topic t2 --from-beginning