基礎(chǔ)代碼來自:Spring Boot + Apache Camel SQL component + MySQL - Hello World Example | JavaInUse
在文未有代碼下載鏈接 https://www.javainuse.com/zip/camel/boot-camel-sql.rar
-
準(zhǔn)備工作:
1)修改application.properties文件中Mysql數(shù)據(jù)庫的相關(guān)配置
2)啟動(dòng)主程序纽帖,添加一條記錄 {"empId":"002","empName":"keven"}
3)查一下結(jié)果:(剛才多添加了一條同樣的記錄)
4)再將application.properties中spring.datasource.initialization-mode=always這行注釋掉镰吵,否則每次重啟時(shí)它都會(huì)重建數(shù)據(jù)庫,又要重新添加記錄
從上圖可以看出:本程序提供了兩個(gè)功能,從接收瀏覽器Get/Post兩個(gè)方法(端點(diǎn))凤跑,分別路由到“插入/查詢所有記錄”兩個(gè)路徑豺瘤,執(zhí)行對應(yīng)功能。
以下做一點(diǎn)擴(kuò)展:
- 發(fā)送到本地文件
- 在EmployeeServiceImpl類中添加如下路由:
//write,Mysql--->File
from("direct:write").to("sql:select * from employee").process(new Processor() {
public void process(Exchange xchg) throws Exception {
ArrayList<Map<String, String>> dataList = (ArrayList<Map<String, String>>) xchg.getIn().getBody();
List<Employee> employees = new ArrayList<Employee>();
System.out.println(dataList);
StringBuilder sb=new StringBuilder();
for (Map<String, String> data : dataList) {
sb.append("empId:"+data.get("empId")+",");
sb.append("empName:"+data.get("empName"));
}
xchg.getIn().setBody(sb.toString());
}
}).to("file:data/outbox");
- 到控制類EmployeeController中加一條
//write
@RequestMapping(value = "/write", method = RequestMethod.GET)
public boolean write() {
producerTemplate.requestBody("direct:write", null, List.class);
return true;
}
這樣,當(dāng)頁面中接收到write的請求時(shí)埠啃,程序會(huì)先查找記錄,再把結(jié)果輸出到程序的data/outbox目錄下
3)重啟一下伟恶,訪問http://localhost:8080/write
再到程序目錄下檢查一下
可以看到碴开,已經(jīng)輸出到指定目錄了
- 發(fā)送到kafka隊(duì)列
1)準(zhǔn)備工作
在poem.xml文件中添加kafka依賴
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
<version>2.16.3</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.0</version>
</dependency>
到服務(wù)實(shí)體類EmployeeServiceImpl中添加kafka定義(也可放到屬性文件中去)
String topicName = "topic=camel-topic";
String kafkaServer = "kafka:CDH-04:9092";
String zooKeeperHost = "zookeeperHost=CDH-05&zookeeperPort=2181";
String serializerClass = "serializerClass=kafka.serializer.StringEncoder";
String toKafka = new StringBuilder().append(kafkaServer).append("?").append(topicName).append("&")
.append(zooKeeperHost).append("&").append(serializerClass).toString();
2)修改代碼
添加到kafka的路由
//Kafka,Mysql--->Kafka
from("direct:kafka").to("sql:select * from employee").process(new Processor() {
public void process(Exchange xchg) throws Exception {
ArrayList<Map<String, String>> dataList = (ArrayList<Map<String, String>>) xchg.getIn().getBody();
List<Employee> employees = new ArrayList<Employee>();
System.out.println(dataList);
for (Map<String, String> data : dataList) {
Employee employee = new Employee();
employee.setEmpId(data.get("empId"));
employee.setEmpName(data.get("empName"));
employees.add(employee);
}
xchg.getIn().setBody(employees.toString());
}
}).to(toKafka).process(new Processor() {
public void process(Exchange exchange) throws Exception {
System.out.println("it is :"+toKafka);
}
});
添加觸發(fā)控制(EmployeeController類)
//kafka
@RequestMapping(value = "/kafka", method = RequestMethod.GET)
public boolean kafka() {
producerTemplate.requestBody("direct:kafka", null, List.class);
return true;
}
3)訪問一下 http://localhost:8080/kafka
4)查看一下隊(duì)列
可以看到,已經(jīng)發(fā)送到隊(duì)列了