基于docker部署的微服務(wù)架構(gòu)(八): 日志數(shù)據(jù)保存到MongoDB數(shù)據(jù)庫

前言

上一篇 基于docker部署的微服務(wù)架構(gòu)(七): 部署ELK日志統(tǒng)計(jì)分析系統(tǒng) 中盹兢,已經(jīng)把日志數(shù)據(jù)輸出到 elasticsearch 并用 kibana 做展現(xiàn)漂佩。
實(shí)際項(xiàng)目中還有把日志數(shù)據(jù)保存到數(shù)據(jù)庫澡罚,做進(jìn)一步分析的需求尝抖。由于數(shù)據(jù)分析的需求有可能變動(dòng)(例如:可能一開始只需要統(tǒng)計(jì)服務(wù)使用總量次伶,后來隨著業(yè)務(wù)的擴(kuò)展婉支,需要按不同地區(qū)或不同服務(wù)進(jìn)行統(tǒng)計(jì)),保存日志的數(shù)據(jù)結(jié)構(gòu)也會(huì)相應(yīng)的變動(dòng)痢毒,如果使用傳統(tǒng)的關(guān)系型數(shù)據(jù)庫送矩,就需要對(duì)表結(jié)構(gòu)進(jìn)行修改,一般日志的數(shù)據(jù)量會(huì)很大哪替,修改表結(jié)構(gòu)很耗時(shí)栋荸,對(duì)開發(fā)也不友好,這時(shí)候應(yīng)該考慮 nosql 數(shù)據(jù)庫凭舶。
nosql 的使用情況來看晌块,MongoDB 是使用最多的,一方面由于良好的性能库快,尤其在收購了 wiredtiger 引擎之后摸袁,提供了文檔級(jí)鎖钥顽,寫入性能大大提高义屏;另一方面就是 MongoDB 比較容易上手,hbase 依賴 hadoop 環(huán)境,部署起來比較麻煩闽铐,MongoDB 就很簡(jiǎn)單了蝶怔。

本文將會(huì)介紹,在 docker 環(huán)境下搭建 MongoDB兄墅,搭建 MongoDB 的web客戶端 mongo-express踢星,MongoDB 簡(jiǎn)單使用,使用 spring kafka 創(chuàng)建 kafka 消費(fèi)者接受消息隙咸,以及使用 spring data 操作 MongoDB沐悦。

MongoDB的優(yōu)缺點(diǎn)

先說優(yōu)點(diǎn),nosql 數(shù)據(jù)庫的優(yōu)點(diǎn)五督, MongoDB 都具備:高擴(kuò)展性藏否、高性能、松散的數(shù)據(jù)結(jié)構(gòu)充包、天然支持分片和集群等等副签。
MongoDB 在此之上還提供了非常豐富的查詢功能,不像 hbase 只能全表或 row key 查詢基矮。MongoDB 還提供了二級(jí)索引淆储,并且還支持 MapReduceMongoDB 還提供了一個(gè)分布式文件系統(tǒng) GridFS家浇,用來存儲(chǔ)超過16MB的數(shù)據(jù)本砰。
缺點(diǎn),不支持事務(wù)蓝谨,這是 nosql 數(shù)據(jù)庫的通病灌具,也是 nosql 的基因所致。nosql 從誕生就 不是為了 處理結(jié)構(gòu)化的強(qiáng)一致性數(shù)據(jù)的譬巫。
像日志數(shù)據(jù)這種咖楣,結(jié)構(gòu)松散、不要求一致性芦昔、數(shù)據(jù)量大的數(shù)據(jù)诱贿,保存到 MongoDB 是個(gè)不錯(cuò)的選擇。

在docker環(huán)境中部署MongoDB

登錄 docker 節(jié)點(diǎn)咕缎,運(yùn)行 docker pull mongo:3.2.11 下載目前最新的 MongoDB 鏡像(建議在拉取鏡像時(shí)使用具體的版本號(hào)珠十,不要用 latest,避免版本兼容的問題凭豪,也更清楚具體用的哪個(gè)版本)焙蹭。
創(chuàng)建數(shù)據(jù)掛載卷,mkdir -p /mongodb/data嫂伞。
啟動(dòng)容器

