由于最近的項(xiàng)目中使用到kafka葛虐,項(xiàng)目使用nodejs語(yǔ)言家夺,遇到一些問(wèn)題
nvm版本: v6.17.1
kafka-node版本:5.0.0
在項(xiàng)目中遇到報(bào)錯(cuò):kafka.KafkaClient is not a constructor
解決辦法:由于我之前安裝的kafka-node版本較低温眉,在安裝kafka-node的高版本時(shí)沒(méi)有覆蓋掉導(dǎo)致碑定,將kafka-node低版本完全卸載后重新安裝5.0.0的kafka-node版本后該問(wèn)題解決慎皱。
最后裸扶,記錄一下我使用kafka-node生產(chǎn)者的代碼铡买,該代碼還在測(cè)試中更鲁,后面修改會(huì)同步:
var kafka = require("kafka-node");
var config = require('appConfig');//我的配置文件
var Client = kafka.KafkaClient;
var Producer = kafka.Producer;
var client = null;
var payloads = null;
exports.sendKafkaMessage = function (topic,message,operation,callback) {
try {
console.log("sendKafkaMessage,host:"+config.kafka_host);
if (this.client == null ) {
this.client = new Client({kafkaHost: config.kafka_host});
log.app.info("連接kafka中");
}
var producer = new Producer(this.client,{ requireAcks: 1 });
var payloads = [{
topic: topic,
messages: JSON.stringify(message),
partition: 0, // default 0
// attributes: 2, // default: 0
}];
console.log("send message:["+JSON.stringify(message)+"],to topic:"+topic);
producer.on('ready', function () {
try {
producer.send(payloads, function (err, data) {
if (err){
console.log("send message:["+JSON.stringify(message)+"] error,topic:["+topic+"],data:"+JSON.stringify(data));
callback(err);
} else {
console.log("send message:["+JSON.stringify(message)+"] success,topic:"+topic+"],data:"+JSON.stringify(data));
// callback(null);
}
producer.close();
})
}catch (e) {
console.log(e);
}
});
producer.on('error', function (err) {
console.log('error:', err);
callback(err);
});
}catch (e) {
console.log(e);
}
callback(null);
}