生產(chǎn)者(2019-02-15)

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Kafka生產(chǎn)者

架構圖:?

Kafka生產(chǎn)者組件圖

必選屬性:

? ? bootstrap.servers: broker的地址清單(host:port)

? ? key.serializer: 鍵的序列化器(ByteArraySerializer[這個只做很少的事情], StringSerializer, IntegerSerializer, 自定義序列化器)

? ? value.serializer: 值的序列化器(同上)

創(chuàng)建Kafka生產(chǎn)者:

? ? 1. 新建一個Properties對象;

? ? 2. 因為我們打算把鍵和值定義成字符串類型, 所以使用內(nèi)置的StringSerializer;

? ? 3. 在這里我們創(chuàng)建了一個新的生產(chǎn)者對象, 并為鍵和值設置了恰當?shù)念愋? 然后把Properties對象傳給它禾蚕。

? ? private Properties kafkaProps = new Properties();

? ? kafkaProps.put("bootstrap.servers", "broker1:9092, broker2:9092");

? ? kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

? ??kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

? ? KafkaProducer<String, String> producer = new kafkaProducer<String, String>(kafkaProps);

發(fā)送消息:

? ? 1.同步發(fā)送消息

? ? ? ? ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");

? ? ? ? try {

? ? ? ? ? ? producer.send(record).get();

????????} catch (Exception e) {

? ? ? ? ? ? e.printStackTrace();

????????}

? ? 2.異步發(fā)送

? ? ? ? private class DemoProducerCallback implements Callback {

? ? ? ? ? ? @Override

? ? ? ? ? ? public void onCompletion(RecordMetadata recordMetadata, Exception e) {

? ? ? ? ? ? ? ? if (e != null) {

? ? ? ? ? ? ? ? ? ? e.printStackTrace();

????????????????}

????????????}

????????}

? ??????ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");

? ? ? ? producer.send(record, new DemoProducerCallback());

可配置參數(shù):

? ? 1.acks: 有多少個分區(qū)副本收到消息生產(chǎn)者才會認為消息寫入是成功的;

? ? 2.buffer.memory: 設置生產(chǎn)者內(nèi)存緩沖區(qū)的大小;

? ? 3.compression.type: 指定消息發(fā)送時使用哪一種壓縮算法進行壓縮(snappy, gzip, lz4);

? ? 4.retries: 生產(chǎn)者可以重發(fā)消息的次數(shù);

? ? 5.batch.size: 同一批次發(fā)送到同一分區(qū)使用的內(nèi)存大小;

? ? 6.linger.ms: 同批次等待時間;

? ? 7.client.id: 任意字符串, 識別消息的來源;

? ? 8.max.in.flight.requests.per.connection: 生產(chǎn)者在收到服務器的響應之前可以發(fā)送多少個消息;

? ? 9.timeout.ms, request.timeout.ms 和 metadata.fetch.timeout.ms:?

? ? ? ? timeout.ms: 等待同步副本返回消息確認的時間;

? ? ? ? request.timeout.ms: 生產(chǎn)者在發(fā)送數(shù)據(jù)時等待服務器返回響應的時間;

? ? ? ? metadata.fetch.timeout.ms: 生產(chǎn)者在獲取元數(shù)據(jù)時等待服務器返回響應的時間;

? ? 10.max.block.ms: 獲取元數(shù)據(jù)時的阻塞時間;

? ? 11.max.request.size: 生產(chǎn)者發(fā)送請求的大小;

? ? 12.receive.buffer.bytes 和 send.buffer.bytes: TCP socket 接收和發(fā)送數(shù)據(jù)寶的緩沖區(qū)大小;

序列化器:

? ? 主要實現(xiàn) org.apache.kafka.common.serialization.Serializer 的?byte[] serialize(String topic, Customer data) 方法

分區(qū)器:

? ??主要實現(xiàn) org.apache.kafka.clients.producer.Partitioner 的?int partition(String topic, Object key,byte[] keyBytes, Object value,byte[] valueBytes, Cluster cluster) 方法

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末贷笛,一起剝皮案震驚了整個濱河市拂蝎,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌沟启,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,509評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡砸彬,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評論 3 394
  • 文/潘曉璐 我一進店門斯入,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拿霉,“玉大人,你說我怎么就攤上這事咱扣。” “怎么了涵防?”我有些...
    開封第一講書人閱讀 163,875評論 0 354
  • 文/不壞的土叔 我叫張陵闹伪,是天一觀的道長沪铭。 經(jīng)常有香客問我,道長偏瓤,這世上最難降的妖魔是什么杀怠? 我笑而不...
    開封第一講書人閱讀 58,441評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮厅克,結果婚禮上赔退,老公的妹妹穿的比我還像新娘。我一直安慰自己证舟,他們只是感情好硕旗,可當我...
    茶點故事閱讀 67,488評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著女责,像睡著了一般漆枚。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上抵知,一...
    開封第一講書人閱讀 51,365評論 1 302
  • 那天墙基,我揣著相機與錄音,去河邊找鬼刷喜。 笑死残制,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的掖疮。 我是一名探鬼主播初茶,決...
    沈念sama閱讀 40,190評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼氮墨!你這毒婦竟也來了纺蛆?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,062評論 0 276
  • 序言:老撾萬榮一對情侶失蹤规揪,失蹤者是張志新(化名)和其女友劉穎桥氏,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體猛铅,經(jīng)...
    沈念sama閱讀 45,500評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡字支,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,706評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了奸忽。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片堕伪。...
    茶點故事閱讀 39,834評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖栗菜,靈堂內(nèi)的尸體忽然破棺而出欠雌,到底是詐尸還是另有隱情,我是刑警寧澤疙筹,帶...
    沈念sama閱讀 35,559評論 5 345
  • 正文 年R本政府宣布富俄,位于F島的核電站禁炒,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏霍比。R本人自食惡果不足惜幕袱,卻給世界環(huán)境...
    茶點故事閱讀 41,167評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望悠瞬。 院中可真熱鬧们豌,春花似錦、人聲如沸浅妆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽狂打。三九已至擂煞,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間趴乡,已是汗流浹背对省。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留晾捏,地道東北人蒿涎。 一個月前我還...
    沈念sama閱讀 47,958評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像惦辛,于是被迫代替她去往敵國和親劳秋。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,779評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 大致可以通過上述情況進行排除 1.kafka服務器問題 查看日志是否有報錯胖齐,網(wǎng)絡訪問問題等玻淑。 2. kafka p...
    生活的探路者閱讀 7,589評論 0 10
  • 學習kafka有一段時間了。關于它里面的知識還是需要總結一下呀伙,一來是能讓自己對kafka能有一個比較成型的理解补履,二...
    紹圣閱讀 1,085評論 0 3
  • Kafka的基本概念 BrokerKafka集群中包含多個服務器,其中每個服務器稱為一個broker剿另。有一點需要注...
    frmark閱讀 372評論 0 0
  • 方法比努力重要 實踐比讀書重要 前提是--先讀書 蔡康永說過這樣一段話:15歲覺得游泳難雨女,放棄游泳谚攒,到18歲遇到一...
    三月1006閱讀 449評論 1 2
  • 偶然看見窗外幾棵樹因為寒冷掉光了所有的葉,只能下光禿禿的樹干在風中搖曳氛堕。想想南方此時過于寒冷的外界馏臭,只是讓失掉假日...
    何以觥籌錯閱讀 264評論 0 0