filebeat+kafka+Flink+ElasticSearch+springboot+高德地圖熱力圖項(xiàng)目

由于近期在研究ELK和最新的實(shí)時(shí)計(jì)算框架Flink杜顺,所以把以前熱力圖項(xiàng)目flume+kafka+SparkStreaming+mysql+ssm+高德地圖熱力圖項(xiàng)目換組件重構(gòu)一下新蟆。效果不會(huì)變好饰及,可能更麻煩,性能更低砰奕,純屬應(yīng)用一下最近研究的新組件和新計(jì)算框架蛛芥。

項(xiàng)目環(huán)境:

filebeat 6.2.0
kafka 0.8.2
Flink 1.6.1
ElasticSearch 6.4.0
springboot 2.1.5
scala 2.11

項(xiàng)目順序:

1.python寫個(gè)腳本模擬下數(shù)據(jù)。正常情況的真實(shí)數(shù)據(jù)是军援,我們每個(gè)人的手機(jī)會(huì)不停發(fā)送你的經(jīng)緯度仅淑,比如你坐火車到別的省份會(huì)收到一條短信,例如:山東省濟(jì)南市歡迎您胸哥。就是這個(gè)道理涯竟。

import random
import time

phone = [
    "13869555210",
    "18542360152",
    "15422556663",
    "18852487210",
    "13993584664",
    "18754366522",
    "15222436542",
    "13369568452",
    "13893556666",
    "15366698558"
]

location = [
    "123.449169, 41.740567",
    "123.450169, 41.740705",
    "123.451169, 41.741405",
    "123.452169, 41.741805",
    "123.453654, 41.742405",
    "123.454654, 41.742805",
    "123.455654, 41.743405",
    "123.458654, 41.743705"
]


def sample_phone():
    return random.sample(phone, 1)[0]


def sample_location():
    return random.sample(location, 1)[0]


def generator_log(count=10):
    time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    f = open("/var/log/lyh.log", "a+")
    while count >= 1:
        query_log = "{phone}\t{location}\t{date}".format(phone=sample_phone(), location=sample_location(),
                                                         date=time_str)
        f.write(query_log + "\n")
        #   print query_log
        count = count - 1


if __name__ == '__main__':
    while True:
        generator_log(100)
        time.sleep(5)

把代碼上傳到linux環(huán)境運(yùn)行,腳本功能向/var/log/lyh.log文件內(nèi) 每隔5秒隨即生成100條數(shù)據(jù)空厌。內(nèi)容就是電話號(hào)碼+經(jīng)緯度+時(shí)間庐船,后期用Flink實(shí)時(shí)處理時(shí)候需要拿到經(jīng)緯度信息。

2.用filebeat組件抓取/var/log/lyh.log文件中不停增加的數(shù)據(jù)嘲更,然后輸出到kafka中

filebeat是ELK日志收集系統(tǒng)體系里抓取日志的插件筐钟,我們這里為了應(yīng)用一下用他來(lái)抓取我們上面Python腳本生成的數(shù)據(jù)。
修改filebeat.yml配置文件赋朦,配置監(jiān)控抓取信息的文件篓冲,和輸出的位置

filebeat.prospectors:
- type: log      #抓取信息后以log格式j(luò)son字符串輸出
  paths:
    - /var/log/lyh.log    #監(jiān)控抓取數(shù)據(jù)的文件

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
#如果不適用logstash對(duì)日志進(jìn)行過(guò)濾,也可以直接輸出到es
#output.elasticsearch:
#  hosts: ["172.24.112.17:9200"]
#  #輸出到kafka
output.kafka:
  hosts: ["hadoop1:9092", "hadoop2:9092", "hadoop3:9092"]
  topic: 'log'

注意:filebeat和kafka的版本一定要兼容否者報(bào)錯(cuò)宠哄,具體哪個(gè)版本之間互相兼容參考官方文檔https://www.elastic.co/guide/en/beats/filebeat/6.4/kafka-output.html
啟動(dòng)filebeat命令

