最近的業(yè)務(wù)用到kafka祥绞,過程中發(fā)現(xiàn)nodejs這邊的kafka client庫姓惑,現(xiàn)狀不是很好(相對(duì)其他語言)操灿。
比如provider這邊用了python,就很很好用的流行庫pykafka:
from pykafka import KafkaClient
client = KafkaClient(zookeeper_hosts='192.168.99.100:2181')
topic = client.topics['Hello-Kafka']
with topic.get_sync_producer() as producer:
producer.produce('msg')
可以看到還直接支持zookeeper,很方便皮迟。
consumer這邊需要Node.js來做,生態(tài)就沒那么好了秸架。比較流行的庫kafka-node,API設(shè)計(jì)地不怎么好用咆蒿。
看起來設(shè)計(jì)較好的kafkajs东抹,還流行不起來(3860 weekly downloads)蚂子。
下面說下kafka-node這個(gè)庫使用的一些問題:
1. 一旦提供topic立刻開始fetch消息
const kafka = require('kafka-node')
const client = new kafka.KafkaClient({ kafkaHost: '192.168.99.100:32768' });
const consumer = new kafka.Consumer(client, [
{ topic: 'Hello-Kafka', partition: 0 }
]);
consumer.on('message', function (message) {
console.log('接收一條', message.value);
});
這樣做是有問題的,在consumer聲明這一行府阀,實(shí)際就已經(jīng)開始異步fetch消息了缆镣,而此時(shí)on()還沒有執(zhí)行。這會(huì)造成歷史信息丟失试浙。在兩行間加一個(gè)延時(shí)董瞻,會(huì)很明顯地看到效果。
可以改成這樣:
const kafka = require('kafka-node')
const client = new kafka.KafkaClient({ kafkaHost: '192.168.99.100:32768' });
const consumer = new kafka.Consumer(client, []);
consumer.on('message', function (message) {
console.log('接收一條', message.value);
});
consumer.addTopics([{ topic: 'Hello-Kafka', partition: 0 }]);
這樣在最后一行addTopics之后田巴,fetch才開始钠糊。
2. 只能通過Eventemitter來獲取信息
consumer獲取信息只能通過on('message', cb)
,不好和現(xiàn)在的js代碼結(jié)合壹哺。下面給一種寫法:
function getMessages(consumer, num) {
return new Promise((resolve) => {
let messages = [];
let n = 0;
const listener = function (message) {
messages.push(message);
n++;
if (num == n) {
consumer.pause();
consumer.removeListener('message', listener);
resolve(messages);
}
}
consumer.on('message', listener);
});
}
const tenMsgs = await getMessages(consumer, 10);
consumer.addTopics([{ topic: 'Hello-Kafka', partition: 0 }]);
3. 不支持zookeeper
只能通過其他zookeeper庫來獲取brokers地址抄伍。
最流行的是node-zookeeper-client。比較難用管宵,下面提供關(guān)鍵代碼:
const zookeeper = require('node-zookeeper-client');
function getNodeInfo(client, id) {
return new Promise((resolve, reject) => {
const path = '/brokers/ids';
client.getData(path + '/' + id, function (err, data) {
if (err) return reject(err);
resolve(nodeInfo = JSON.parse('' + data));
});
});
}
function getBrokers(zookeeperUrl) {
return new Promise((resolve, reject) => {
const client = zookeeper.createClient(zookeeperUrl);
const path = '/brokers/ids';
client.once('connected', function () {
client.getChildren(path, function (err, nodeIds) {
if (err) {
client.close();
return reject(err);
}
if (nodeIds.length === 0) {
client.close();
return reject(new Error('GetChildren returns no node'));
}
resolve(Promise.all(
nodeIds.map(id => getNodeInfo(client, id))
));
// const nodeInfo = await getNodeInfo(client, id);
// addrStr += nodeInfo.host + ':' + nodeInfo.port;
client.close();
});
});
client.connect();
});
}
function getBrokersAddr(brokers) {
const addrStr = '';
brokers.forEach(b => {
addrStr += b.host + ':' + b.port;
});
return addrStr;
}
brokers = await getBrokers('192.168.99.100:2181')
brokerStr = getBrokersAddr(brokers);
輸出192.168.99.100:32768截珍,可直接用于上面的kafka-node client。