什么是Camel彻亲?
Camel框架的核心是一個路由引擎蚕断,或者更確切地說是一個路由引擎構(gòu)建器。它允許您定義自己的路由規(guī)則匿垄,決定從哪個源接收消息,并確定如何處理這些消息并將其發(fā)送到其他目標(biāo)归粉。
Camel提供更高層次的抽象椿疗,使您可以使用相同的API與各種系統(tǒng)進行交互,而不管系統(tǒng)使用的協(xié)議或數(shù)據(jù)類型如何糠悼。 Camel中的組件提供了針對不同協(xié)議和數(shù)據(jù)類型的API的特定實現(xiàn)届榄。開箱即用,Camel支持80多種協(xié)議和數(shù)據(jù)類型倔喂。
Getting started
源碼地址:https://github.com/camelinaction/camelinaction.git
下面是一個拷貝文件的例子铝条,將文件從data/inbox拷貝到data/outbox
1 添加maven依賴
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.15.6</version>
</dependency>
</dependencies>
2 代碼
public class FileCopierWithCamel {
public static void main(String args[]) throws Exception {
// create CamelContext
CamelContext context = new DefaultCamelContext();
// add our route to the CamelContext
context.addRoutes(new RouteBuilder() {
public void configure() {
/**
file: 表示使用文件Component
from 表示從哪里獲取數(shù)據(jù),進行消費
to 表示將數(shù)據(jù)生產(chǎn)到哪里
*/
from("file:data/inbox?noop=true").to("file:data/outbox");
}
});
// start the route and let it do its work
context.start();
Thread.sleep(10000);
// stop the CamelContext
context.stop();
}
}
Camel概念
CamelContext
Camel的容器席噩,通過CamelContext可以訪問內(nèi)部服務(wù):Components班缰,Endpoints,Endpoints悼枢,Registry等等
Routes
通過路由可以實現(xiàn):客戶端與服務(wù)端埠忘,生產(chǎn)者與消費者的解耦
比如:從ftp服務(wù)上獲取訂單信息,將其發(fā)送到JMS隊列馒索,可以通過如下路由表示
//from可以理解成消費者:表示從ftp服務(wù)上獲取數(shù)據(jù)進行消費
from("ftp://rider.com/orders?username=rider&password=secret")
//to可以理解成生產(chǎn)者:表示將數(shù)據(jù)發(fā)送給jms
.to("jms:incomingOrders");
endpoint URI
可以簡單理解成消息的地址
- 對于消費者(from方法)來說,表示消息從哪里來
- 對于生產(chǎn)者(to方法)來說莹妒,表示消息到哪里去
如上圖所示
Scheme:指明使用的是FtpComponent
Context path: ftp服務(wù)和端口號,以及文件路徑
Options:一些操作配置绰上,每個組件都不同
Exchange
Message的容器旨怠,其的內(nèi)部屬性,如下圖所示
Message
消息數(shù)據(jù)的基本實體
MEP
Exchange支持多種消息交換模式 (MEPs)渔期,通過其內(nèi)部持有的pattern屬性進行區(qū)分
下面介紹2種常用的交互模式
- InOnly :單向消息模式(也稱為事件消息)运吓,簡言之:不需要等待對方的響應(yīng)
- InOut : 請求響應(yīng)模式,例如:基于http的傳輸疯趟,通常是此模式拘哨,客戶端請求web頁面,等待服務(wù)端的回應(yīng)
InOut模式包含In message 與 Out message信峻,而InOnly模式只包含In message
Exception
如果路由期間發(fā)生錯誤倦青,此屬性將被賦值
Properties
Exchange的消息頭,Camel本身和開發(fā)者可以設(shè)置或讀取屬性值
Endpoints
Endpoints是模擬通道末端的camel抽象盹舞,充當(dāng)一個工廠产镐,用于創(chuàng)建消息的producer和consumer
Component
創(chuàng)建Endpoints的工廠隘庄,一個Component的實現(xiàn),通常有一些傳輸屬性需要設(shè)置癣亚。例如丑掺,JMS-Component要求在其上設(shè)置ConnectionFactory,以便對所有JMS通信使用相同的消息代理
Component述雾,Endpoints和Exchange的關(guān)系如下圖所示:
內(nèi)部組件介紹
Direct Component
基于內(nèi)存的同步消息組件
使用Direct組件街州,生產(chǎn)者直接調(diào)用消費者。因此使用Direct組件的唯一開銷是方法調(diào)用玻孟。
Direct的線程模型
由于生產(chǎn)者直接調(diào)用消費者
因此:調(diào)用者與camel的消費者共用一個線程
SEDA Component
基于內(nèi)存的異步消息組件:生產(chǎn)者和消費者通過BlockingQueue交換消息唆缴,生產(chǎn)者與消費者是不同的線程
如果VM在消息尚未處理時終止,則seda不會實現(xiàn)消息的持久化或恢復(fù)黍翎,因此有丟失消息的風(fēng)險
消費者視角
Consumer thread pool
SedaConsumer內(nèi)部持有一個線程池面徽,默認是1個線程,可以通過concurrentConsumers指定線程數(shù)
代碼如下所示
from("seda:start?concurrentConsumers=2")
.to("log:A")
.to("log:B");
Threads thread pool
Consumer thread pool中的每個線程匣掸,還可以開啟新的線程池趟紊,代碼如下所示
from("seda:start?concurrentConsumers=2")
.to("log:A")
// create a thread pool with a pool size of 5 and a maxi- mum size of 10.
.threads(5, 10)
.to("log:B");
如上圖所示:consumer線程執(zhí)行完"log:A"后,將后續(xù)任務(wù)提交給"Threads thead pool",然后就直接返回了
生產(chǎn)者視角
異步發(fā)送消息
生產(chǎn)者發(fā)完消息旺聚,立刻返回织阳,不需要等待消息消費成功
//InOnly消息模式
producerTemplate.sendBody("seda:start", body);
同步發(fā)送消息
生產(chǎn)者發(fā)完消息,會阻塞砰粹,直到消費成功
//InOut消息模式
producerTemplate.requestBody("seda:start", body);
實現(xiàn)原理:SedaProducer通過CountDownLatch信號量進行等待唧躲,當(dāng)數(shù)據(jù)消費成功后,消費者修改CountDownLatch信號量碱璃,喚醒SedaProducer弄痹,然后消費者才返回。
Camel使用
消息發(fā)送
Camel可以使用ProducerTemplate將消息發(fā)送到endpoint嵌器,或從endpoint請求數(shù)據(jù)
我們可以使用@Produce創(chuàng)建ProducerTemplate肛真,代碼如下
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
public class ProducePojo {
@Produce
private ProducerTemplate template;
public String sayHello(String name) {
//發(fā)消息到一個activemq端點
return template.requestBody("activemq:queue:sayhello",
name, String.class);
} }
為了確保ProducerTemplate可以注入到ProducePojo類,需要將ProducePojo配置到spring上下文
<beans xmlns="http://www.springframework.org/schema/beans" ...>
<bean id="activemq"
class="org.apache.activemq.camel.component
.ActiveMQComponent">
<property name="brokerURL"
value="tcp://localhost:61616"/>
</bean>
<bean id="producer"
class="org.camelcookbook.extend.produce
.ProducePojo"/>
<camelContext xmlns="http://camel.apache.org/schema/spring"/>
</beans>
方法調(diào)用
比如我要調(diào)用MyBean的myMethon,可以通過注解或java DSL
如果參數(shù)是對象類型爽航,camel也會自動轉(zhuǎn)型
以下代碼表示接收到someEndpoint的消息后蚓让,調(diào)用myBean.myMethod方法
//注意:要確保MyBean被camelContext或springContext加載
public class MyBean {
//注解的方式
@Consume(uri="someEndpoint")
public String myMethod(ParamBean message) {
//...
} }
//java DSL
from("someEndpoint")
.bean(MyBean.class, "myMethod");
//通過ProducerTemplate調(diào)用此方法
ParamBean param = genTestParam();
template.requestBody("someEndpoint", param);
這里其實用的的是camel的內(nèi)部組件Bean Component,具體用法可以參考如下官方文檔
Bean Component: http://camel.apache.org/bean.html
關(guān)于參數(shù)的傳遞讥珍,可以參考
Bean Binding: http://camel.apache.org/bean-binding.html
自定義Processor
Processor是camel中的基本功能元素历极,自定義Processor非常易于在路由中編寫和使用
定義一個將訂單數(shù)據(jù)轉(zhuǎn)成csv格式的Processor
public class OrderToCsvProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
String custom = exchange.getIn().getBody(String.class);
String id = custom.substring(0, 9);
String customerId = custom.substring(10, 19);
String date = custom.substring(20, 29);
String items = custom.substring(30);
String[] itemIds = items.split("@");
StringBuilder csv = new StringBuilder();
csv.append(id.trim());
csv.append(",").append(date.trim());
csv.append(",").append(customerId.trim());
for (String item : itemIds) {
csv.append(",").append(item.trim());
}
exchange.getIn().setBody(csv.toString());
}
}
定義路由規(guī)則
from("quartz://report?cron=0+0+6+*+*+?")
.to("http://riders.com/orders/cmd=received&date=yesterday")
.process(new OrderToCsvProcessor())
.to("file://riders/orders?fileName=report-${header.Date}.csv");
異常處理
基本用法
camel支持"異步重試,延遲重試"等多種處理方式
//通用異常處理
errorHandler(defaultErrorHandler()
//異步重試(默認同步)
.asyncDelayedRedelivery()
.maximumRedeliveries(2)
.redeliveryDelay(1000)
.retryAttemptedLogLevel(LoggingLevel.WARN));
//如果是JmsException衷佃,則只需自定義processer
onException(JmsException.class)
.handled(true)
.process(new GenerateFailueResponse());
//如果IOException,則重試3次趟卸,依舊失敗,則執(zhí)行to對應(yīng)的動作
onException(IOException.class).maximumRedeliveries(3)
.handled(true)
.to("ftp://gear@ftp.rider.com?password=secret");
from("file:/rider/files/upload?delay=3600000")
.to("http://rider.com?user=gear&password=secret");
上面的代碼,其異常處理的作用域是整個context
Camel也支持route作用域的異常處理锄列,如下代碼所示
from("direct:step1")
.bean(Step1.class, "success")
//異常處理图云,作用域是當(dāng)前路由
.onCompletion().onFailureOnly()
//如果失敗,則執(zhí)行onFailure方法
.bean(Step1.class, "onFailure")
.end()
.to("direct:step2");
注意onCompletion的方式邻邮,是異步的竣况,如果想同步處理異常可以參考camel的Synchronization使用方式
一個異常處理的例子
場景描述
順序執(zhí)行step1,step2,step3,如果某一步失敗饶囚,回滾之前的每一步
比如step3執(zhí)行失敗帕翻,回滾step2,step1
解決方案
通過 onCompletion().onFailureOnly()方法對每一步設(shè)置失敗回調(diào)函數(shù),
下面的代碼模擬了step3執(zhí)行失敗的場景,從日志可以看出camel按順序執(zhí)行了step2和step1的失敗回調(diào)方法
public class RollbackTest extends CamelTestSupport {
@Override
public void setUp() throws Exception {
deleteDirectory("target/mail/backup");
super.setUp();
}
@Test
public void testRollback() throws Exception {
template.sendBodyAndHeader("direct:step1", "bumper", "to", "FATAL");
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:step1")
.bean(Step1.class, "success")
.onCompletion().onFailureOnly()
//如果失敗萝风,調(diào)用step1的onFailure方法
.bean(Step1.class, "onFailure")
.end()
.to("direct:step2");
from("direct:step2")
.bean(Step2.class, "success")
.onCompletion().onFailureOnly()
.bean(Step2.class, "onFailure")
.end()
.to("direct:step3");
from("direct:step3")
.bean(Step3.class, "fail")
.onCompletion().onFailureOnly()
.bean(Step3.class, "onFailure")
.end()
.log("888:end");
}
};
}
}