Apache Camel 調(diào)研

什么是Camel彻亲?

Camel框架的核心是一個路由引擎蚕断,或者更確切地說是一個路由引擎構(gòu)建器。它允許您定義自己的路由規(guī)則匿垄,決定從哪個源接收消息,并確定如何處理這些消息并將其發(fā)送到其他目標(biāo)归粉。

Camel提供更高層次的抽象椿疗,使您可以使用相同的API與各種系統(tǒng)進行交互,而不管系統(tǒng)使用的協(xié)議或數(shù)據(jù)類型如何糠悼。 Camel中的組件提供了針對不同協(xié)議和數(shù)據(jù)類型的API的特定實現(xiàn)届榄。開箱即用,Camel支持80多種協(xié)議和數(shù)據(jù)類型倔喂。

Getting started

源碼地址:https://github.com/camelinaction/camelinaction.git

下面是一個拷貝文件的例子铝条,將文件從data/inbox拷貝到data/outbox

1 添加maven依賴

<dependencies>
  <dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-core</artifactId>
    <version>2.15.6</version>
  </dependency>
</dependencies>

2 代碼

public class FileCopierWithCamel {

    public static void main(String args[]) throws Exception {
        // create CamelContext
        CamelContext context = new DefaultCamelContext();

        // add our route to the CamelContext
        context.addRoutes(new RouteBuilder() {
            public void configure() {
                /**
                  file: 表示使用文件Component
                  from 表示從哪里獲取數(shù)據(jù),進行消費
                  to  表示將數(shù)據(jù)生產(chǎn)到哪里
                 */
                from("file:data/inbox?noop=true").to("file:data/outbox");
            }
        });

        // start the route and let it do its work
        context.start();
        Thread.sleep(10000);

        // stop the CamelContext
        context.stop();
    }
}

Camel概念

CamelContext

Camel的容器席噩,通過CamelContext可以訪問內(nèi)部服務(wù):Components班缰,Endpoints,Endpoints悼枢,Registry等等

image.png

Routes

通過路由可以實現(xiàn):客戶端與服務(wù)端埠忘,生產(chǎn)者與消費者的解耦

比如:從ftp服務(wù)上獲取訂單信息,將其發(fā)送到JMS隊列馒索,可以通過如下路由表示

    //from可以理解成消費者:表示從ftp服務(wù)上獲取數(shù)據(jù)進行消費
    from("ftp://rider.com/orders?username=rider&password=secret")
    //to可以理解成生產(chǎn)者:表示將數(shù)據(jù)發(fā)送給jms
    .to("jms:incomingOrders");
image.png

endpoint URI

可以簡單理解成消息的地址

  • 對于消費者(from方法)來說,表示消息從哪里來
  • 對于生產(chǎn)者(to方法)來說莹妒,表示消息到哪里去
image.png

如上圖所示

Scheme:指明使用的是FtpComponent

Context path: ftp服務(wù)和端口號,以及文件路徑

Options:一些操作配置绰上,每個組件都不同

Exchange

Message的容器旨怠,其的內(nèi)部屬性,如下圖所示

image.png

Message

消息數(shù)據(jù)的基本實體

MEP

Exchange支持多種消息交換模式 (MEPs)渔期,通過其內(nèi)部持有的pattern屬性進行區(qū)分

下面介紹2種常用的交互模式

  • InOnly :單向消息模式(也稱為事件消息)运吓,簡言之:不需要等待對方的響應(yīng)
  • InOut : 請求響應(yīng)模式,例如:基于http的傳輸疯趟,通常是此模式拘哨,客戶端請求web頁面,等待服務(wù)端的回應(yīng)

InOut模式包含In message 與 Out message信峻,而InOnly模式只包含In message

Exception

如果路由期間發(fā)生錯誤倦青,此屬性將被賦值

Properties

Exchange的消息頭,Camel本身和開發(fā)者可以設(shè)置或讀取屬性值

Endpoints

