Camel筆記(從Mysql到本地文件與Kafka隊(duì)列)

基礎(chǔ)代碼來自:Spring Boot + Apache Camel SQL component + MySQL - Hello World Example | JavaInUse

在文未有代碼下載鏈接 https://www.javainuse.com/zip/camel/boot-camel-sql.rar

  • 準(zhǔn)備工作:
    1)修改application.properties文件中Mysql數(shù)據(jù)庫的相關(guān)配置
    2)啟動(dòng)主程序纽帖,添加一條記錄 {"empId":"002","empName":"keven"}


    27.png

    3)查一下結(jié)果:(剛才多添加了一條同樣的記錄)


    28.png

    4)再將application.properties中spring.datasource.initialization-mode=always這行注釋掉镰吵,否則每次重啟時(shí)它都會(huì)重建數(shù)據(jù)庫,又要重新添加記錄

從上圖可以看出:本程序提供了兩個(gè)功能,從接收瀏覽器Get/Post兩個(gè)方法(端點(diǎn))凤跑,分別路由到“插入/查詢所有記錄”兩個(gè)路徑豺瘤,執(zhí)行對應(yīng)功能。

以下做一點(diǎn)擴(kuò)展:

  • 發(fā)送到本地文件
  1. 在EmployeeServiceImpl類中添加如下路由:
//write,Mysql--->File
        from("direct:write").to("sql:select * from employee").process(new Processor() {
            public void process(Exchange xchg) throws Exception {
                ArrayList<Map<String, String>> dataList = (ArrayList<Map<String, String>>) xchg.getIn().getBody();
                List<Employee> employees = new ArrayList<Employee>();
                System.out.println(dataList);
                StringBuilder sb=new StringBuilder();
                for (Map<String, String> data : dataList) {
                    sb.append("empId:"+data.get("empId")+",");
                    sb.append("empName:"+data.get("empName"));
                }
                xchg.getIn().setBody(sb.toString());
            }
        }).to("file:data/outbox");
  1. 到控制類EmployeeController中加一條
//write
    @RequestMapping(value = "/write", method = RequestMethod.GET)
    public boolean write() {
        producerTemplate.requestBody("direct:write", null, List.class);
        return true;
    }

這樣,當(dāng)頁面中接收到write的請求時(shí)埠啃,程序會(huì)先查找記錄,再把結(jié)果輸出到程序的data/outbox目錄下
3)重啟一下伟恶,訪問http://localhost:8080/write

29.png

再到程序目錄下檢查一下
30.png

可以看到碴开,已經(jīng)輸出到指定目錄了

  • 發(fā)送到kafka隊(duì)列
    1)準(zhǔn)備工作
    在poem.xml文件中添加kafka依賴
              <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-kafka</artifactId>
            <version>2.16.3</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.0</version>
        </dependency>

到服務(wù)實(shí)體類EmployeeServiceImpl中添加kafka定義(也可放到屬性文件中去)

    String topicName = "topic=camel-topic";
    String kafkaServer = "kafka:CDH-04:9092";
    String zooKeeperHost = "zookeeperHost=CDH-05&zookeeperPort=2181";
    String serializerClass = "serializerClass=kafka.serializer.StringEncoder";

    String toKafka = new StringBuilder().append(kafkaServer).append("?").append(topicName).append("&")
            .append(zooKeeperHost).append("&").append(serializerClass).toString();

2)修改代碼
添加到kafka的路由

//Kafka,Mysql--->Kafka
        from("direct:kafka").to("sql:select * from employee").process(new Processor() {
            public void process(Exchange xchg) throws Exception {
                ArrayList<Map<String, String>> dataList = (ArrayList<Map<String, String>>) xchg.getIn().getBody();
                List<Employee> employees = new ArrayList<Employee>();
                System.out.println(dataList);
                for (Map<String, String> data : dataList) {
                    Employee employee = new Employee();
                    employee.setEmpId(data.get("empId"));
                    employee.setEmpName(data.get("empName"));
                    employees.add(employee);
                }
                xchg.getIn().setBody(employees.toString());
            }
        }).to(toKafka).process(new Processor() {
            public void process(Exchange exchange) throws Exception {
                System.out.println("it is :"+toKafka);
            }
        });

添加觸發(fā)控制(EmployeeController類)

//kafka
    @RequestMapping(value = "/kafka", method = RequestMethod.GET)
    public boolean kafka() {
        producerTemplate.requestBody("direct:kafka", null, List.class);
        return true;
    }

3)訪問一下 http://localhost:8080/kafka

31.png

4)查看一下隊(duì)列
32.png

可以看到,已經(jīng)發(fā)送到隊(duì)列了

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末博秫,一起剝皮案震驚了整個(gè)濱河市潦牛,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌挡育,老刑警劉巖巴碗,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異即寒,居然都是意外死亡橡淆,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進(jìn)店門母赵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來逸爵,“玉大人,你說我怎么就攤上這事市咽∪” “怎么了?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵施绎,是天一觀的道長溯革。 經(jīng)常有香客問我,道長谷醉,這世上最難降的妖魔是什么致稀? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮俱尼,結(jié)果婚禮上抖单,老公的妹妹穿的比我還像新娘。我一直安慰自己遇八,他們只是感情好矛绘,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著刃永,像睡著了一般货矮。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上斯够,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天骂远,我揣著相機(jī)與錄音,去河邊找鬼惶洲。 笑死,一個(gè)胖子當(dāng)著我的面吹牛燃少,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播铃在,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼阵具,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了涌穆?” 一聲冷哼從身側(cè)響起怔昨,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎宿稀,沒想到半個(gè)月后趁舀,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡祝沸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年矮烹,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片罩锐。...
    茶點(diǎn)故事閱讀 38,039評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡奉狈,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出涩惑,到底是詐尸還是另有隱情仁期,我是刑警寧澤,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布竭恬,位于F島的核電站跛蛋,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏痊硕。R本人自食惡果不足惜赊级,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望岔绸。 院中可真熱鬧理逊,春花似錦、人聲如沸盒揉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽刚盈。三九已至墨微,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間扁掸,已是汗流浹背翘县。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留谴分,地道東北人锈麸。 一個(gè)月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像牺蹄,于是被迫代替她去往敵國和親忘伞。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容