sudo -u elk ./filebeat -e -c filebeat.yml -d "publish"

3.編寫Flink代碼壹将。從kafka消費(fèi)數(shù)據(jù),清洗數(shù)據(jù)拿到自己要的數(shù)據(jù)毛嫉,存入到ElasticSearch中

pom依賴:

<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.8</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-core</artifactId>
      <version>1.6.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.11</artifactId>
      <version>1.6.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.11</artifactId>
      <version>1.6.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>1.6.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.8 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
      <version>1.6.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/log4j/log4j -->
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.36</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
      <version>1.6.1</version>
    </dependency>

  </dependencies>

Flink代碼:

import java.text.SimpleDateFormat
import java.util.{Date, Properties}

import com.alibaba.fastjson.JSON
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests


object Flink_kafka {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 非常關(guān)鍵诽俯,一定要設(shè)置啟動(dòng)檢查點(diǎn)!承粤!
    env.enableCheckpointing(5000)

    //配置kafka信息
    val props = new Properties()
    props.setProperty("bootstrap.servers", "192.168.199.128:9092,192.168.199.131:9092,192.168.199.132:9092")
    props.setProperty("zookeeper.connect", "192.168.199.128:2181,192.168.199.131:2181,192.168.199.132:2181")
    props.setProperty("group.id", "test")
    //讀取數(shù)據(jù),第一個(gè)參數(shù)是kafka的topic惊畏,也就是上面filebeat配置文件里面設(shè)定的topic叫l(wèi)og
    val consumer = new FlinkKafkaConsumer08[String]("log", new SimpleStringSchema(), props)
    //設(shè)置只讀取最新數(shù)據(jù)
    consumer.setStartFromLatest()
    //添加kafka為數(shù)據(jù)源
    //18542360152   116.410588, 39.880172   2019-05-24 23:43:38
    val stream = env.addSource(consumer).map(
      x=>{
        JSON.parseObject(x)
      }
    ).map(x=>{
      x.getString("message")
    }).map(x=>{
      val jingwei=x.split("\\t")(1)
      val wei=jingwei.split(",")(0).trim
      val jing=jingwei.split(",")(1).trim
      //調(diào)一下時(shí)間格式,es里面存儲(chǔ)時(shí)間默認(rèn)是UTC格式日期密任,+0800是設(shè)置成北京時(shí)區(qū)
      val sdf=new SimpleDateFormat("yyyy-MM-dd\'T\'HH:mm:ss.SSS+0800")
      val time=sdf.format(new Date())
      val resultStr=wei+","+jing+","+time
      resultStr
    })

    stream.print() //數(shù)據(jù)清洗以后是這種樣子 123.450169,41.740705,2019-05-26T19:03:59.281+0800
    //把清洗好的數(shù)據(jù)存入es中,數(shù)據(jù)入庫(kù)
    val httpHosts = new java.util.ArrayList[HttpHost]
    httpHosts.add(new HttpHost("192.168.199.128", 9200, "http"))//es的client通過(guò)http請(qǐng)求連接到es進(jìn)行增刪改查操作

    val esSinkBuilder = new ElasticsearchSink.Builder[String](
      httpHosts,
      new ElasticsearchSinkFunction[String]{    //參數(shù)element就是上面清洗好的數(shù)據(jù)格式
        def createIndexRequest(element: String):IndexRequest={
          val json = new java.util.HashMap[String, String]
          json.put("wei", element.split(",")(0))
          json.put("jing", element.split(",")(1))
          json.put("time", element.split(",")(2))

          return Requests.indexRequest()
            .index("location-index")
              .`type`("location")
            .source(json)
        }

        override def process(element: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
          requestIndexer.add(createIndexRequest(element))
        }
      }
    )

    //批量請(qǐng)求的配置偷俭;這將指示接收器在每個(gè)元素之后發(fā)出請(qǐng)求浪讳,否則將對(duì)它們進(jìn)行緩沖。
    esSinkBuilder.setBulkFlushMaxActions(1)

    stream.addSink(esSinkBuilder.build())