Endpoints是模擬通道末端的camel抽象盹舞,充當(dāng)一個工廠产镐,用于創(chuàng)建消息的producer和consumer

Component

創(chuàng)建Endpoints的工廠隘庄,一個Component的實現(xiàn),通常有一些傳輸屬性需要設(shè)置癣亚。例如丑掺,JMS-Component要求在其上設(shè)置ConnectionFactory,以便對所有JMS通信使用相同的消息代理

Component述雾,Endpoints和Exchange的關(guān)系如下圖所示:

image.png

內(nèi)部組件介紹

Direct Component

基于內(nèi)存的同步消息組件

使用Direct組件街州,生產(chǎn)者直接調(diào)用消費者。因此使用Direct組件的唯一開銷是方法調(diào)用玻孟。

Direct的線程模型

由于生產(chǎn)者直接調(diào)用消費者

因此:調(diào)用者與camel的消費者共用一個線程

image.png

SEDA Component

基于內(nèi)存的異步消息組件:生產(chǎn)者和消費者通過BlockingQueue交換消息唆缴,生產(chǎn)者與消費者是不同的線程

如果VM在消息尚未處理時終止,則seda不會實現(xiàn)消息的持久化或恢復(fù)黍翎,因此有丟失消息的風(fēng)險

消費者視角

Consumer thread pool

SedaConsumer內(nèi)部持有一個線程池面徽,默認是1個線程,可以通過concurrentConsumers指定線程數(shù)

代碼如下所示

from("seda:start?concurrentConsumers=2")
    .to("log:A")
    .to("log:B");

image.png

Threads thread pool

Consumer thread pool中的每個線程匣掸,還可以開啟新的線程池趟紊,代碼如下所示

from("seda:start?concurrentConsumers=2")
            .to("log:A")
            // create a thread pool with a pool size of 5 and a maxi- mum size of 10.
            .threads(5, 10) 
            .to("log:B");

image.png

如上圖所示:consumer線程執(zhí)行完"log:A"后,將后續(xù)任務(wù)提交給"Threads thead pool",然后就直接返回了

生產(chǎn)者視角

異步發(fā)送消息

生產(chǎn)者發(fā)完消息旺聚,立刻返回织阳,不需要等待消息消費成功

 //InOnly消息模式
 producerTemplate.sendBody("seda:start", body);

同步發(fā)送消息

生產(chǎn)者發(fā)完消息,會阻塞砰粹,直到消費成功

 //InOut消息模式
 producerTemplate.requestBody("seda:start", body);

實現(xiàn)原理:SedaProducer通過CountDownLatch信號量進行等待唧躲,當(dāng)數(shù)據(jù)消費成功后,消費者修改CountDownLatch信號量碱璃,喚醒SedaProducer弄痹,然后消費者才返回。

Camel使用

消息發(fā)送

Camel可以使用ProducerTemplate將消息發(fā)送到endpoint嵌器,或從endpoint請求數(shù)據(jù)

我們可以使用@Produce創(chuàng)建ProducerTemplate肛真,代碼如下

   import org.apache.camel.Produce;
   import org.apache.camel.ProducerTemplate;
   public class ProducePojo {
     @Produce
     private ProducerTemplate template;
     public String sayHello(String name) {
       //發(fā)消息到一個activemq端點
       return template.requestBody("activemq:queue:sayhello",
                                   name, String.class);
   } } 

為了確保ProducerTemplate可以注入到ProducePojo類,需要將ProducePojo配置到spring上下文

    <beans xmlns="http://www.springframework.org/schema/beans" ...>
     <bean id="activemq"
           class="org.apache.activemq.camel.component
                  .ActiveMQComponent">
       <property name="brokerURL"
                 value="tcp://localhost:61616"/>
     </bean>
     <bean id="producer"
           class="org.camelcookbook.extend.produce
                  .ProducePojo"/>
     <camelContext xmlns="http://camel.apache.org/schema/spring"/>
   </beans>

