阿里開源Canal--⑤投遞到Kerberos認證的Kafka

在前一章節(jié)中觉增,Billow介紹了如何通過1.1.1以上的canal配置將binlog數(shù)據(jù)投遞到kafka彻亲。在實際的生產(chǎn)環(huán)境中孕锄,我們的kafka很多都會集成Kerberos作為安全認證。那么在本節(jié)苞尝,Billow將介紹如何通過修改源碼使Canal可配置為投遞數(shù)據(jù)到Kerberos認證的Kafka集群畸肆。

1.導(dǎo)入Canal源碼

canal已開源到github。下載地址為:https://github.com/alibaba/canal.git

1.1 在idea中導(dǎo)入git項目野来。

導(dǎo)入后的項目目錄為:


1.2 修改canal啟動類

canal獨立版本的入口類為:com.alibaba.otter.canal.deployer.CanalLauncher

在該類的main方法中恼除,做了以下幾件事情:
1、加載配置曼氛。
2豁辉、根據(jù)配置啟動CanalStater

...
...
logger.info("## load canal configurations");
            String conf = System.getProperty("canal.conf", "classpath:canal.properties");
            Properties properties = new Properties();
            RemoteConfigLoader remoteConfigLoader = null;
            if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
                conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
                properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
            } else {
                properties.load(new FileInputStream(conf));
            }
...
...
final CanalStater canalStater = new CanalStater();
            canalStater.start(properties);

在CanalStater.start方法中,通過配置項初始化MQ的生產(chǎn)者舀患。此處Billow配置為Kafka徽级,因此我們只關(guān)注kafka。


在初始化CanalKafkaProducer之后聊浅,會讀取配置文件中的mq配置餐抢。


在canal.properties中的mq配置如下:

##################################################
#########            MQ              #############
##################################################
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false


canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "/usr/keytab/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "/usr/keytab/jaas.conf"

其中canal.mq.kafka.kerberos為前綴的配置是Billow的自定義kerberos配置項。說明:

  • canal.mq.kafka.kerberos.enable
    此配置項為true跟false低匙,為true時表示kafka集群開啟了kerberos認證旷痕,那么會讀取接下來的兩個配置項內(nèi)容。

  • canal.mq.kafka.kerberos.krb5FilePath
    此配置項當(dāng)canal.mq.kafka.kerberos.enable為true時才會讀取顽冶,配置為kerberos集群中的krb5.conf文件欺抗。示例:

[logging]
 default = FILE:/var/log/krb5libs.log
 kdc = FILE:/var/log/krb5kdc.log
 admin_server = FILE:/var/log/kadmind.log

[libdefaults]
 default_realm = HADOOP.COM
 dns_lookup_realm = false
 dns_lookup_kdc = false
 ticket_lifetime = 24h
 renew_lifetime = 7d
 forwardable = true

[realms]
 BETA.COM = {
  kdc = hadoop1.com
  admin_server = hadoop1.com
 }

[domain_realm]
 .hadoop1.com = HADOOP.COM
 hadoop1.com = HADOOP.COM

  • canal.mq.kafka.kerberos.jaasFilePath
    此配置項當(dāng)canal.mq.kafka.kerberos.enable為true時才會讀取,配置為連接kafka時的jaas配置項强重。示例:
KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="E:/resources/billow.keytab"
   principal="billow@HADOOP.COM"
   client=true;
};

此處Billow在配置文件中配置了自定義的配置項绞呈,那么在代碼中,需要添加這幾項配置項的讀取间景。
CanalStater的buildMQProperties方法中添加配置項的讀取佃声。

/**
     * 構(gòu)造MQ對應(yīng)的配置
     * 
     * @param properties canal.properties 配置
     * @return
     */
    private static MQProperties buildMQProperties(Properties properties) {
        MQProperties mqProperties = new MQProperties();
        ......
        ......
        String kafkaKerberosEnable = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_ENABLE);
        if (!StringUtils.isEmpty(kafkaKerberosEnable)) {
            mqProperties.setKerberosEnable(Boolean.valueOf(kafkaKerberosEnable));
        }
        String kafkaKerberosKrb5Filepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH);
        if (!StringUtils.isEmpty(kafkaKerberosKrb5Filepath)) {
            mqProperties.setKerberosKrb5FilePath(kafkaKerberosKrb5Filepath);
        }
        String kafkaKerberosJaasFilepath = CanalController.getProperty(properties, CanalConstants.CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH);
        if (!StringUtils.isEmpty(kafkaKerberosJaasFilepath)) {
            mqProperties.setKerberosJaasFilePath(kafkaKerberosJaasFilepath);
        }
        return mqProperties;
    }

對應(yīng)的CanalConstants類中,添加常量信息配置:

/**
 * 啟動常用變量
 *
 * @author jianghang 2012-11-8 下午03:15:55
 * @version 1.0.0
 */
public class CanalConstants {
...
...
    public static final String CANAL_MQ_KAFKA_KERBEROS_ENABLE    = ROOT + "." + "mq.kafka.kerberos.enable";
    public static final String CANAL_MQ_KAFKA_KERBEROS_KRB5FILEPATH  = ROOT + "." + "mq.kafka.kerberos.krb5FilePath";
    public static final String CANAL_MQ_KAFKA_KERBEROS_JAASFILEPATH  = ROOT + "." + "mq.kafka.kerberos.jaasFilePath";
...
...

1.3 配置CanalKafkaProducer

上一小節(jié)中倘要,Billow介紹了如何添加關(guān)于Kerberos的開關(guān)配置圾亏。在這節(jié)我們來看看如何配置kafkaProducer為安全模式。

觀察源碼發(fā)現(xiàn),在CanalStater的start方法中初始化了一個CanalKafkaProducer對象志鹃。在此對象的init方法里面父晶,有關(guān)于kafkaproduct的相關(guān)配置。
在此處弄跌,Billow添加了判斷甲喝,如果配置文件中開啟了kerberos認證,那么就會配置kafkaProperty為安全模式铛只。并添加了系統(tǒng)環(huán)境配置埠胖。

