1 书斜、消息發(fā)送方法:Producer.send(new ProducerRecord())
2 苹威、Producer 生產者對象撞叨,這個對象是線程安全的实辑,所以可以在多線程下用來發(fā)送消息漱病,
3买雾、ProducerRecord 為具體的消息對象,構造方法有很多杨帽,多使用new ProducerRecord("topic","message")? 其他構造方法如下:
????public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
????public ProducerRecord(String topic, Integer partition漓穿, Long timestamp ,K key , V value)
????public ProducerRecord(String topic , Integer partition, K key, V value, Iterable<Header> headers))
????public ProducerRecord(String topic, Integer partition, K key, V value)
????public ProducerRecord(String topic, K key, V value)
???public ProducerRecord(String topic, V value)
描述:(topic:主題? ?value :消息? key: 消息主鍵??partition:分區(qū)?headers: 頭部信息? timestamp:時間)
在發(fā)送消息時ProducerRecord 會被頻繁的創(chuàng)建注盈,每條消息都會創(chuàng)建一個ProducerRecord對象晃危。
4、創(chuàng)建了生產者和消息之后就可以發(fā)送消息了老客,發(fā)送消息有三種模式僚饭,發(fā)后既忘(fire-and-forget) ,同步(sync)胧砰, 異步(async)三種鳍鸵。
KafkaProduct的send()方法,返回值并不是void尉间,而是Feature<RecordMetadata>類型权纤,send() 有兩個重載方法
同步方法 : public?Future send(ProducerRecord record)
異步方法:public Future<RecordMetadata> send(ProducerRecord<K , V> record ,Callback callback)
發(fā)后即忘:
同步:利用返回的Future實現同步?Product.send(ProducerRecord).get()
實際上,send()方法本來就是異步的乌妒,返回的Future對象可以使調用方獲得發(fā)送結果
如上述 send()方法獲取的Future<RecordMetadata> 對象汹想, 調用get()方法可以阻塞等待kafka的響應,直到發(fā)送成功或者發(fā)生異常撤蚊。
RecordMetadata 中包含了一些元數據信息 比如分區(qū)古掏、主題、分區(qū)中的偏移量侦啸、時間戳等槽唾。如果不需要這些信息 使用send().get()方法更方便。
異步:
異步使用Callback 方式返回光涂,要么成功庞萍,要么返回錯誤信息,RecordMetadata 和Exception 是互斥的??Exception == null 時忘闻,發(fā)送成功RecordMetadata 有信息钝计,? ? ? ? Exception !=null的時候??RecordMetadata就為空。
5消息順序性、
? ??producer .send(record1, callback1) ;producer .send(record2, callback2) ; 相對于同一個分區(qū)而言私恬,record1 在record2 之前發(fā)送债沮,那么callback1 一定在callback2 之前調用,也就是說相同的分區(qū)本鸣,回調函數也可以保準有序性返回疫衩。