docker run -d --name mongodb --volume /mongodb/data:/data/db \
--publish 27017:27017 \
mongo:3.2.11

運(yùn)行 docker exec -it mongodb mongo 進(jìn)入 Mongo shell孔厉,運(yùn)行 show dbs 查看當(dāng)前所有的數(shù)據(jù)庫拯钻。

簡(jiǎn)單介紹下 MongoDB 的基礎(chǔ)概念:

  • database 和 mysql 類似,表示數(shù)據(jù)庫
  • collection 相當(dāng)于 mysql 中的表撰豺,用來存放數(shù)據(jù)粪般,不同的是 collection 不需要定義表結(jié)構(gòu)
  • document 相當(dāng)于 mysql 表中保存的一條數(shù)據(jù),BSON格式污桦,BSON類似于JSON

Mongo shell 中運(yùn)行 use add-service-demo-log亩歹,創(chuàng)建一個(gè) add-service-demo-log 數(shù)據(jù)庫,并切換到了這個(gè)數(shù)據(jù)庫凡橱。
這時(shí)候運(yùn)行 show dbs 并沒有顯示 add-service-demo-log 數(shù)據(jù)庫小作,因?yàn)樾陆ǖ倪@個(gè)數(shù)據(jù)庫中沒有 collection,新建一個(gè) collection稼钩,db.createCollection('addLog') 躲惰。
再運(yùn)行 show dbs 就可以看到 add-service-demo-log 數(shù)據(jù)庫了。
運(yùn)行 db变抽,查看當(dāng)前處于哪個(gè)數(shù)據(jù)庫础拨。
運(yùn)行 show collections 查看數(shù)據(jù)庫中所有的 collection

在docker環(huán)境中部署mongo-express

在使用 MongoDB 開發(fā)時(shí)绍载,通常需要一個(gè)客戶端工具幫助我們操作 MongoDB诡宗,有很多優(yōu)秀的客戶端工具可以選擇。這里我們部署一個(gè)web端的客戶端工具 mongo-express击儡,web工具的好處就是只要部署在服務(wù)端塔沃,所有開發(fā)人員都可以使用。
運(yùn)行 docker pull mongo-express:0.32.0阳谍,下載鏡像蛀柴。
啟動(dòng)容器

docker run -d --name mongo-express --link mongodb:mongo \
--publish 8081:8081 \
mongo-express:0.32.0

--link 的時(shí)候給 mongodb 一個(gè)別名:mongo,因?yàn)?mongo-express 默認(rèn)的MongoDB server是 mongo矫夯,這樣的話就不用指定 ME_CONFIG_MONGODB_SERVER 環(huán)境變量了鸽疾。

啟動(dòng)完成之后訪問 http://宿主機(jī)IP:8081,打開 mongo-express 的頁面训貌。

mongo-express
mongo-express

修改add-service-demo把日志發(fā)送到kafka的add-log topic

修改 log4j2.xml 配置文件制肮,新增一個(gè) kafka appender ,日志的輸出格式使用 JsonLayout

    <Kafka name="addLog" topic="add-log">
        <JsonLayout complete="false" compact="true"/>
        <Property name="bootstrap.servers">${kafkaBootstrapServers}</Property>
    </Kafka>

把日志數(shù)據(jù)輸出到 kafkaadd-log topic 下递沪。對(duì)這個(gè) appender 進(jìn)行異步包裝:

    <Async name="addLogAsync">
        <AppenderRef ref="addLog"/>
    </Async>

最后增加一個(gè) Logger豺鼻,使用之前配置的異步 appender

    <Logger name="addLogger" level="info">
        <appender-ref ref="addLogAsync"/>
    </Logger>

