最近測試跟我說,某個(gè)應(yīng)用消費(fèi)不到交易的消息。登錄到Kafka Broker看下了下日志,發(fā)現(xiàn)一直在報(bào)錯(cuò):
java.lang.IllegalArgumentException: Magic v0 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2012)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
at java.lang.Thread.run(Thread.java:748)
問了下相關(guān)開發(fā)人員胳岂,發(fā)現(xiàn)最近有個(gè)版本需要在Kafka信息的Headers中增加LogId來做交易跟蹤,結(jié)合錯(cuò)誤信息中提示消費(fèi)者Api版本太低舔稀,不支持header信息乳丰,導(dǎo)致出錯(cuò),讓開發(fā)人員去掉header后内贮,消費(fèi)者可以正常消費(fèi)消息
下面我們來模擬重現(xiàn)
Kafka版本:0.11.0
生產(chǎn)者代碼:
? 寫了個(gè)攔截器产园,為每條消息的header中添加LOG_ID
public class KafkaProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
@Override
public void configure(Map<String, ?> configs) {}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
@Override
public void close() {}
@Override
public ProducerRecord onSend(ProducerRecord<K, V> record) {
String uuid = UUID.randomUUID().toString();
record.headers().add("LOG_ID",uuid.getBytes());
return record;
}
}
public class App
{
public static void main( String[] args ) throws InterruptedException
{
Properties props = new Properties();
// broker地址
props.put("bootstrap.servers", "localhost:9092");
// 請(qǐng)求時(shí)候需要驗(yàn)證
props.put("acks", "all");
// 請(qǐng)求失敗時(shí)候需要重試
props.put("retries", 1);
// 內(nèi)存緩存區(qū)大小
props.put("buffer.memory", 33554432);
// 指定消息key序列化方式
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 指定消息本身的序列化方式
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
List<String> interceptors = new ArrayList<>();
interceptors.add("com.df.KafkaTest.KafkaProducerInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 10; i < 20; i++)
try {
AccessInfo ai = new AccessInfo();
ai.setAccessId("123456");
ai.setAccessName("源碼婆媳"+Integer.toString(i));
ai.setBusScope("01");
ai.setIconUrl("http://www.baidu.com");
ProducerRecord record = new ProducerRecord<>("testTopic",0,"H"+Integer.toString(i),JSON.toJSONString(ai));
producer.send(record).get();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("Message sent successfully");
producer.close();
}
}
消費(fèi)者代碼
使用高版本Api的客戶端:
public class KafkaConsumerAsync {
public static void main(String[] args) throws InterruptedException {
// 1、準(zhǔn)備配置文件
String kafkas = "127.0.0.1:9092";
Properties props = new Properties();
//kafka連接信息
props.put("bootstrap.servers",kafkas);
//消費(fèi)者組id
props.put("group.id", "testTopic-002");
//是否自動(dòng)提交offset
props.put("enable.auto.commit", "false");
//在沒有offset的情況下采取的拉取策略
props.put("auto.offset.reset", "earliest");
//自動(dòng)提交時(shí)間間隔
props.put("auto.commit.interval.ms", "1000");
//設(shè)置一次fetch請(qǐng)求取得的數(shù)據(jù)最大為1k
props.put("fetch.max.bytes", "1024");
//key反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//value反序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
String topic = "testTopic";
// 2夜郁、創(chuàng)建KafkaConsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 3什燕、訂閱數(shù)據(jù),不給定監(jiān)聽器
consumer.subscribe(Collections.singleton(topic));
try{
//最少處理100條
int minCommitSize = 100;
//定義計(jì)數(shù)器
int icount = 0;
// 4竞端、獲取數(shù)據(jù)
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),record.offset(), record.key(), record.value());
icount++;
}
//在業(yè)務(wù)邏輯處理成功后提交offset
if(icount >= minCommitSize){
//滿足最少消費(fèi)100條,再進(jìn)行異步提交
consumer.commitAsync(new OffsetCommitCallback() {
@Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if(exception == null){
System.out.println("commit success");
}else {
//提交失敗屎即,對(duì)應(yīng)處理
System.out.println("commit failed");
}
}
});
//計(jì)數(shù)器歸零
icount = 0 ;
}
}
}catch (Exception e){
e.printStackTrace();
}finally {
//關(guān)閉連接
consumer.close();
}
}
}
執(zhí)行結(jié)果:
[28 10:20:08,923 INFO ] [main] internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Discovered group coordinator xxxxx.com:9092 (id: 2147483647 rack: null)
[28 10:20:08,931 INFO ] [main] internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Revoking previously assigned partitions []
[28 10:20:08,931 INFO ] [main] internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] (Re-)joining group
[28 10:20:08,958 INFO ] [main] internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Successfully joined group with generation 1
[28 10:20:08,960 INFO ] [main] internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=testTopic-002] Setting newly assigned partitions [testTopic-0]
[28 10:20:08,978 INFO ] [main] internals.Fetcher - [Consumer clientId=consumer-1, groupId=testTopic-002] Resetting offset for partition testTopic-0 to offset 0.
topic = testTopic,partition = 0,offset = 0, key = H10, value = {"accessId":"123456","accessName":"源碼婆媳10","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 1, key = H11, value = {"accessId":"123456","accessName":"源碼婆媳11","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 2, key = H12, value = {"accessId":"123456","accessName":"源碼婆媳12","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 3, key = H13, value = {"accessId":"123456","accessName":"源碼婆媳13","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 4, key = H14, value = {"accessId":"123456","accessName":"源碼婆媳14","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 5, key = H15, value = {"accessId":"123456","accessName":"源碼婆媳15","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 6, key = H16, value = {"accessId":"123456","accessName":"源碼婆媳16","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 7, key = H17, value = {"accessId":"123456","accessName":"源碼婆媳17","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 8, key = H18, value = {"accessId":"123456","accessName":"源碼婆媳18","busScope":"01","iconUrl":"http://www.baidu.com"}
topic = testTopic,partition = 0,offset = 9, key = H19, value = {"accessId":"123456","accessName":"源碼婆媳19","busScope":"01","iconUrl":"http://www.baidu.com"}
使用低版本(0.8.22)客戶端代碼
public class SimpleConsumerExample {
private static kafka.javaapi.consumer.ConsumerConnector consumer;
public static void consume() {
Properties props = new Properties();
// zookeeper 配置
props.put("zookeeper.connect", "127.0.0.1:2181");
// group 代表一個(gè)消費(fèi)組
props.put("group.id", "jd-group");
// zk連接超時(shí)
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
// 序列化類
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config = new ConsumerConfig(props);
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("testTopic", new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,
keyDecoder, valueDecoder);
KafkaStream<String, String> stream = consumerMap.get("testTopic").get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext())
System.out.println(it.next().message());
}
public static void main(String[] args) {
consume();
}
}
消費(fèi)者運(yùn)行后,一直消費(fèi)不到消息
[28 09:51:41,590 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing
[28 09:51:41,591 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Disconnecting from xxxxxx.xx.com:9092
[28 09:51:41,592 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566957087042] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset 0 to broker id:0,host:xxxxxx.xx.com,port:9092] )
[28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Verifying properties
[28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property client.id is overridden to jd-group
[28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property metadata.broker.list is overridden to xxxxxx.xx.com:9092
[28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000
[28 09:51:41,798 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] client.ClientUtils$ - Fetching metadata from broker id:0,host:xxxxxx.xx.com,port:9092 with correlation id 65 for 1 topic(s) Set(testTopic)
[28 09:51:41,799 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing
[28 09:51:41,824 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Disconnecting from xxxxxx.xx.com:9092
[28 09:51:41,825 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566957087042] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset 0 to broker id:0,host:xxxxxx.xx.com,port:9092] )
[28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Verifying properties
[28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property client.id is overridden to jd-group
[28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property metadata.broker.list is overridden to xxxxxx.xx.com:9092
[28 09:51:42,032 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000
[28 09:51:42,033 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] client.ClientUtils$ - Fetching metadata from broker id:0,host:xxxxxx.xx.com,port:9092 with correlation id 66 for 1 topic(s) Set(testTopic)
[28 09:51:42,035 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing
[28 09:51:42,041 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Disconnecting from xxxxxx.xx.com:9092
[28 09:51:42,041 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566957087042] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset 0 to broker id:0,host:xxxxxx.xx.com,port:9092] )
[28 09:51:42,251 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Verifying properties
[28 09:51:42,252 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property client.id is overridden to jd-group
[28 09:51:42,252 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property metadata.broker.list is overridden to xxxxxx.xx.com:9092
[28 09:51:42,252 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] utils.VerifiableProperties - Property request.timeout.ms is overridden to 30000
[28 09:51:42,253 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] client.ClientUtils$ - Fetching metadata from broker id:0,host:xxxxxx.xx.com,port:9092 with correlation id 67 for 1 topic(s) Set(testTopic)
[28 09:51:42,254 INFO ] [jd-group_xxxxxx-1566957086889-abf02be8-leader-finder-thread] producer.SyncProducer - Connected to xxxxxx.xx.com:9092 for producing
Kafka Broker的server.log一直在刷錯(cuò)誤日志:
[2019-08-28 09:51:42,045] ERROR [KafkaApi-0] Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=testTopic,partitions=[{partition=0,fetch_offset=0,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v0 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:558)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:579)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2012)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:578)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:598)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:597)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:614)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
at java.lang.Thread.run(Thread.java:748)
通過錯(cuò)誤日志信息結(jié)合源碼事富,我們發(fā)現(xiàn)技俐,在Broker拉取到Kakfa消息后,調(diào)用fetchResponseCallback回調(diào)方法统台,創(chuàng)建返回信息時(shí)雕擂,會(huì)校驗(yàn)消費(fèi)者Api版本,如果低于當(dāng)前Broker版本與向下轉(zhuǎn)換消息
def fetchResponseCallback(bandwidthThrottleTimeMs: Int) {
def createResponse(requestThrottleTimeMs: Int): RequestChannel.Response = {
val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]
fetchedPartitionData.asScala.foreach { case (tp, partitionData) =>
convertedData.put(tp, convertedPartitionData(tp, partitionData))
}
val response = new FetchResponse(convertedData, 0)
val responseStruct = response.toStruct(versionId)
trace(s"Sending fetch response to client $clientId of ${responseStruct.sizeOf} bytes.")
response.responseData.asScala.foreach { case (topicPartition, data) =>
// record the bytes out metrics only when the response is being sent
brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
}
val responseSend = response.toSend(responseStruct, bandwidthThrottleTimeMs + requestThrottleTimeMs,
request.connectionId, request.header)
RequestChannel.Response(request, responseSend)
}
if (fetchRequest.isFromFollower)
sendResponseExemptThrottle(createResponse(0))
else
sendResponseMaybeThrottle(request, request.header.clientId, requestThrottleMs =>
requestChannel.sendResponse(createResponse(requestThrottleMs)))
}
def convertedPartitionData(tp: TopicPartition, data: FetchResponse.PartitionData) = {
replicaManager.getMagic(tp).flatMap { magic =>
val downConvertMagic = {
if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
Some(RecordBatch.MAGIC_VALUE_V0)
else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
Some(RecordBatch.MAGIC_VALUE_V1)
else
None
}
downConvertMagic.map { magic =>
trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId")
//在這里會(huì)對(duì)消息進(jìn)行向下轉(zhuǎn)換
val converted = data.records.downConvert(magic, fetchRequest.fetchData.get(tp).fetchOffset)
new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET,
data.logStartOffset, data.abortedTransactions, converted)
}
}.getOrElse(data)
}
對(duì)消息轉(zhuǎn)換贱勃,最后會(huì)調(diào)用MemoryRecordsBuilder的appendWithOffset井赌,在此方法中做一些調(diào)用谤逼,如果調(diào)用不通過就會(huì)拋出異常,Magic v0 does not support record headers就是在此方法中拋出的仇穗。因?yàn)镸agic v0 和Magic v1版本的消息格式中森缠,不支持header
private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
ByteBuffer value, Header[] headers) {
try {
if (isControlRecord != isControlBatch)
throw new IllegalArgumentException("Control records can only be appended to control batches");
if (lastOffset != null && offset <= lastOffset)
throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s " +
"(Offsets must increase monotonically).", offset, lastOffset));
if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
throw new IllegalArgumentException("Invalid negative timestamp " + timestamp);
if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
if (baseTimestamp == null)
baseTimestamp = timestamp;
if (magic > RecordBatch.MAGIC_VALUE_V1) {
appendDefaultRecord(offset, timestamp, key, value, headers);
return null;
} else {
return appendLegacyRecord(offset, timestamp, key, value);
}
} catch (IOException e) {
throw new KafkaException("I/O exception when writing to the append stream, closing", e);
}
}
下面我們將Broker的Kafka版本換成1.0
運(yùn)行低版本消費(fèi)者:
[28 14:26:23,068 INFO ] [jd-group_xx-1566973572731-a5b3105a-leader-finder-thread] consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1566973572960] Added fetcher for partitions ArrayBuffer([[testTopic,0], initOffset -1 to broker id:0,host:xx.xx.com,port:9092] )
{"accessId":"123456","accessName":"源碼婆媳10","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"源碼婆媳11","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"源碼婆媳12","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"源碼婆媳13","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"源碼婆媳14","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"源碼婆媳15","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"源碼婆媳16","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"源碼婆媳17","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"源碼婆媳18","busScope":"01","iconUrl":"http://www.baidu.com"}
{"accessId":"123456","accessName":"源碼婆媳19","busScope":"01","iconUrl":"http://www.baidu.com"}
很奇怪,怎么升級(jí)下Broker版本就可以正常消費(fèi)消息了呢仪缸?不是說好的v0版本消息格式不支持header嗎?
看了下1.0版本的源碼列肢,發(fā)現(xiàn)在做消息向下轉(zhuǎn)換的時(shí)候調(diào)用的不是MemoryRecordsBuilder恰画,而是RecordsUtil的convertRecordBatch,當(dāng)發(fā)現(xiàn)v0或v1版本時(shí)瓷马,直接忽略header信息拴还,這樣消費(fèi)者就能正常消息消息了
private static MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
RecordBatch batch = recordBatchAndRecords.batch;
final TimestampType timestampType = batch.timestampType();
long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(),
timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
for (Record record : recordBatchAndRecords.records) {
// Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported
if (magic > RecordBatch.MAGIC_VALUE_V1)
builder.append(record);
else
builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value());
}
builder.close();
return builder;
}
總結(jié):
? 通過之前的分析,解決上面的錯(cuò)誤有三種方法
1)升級(jí)消費(fèi)端Api版本欧聘,升級(jí)到新版本后片林,支持header
2)升級(jí)Broker版本,1.0以上版本怀骤,向下轉(zhuǎn)換時(shí)费封,會(huì)忽略到header信息
3)最后一種方式,也很簡單蒋伦,那就是生產(chǎn)者不增加header信息弓摘。因?yàn)槲覀冺?xiàng)目中LOG-ID暫時(shí)不是必須的,我們選擇了此種方式痕届,等消費(fèi)者端版本全部升級(jí)之后韧献,再添加header信息