    env.execute("Kafka_Flink")
  }
}

注意:ES存儲(chǔ)時(shí)間時(shí)候的格式和時(shí)區(qū)問(wèn)題
elasticsearch原生支持date類型涌萤,json格式通過(guò)字符來(lái)表示date類型淹遵。
所以在用json提交日期至elasticsearch的時(shí)候口猜,es會(huì)隱式轉(zhuǎn)換,把es認(rèn)為是date類型的字符串直接轉(zhuǎn)為date類型透揣。

date類型是包含時(shí)區(qū)信息的济炎,如果我們沒(méi)有在json代表日期的字符串中顯式指定時(shí)區(qū),對(duì)es來(lái)說(shuō)沒(méi)什么問(wèn)題辐真,
但是如果通過(guò)kibana顯示es里的數(shù)據(jù)時(shí)须尚,就會(huì)出現(xiàn)問(wèn)題,數(shù)據(jù)的時(shí)間會(huì)晚8個(gè)小時(shí)侍咱。

kibana在通過(guò)瀏覽器展示的時(shí)候耐床,會(huì)通過(guò)js獲取當(dāng)前客戶端機(jī)器所在的時(shí)區(qū),也就是東八區(qū)楔脯,所以kibana會(huì)把從es得到的日期數(shù)據(jù)減去8小時(shí)撩轰。
最佳實(shí)踐方案就是:往es提交日期數(shù)據(jù)時(shí),直接提交帶有時(shí)區(qū)信息的日期字符串昧廷,
如:“2016-07-15T12:58:17.136+0800”堪嫂。 這個(gè)是世界協(xié)調(diào)時(shí)間(UTC)格式-es默認(rèn)支持的格式
java格式化:

String FULL_FORMAT="yyyy-MM-dd\'T\'HH:mm:ss.SSS+0800";
Date now=new Date();
new SimpleDateFormat(FULL_FORMAT).format(now)

4.目前數(shù)據(jù)已經(jīng)入庫(kù),用springboot創(chuàng)建一個(gè)web項(xiàng)目木柬,從es里查出數(shù)據(jù)皆串,在前臺(tái)高德熱力圖組件里動(dòng)態(tài)展示

整個(gè)web項(xiàng)目的pom依賴:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--JavaServer Pages Standard Tag Library,JSP標(biāo)準(zhǔn)標(biāo)簽庫(kù)-->
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>jstl</artifactId>
        </dependency>

        <!--內(nèi)置tocat對(duì)Jsp支持的依賴弄诲,用于編譯Jsp-->
        <dependency>
            <groupId>org.apache.tomcat.embed</groupId>
            <artifactId>tomcat-embed-jasper</artifactId>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.4.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.36</version>
        </dependency>
    </dependencies>

4.1我們要從es中查出距離當(dāng)前時(shí)間20秒以內(nèi)的所有數(shù)據(jù)愚战,并且按經(jīng)緯度聚合統(tǒng)計(jì)數(shù)量。
es的查詢語(yǔ)句:


QQ截圖20190526193643.png

使用聚合查詢之前要先設(shè)置一下mapping齐遵,把jing和wei的屬性fielddata設(shè)置成true寂玲,默認(rèn)是false。不改成true進(jìn)行聚會(huì)查詢會(huì)報(bào)錯(cuò)梗摇。
上面語(yǔ)句是先查出距離當(dāng)前時(shí)間20秒內(nèi)的所有數(shù)據(jù)拓哟,然后根據(jù)jing和wei數(shù)據(jù)進(jìn)行聚合也就是sql里的group by,聚會(huì)以后統(tǒng)計(jì)總數(shù)伶授。意思就是當(dāng)前經(jīng)緯度內(nèi)的總?cè)藬?shù)断序,數(shù)越大代表該區(qū)域人越多。
web代碼里面我們要把上述查詢語(yǔ)句通過(guò)es的api換成java代碼實(shí)現(xiàn)糜烹。