修改完之后 log4j2.xml 的文件內(nèi)容:

  <?xml version="1.0" encoding="UTF-8"?>
  <Configuration>
      <Properties>
          <Property name="logFormat">
              %d{yyyy-MM-dd HH:mm:ss.SSS}{GMT+8} [@project.artifactId@] [%thread] %-5level %logger{35} - %msg %n
          </Property>
          <Property name="kafkaBootstrapServers">
              @kafka.bootstrap.servers@
          </Property>
      </Properties>
      <Appenders>
          <Console name="STDOUT" target="SYSTEM_OUT">
              <PatternLayout pattern="${logFormat}"/>
          </Console>

          <RollingFile name="RollingFile" fileName="logs/@project.artifactId@.log"
                       filePattern="logs/@project.artifactId@.%d{yyyy-MM-dd}.log">
              <PatternLayout>
                  <Pattern>
                      ${logFormat}
                  </Pattern>
              </PatternLayout>
              <Policies>
                  <TimeBasedTriggeringPolicy/>
              </Policies>
          </RollingFile>

          <Kafka name="basicLog" topic="basic-log">
              <PatternLayout>
                  <Pattern>
                      ${logFormat}
                  </Pattern>
              </PatternLayout>
              <Property name="bootstrap.servers">${kafkaBootstrapServers}</Property>
          </Kafka>

          <Kafka name="addLog" topic="add-log">
              <JsonLayout complete="false" compact="true"/>
              <Property name="bootstrap.servers">${kafkaBootstrapServers}</Property>
          </Kafka>

          <Async name="FileAsync">
              <AppenderRef ref="RollingFile"/>
          </Async>

          <Async name="basicLogAsync">
              <AppenderRef ref="basicLog"/>
          </Async>

          <Async name="addLogAsync">
              <AppenderRef ref="addLog"/>
          </Async>
      </Appenders>
      <Loggers>
          <Logger name="org.apache.kafka" level="info"/>
          <Logger name="addLogger" level="info">
              <appender-ref ref="addLogAsync"/>
          </Logger>
          <Root level="info">
              <AppenderRef ref="STDOUT"/>
              <AppenderRef ref="FileAsync"/>
              <AppenderRef ref="basicLogAsync"/>
          </Root>
      </Loggers>
  </Configuration>

AddController.java 中使用 addLogger 輸出日志:

  @RestController
  @RefreshScope
  public class AddController {
      private static final Logger addLogger = LoggerFactory.getLogger("addLogger");

      @Value("${my.info.str}")
      private String infoStr;

      @RequestMapping(value = "/add", method = RequestMethod.GET)
      public Map<String, Object> add(Integer a, Integer b) {
          System.out.println("端口為8100的實(shí)例被調(diào)用");
          System.out.println("infoStr : " + infoStr);
          Map<String, Object> returnMap = new HashMap<>();
          returnMap.put("code", 200);
          returnMap.put("msg", "操作成功");
          Integer result = a + b;
          returnMap.put("result", result);

          addLogger.info("a : " + a + ", b : " + b + ", a + b :" + result);
          return returnMap;
      }

  }

這樣在調(diào)用 AddController 中的 add 方法時(shí),會(huì)輸出一條日志到 kafkaadd-log topic 中款慨,只要?jiǎng)?chuàng)建一個(gè)消費(fèi)者訂閱 add-log儒飒,把日志數(shù)據(jù)保存到 MongoDB 即可。

創(chuàng)建日志消費(fèi)者

新建一個(gè) maven 項(xiàng)目檩奠,修改 pom.xml 增加需要的依賴:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.4.2.RELEASE</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-eureka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-config</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-bus-amqp</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>1.1.1.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
    </dependency>

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-mongodb</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Camden.SR2</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<properties>
    <!-- 指定java版本 -->
    <java.version>1.8</java.version>
    <!-- 鏡像前綴桩了,推送鏡像到遠(yuǎn)程庫時(shí)需要届良,這里配置了一個(gè)阿里云的私有庫 -->
    <docker.image.prefix>
        registry.cn-hangzhou.aliyuncs.com/ztecs
    </docker.image.prefix>
    <!-- docker鏡像的tag -->
    <docker.tag>demo</docker.tag>

    <!-- 激活的profile -->
    <activatedProperties></activatedProperties>

    <kafka.bootstrap.servers>10.47.160.238:9092</kafka.bootstrap.servers>
</properties>

<profiles>
    <!-- docker環(huán)境 -->
    <profile>
        <id>docker</id>

        <properties>
            <activatedProperties>docker</activatedProperties>
            <docker.tag>docker-demo-${project.version}</docker.tag>

            <kafka.bootstrap.servers>kafka:9092</kafka.bootstrap.servers>
        </properties>
    </profile>
</profiles>

