在前一章節(jié)中觉增,Billow介紹了如何通過1.1.1以上的canal配置將binlog數(shù)據(jù)投遞到kafka彻亲。在實際的生產(chǎn)環(huán)境中孕锄,我們的kafka很多都會集成Kerberos作為安全認證。那么在本節(jié)苞尝,Billow將介紹如何通過修改源碼使Canal可配置為投遞數(shù)據(jù)到Kerberos認證的Kafka集群畸肆。
1.導(dǎo)入Canal源碼
canal已開源到github。下載地址為:https://github.com/alibaba/canal.git
1.1 在idea中導(dǎo)入git項目野来。
導(dǎo)入后的項目目錄為:
1.2 修改canal啟動類
canal獨立版本的入口類為:com.alibaba.otter.canal.deployer.CanalLauncher
在該類的main方法中恼除,做了以下幾件事情:
1、加載配置曼氛。
2豁辉、根據(jù)配置啟動CanalStater
...
...
logger.info("## load canal configurations");
String conf = System.getProperty("canal.conf", "classpath:canal.properties");
Properties properties = new Properties();
RemoteConfigLoader remoteConfigLoader = null;
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
} else {
properties.load(new FileInputStream(conf));
}
...
...
final CanalStater canalStater = new CanalStater();
canalStater.start(properties);
在CanalStater.start方法中,通過配置項初始化MQ的生產(chǎn)者舀患。此處Billow配置為Kafka徽级,因此我們只關(guān)注kafka。
在初始化CanalKafkaProducer之后聊浅,會讀取配置文件中的mq配置餐抢。
在canal.properties中的mq配置如下:
##################################################
######### MQ #############
##################################################
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "/usr/keytab/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "/usr/keytab/jaas.conf"
其中canal.mq.kafka.kerberos為前綴的配置是Billow的自定義kerberos配置項。說明:
canal.mq.kafka.kerberos.enable
此配置項為true跟false低匙,為true時表示kafka集群開啟了kerberos認證旷痕,那么會讀取接下來的兩個配置項內(nèi)容。canal.mq.kafka.kerberos.krb5FilePath
此配置項當(dāng)canal.mq.kafka.kerberos.enable為true時才會讀取顽冶,配置為kerberos集群中的krb5.conf文件欺抗。示例:
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = HADOOP.COM
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
[realms]
BETA.COM = {
kdc = hadoop1.com
admin_server = hadoop1.com
}
[domain_realm]
.hadoop1.com = HADOOP.COM
hadoop1.com = HADOOP.COM
- canal.mq.kafka.kerberos.jaasFilePath
此配置項當(dāng)canal.mq.kafka.kerberos.enable為true時才會讀取,配置為連接kafka時的jaas配置項强重。示例:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="E:/resources/billow.keytab"
principal="billow@HADOOP.COM"
client=true;
};
此處Billow在配置文件中配置了自定義的配置項绞呈,那么在代碼中,需要添加這幾項配置項的讀取间景。
CanalStater的buildMQProperties方法中添加配置項的讀取佃声。
/**
* 構(gòu)造MQ對應(yīng)的配置
*
* @param properties canal.properties 配置
* @return
*/
private static MQProperties buildMQProperties(Properties properties) {
MQProperties mqProperties = new MQProperties();
......
......
String kafkaKerberosEnable = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
if (!StringUtils.isEmpty(kafkaKerberosEnable)) {
mqProperties.setKerberosEnable(Boolean.valueOf(kafkaKerberosEnable));
}
String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH);
if (!StringUtils.isEmpty(kafkaKerberosKrb5Filepath)) {
mqProperties.setKerberosKrb5FilePath(kafkaKerberosKrb5Filepath);
}
String kafkaKerberosJaasFilepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH);
if (!StringUtils.isEmpty(kafkaKerberosJaasFilepath)) {
mqProperties.setKerberosJaasFilePath(kafkaKerberosJaasFilepath);
}
return mqProperties;
}
對應(yīng)的CanalConstants類中,添加常量信息配置:
/**
* 啟動常用變量
*
* @author jianghang 2012-11-8 下午03:15:55
* @version 1.0.0
*/
public class CanalConstants {
...
...
public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE = ROOT + "." + "mq.kafka.kerberos.enable";
public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";
...
...
1.3 配置CanalKafkaProducer
上一小節(jié)中倘要,Billow介紹了如何添加關(guān)于Kerberos的開關(guān)配置圾亏。在這節(jié)我們來看看如何配置kafkaProducer為安全模式。
觀察源碼發(fā)現(xiàn),在CanalStater的start方法中初始化了一個CanalKafkaProducer對象志鹃。在此對象的init方法里面父晶,有關(guān)于kafkaproduct的相關(guān)配置。
在此處弄跌,Billow添加了判斷甲喝,如果配置文件中開啟了kerberos認證,那么就會配置kafkaProperty為安全模式铛只。并添加了系統(tǒng)環(huán)境配置埠胖。
if (kafkaProperties.isKerberosEnable()){
//kafka集群開啟了kerberos認證
System.setProperty("java.security.krb5.conf", kafkaProperties.getKerberosKrb5FilePath());
System.setProperty("java.security.auth.login.config", kafkaProperties.getKerberosJaasFilePath());
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.kerberos.service.name", "kafka");
}
具體位置為:
public class CanalKafkaProducer implements CanalMQProducer {
...
...
@Override
public void init(MQProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaProperties.getServers());
properties.put("acks", kafkaProperties.getAcks());
properties.put("compression.type", kafkaProperties.getCompressionType());
properties.put("batch.size", kafkaProperties.getBatchSize());
properties.put("linger.ms", kafkaProperties.getLingerMs());
properties.put("max.request.size", kafkaProperties.getMaxRequestSize());
properties.put("buffer.memory", kafkaProperties.getBufferMemory());
properties.put("key.serializer", StringSerializer.class.getName());
if(kafkaProperties.getTransaction()){
properties.put("transactional.id", "canal-transactional-id");
} else {
properties.put("retries", kafkaProperties.getRetries());
}
if (kafkaProperties.isKerberosEnable()){
//kafka集群開啟了kerberos認證
System.setProperty("java.security.krb5.conf", kafkaProperties.getKerberosKrb5FilePath());
System.setProperty("java.security.auth.login.config", kafkaProperties.getKerberosJaasFilePath());
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.kerberos.service.name", "kafka");
}
if (!kafkaProperties.getFlatMessage()) {
properties.put("value.serializer", MessageSerializer.class.getName());
producer = new KafkaProducer<String, Message>(properties);
} else {
properties.put("value.serializer", StringSerializer.class.getName());
producer2 = new KafkaProducer<String, String>(properties);
}
if (kafkaProperties.getTransaction()) {
if (!kafkaProperties.getFlatMessage()) {
producer.initTransactions();
} else {
producer2.initTransactions();
}
}
}
...
...
}
2、測試
修改好源碼后淳玩,編譯打包直撤。
mvn clean install -Dmaven.test.skip -Denv=release
命令執(zhí)行成功后會在項目的target文件夾下面生成壓縮包:
將deployer包拷貝至服務(wù)器,配置好集群環(huán)境的krb5.conf蜕着、jaas.conf以及canal.properties文件谋竖。啟動canal,查看日志承匣,并啟動kafka消費者進行數(shù)據(jù)的消費蓖乘。
Billow已測試成功,有不懂的童鞋可以私信公眾號問~