方法調(diào)用

比如我要調(diào)用MyBean的myMethon,可以通過注解或java DSL

如果參數(shù)是對象類型爽航,camel也會自動轉(zhuǎn)型

以下代碼表示接收到someEndpoint的消息后蚓让,調(diào)用myBean.myMethod方法

    //注意:要確保MyBean被camelContext或springContext加載
  public class MyBean {
     //注解的方式
     @Consume(uri="someEndpoint")
     public String myMethod(ParamBean message) {
       //...
  } } 
  //java DSL
  from("someEndpoint")
     .bean(MyBean.class, "myMethod");

  //通過ProducerTemplate調(diào)用此方法
  ParamBean param = genTestParam();
  template.requestBody("someEndpoint", param);

這里其實用的的是camel的內(nèi)部組件Bean Component,具體用法可以參考如下官方文檔

Bean Component: http://camel.apache.org/bean.html

關(guān)于參數(shù)的傳遞讥珍,可以參考

Bean Binding: http://camel.apache.org/bean-binding.html

自定義Processor

Processor是camel中的基本功能元素历极,自定義Processor非常易于在路由中編寫和使用
定義一個將訂單數(shù)據(jù)轉(zhuǎn)成csv格式的Processor

public class OrderToCsvProcessor implements Processor {

    public void process(Exchange exchange) throws Exception {
        String custom = exchange.getIn().getBody(String.class);

        String id = custom.substring(0, 9);
        String customerId = custom.substring(10, 19);
        String date = custom.substring(20, 29);
        String items = custom.substring(30);
        String[] itemIds = items.split("@");

        StringBuilder csv = new StringBuilder();
        csv.append(id.trim());
        csv.append(",").append(date.trim());
        csv.append(",").append(customerId.trim());
        for (String item : itemIds) {
            csv.append(",").append(item.trim());
        }

        exchange.getIn().setBody(csv.toString());
    }

}

定義路由規(guī)則

from("quartz://report?cron=0+0+6+*+*+?")
    .to("http://riders.com/orders/cmd=received&date=yesterday")
    .process(new OrderToCsvProcessor())
    .to("file://riders/orders?fileName=report-${header.Date}.csv");

異常處理

基本用法

camel支持"異步重試,延遲重試"等多種處理方式

//通用異常處理
errorHandler(defaultErrorHandler()
                    //異步重試(默認同步)
                    .asyncDelayedRedelivery()
                    .maximumRedeliveries(2)
                    .redeliveryDelay(1000)
                    .retryAttemptedLogLevel(LoggingLevel.WARN));

//如果是JmsException衷佃,則只需自定義processer
onException(JmsException.class)
    .handled(true)
    .process(new GenerateFailueResponse());

//如果IOException,則重試3次趟卸,依舊失敗,則執(zhí)行to對應(yīng)的動作
onException(IOException.class).maximumRedeliveries(3)
    .handled(true)
    .to("ftp://gear@ftp.rider.com?password=secret");

from("file:/rider/files/upload?delay=3600000")
    .to("http://rider.com?user=gear&password=secret");

上面的代碼,其異常處理的作用域是整個context

Camel也支持route作用域的異常處理锄列,如下代碼所示

from("direct:step1")
        .bean(Step1.class, "success")
         //異常處理图云,作用域是當(dāng)前路由
        .onCompletion().onFailureOnly()
         //如果失敗,則執(zhí)行onFailure方法
        .bean(Step1.class, "onFailure")
        .end()
        .to("direct:step2");

注意onCompletion的方式邻邮,是異步的竣况,如果想同步處理異常可以參考camel的Synchronization使用方式

一個異常處理的例子

場景描述

順序執(zhí)行step1,step2,step3,如果某一步失敗饶囚,回滾之前的每一步

比如step3執(zhí)行失敗帕翻,回滾step2,step1

解決方案

通過 onCompletion().onFailureOnly()方法對每一步設(shè)置失敗回調(diào)函數(shù),