<build>
    <defaultGoal>install</defaultGoal>
    <finalName>${project.artifactId}</finalName>

    <resources>
        <resource>
            <directory>src/main/resources</directory>
            <filtering>true</filtering>
        </resource>
    </resources>

    <plugins>
        <!-- 配置spring boot maven插件,把項(xiàng)目打包成可運(yùn)行的jar包 -->
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <executable>true</executable>
            </configuration>
        </plugin>

        <!-- 打包時(shí)跳過單元測(cè)試 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <configuration>
                <skipTests>true</skipTests>
            </configuration>
        </plugin>

        <!-- 配置docker maven插件圣猎,綁定install生命周期,在運(yùn)行maven install時(shí)生成docker鏡像 -->
        <plugin>
            <groupId>com.spotify</groupId>
            <artifactId>docker-maven-plugin</artifactId>
            <version>0.4.13</version>
            <executions>
                <execution>
                    <phase>install</phase>
                    <goals>
                        <goal>build</goal>
                        <goal>tag</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <!-- 修改這里的docker節(jié)點(diǎn)ip乞而,需要打開docker節(jié)點(diǎn)的遠(yuǎn)程管理端口2375送悔,
                具體如何配置可以參照之前的docker安裝和配置的文章 -->
                <dockerHost>http://docker節(jié)點(diǎn)ip:2375</dockerHost>
                <imageName>${docker.image.prefix}/${project.build.finalName}</imageName>
                <baseImage>java</baseImage>
                <!-- 這里的entryPoint定義了容器啟動(dòng)時(shí)的運(yùn)行命令,容器啟動(dòng)時(shí)運(yùn)行
                java -jar 包名 , -Djava.security.egd這個(gè)配置解決tomcat8啟動(dòng)時(shí)因?yàn)樾枰占h(huán)境噪聲來生成安全隨機(jī)數(shù)導(dǎo)致啟動(dòng)過慢的問題-->
                <entryPoint>["java", "-Djava.security.egd=file:/dev/./urandom", "-jar", "/${project.build.finalName}.jar"]
                </entryPoint>
                <resources>
                    <resource>
                        <targetPath>/</targetPath>
                        <directory>${project.build.directory}</directory>
                        <include>${project.build.finalName}.jar</include>
                    </resource>
                </resources>
                <image>${docker.image.prefix}/${project.build.finalName}</image>
                <newName>${docker.image.prefix}/${project.build.finalName}:${docker.tag}</newName>
                <forceTags>true</forceTags>
                <pushImage>false</pushImage>
            </configuration>
        </plugin>
    </plugins>
</build>

看過之前文章的朋友應(yīng)該對(duì)這些配置比較熟悉了爪模,和其他項(xiàng)目不同的是欠啤,這里引入了 spring-kafkajackson屋灌、 spring-boot-starter-data-mongodb 洁段。
使用 spring-kafka 創(chuàng)建 kafka 訂閱者,使用 jackson 對(duì)日志數(shù)據(jù)進(jìn)行轉(zhuǎn)換共郭,使用 spring-boot-starter-data-mongodb 完成 MongoDB 的相關(guān)操作祠丝。

resources 目錄下創(chuàng)建 bootstrap.yml,因?yàn)榕渲眯畔⑹菑?config-server 中獲取的除嘹,所以 bootstrap.yml 的內(nèi)容和其他項(xiàng)目一樣:

  spring:
    application:
      name: @project.artifactId@
    profiles:
      active: @activatedProperties@
    cloud:
      config:
        profile: dev
        label: master
        discovery:
          enabled: true
          serviceId: CONFIG-SERVER-DEMO
        failFast: true
        retry:
          initialInterval: 10000
          multiplier: 2
          maxInterval: 60000
          maxAttempts: 10

  eureka:
    client:
      serviceUrl:
        defaultZone: http://localhost:8000/eureka/

在git倉庫中創(chuàng)建配置文件 log-persist-demo-dev.yml 写半,:

  spring:
    rabbitmq:
      host: 10.47.160.238
      port: 5673
      username: guest
      password: guest
    data:
      mongodb:
        uri: mongodb://10.47.160.114:27017/add-service-demo-log

  kafka:
    bootstrapServers: 10.47.160.114:9092
    groupId: mongo
    enableAutoCommit: true
    autoCommitIntervalMs: 100
    sessionTimeOutMs: 15000

