? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Kafka生產(chǎn)者
架構圖:?
必選屬性:
? ? bootstrap.servers: broker的地址清單(host:port)
? ? key.serializer: 鍵的序列化器(ByteArraySerializer[這個只做很少的事情], StringSerializer, IntegerSerializer, 自定義序列化器)
? ? value.serializer: 值的序列化器(同上)
創(chuàng)建Kafka生產(chǎn)者:
? ? 1. 新建一個Properties對象;
? ? 2. 因為我們打算把鍵和值定義成字符串類型, 所以使用內(nèi)置的StringSerializer;
? ? 3. 在這里我們創(chuàng)建了一個新的生產(chǎn)者對象, 并為鍵和值設置了恰當?shù)念愋? 然后把Properties對象傳給它禾蚕。
? ? private Properties kafkaProps = new Properties();
? ? kafkaProps.put("bootstrap.servers", "broker1:9092, broker2:9092");
? ? kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
? ??kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
? ? KafkaProducer<String, String> producer = new kafkaProducer<String, String>(kafkaProps);
發(fā)送消息:
? ? 1.同步發(fā)送消息
? ? ? ? ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
? ? ? ? try {
? ? ? ? ? ? producer.send(record).get();
????????} catch (Exception e) {
? ? ? ? ? ? e.printStackTrace();
????????}
? ? 2.異步發(fā)送
? ? ? ? private class DemoProducerCallback implements Callback {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void onCompletion(RecordMetadata recordMetadata, Exception e) {
? ? ? ? ? ? ? ? if (e != null) {
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
????????????????}
????????????}
????????}
? ??????ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
? ? ? ? producer.send(record, new DemoProducerCallback());
可配置參數(shù):
? ? 1.acks: 有多少個分區(qū)副本收到消息生產(chǎn)者才會認為消息寫入是成功的;
? ? 2.buffer.memory: 設置生產(chǎn)者內(nèi)存緩沖區(qū)的大小;
? ? 3.compression.type: 指定消息發(fā)送時使用哪一種壓縮算法進行壓縮(snappy, gzip, lz4);
? ? 4.retries: 生產(chǎn)者可以重發(fā)消息的次數(shù);
? ? 5.batch.size: 同一批次發(fā)送到同一分區(qū)使用的內(nèi)存大小;
? ? 6.linger.ms: 同批次等待時間;
? ? 7.client.id: 任意字符串, 識別消息的來源;
? ? 8.max.in.flight.requests.per.connection: 生產(chǎn)者在收到服務器的響應之前可以發(fā)送多少個消息;
? ? 9.timeout.ms, request.timeout.ms 和 metadata.fetch.timeout.ms:?
? ? ? ? timeout.ms: 等待同步副本返回消息確認的時間;
? ? ? ? request.timeout.ms: 生產(chǎn)者在發(fā)送數(shù)據(jù)時等待服務器返回響應的時間;
? ? ? ? metadata.fetch.timeout.ms: 生產(chǎn)者在獲取元數(shù)據(jù)時等待服務器返回響應的時間;
? ? 10.max.block.ms: 獲取元數(shù)據(jù)時的阻塞時間;
? ? 11.max.request.size: 生產(chǎn)者發(fā)送請求的大小;
? ? 12.receive.buffer.bytes 和 send.buffer.bytes: TCP socket 接收和發(fā)送數(shù)據(jù)寶的緩沖區(qū)大小;
序列化器:
? ? 主要實現(xiàn) org.apache.kafka.common.serialization.Serializer 的?byte[] serialize(String topic, Customer data) 方法
分區(qū)器:
? ??主要實現(xiàn) org.apache.kafka.clients.producer.Partitioner 的?int partition(String topic, Object key,byte[] keyBytes, Object value,byte[] valueBytes, Cluster cluster) 方法