下面的代碼模擬了step3執(zhí)行失敗的場景,從日志可以看出camel按順序執(zhí)行了step2和step1的失敗回調(diào)方法

public class RollbackTest extends CamelTestSupport {
  
    @Override
    public void setUp() throws Exception {
        deleteDirectory("target/mail/backup");
        super.setUp();
    }

    @Test
    public void testRollback() throws Exception {
        template.sendBodyAndHeader("direct:step1", "bumper", "to", "FATAL");
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:step1")
                        .bean(Step1.class, "success")
                        .onCompletion().onFailureOnly()
                         //如果失敗萝风,調(diào)用step1的onFailure方法
                        .bean(Step1.class, "onFailure")
                        .end()
                        .to("direct:step2");

                from("direct:step2")
                        .bean(Step2.class, "success")
                        .onCompletion().onFailureOnly()
                        .bean(Step2.class, "onFailure")
                        .end()
                        .to("direct:step3");

                from("direct:step3")
                        .bean(Step3.class, "fail")
                        .onCompletion().onFailureOnly()
                        .bean(Step3.class, "onFailure")
                        .end()
                        .log("888:end");
            }
        };
    }
}


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市紫岩,隨后出現(xiàn)的幾起案子规惰,更是在濱河造成了極大的恐慌,老刑警劉巖泉蝌,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件歇万,死亡現(xiàn)場離奇詭異,居然都是意外死亡勋陪,警方通過查閱死者的電腦和手機贪磺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來诅愚,“玉大人寒锚,你說我怎么就攤上這事∥バⅲ” “怎么了刹前?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長雌桑。 經(jīng)常有香客問我喇喉,道長,這世上最難降的妖魔是什么校坑? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任拣技,我火速辦了婚禮,結(jié)果婚禮上耍目,老公的妹妹穿的比我還像新娘膏斤。我一直安慰自己,他們只是感情好制妄,可當(dāng)我...
    茶點故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布掸绞。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪衔掸。 梳的紋絲不亂的頭發(fā)上烫幕,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天,我揣著相機與錄音敞映,去河邊找鬼较曼。 笑死,一個胖子當(dāng)著我的面吹牛振愿,可吹牛的內(nèi)容都是我干的捷犹。 我是一名探鬼主播,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼冕末,長吁一口氣:“原來是場噩夢啊……” “哼萍歉!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起档桃,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤枪孩,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后藻肄,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蔑舞,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年嘹屯,在試婚紗的時候發(fā)現(xiàn)自己被綠了攻询。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡州弟,死狀恐怖钧栖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情呆馁,我是刑警寧澤桐经,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站浙滤,受9級特大地震影響阴挣,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜纺腊,卻給世界環(huán)境...
    茶點故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一畔咧、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧揖膜,春花似錦誓沸、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽宿百。三九已至,卻和暖如春洪添,著一層夾襖步出監(jiān)牢的瞬間垦页,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工干奢, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留痊焊,地道東北人。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓忿峻,卻偏偏與公主長得像薄啥,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子逛尚,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,894評論 2 11
  • Camel路由簡介 Camel最重要的特色之一就是路由垄惧,沒有它Camel本質(zhì)上只是一個傳輸連接器庫。本篇文章將從各...
    God_Moon閱讀 2,501評論 0 3
  • JAVA面試題 1黑低、作用域public,private,protected,以及不寫時的區(qū)別答:區(qū)別如下:作用域 ...
    JA尐白閱讀 1,146評論 1 0
  • 原文鏈接:https://docs.spring.io/spring-boot/docs/1.4.x/refere...
    pseudo_niaonao閱讀 4,680評論 0 9
  • 今天天無意中看到了這樣的一個報道:主人公在我的家鄉(xiāng)赘艳,報道是這個樣子的: 兒子癱瘓在床、大伯哥年邁腿傷克握、侄子智障眼盲...
    鹿縝閱讀 181評論 3 3