創(chuàng)建 log4j2.xml 配置文件,內(nèi)容也和其他項(xiàng)目相同:

  <?xml version="1.0" encoding="UTF-8"?>
  <Configuration>
      <Properties>
          <Property name="logFormat">
              %d{yyyy-MM-dd HH:mm:ss.SSS}{GMT+8} [@project.artifactId@] [%thread] %-5level %logger{35} - %msg %n
          </Property>
          <Property name="kafkaBootstrapServers">
              @kafka.bootstrap.servers@
          </Property>
      </Properties>
      <Appenders>
          <Console name="STDOUT" target="SYSTEM_OUT">
              <PatternLayout pattern="${logFormat}"/>
          </Console>

          <RollingFile name="RollingFile" fileName="logs/@project.artifactId@.log"
                       filePattern="logs/@project.artifactId@.%d{yyyy-MM-dd}.log">
              <PatternLayout>
                  <Pattern>
                      ${logFormat}
                  </Pattern>
              </PatternLayout>
              <Policies>
                  <TimeBasedTriggeringPolicy/>
              </Policies>
          </RollingFile>

          <Kafka name="basicLog" topic="basic-log">
              <PatternLayout>
                  <Pattern>
                      ${logFormat}
                  </Pattern>
              </PatternLayout>
              <Property name="bootstrap.servers">${kafkaBootstrapServers}</Property>
          </Kafka>

          <Async name="FileAsync">
              <AppenderRef ref="RollingFile"/>
          </Async>

          <Async name="basicLogAsync">
              <AppenderRef ref="basicLog"/>
          </Async>

      </Appenders>
      <Loggers>
          <Logger name="org.apache.kafka" level="info"/>
          <Root level="info">
              <AppenderRef ref="STDOUT"/>
              <AppenderRef ref="FileAsync"/>
              <AppenderRef ref="basicLogAsync"/>
          </Root>
      </Loggers>
  </Configuration>

創(chuàng)建一個(gè) demo 包尉咕,在 demo 包下創(chuàng)建啟動(dòng)入口 LogPersistDemoApplication.java

        @SpringBootApplication
        public class LogPersistDemoApplication {

            public static void main(String[] args) {
                SpringApplication.run(LogPersistDemoApplication.class, args);
            }

        }

demo 下創(chuàng)建一個(gè)子包 config叠蝇,用來存放 java config 配置。創(chuàng)建 KafkaConfig.java 配置 kafka

  @Configuration
  @EnableKafka
  public class KafkaConfig {

      @Value("${kafka.bootstrapServers}")
      private String bootstrapServers;

      @Value("${kafka.groupId}")
      private String groupId;

      @Value("${kafka.enableAutoCommit}")
      private Boolean enableAutoCommit;

      @Value("${kafka.autoCommitIntervalMs}")
      private Integer autoCommitIntervalMs;

      @Value("${kafka.sessionTimeOutMs}")
      private Integer sessionTimeOutMs;

      @Bean
      ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
          ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                  new ConcurrentKafkaListenerContainerFactory<>();
          factory.setConsumerFactory(consumerFactory());
          return factory;
      }

      @Bean
      public ConsumerFactory<Integer, String> consumerFactory() {
          return new DefaultKafkaConsumerFactory<>(consumerConfigs());
      }

      @Bean
      public Map<String, Object> consumerConfigs() {
          Map<String, Object> props = new HashMap<>();
          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
          props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
          props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
          props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
          props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeOutMs);
          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
          return props;
      }
  }

簡(jiǎn)單看下 KafkaConfig.java 年缎,在類級(jí)別使用 @EnableKafkaspring boot 自動(dòng)幫我們初始化 kafka 相關(guān)的bean悔捶。配置類中用到的配置信息使用 @Valuelog-persist-demo-dev.yml 讀取數(shù)據(jù),并配置了 kafkaListenerContainerFactory单芜,用來創(chuàng)建 kafka 消費(fèi)者蜕该。