 if (kafkaProperties.isKerberosEnable()){
            //kafka集群開啟了kerberos認證
            System.setProperty("java.security.krb5.conf", kafkaProperties.getKerberosKrb5FilePath());
            System.setProperty("java.security.auth.login.config", kafkaProperties.getKerberosJaasFilePath());
            System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
            properties.put("security.protocol", "SASL_PLAINTEXT");
            properties.put("sasl.kerberos.service.name", "kafka");

        }

具體位置為:

public class CanalKafkaProducer implements CanalMQProducer {

...
...


    @Override
    public void init(MQProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaProperties.getServers());
        properties.put("acks", kafkaProperties.getAcks());
        properties.put("compression.type", kafkaProperties.getCompressionType());
        properties.put("batch.size", kafkaProperties.getBatchSize());
        properties.put("linger.ms", kafkaProperties.getLingerMs());
        properties.put("max.request.size", kafkaProperties.getMaxRequestSize());
        properties.put("buffer.memory", kafkaProperties.getBufferMemory());
        properties.put("key.serializer", StringSerializer.class.getName());
        if(kafkaProperties.getTransaction()){
            properties.put("transactional.id", "canal-transactional-id");
        } else {
            properties.put("retries", kafkaProperties.getRetries());
        }
        if (kafkaProperties.isKerberosEnable()){
            //kafka集群開啟了kerberos認證
            System.setProperty("java.security.krb5.conf", kafkaProperties.getKerberosKrb5FilePath());
            System.setProperty("java.security.auth.login.config", kafkaProperties.getKerberosJaasFilePath());
            System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
            properties.put("security.protocol", "SASL_PLAINTEXT");
            properties.put("sasl.kerberos.service.name", "kafka");

        }
        if (!kafkaProperties.getFlatMessage()) {
            properties.put("value.serializer", MessageSerializer.class.getName());
            producer = new KafkaProducer<String, Message>(properties);
        } else {
            properties.put("value.serializer", StringSerializer.class.getName());
            producer2 = new KafkaProducer<String, String>(properties);
        }
        if (kafkaProperties.getTransaction()) {
            if (!kafkaProperties.getFlatMessage()) {
                producer.initTransactions();
            } else {
                producer2.initTransactions();
            }
        }
    }

...
...
}

2、測試

修改好源碼后淳玩,編譯打包直撤。

mvn clean install -Dmaven.test.skip -Denv=release

命令執(zhí)行成功后會在項目的target文件夾下面生成壓縮包:



將deployer包拷貝至服務(wù)器,配置好集群環(huán)境的krb5.conf蜕着、jaas.conf以及canal.properties文件谋竖。啟動canal,查看日志承匣,并啟動kafka消費者進行數(shù)據(jù)的消費蓖乘。
Billow已測試成功,有不懂的童鞋可以私信公眾號問~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末韧骗,一起剝皮案震驚了整個濱河市嘉抒,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌袍暴,老刑警劉巖些侍,帶你破解...
    沈念sama閱讀 217,509評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異政模,居然都是意外死亡岗宣,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評論 3 394
  • 文/潘曉璐 我一進店門淋样,熙熙樓的掌柜王于貴愁眉苦臉地迎上來耗式,“玉大人,你說我怎么就攤上這事习蓬∨κ玻” “怎么了措嵌?”我有些...
    開封第一講書人閱讀 163,875評論 0 354
  • 文/不壞的土叔 我叫張陵躲叼,是天一觀的道長。 經(jīng)常有香客問我企巢,道長枫慷,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,441評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮或听,結(jié)果婚禮上探孝,老公的妹妹穿的比我還像新娘。我一直安慰自己誉裆,他們只是感情好顿颅,可當(dāng)我...
    茶點故事閱讀 67,488評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著足丢,像睡著了一般粱腻。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上斩跌,一...
    開封第一講書人閱讀 51,365評論 1 302
  • 那天绍些,我揣著相機與錄音,去河邊找鬼耀鸦。 笑死柬批,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的袖订。 我是一名探鬼主播氮帐,決...
    沈念sama閱讀 40,190評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼洛姑!你這毒婦竟也來了揪漩?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,062評論 0 276
  • 序言:老撾萬榮一對情侶失蹤吏口,失蹤者是張志新(化名)和其女友劉穎奄容,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體产徊,經(jīng)...
    沈念sama閱讀 45,500評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡昂勒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,706評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了舟铜。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片戈盈。...
    茶點故事閱讀 39,834評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖谆刨,靈堂內(nèi)的尸體忽然破棺而出塘娶,到底是詐尸還是另有隱情,我是刑警寧澤痊夭,帶...
    沈念sama閱讀 35,559評論 5 345
  • 正文 年R本政府宣布刁岸,位于F島的核電站,受9級特大地震影響她我,放射性物質(zhì)發(fā)生泄漏虹曙。R本人自食惡果不足惜迫横,卻給世界環(huán)境...
    茶點故事閱讀 41,167評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望酝碳。 院中可真熱鬧矾踱,春花似錦、人聲如沸疏哗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽返奉。三九已至圣蝎,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間衡瓶,已是汗流浹背徘公。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留哮针,地道東北人关面。 一個月前我還...
    沈念sama閱讀 47,958評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像十厢,于是被迫代替她去往敵國和親等太。 傳聞我的和親對象是個殘疾皇子业踏,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,779評論 2 354

推薦閱讀更多精彩內(nèi)容