Kafka順序消息消息
1.消息發(fā)送的api
public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
return this.doSend(producerRecord);
}
public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
return this.doSend(producerRecord);
}
public ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, (Object)null, data);
return this.doSend(producerRecord);
}
public ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, partition, key, data);
return this.doSend(producerRecord);
}
2.KafkaProducer.class 封裝獲取partition方法,優(yōu)先使用傳入的partition
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
//優(yōu)先返回傳入的partition
return partition != null ? partition : this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
3.DefaultPartitioner.class 封裝獲取partition方法搅幅,優(yōu)先使用key的murmur2算法的hash值對partitionCount取模条辟,其次使用本地原子類計數(shù)器自增值對partitionCount取模
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
//根據(jù)topic獲取partitionCount
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
//指定了key披诗,則根據(jù)key的hash來取模選取partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger((new Random()).nextInt());
AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
- Utils.class
|
<pre> public static int toPositive(int number) {
return number & 2147483647;
}
public static int murmur2(byte[] data) {
int length = data.length;
int seed = -1756908916;
int m = 1540483477;
int r = true;
int h = seed ^ length;
int length4 = length / 4;
for(int i = 0; i < length4; ++i) {
int i4 = i * 4;
int k = (data[i4 + 0] & 255) + ((data[i4 + 1] & 255) << 8) + ((data[i4 + 2] & 255) << 16) + ((data[i4 + 3] & 255) << 24);
k *= 1540483477;
k ^= k >>> 24;
k *= 1540483477;
h *= 1540483477;
h ^= k;
}
switch(length % 4) {
case 3:
h ^= (data[(length & -4) + 2] & 255) << 16;
case 2:
h ^= (data[(length & -4) + 1] & 255) << 8;
case 1:
h ^= data[length & -4] & 255;
h *= 1540483477;
default:
h ^= h >>> 13;
h *= 1540483477;
h ^= h >>> 15;
return h;
}
}</pre>
|
踩坑記錄
public ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data)
使用上述方法時拨匆,partition傳參不能為Integer類型籍救,生成serializedKey時出會出現(xiàn)類型轉(zhuǎn)換異常,導(dǎo)致消息發(fā)送失敗
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
//此處keySerializer會做類型轉(zhuǎn)換污茵,傳入Integer會報錯
try {
serializedKey = keySerializer.serialize(record.topic(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer");
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
int partition = partition(record, serializedKey, serializedValue, cluster);
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
}