先創(chuàng)建一個(gè)Location實(shí)體類违诗,來(lái)存放查詢出來(lái)的數(shù)據(jù),總數(shù)疮蹦,經(jīng)度诸迟,緯度。

public class Location {
    private Integer count;
    private double wei;
    private double jing;

    public Integer getCount() {
        return count;
    }

    public void setCount(Integer count) {
        this.count = count;
    }

    public double getWei() {
        return wei;
    }

    public void setWei(double wei) {
        this.wei = wei;
    }

    public double getJing() {
        return jing;
    }

    public void setJing(double jing) {
        this.jing = jing;
    }
}

寫一個(gè)es的工具類,創(chuàng)建和es連接的client阵苇,一些基本增刪改查方法壁公,以及上面查詢語(yǔ)句的java代碼實(shí)現(xiàn)

import com.test.flink_web_show.controller.Location;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.script.mustache.SearchTemplateRequest;
import org.elasticsearch.script.mustache.SearchTemplateResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class RestClientUtils {

    private RestHighLevelClient client = null;

    public RestClientUtils() {
        if (client == null){
            synchronized (RestHighLevelClient.class){
                if (client == null){
                    client = getClient();
                }
            }
        }
    }

    private RestHighLevelClient getClient(){
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("192.168.199.128", 9200, "http"),
                        new HttpHost("192.168.199.128", 9201, "http")));

        return client;
    }

    public void closeClient(){
        try {
            if (client != null){
                client.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    /*------------------------------------------------ search Api 多條件查詢 start ----------------------------------------------*/
    /**
     * 查詢模板
     * @throws Exception
     */
    public List<Location> searchTemplate(String indexName, String JsonStr, Map<String, Object> scriptParams) throws Exception{
        //Inline Templates
        SearchTemplateRequest request = new SearchTemplateRequest();
        request.setRequest(new SearchRequest(indexName));
        request.setScriptType(ScriptType.INLINE);
        request.setScript(JsonStr);

        request.setScriptParams(scriptParams);

        //Synchronous Execution
        SearchTemplateResponse response = client.searchTemplate(request, RequestOptions.DEFAULT);

        //SearchTemplate Response
        SearchResponse searchResponse = response.getResponse();
        //Retrieving SearchHits 獲取結(jié)果數(shù)據(jù)
        SearchHits hits = searchResponse.getHits();
        long totalHits = hits.getTotalHits();
        float maxScore = hits.getMaxScore();
        System.out.println("totalHits: " + totalHits);
        System.out.println("maxScore: " + maxScore);
        System.out.println("------------------------------------------");
        SearchHit[] searchHits = hits.getHits();
        /*for (SearchHit hit : searchHits) {
            // do something with the SearchHit
            String index = hit.getIndex();
            String type = hit.getType();
            String id = hit.getId();
            float score = hit.getScore();

            String sourceAsString = hit.getSourceAsString();
            System.out.println("index: " + index);
            System.out.println("type: " + type);
            System.out.println("id: " + id);
            System.out.println("score: " + score);
            System.out.println(sourceAsString);
            System.out.println("------------------------------------------");
        }*/
        //得到aggregations下內(nèi)容
        ArrayList<Location> locations = new ArrayList<>();
        Aggregations aggregations = searchResponse.getAggregations();
        if(aggregations!=null){
            Map<String, Aggregation> aggregationMap = aggregations.getAsMap();
            Terms companyAggregation = (Terms) aggregationMap.get("group_by_jing");
            List<? extends Terms.Bucket> buckets = companyAggregation.getBuckets();
            for(Terms.Bucket bk:buckets){
                Location location = new Location();
                Object key = bk.getKey();
                long docCount = bk.getDocCount();
                System.out.println("key: "+key.toString());
                System.out.println("doc_count: "+docCount);
                String jingdu = key.toString().split("#split#")[0];
                String substring_jing = jingdu.substring(1, jingdu.length() - 1);
                location.setJing(Double.parseDouble(substring_jing));
                String weidu = key.toString().split("#split#")[1];
                String substring_wei = weidu.substring(1, weidu.length() - 1);
                location.setWei(Double.parseDouble(substring_wei));
                location.setCount((int)docCount);

                locations.add(location);
            }
        }
        return locations;
    }
}

es的java api比較復(fù)雜具體參考我的另一篇簡(jiǎn)書(shū)ElasticSearch java API

Controller代碼:

import com.alibaba.fastjson.JSON;
import com.test.flink_web_show.es_utils.RestClientUtils;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.servlet.ModelAndView;

import javax.servlet.http.HttpServletResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Controller
public class HomeController {

    @RequestMapping("/")
    public ModelAndView home()
    {
        ModelAndView modelAndView = new ModelAndView();

        modelAndView.setViewName("index");
        return modelAndView;
    }
    @RequestMapping("/get_map")
    public void getMap(HttpServletResponse response) throws Exception{
        RestClientUtils restClientUtils = new RestClientUtils();
        String searchJSON="{\n" +
                "  \"query\": {\n" +
                "    \"bool\": {\n" +
                "      \"filter\": {\n" +
                "        \"range\": {\n" +
                "          \"{{time}}\": {\n" +
                "            \"{{gte}}\": \"{{value1}}\", \n" +
                "            \"{{lt}}\": \"{{now}}\"\n" +
                "          }\n" +
                "        }\n" +
                "      }\n" +
                "    }\n" +
                "  },\n" +
                "  \"aggs\": {\n" +
                "    \"{{group_by_jing}}\": {\n" +
                "      \"terms\": {\n" +
                "        \"script\": \"{{doc['jing'].values +'#split#'+ doc['wei'].values}}\"\n" +
                "      }\n" +
                "    }\n" +
                "  }\n" +
                "}";
        Map<String, Object> map = new HashMap<>();
        map.put("time","time");
        map.put("gte","gte");
        map.put("value1","now-20s");
        map.put("lt","lt");
        map.put("now","now");
        map.put("group_by_jing","group_by_jing");
        map.put("doc['jing'].values +'#split#'+ doc['wei'].values","doc['jing'].values +'#split#'+ doc['wei'].values");

        List<Location> locations = restClientUtils.searchTemplate("location-index", searchJSON, map);
        restClientUtils.closeClient();
        String json = JSON.toJSONString(locations);
        response.getWriter().print(json);
    }
}

前臺(tái)jsp代碼:

<%--
  Created by IntelliJ IDEA.
  User: ttc
  Date: 2018/7/6
  Time: 14:06
  To change this template use File | Settings | File Templates.
--%>
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8"/>
    <title>高德地圖</title>
    <link rel="stylesheet" />
</head>
<body>
<script src="https://cdn.bootcss.com/echarts/4.1.0.rc2/echarts.min.js"></script>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
<script src="http://webapi.amap.com/maps?v=1.4.9&amp;key=d16808eab90b7545923a1c2f4bb659ef"></script>
<div id="container"></div>

<script>
    //定義地圖第一次打開(kāi)的中心位置
    var map = new AMap.Map("container", {
        resizeEnable: true,
        center: [123.453169, 41.742567],   //中心點(diǎn)的經(jīng)緯度
        zoom: 17                        //初始地圖的縮放度
    });

    var heatmap;
    map.plugin(["AMap.Heatmap"],function() {      //加載熱力圖插件
        heatmap = new AMap.Heatmap(map,{
            raduis:50,
            opacity:[0,0.7]
        });    //在地圖對(duì)象疊加熱力圖
        //具體參數(shù)見(jiàn)接口文檔
    });
    //定時(shí)函數(shù)每1秒就發(fā)送一個(gè)ajax請(qǐng)求,去es里面查詢數(shù)據(jù)賦值給points對(duì)象绅项,從而更新heatmap對(duì)象來(lái)給熱力圖添加數(shù)據(jù)
    setInterval(function (args) {
        var points =(function a(){  //<![CDATA[
            var city=[];
            $.ajax({
                type:"POST",
                url:"/get_map",
                dataType:'json',
                async:false,        //
                success:function(result){
                    for(var i=0;i<result.length;i++){
                        //alert("調(diào)用了");
                        city.push({"lng":result[i].wei,"lat":result[i].jing,"count":result[i].count});
                    }

                }
            });
            return city;
        })();//]]>
        heatmap.setDataSet({data:points,max:70}); //設(shè)置熱力圖數(shù)據(jù)集
    },1000)
    // var map = new AMap.Map('container', {
    //    pitch:75, // 地圖俯仰角度紊册,有效范圍 0 度- 83 度
    //    viewMode:'3D' // 地圖模式
    //});
</script>

</body>
</html>

上述為全部代碼部分
按順序啟動(dòng)項(xiàng)目全部流程
啟動(dòng)zookeeper

zkServer.sh start

啟動(dòng)kafka

bin/kafka-server-start.sh config/server.properties

啟動(dòng)es

sudo -u elk bin/elasticsearch

啟動(dòng)filebeat

sudo -u elk ./filebeat -e -c filebeat.yml -d "publish"

啟動(dòng)Python腳本生成模擬數(shù)據(jù)

python phoneData_every5second.py

啟動(dòng)Flink項(xiàng)目,實(shí)時(shí)接收并處理數(shù)據(jù)存入到es
啟動(dòng)web項(xiàng)目完成動(dòng)態(tài)地圖展示


QQ截圖20190526201432.png

不會(huì)截取gif動(dòng)圖快耿,數(shù)據(jù)模擬也不是很好囊陡,圖中的熱力圖圓圈顏色會(huì)根據(jù)模擬數(shù)據(jù)一直不停變化。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末润努,一起剝皮案震驚了整個(gè)濱河市关斜,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌铺浇,老刑警劉巖痢畜,帶你破解...
    沈念sama閱讀 217,542評(píng)論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異鳍侣,居然都是意外死亡丁稀,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,822評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門倚聚,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)线衫,“玉大人,你說(shuō)我怎么就攤上這事惑折∈谡耍” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,912評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵惨驶,是天一觀的道長(zhǎng)白热。 經(jīng)常有香客問(wèn)我,道長(zhǎng)粗卜,這世上最難降的妖魔是什么屋确? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,449評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮续扔,結(jié)果婚禮上攻臀,老公的妹妹穿的比我還像新娘。我一直安慰自己纱昧,他們只是感情好刨啸,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,500評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著识脆,像睡著了一般设联。 火紅的嫁衣襯著肌膚如雪加匈。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,370評(píng)論 1 302
  • 那天仑荐,我揣著相機(jī)與錄音,去河邊找鬼纵东。 笑死粘招,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的偎球。 我是一名探鬼主播洒扎,決...
    沈念sama閱讀 40,193評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼衰絮!你這毒婦竟也來(lái)了袍冷?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,074評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤猫牡,失蹤者是張志新(化名)和其女友劉穎胡诗,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體淌友,經(jīng)...
    沈念sama閱讀 45,505評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡煌恢,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,722評(píng)論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了震庭。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瑰抵。...
    茶點(diǎn)故事閱讀 39,841評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖器联,靈堂內(nèi)的尸體忽然破棺而出二汛,到底是詐尸還是另有隱情,我是刑警寧澤拨拓,帶...
    沈念sama閱讀 35,569評(píng)論 5 345
  • 正文 年R本政府宣布肴颊,位于F島的核電站,受9級(jí)特大地震影響千元,放射性物質(zhì)發(fā)生泄漏苫昌。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,168評(píng)論 3 328
  • 文/蒙蒙 一幸海、第九天 我趴在偏房一處隱蔽的房頂上張望祟身。 院中可真熱鬧,春花似錦物独、人聲如沸袜硫。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,783評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)婉陷。三九已至帚称,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間秽澳,已是汗流浹背闯睹。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,918評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留担神,地道東北人楼吃。 一個(gè)月前我還...
    沈念sama閱讀 47,962評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像妄讯,于是被迫代替她去往敵國(guó)和親孩锡。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,781評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容