config 包下創(chuàng)建 MongoConfig.java 配置 MongoDB

  @Configuration
  public class MongoConfig {

      @Value("${spring.data.mongodb.uri}")
      private String mongoClientUri;

      @Bean
      public MongoDbFactory mongoDbFactory() {
          SimpleMongoDbFactory mongoDbFactory = null;
          try {
              mongoDbFactory = new SimpleMongoDbFactory(new MongoClientURI(mongoClientUri));
              mongoDbFactory.setWriteConcern(WriteConcern.UNACKNOWLEDGED);
          } catch (UnknownHostException e) {
              e.printStackTrace();
          }
          return mongoDbFactory;
      }
  }

這里配置了 MongoClientURI,并且設(shè)置了寫安全級(jí)別(WriteConcern)為 UNACKNOWLEDGED洲鸠,關(guān)于 MongoDB 寫安全級(jí)別蛇损,簡(jiǎn)單介紹:

  • Errors Ignored(-1) 忽略所有異常,包括網(wǎng)絡(luò)異常坛怪。
  • Unacknowledged(0) 忽略寫入異常淤齐,但是會(huì)檢測(cè)網(wǎng)絡(luò)異常。
  • Acknowledged(1) 默認(rèn)級(jí)別袜匿,可以捕獲到寫入異常更啄。 MongoDB 保存數(shù)據(jù)時(shí),先把數(shù)據(jù)寫入內(nèi)存居灯,定期 fsync 保存到硬盤祭务,如果數(shù)據(jù)寫入內(nèi)存之后沒來得及寫入硬盤内狗,服務(wù)掛了,數(shù)據(jù)就丟了义锥。
  • Journaled(1, journal=true) 增加 journal 日志柳沙,數(shù)據(jù)寫入內(nèi)存的同時(shí)記錄日志,服務(wù)down了可以通過 journal 日志還原操作
  • majority(>1) 在副本集模式下拌倍,保證多數(shù)節(jié)點(diǎn)(超過半數(shù))數(shù)據(jù)寫入赂鲤。

從上到下,安全級(jí)別由低到高柱恤,寫入效率由高到低数初。由于記錄的是日志數(shù)據(jù),數(shù)據(jù)量大梗顺,對(duì)寫入效率要求較高泡孩,并且允許部分?jǐn)?shù)據(jù)丟失,所以配置了 Unacknowledged 級(jí)別寺谤,最低級(jí)別會(huì)忽略網(wǎng)絡(luò)異常仑鸥,一般不建議使用。

demo 包下創(chuàng)建一個(gè) model 子包变屁,用來存放數(shù)據(jù)模型锈候。創(chuàng)建 AddLog.java 數(shù)據(jù)模型:

  public class AddLog {
      @Id
      private String id;

      private Long timeMillis;
      private String thread;
      private String level;
      private String loggerName;
      private String message;
      private Boolean endOfBatch;
      private String loggerFqcn;
      private Integer threadId;
      private Integer threadPriority;

      public String getId() {
          return id;
      }

      public void setId(String id) {
          this.id = id;
      }

      public Long getTimeMillis() {
          return timeMillis;
      }

      public void setTimeMillis(Long timeMillis) {
          this.timeMillis = timeMillis;
      }

      public String getThread() {
          return thread;
      }

      public void setThread(String thread) {
          this.thread = thread;
      }

      public String getLevel() {
          return level;
      }

      public void setLevel(String level) {
          this.level = level;
      }

      public String getLoggerName() {
          return loggerName;
      }

      public void setLoggerName(String loggerName) {
          this.loggerName = loggerName;
      }

      public String getMessage() {
          return message;
      }

      public void setMessage(String message) {
          this.message = message;
      }

      public Boolean getEndOfBatch() {
          return endOfBatch;
      }

      public void setEndOfBatch(Boolean endOfBatch) {
          this.endOfBatch = endOfBatch;
      }

      public String getLoggerFqcn() {
          return loggerFqcn;
      }

      public void setLoggerFqcn(String loggerFqcn) {
          this.loggerFqcn = loggerFqcn;
      }

      public Integer getThreadId() {
          return threadId;
      }

      public void setThreadId(Integer threadId) {
          this.threadId = threadId;
      }

      public Integer getThreadPriority() {
          return threadPriority;
      }

      public void setThreadPriority(Integer threadPriority) {
          this.threadPriority = threadPriority;
      }

  }

