前言
上一篇 基于docker部署的微服務(wù)架構(gòu)(七): 部署ELK日志統(tǒng)計(jì)分析系統(tǒng) 中盹兢,已經(jīng)把日志數(shù)據(jù)輸出到 elasticsearch 并用 kibana 做展現(xiàn)漂佩。
實(shí)際項(xiàng)目中還有把日志數(shù)據(jù)保存到數(shù)據(jù)庫澡罚,做進(jìn)一步分析的需求尝抖。由于數(shù)據(jù)分析的需求有可能變動(dòng)(例如:可能一開始只需要統(tǒng)計(jì)服務(wù)使用總量次伶,后來隨著業(yè)務(wù)的擴(kuò)展婉支,需要按不同地區(qū)或不同服務(wù)進(jìn)行統(tǒng)計(jì)),保存日志的數(shù)據(jù)結(jié)構(gòu)也會(huì)相應(yīng)的變動(dòng)痢毒,如果使用傳統(tǒng)的關(guān)系型數(shù)據(jù)庫送矩,就需要對(duì)表結(jié)構(gòu)進(jìn)行修改,一般日志的數(shù)據(jù)量會(huì)很大哪替,修改表結(jié)構(gòu)很耗時(shí)栋荸,對(duì)開發(fā)也不友好,這時(shí)候應(yīng)該考慮 nosql 數(shù)據(jù)庫凭舶。
從 nosql 的使用情況來看晌块,MongoDB 是使用最多的,一方面由于良好的性能库快,尤其在收購了 wiredtiger 引擎之后摸袁,提供了文檔級(jí)鎖钥顽,寫入性能大大提高义屏;另一方面就是 MongoDB 比較容易上手,hbase 依賴 hadoop 環(huán)境,部署起來比較麻煩闽铐,MongoDB 就很簡(jiǎn)單了蝶怔。
本文將會(huì)介紹,在 docker 環(huán)境下搭建 MongoDB兄墅,搭建 MongoDB 的web客戶端 mongo-express踢星,MongoDB 簡(jiǎn)單使用,使用 spring kafka 創(chuàng)建 kafka 消費(fèi)者接受消息隙咸,以及使用 spring data 操作 MongoDB沐悦。
MongoDB的優(yōu)缺點(diǎn)
先說優(yōu)點(diǎn),nosql 數(shù)據(jù)庫的優(yōu)點(diǎn)五督, MongoDB 都具備:高擴(kuò)展性藏否、高性能、松散的數(shù)據(jù)結(jié)構(gòu)充包、天然支持分片和集群等等副签。
MongoDB 在此之上還提供了非常豐富的查詢功能,不像 hbase 只能全表或 row key 查詢基矮。MongoDB 還提供了二級(jí)索引淆储,并且還支持 MapReduce。MongoDB 還提供了一個(gè)分布式文件系統(tǒng) GridFS家浇,用來存儲(chǔ)超過16MB的數(shù)據(jù)本砰。
缺點(diǎn),不支持事務(wù)蓝谨,這是 nosql 數(shù)據(jù)庫的通病灌具,也是 nosql 的基因所致。nosql 從誕生就 不是為了 處理結(jié)構(gòu)化的強(qiáng)一致性數(shù)據(jù)的譬巫。
像日志數(shù)據(jù)這種咖楣,結(jié)構(gòu)松散、不要求一致性芦昔、數(shù)據(jù)量大的數(shù)據(jù)诱贿,保存到 MongoDB 是個(gè)不錯(cuò)的選擇。
在docker環(huán)境中部署MongoDB
登錄 docker 節(jié)點(diǎn)咕缎,運(yùn)行 docker pull mongo:3.2.11
下載目前最新的 MongoDB 鏡像(建議在拉取鏡像時(shí)使用具體的版本號(hào)珠十,不要用 latest,避免版本兼容的問題凭豪,也更清楚具體用的哪個(gè)版本)焙蹭。
創(chuàng)建數(shù)據(jù)掛載卷,mkdir -p /mongodb/data
嫂伞。
啟動(dòng)容器
docker run -d --name mongodb --volume /mongodb/data:/data/db \
--publish 27017:27017 \
mongo:3.2.11
運(yùn)行 docker exec -it mongodb mongo
進(jìn)入 Mongo shell孔厉,運(yùn)行 show dbs
查看當(dāng)前所有的數(shù)據(jù)庫拯钻。
簡(jiǎn)單介紹下 MongoDB 的基礎(chǔ)概念:
- database 和 mysql 類似,表示數(shù)據(jù)庫
- collection 相當(dāng)于 mysql 中的表撰豺,用來存放數(shù)據(jù)粪般,不同的是 collection 不需要定義表結(jié)構(gòu)
- document 相當(dāng)于 mysql 表中保存的一條數(shù)據(jù),BSON格式污桦,BSON類似于JSON
在 Mongo shell 中運(yùn)行 use add-service-demo-log
亩歹,創(chuàng)建一個(gè) add-service-demo-log 數(shù)據(jù)庫,并切換到了這個(gè)數(shù)據(jù)庫凡橱。
這時(shí)候運(yùn)行 show dbs
并沒有顯示 add-service-demo-log 數(shù)據(jù)庫小作,因?yàn)樾陆ǖ倪@個(gè)數(shù)據(jù)庫中沒有 collection,新建一個(gè) collection稼钩,db.createCollection('addLog')
躲惰。
再運(yùn)行 show dbs
就可以看到 add-service-demo-log 數(shù)據(jù)庫了。
運(yùn)行 db
变抽,查看當(dāng)前處于哪個(gè)數(shù)據(jù)庫础拨。
運(yùn)行 show collections
查看數(shù)據(jù)庫中所有的 collection。
在docker環(huán)境中部署mongo-express
在使用 MongoDB 開發(fā)時(shí)绍载,通常需要一個(gè)客戶端工具幫助我們操作 MongoDB诡宗,有很多優(yōu)秀的客戶端工具可以選擇。這里我們部署一個(gè)web端的客戶端工具 mongo-express击儡,web工具的好處就是只要部署在服務(wù)端塔沃,所有開發(fā)人員都可以使用。
運(yùn)行 docker pull mongo-express:0.32.0
阳谍,下載鏡像蛀柴。
啟動(dòng)容器
docker run -d --name mongo-express --link mongodb:mongo \
--publish 8081:8081 \
mongo-express:0.32.0
在 --link
的時(shí)候給 mongodb 一個(gè)別名:mongo,因?yàn)?mongo-express 默認(rèn)的MongoDB server是 mongo矫夯,這樣的話就不用指定 ME_CONFIG_MONGODB_SERVER 環(huán)境變量了鸽疾。
啟動(dòng)完成之后訪問 http://宿主機(jī)IP:8081,打開 mongo-express 的頁面训貌。

修改add-service-demo把日志發(fā)送到kafka的add-log topic
修改 log4j2.xml 配置文件制肮,新增一個(gè) kafka appender ,日志的輸出格式使用 JsonLayout
:
<Kafka name="addLog" topic="add-log">
<JsonLayout complete="false" compact="true"/>
<Property name="bootstrap.servers">${kafkaBootstrapServers}</Property>
</Kafka>
把日志數(shù)據(jù)輸出到 kafka 的 add-log topic 下递沪。對(duì)這個(gè) appender 進(jìn)行異步包裝:
<Async name="addLogAsync">
<AppenderRef ref="addLog"/>
</Async>
最后增加一個(gè) Logger豺鼻,使用之前配置的異步 appender:
<Logger name="addLogger" level="info">
<appender-ref ref="addLogAsync"/>
</Logger>
修改完之后 log4j2.xml 的文件內(nèi)容:
<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Properties>
<Property name="logFormat">
%d{yyyy-MM-dd HH:mm:ss.SSS}{GMT+8} [@project.artifactId@] [%thread] %-5level %logger{35} - %msg %n
</Property>
<Property name="kafkaBootstrapServers">
@kafka.bootstrap.servers@
</Property>
</Properties>
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="${logFormat}"/>
</Console>
<RollingFile name="RollingFile" fileName="logs/@project.artifactId@.log"
filePattern="logs/@project.artifactId@.%d{yyyy-MM-dd}.log">
<PatternLayout>
<Pattern>
${logFormat}
</Pattern>
</PatternLayout>
<Policies>
<TimeBasedTriggeringPolicy/>
</Policies>
</RollingFile>
<Kafka name="basicLog" topic="basic-log">
<PatternLayout>
<Pattern>
${logFormat}
</Pattern>
</PatternLayout>
<Property name="bootstrap.servers">${kafkaBootstrapServers}</Property>
</Kafka>
<Kafka name="addLog" topic="add-log">
<JsonLayout complete="false" compact="true"/>
<Property name="bootstrap.servers">${kafkaBootstrapServers}</Property>
</Kafka>
<Async name="FileAsync">
<AppenderRef ref="RollingFile"/>
</Async>
<Async name="basicLogAsync">
<AppenderRef ref="basicLog"/>
</Async>
<Async name="addLogAsync">
<AppenderRef ref="addLog"/>
</Async>
</Appenders>
<Loggers>
<Logger name="org.apache.kafka" level="info"/>
<Logger name="addLogger" level="info">
<appender-ref ref="addLogAsync"/>
</Logger>
<Root level="info">
<AppenderRef ref="STDOUT"/>
<AppenderRef ref="FileAsync"/>
<AppenderRef ref="basicLogAsync"/>
</Root>
</Loggers>
</Configuration>
在 AddController.java 中使用 addLogger 輸出日志:
@RestController
@RefreshScope
public class AddController {
private static final Logger addLogger = LoggerFactory.getLogger("addLogger");
@Value("${my.info.str}")
private String infoStr;
@RequestMapping(value = "/add", method = RequestMethod.GET)
public Map<String, Object> add(Integer a, Integer b) {
System.out.println("端口為8100的實(shí)例被調(diào)用");
System.out.println("infoStr : " + infoStr);
Map<String, Object> returnMap = new HashMap<>();
returnMap.put("code", 200);
returnMap.put("msg", "操作成功");
Integer result = a + b;
returnMap.put("result", result);
addLogger.info("a : " + a + ", b : " + b + ", a + b :" + result);
return returnMap;
}
}
這樣在調(diào)用 AddController 中的 add 方法時(shí),會(huì)輸出一條日志到 kafka 的 add-log topic 中款慨,只要?jiǎng)?chuàng)建一個(gè)消費(fèi)者訂閱 add-log儒飒,把日志數(shù)據(jù)保存到 MongoDB 即可。
創(chuàng)建日志消費(fèi)者
新建一個(gè) maven 項(xiàng)目檩奠,修改 pom.xml 增加需要的依賴:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.2.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.1.RELEASE</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<properties>
<!-- 指定java版本 -->
<java.version>1.8</java.version>
<!-- 鏡像前綴桩了,推送鏡像到遠(yuǎn)程庫時(shí)需要届良,這里配置了一個(gè)阿里云的私有庫 -->
<docker.image.prefix>
registry.cn-hangzhou.aliyuncs.com/ztecs
</docker.image.prefix>
<!-- docker鏡像的tag -->
<docker.tag>demo</docker.tag>
<!-- 激活的profile -->
<activatedProperties></activatedProperties>
<kafka.bootstrap.servers>10.47.160.238:9092</kafka.bootstrap.servers>
</properties>
<profiles>
<!-- docker環(huán)境 -->
<profile>
<id>docker</id>
<properties>
<activatedProperties>docker</activatedProperties>
<docker.tag>docker-demo-${project.version}</docker.tag>
<kafka.bootstrap.servers>kafka:9092</kafka.bootstrap.servers>
</properties>
</profile>
</profiles>
<build>
<defaultGoal>install</defaultGoal>
<finalName>${project.artifactId}</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<!-- 配置spring boot maven插件,把項(xiàng)目打包成可運(yùn)行的jar包 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<executable>true</executable>
</configuration>
</plugin>
<!-- 打包時(shí)跳過單元測(cè)試 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<!-- 配置docker maven插件圣猎,綁定install生命周期,在運(yùn)行maven install時(shí)生成docker鏡像 -->
<plugin>
<groupId>com.spotify</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>0.4.13</version>
<executions>
<execution>
<phase>install</phase>
<goals>
<goal>build</goal>
<goal>tag</goal>
</goals>
</execution>
</executions>
<configuration>
<!-- 修改這里的docker節(jié)點(diǎn)ip乞而,需要打開docker節(jié)點(diǎn)的遠(yuǎn)程管理端口2375送悔,
具體如何配置可以參照之前的docker安裝和配置的文章 -->
<dockerHost>http://docker節(jié)點(diǎn)ip:2375</dockerHost>
<imageName>${docker.image.prefix}/${project.build.finalName}</imageName>
<baseImage>java</baseImage>
<!-- 這里的entryPoint定義了容器啟動(dòng)時(shí)的運(yùn)行命令,容器啟動(dòng)時(shí)運(yùn)行
java -jar 包名 , -Djava.security.egd這個(gè)配置解決tomcat8啟動(dòng)時(shí)因?yàn)樾枰占h(huán)境噪聲來生成安全隨機(jī)數(shù)導(dǎo)致啟動(dòng)過慢的問題-->
<entryPoint>["java", "-Djava.security.egd=file:/dev/./urandom", "-jar", "/${project.build.finalName}.jar"]
</entryPoint>
<resources>
<resource>
<targetPath>/</targetPath>
<directory>${project.build.directory}</directory>
<include>${project.build.finalName}.jar</include>
</resource>
</resources>
<image>${docker.image.prefix}/${project.build.finalName}</image>
<newName>${docker.image.prefix}/${project.build.finalName}:${docker.tag}</newName>
<forceTags>true</forceTags>
<pushImage>false</pushImage>
</configuration>
</plugin>
</plugins>
</build>
看過之前文章的朋友應(yīng)該對(duì)這些配置比較熟悉了爪模,和其他項(xiàng)目不同的是欠啤,這里引入了 spring-kafka 、jackson屋灌、 spring-boot-starter-data-mongodb 洁段。
使用 spring-kafka 創(chuàng)建 kafka 訂閱者,使用 jackson 對(duì)日志數(shù)據(jù)進(jìn)行轉(zhuǎn)換共郭,使用 spring-boot-starter-data-mongodb 完成 MongoDB 的相關(guān)操作祠丝。
在 resources 目錄下創(chuàng)建 bootstrap.yml
,因?yàn)榕渲眯畔⑹菑?config-server 中獲取的除嘹,所以 bootstrap.yml
的內(nèi)容和其他項(xiàng)目一樣:
spring:
application:
name: @project.artifactId@
profiles:
active: @activatedProperties@
cloud:
config:
profile: dev
label: master
discovery:
enabled: true
serviceId: CONFIG-SERVER-DEMO
failFast: true
retry:
initialInterval: 10000
multiplier: 2
maxInterval: 60000
maxAttempts: 10
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8000/eureka/
在git倉庫中創(chuàng)建配置文件 log-persist-demo-dev.yml 写半,:
spring:
rabbitmq:
host: 10.47.160.238
port: 5673
username: guest
password: guest
data:
mongodb:
uri: mongodb://10.47.160.114:27017/add-service-demo-log
kafka:
bootstrapServers: 10.47.160.114:9092
groupId: mongo
enableAutoCommit: true
autoCommitIntervalMs: 100
sessionTimeOutMs: 15000
創(chuàng)建 log4j2.xml 配置文件,內(nèi)容也和其他項(xiàng)目相同:
<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Properties>
<Property name="logFormat">
%d{yyyy-MM-dd HH:mm:ss.SSS}{GMT+8} [@project.artifactId@] [%thread] %-5level %logger{35} - %msg %n
</Property>
<Property name="kafkaBootstrapServers">
@kafka.bootstrap.servers@
</Property>
</Properties>
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="${logFormat}"/>
</Console>
<RollingFile name="RollingFile" fileName="logs/@project.artifactId@.log"
filePattern="logs/@project.artifactId@.%d{yyyy-MM-dd}.log">
<PatternLayout>
<Pattern>
${logFormat}
</Pattern>
</PatternLayout>
<Policies>
<TimeBasedTriggeringPolicy/>
</Policies>
</RollingFile>
<Kafka name="basicLog" topic="basic-log">
<PatternLayout>
<Pattern>
${logFormat}
</Pattern>
</PatternLayout>
<Property name="bootstrap.servers">${kafkaBootstrapServers}</Property>
</Kafka>
<Async name="FileAsync">
<AppenderRef ref="RollingFile"/>
</Async>
<Async name="basicLogAsync">
<AppenderRef ref="basicLog"/>
</Async>
</Appenders>
<Loggers>
<Logger name="org.apache.kafka" level="info"/>
<Root level="info">
<AppenderRef ref="STDOUT"/>
<AppenderRef ref="FileAsync"/>
<AppenderRef ref="basicLogAsync"/>
</Root>
</Loggers>
</Configuration>
創(chuàng)建一個(gè) demo
包尉咕,在 demo
包下創(chuàng)建啟動(dòng)入口 LogPersistDemoApplication.java
:
@SpringBootApplication
public class LogPersistDemoApplication {
public static void main(String[] args) {
SpringApplication.run(LogPersistDemoApplication.class, args);
}
}
在 demo
下創(chuàng)建一個(gè)子包 config
叠蝇,用來存放 java config 配置。創(chuàng)建 KafkaConfig.java 配置 kafka :
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrapServers}")
private String bootstrapServers;
@Value("${kafka.groupId}")
private String groupId;
@Value("${kafka.enableAutoCommit}")
private Boolean enableAutoCommit;
@Value("${kafka.autoCommitIntervalMs}")
private Integer autoCommitIntervalMs;
@Value("${kafka.sessionTimeOutMs}")
private Integer sessionTimeOutMs;
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeOutMs);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
簡(jiǎn)單看下 KafkaConfig.java 年缎,在類級(jí)別使用 @EnableKafka
讓 spring boot 自動(dòng)幫我們初始化 kafka 相關(guān)的bean悔捶。配置類中用到的配置信息使用 @Value
從 log-persist-demo-dev.yml 讀取數(shù)據(jù),并配置了 kafkaListenerContainerFactory
单芜,用來創(chuàng)建 kafka 消費(fèi)者蜕该。
在 config
包下創(chuàng)建 MongoConfig.java
配置 MongoDB :
@Configuration
public class MongoConfig {
@Value("${spring.data.mongodb.uri}")
private String mongoClientUri;
@Bean
public MongoDbFactory mongoDbFactory() {
SimpleMongoDbFactory mongoDbFactory = null;
try {
mongoDbFactory = new SimpleMongoDbFactory(new MongoClientURI(mongoClientUri));
mongoDbFactory.setWriteConcern(WriteConcern.UNACKNOWLEDGED);
} catch (UnknownHostException e) {
e.printStackTrace();
}
return mongoDbFactory;
}
}
這里配置了 MongoClientURI
,并且設(shè)置了寫安全級(jí)別(WriteConcern)為 UNACKNOWLEDGED洲鸠,關(guān)于 MongoDB 寫安全級(jí)別蛇损,簡(jiǎn)單介紹:
- Errors Ignored(-1) 忽略所有異常,包括網(wǎng)絡(luò)異常坛怪。
- Unacknowledged(0) 忽略寫入異常淤齐,但是會(huì)檢測(cè)網(wǎng)絡(luò)異常。
- Acknowledged(1) 默認(rèn)級(jí)別袜匿,可以捕獲到寫入異常更啄。 MongoDB 保存數(shù)據(jù)時(shí),先把數(shù)據(jù)寫入內(nèi)存居灯,定期 fsync 保存到硬盤祭务,如果數(shù)據(jù)寫入內(nèi)存之后沒來得及寫入硬盤内狗,服務(wù)掛了,數(shù)據(jù)就丟了义锥。
- Journaled(1, journal=true) 增加 journal 日志柳沙,數(shù)據(jù)寫入內(nèi)存的同時(shí)記錄日志,服務(wù)down了可以通過 journal 日志還原操作
- majority(>1) 在副本集模式下拌倍,保證多數(shù)節(jié)點(diǎn)(超過半數(shù))數(shù)據(jù)寫入赂鲤。
從上到下,安全級(jí)別由低到高柱恤,寫入效率由高到低数初。由于記錄的是日志數(shù)據(jù),數(shù)據(jù)量大梗顺,對(duì)寫入效率要求較高泡孩,并且允許部分?jǐn)?shù)據(jù)丟失,所以配置了 Unacknowledged 級(jí)別寺谤,最低級(jí)別會(huì)忽略網(wǎng)絡(luò)異常仑鸥,一般不建議使用。
在 demo
包下創(chuàng)建一個(gè) model
子包变屁,用來存放數(shù)據(jù)模型锈候。創(chuàng)建 AddLog.java
數(shù)據(jù)模型:
public class AddLog {
@Id
private String id;
private Long timeMillis;
private String thread;
private String level;
private String loggerName;
private String message;
private Boolean endOfBatch;
private String loggerFqcn;
private Integer threadId;
private Integer threadPriority;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimeMillis() {
return timeMillis;
}
public void setTimeMillis(Long timeMillis) {
this.timeMillis = timeMillis;
}
public String getThread() {
return thread;
}
public void setThread(String thread) {
this.thread = thread;
}
public String getLevel() {
return level;
}
public void setLevel(String level) {
this.level = level;
}
public String getLoggerName() {
return loggerName;
}
public void setLoggerName(String loggerName) {
this.loggerName = loggerName;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Boolean getEndOfBatch() {
return endOfBatch;
}
public void setEndOfBatch(Boolean endOfBatch) {
this.endOfBatch = endOfBatch;
}
public String getLoggerFqcn() {
return loggerFqcn;
}
public void setLoggerFqcn(String loggerFqcn) {
this.loggerFqcn = loggerFqcn;
}
public Integer getThreadId() {
return threadId;
}
public void setThreadId(Integer threadId) {
this.threadId = threadId;
}
public Integer getThreadPriority() {
return threadPriority;
}
public void setThreadPriority(Integer threadPriority) {
this.threadPriority = threadPriority;
}
}
創(chuàng)建數(shù)據(jù)訪問層 AddLogDao.dao
,使用 spring data 對(duì) MongoDB 的封裝 :
@Repository
public interface AddLogDao extends MongoRepository<AddLog, String> {
}
創(chuàng)建 KafkaConsumer.java
接受 kafka 消息:
@Component
public class KafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@Autowired
private AddLogDao addLogDao;
@KafkaListener(topics = {"add-log"})
public void receivePersistLog(String data) {
logger.info("接收到需要保存到MongoDB的日志數(shù)據(jù), data : " + data);
ObjectMapper objectMapper = new ObjectMapper();
try {
AddLog addLog = objectMapper.readValue(data, AddLog.class);
addLogDao.save(addLog);
logger.info("成功保存日志數(shù)據(jù), data : " + data);
} catch (IOException e) {
e.printStackTrace();
}
}
}
這里使用 @KafkaListener(topics = {"add-log"})
接受 add-log topic 的消息敞贡,把收到的消息保存到 MongoDB泵琳。
運(yùn)行 LogPersistDemoApplication.java
的 main
方法啟動(dòng),訪問 add-service-demo 提供的 add
接口誊役,會(huì)在 MongoDB 中插入一條日志記錄获列。
使用docker-maven-plugin打包并生成docker鏡像
這部分內(nèi)容和前面幾篇文章基本相同,都是把容器間的訪問地址和 --link
參數(shù)對(duì)應(yīng)蛔垢,不再贅述击孩。
最后
本文簡(jiǎn)單介紹了 MongoDB 的相關(guān)內(nèi)容, 以及使用 spring kafka 接受kafka消息鹏漆,并把數(shù)據(jù)插入 MongoDB巩梢。