創(chuàng)建數(shù)據(jù)訪問層 AddLogDao.dao,使用 spring data 對(duì) MongoDB 的封裝 :

  @Repository
  public interface AddLogDao extends MongoRepository<AddLog, String> {
  }

創(chuàng)建 KafkaConsumer.java 接受 kafka 消息:

@Component
public class KafkaConsumer {

    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);


    @Autowired
    private AddLogDao addLogDao;

    @KafkaListener(topics = {"add-log"})
    public void receivePersistLog(String data) {
        logger.info("接收到需要保存到MongoDB的日志數(shù)據(jù), data : " + data);
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            AddLog addLog = objectMapper.readValue(data, AddLog.class);
            addLogDao.save(addLog);
            logger.info("成功保存日志數(shù)據(jù), data : " + data);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

這里使用 @KafkaListener(topics = {"add-log"}) 接受 add-log topic 的消息敞贡,把收到的消息保存到 MongoDB泵琳。

運(yùn)行 LogPersistDemoApplication.javamain 方法啟動(dòng),訪問 add-service-demo 提供的 add 接口誊役,會(huì)在 MongoDB 中插入一條日志記錄获列。

使用docker-maven-plugin打包并生成docker鏡像

這部分內(nèi)容和前面幾篇文章基本相同,都是把容器間的訪問地址和 --link 參數(shù)對(duì)應(yīng)蛔垢,不再贅述击孩。

demo源碼 spring-cloud-3.0目錄

最后

本文簡(jiǎn)單介紹了 MongoDB 的相關(guān)內(nèi)容, 以及使用 spring kafka 接受kafka消息鹏漆,并把數(shù)據(jù)插入 MongoDB巩梢。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市艺玲,隨后出現(xiàn)的幾起案子括蝠,更是在濱河造成了極大的恐慌,老刑警劉巖饭聚,帶你破解...
    沈念sama閱讀 221,820評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件忌警,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡秒梳,警方通過查閱死者的電腦和手機(jī)法绵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門箕速,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人朋譬,你說我怎么就攤上這事盐茎。” “怎么了徙赢?”我有些...
    開封第一講書人閱讀 168,324評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵字柠,是天一觀的道長。 經(jīng)常有香客問我犀忱,道長,這世上最難降的妖魔是什么扶关? 我笑而不...
    開封第一講書人閱讀 59,714評(píng)論 1 297
  • 正文 為了忘掉前任阴汇,我火速辦了婚禮,結(jié)果婚禮上节槐,老公的妹妹穿的比我還像新娘搀庶。我一直安慰自己,他們只是感情好铜异,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,724評(píng)論 6 397
  • 文/花漫 我一把揭開白布哥倔。 她就那樣靜靜地躺著,像睡著了一般揍庄。 火紅的嫁衣襯著肌膚如雪咆蒿。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,328評(píng)論 1 310
  • 那天蚂子,我揣著相機(jī)與錄音沃测,去河邊找鬼。 笑死食茎,一個(gè)胖子當(dāng)著我的面吹牛蒂破,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播别渔,決...
    沈念sama閱讀 40,897評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼附迷,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了哎媚?” 一聲冷哼從身側(cè)響起喇伯,我...
    開封第一講書人閱讀 39,804評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎拨与,沒想到半個(gè)月后艘刚,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,345評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡截珍,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,431評(píng)論 3 340
  • 正文 我和宋清朗相戀三年攀甚,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了箩朴。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,561評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡秋度,死狀恐怖炸庞,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情荚斯,我是刑警寧澤埠居,帶...
    沈念sama閱讀 36,238評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站事期,受9級(jí)特大地震影響滥壕,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜兽泣,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,928評(píng)論 3 334
  • 文/蒙蒙 一绎橘、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧唠倦,春花似錦称鳞、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至候齿,卻和暖如春熙暴,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背慌盯。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評(píng)論 1 272
  • 我被黑心中介騙來泰國打工怨咪, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人润匙。 一個(gè)月前我還...
    沈念sama閱讀 48,983評(píng)論 3 376
  • 正文 我出身青樓诗眨,卻偏偏與公主長得像,于是被迫代替她去往敵國和親孕讳。 傳聞我的和親對(duì)象是個(gè)殘疾皇子匠楚,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,573評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容