利用大數(shù)據(jù)技術(shù)分析新浪財(cái)經(jīng)美股行情

學(xué)習(xí)大數(shù)據(jù)技術(shù)有一段時(shí)間了,之前也寫(xiě)過(guò)一些零零散散的博客排苍,作為自己學(xué)習(xí)的一些記錄,不過(guò)每篇博客都只是涵蓋部分技術(shù)学密。這次想寫(xiě)一篇比較完整的博客淘衙,記錄一個(gè)完整的項(xiàng)目從頭到尾生產(chǎn)的過(guò)程,也是對(duì)自己學(xué)習(xí)的一個(gè)總結(jié)

廢話不多說(shuō)则果,直入正題

這次的項(xiàng)目涉及了兩條流程

一條是離線處理幔翰。爬蟲(chóng)爬到股票數(shù)據(jù)后盟劫,先交給 Map Reduce 清洗一下周拐,生成格式化的數(shù)據(jù),然后倒入 hive 進(jìn)行分析帝洪,之后交給 sqoop 導(dǎo)出至 mysql 并用 echarts 可視化展現(xiàn)


離線處理

另一條是實(shí)時(shí)處理款青。爬蟲(chóng)一直爬取數(shù)據(jù)做修,flume 監(jiān)控爬蟲(chóng)爬下來(lái)的文件所在目錄,并不斷傳送給 kafka,spark streaming 會(huì)定期從 kafka 那里拿到數(shù)據(jù)饰及,實(shí)時(shí)分析并將數(shù)據(jù)保存到 mysql蔗坯,最后可視化。


實(shí)時(shí)處理

離線流程

網(wǎng)頁(yè)結(jié)構(gòu)分析

本次爬取 新浪財(cái)經(jīng)美股實(shí)時(shí)行情燎含,頁(yè)面長(zhǎng)這樣

新浪財(cái)經(jīng)美股實(shí)時(shí)行情

F12宾濒,打開(kāi)開(kāi)發(fā)者工具,選擇 network 面板屏箍,F(xiàn)5 刷新頁(yè)面绘梦,找到股票的 json 數(shù)據(jù)的 api 接口。

這是 api 接口

不同的網(wǎng)站尋找 api 接口的方式不太一樣赴魁,給大家一個(gè)小訣竅卸奉,一般的接口都是 xhr 或 script 類(lèi)型,而且它的 url 后面一般都會(huì)跟著一個(gè) page 參數(shù)颖御,代表著這是第幾頁(yè)

雙擊 url 之后來(lái)到了一個(gè)新的頁(yè)面

股票的 json 格式數(shù)據(jù)

這里可以看到返回的數(shù)據(jù)不是標(biāo)準(zhǔn)的 json 格式榄棵,前面跟著一串 IO.XSRV2.CallbackList['QGNtUNkM_FleaeT1'] ,而且我們也可以在 url 里面看到這一串字符潘拱,現(xiàn)在在 url 里他刪掉疹鳄,結(jié)果就變成了下面這樣子。

基本標(biāo)準(zhǔn)的 json 格式數(shù)據(jù)

現(xiàn)在數(shù)據(jù)的格式基本標(biāo)準(zhǔn)了芦岂,只不過(guò)最前面多了兩對(duì)小括號(hào)尚辑,我們?cè)跁?huì)在爬蟲(chóng)程序里面去掉它。根據(jù)上面拿到的的 url 盔腔,開(kāi)始編寫(xiě)我們的爬蟲(chóng)。

爬取數(shù)據(jù)

爬蟲(chóng)程序我寫(xiě)了兩種方案月褥,一種是用 python 語(yǔ)言寫(xiě)的弛随;還有一種是使用 java 語(yǔ)言實(shí)現(xiàn)的 webmagic 框架寫(xiě)的,由于篇幅問(wèn)題宁赤,python 的方案就不在這篇博客里面采用了舀透,以后可能會(huì)單開(kāi)一篇博客介紹 python 版的股票爬蟲(chóng)。

WebMagic 是一個(gè)國(guó)人寫(xiě)的簡(jiǎn)單靈活的Java爬蟲(chóng)框架决左。

要使用 webmagic 愕够,首先下載它的依賴包 webmagic-0.7.3-all.tar.gz
在 eclipse 里面新建一個(gè) Java Project,在工程根目錄下新建一個(gè)文件夾佛猛,將依賴包解壓至文件夾中惑芭,全選之后添加到 Build Path

Add to Build Path

然后就可以寫(xiě)爬蟲(chóng)代碼了

private Site site = Site.me().setDomain("stock.finance.sina.com.cn").setSleepTime(2000)
            .setUserAgent("Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:39.0) Gecko/20100101 Firefox/39.0");

首先構(gòu)建要請(qǐng)求的 Site ,這里注意一點(diǎn)继找,代碼最上面的 setDomain("stock.finance.sina.com.cn") 里面的這個(gè) "stock.finance.sina.com.cn" 就是下圖中的 Host遂跟,同時(shí)也是爬蟲(chóng)程序下載的網(wǎng)頁(yè)所存放的目錄

Host

然后編寫(xiě)一個(gè)方法,過(guò)濾掉前面提到的 json 數(shù)據(jù)外邊的兩對(duì)小括號(hào)

public String regexJson(String source) {      
//用于去掉包裹 json 數(shù)據(jù)的兩對(duì)小括號(hào)
        String regex = "\\{.*\\}";
        String result;
        Pattern pattern = Pattern.compile(regex);
        Matcher matcher = pattern.matcher(source);
        if (matcher.find()) {
            result = matcher.group(0);
        } else {
            result = null;
        }
        return result;
}

在 process 方法中編寫(xiě)爬蟲(chóng)邏輯

@Override
public void process(Page page) {
    // TODO Auto-generated method stub
    page.putField("sixty", regexJson(page.getJson().toString()));
}

主方法

public static void main(String[] args) {
        String url_init = "http://stock.finance.sina.com.cn/usstock/api/jsonp.php/" + 
    "/US_CategoryService.getList?num=60&sort=&asc=0&page=1";
        String url_pattern = "http://stock.finance.sina.com.cn/usstock/api/jsonp.php/" + 
            "/US_CategoryService.getList?num=60&sort=&asc=0&page=";
        String output = "/data/edu1/tmp/";
        QueueScheduler scheduler = new QueueScheduler();
        Spider spider = Spider.create(new GetStock())
                .addUrl(url_init)
                .setScheduler(scheduler)
                .addPipeline(new JsonFilePipeline(output))
                .addPipeline(new ConsolePipeline());
        for (int i = 1; i < 140; i++) {
            Request request = new Request();
            request.setUrl(url_pattern + i);
            scheduler.push(request, spider);
        }
        spider.thread(50).run();
}

爬蟲(chóng)運(yùn)行結(jié)束后,會(huì)在 /data/edu1/tmp/stock.finance.sina.com.cn 下面生成許多 json 文件幻锁,查看某一個(gè)文件凯亮,可以看到里面的 json 字符串。

json 文件內(nèi)容

接下來(lái)我們把這些文件上傳到 hdfs 上面哄尔,然后開(kāi)始編寫(xiě) MapReduce 程序清洗臟數(shù)據(jù)

hadoop fs -put /data/edu1/tmp/stock.finance.sina.com.cn/* /mystock/in

MapReduce清洗

數(shù)據(jù)清洗從名字上也看的出就是把“臟”的數(shù)據(jù)“洗掉”假消,指發(fā)現(xiàn)并糾正數(shù)據(jù)文件中可識(shí)別的錯(cuò)誤的最后一道程序,包括檢查數(shù)據(jù)一致性岭接,處理無(wú)效值和缺失值等富拗。

格式化之后的 json

這里我們把 json 格式的數(shù)據(jù)最終洗成可以直接導(dǎo)入 hive 的以 '\t' 為分隔符文本格式。而且 json 數(shù)據(jù)中有的字段會(huì)有缺失的現(xiàn)象出現(xiàn)亿傅,所以我們還要填補(bǔ)空值媒峡,保持?jǐn)?shù)據(jù)的一致性

這里我們用到了阿里的 fastjson 庫(kù)來(lái)解析 json

map 代碼:

@Override
protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String initJsonString = value.toString();
            JSONObject initJson = JSONObject.parseObject(initJsonString);
            if (!initJsonString.contains("data")) {
                return;
            }

            JSONObject myjson = initJson.getJSONObject("sixty");

            JSONArray data = myjson.getJSONArray("data");

            for (int i = 0; i < data.size(); i++) {
                JSONObject stock = data.getJSONObject(i);
                String name = stock.getString("name").trim().equals("")?"null":stock.getString("name").trim().replaceAll("\r|\n|\t", "");
                String cname = stock.getString("cname").trim().equals("")?"null":stock.getString("cname").trim().replaceAll("\r|\n|\t", "");
                String category = stock.getString("category");
                if (category == null || category.equals("")) {
                    category = "null";
                } else {
                    category = category.toString().trim().replaceAll("\r|\n|\t", "");
                }
                System.out.println(category);
                String symbol = stock.getString("symbol").trim().equals("")?"null":stock.getString("symbol").trim().replaceAll("\r|\n|\t", "");
                String price = stock.getString("price").trim().equals("")?"null":stock.getString("price").trim().replaceAll("\r|\n|\t", "");
                String diff = stock.getString("diff").trim().equals("")?"null":stock.getString("diff").trim().replaceAll("\r|\n|\t", "");
                String chg = stock.getString("chg").trim().equals("")?"null":stock.getString("chg").trim().replaceAll("\r|\n|\t", "");
                String preclose = stock.getString("preclose").equals("")?"null":stock.getString("preclose").trim().replaceAll("\r|\n|\t", "");
                String open = stock.getString("open").trim().equals("")?"null":stock.getString("open").trim().replaceAll("\r|\n|\t", "");
                String high = stock.getString("high").trim().equals("")?"null":stock.getString("high").trim().replaceAll("\r|\n|\t", "");
                String low = stock.getString("low").trim().equals("")?"null":stock.getString("low").trim().replaceAll("\r|\n|\t", "");
                String amplitude = stock.getString("amplitude").trim().equals("")?"null":stock.getString("amplitude").trim().replaceAll("\r|\n|\t", "");
                String volume = stock.getString("volume").trim().equals("")?"null":stock.getString("volume").trim().replaceAll("\r|\n|\t", "");
                String mktcap = stock.getString("mktcap").trim().equals("")?"null":stock.getString("mktcap").trim().replaceAll("\r|\n|\t", "");
                String pe = stock.getString("pe");
                if (pe == null || pe.equals("")) {
                    pe = "null";
                } else {
                    pe = pe.trim().replaceAll("\r|\n|\t", "");
                }
                String market = stock.getString("market").trim().equals("")?"null":stock.getString("market").trim().replaceAll("\r|\n|\t", "");
                String category_id = stock.getString("category_id");
                if (category_id == null || category_id.equals("")) {
                    category_id = "null";
                } else {
                    category_id = category_id.toString().trim().replaceAll("\r|\n|\t", "");
                }
                StringBuffer sb = new StringBuffer();

                sb.append(name);            sb.append("\t");
                sb.append(cname);           sb.append("\t");
                sb.append(category);        sb.append("\t");
                sb.append(symbol);          sb.append("\t");
                sb.append(price);           sb.append("\t");
                sb.append(diff);            sb.append("\t");
                sb.append(chg);             sb.append("\t");
                sb.append(preclose);        sb.append("\t");
                sb.append(open);            sb.append("\t");
                sb.append(high);            sb.append("\t");
                sb.append(low);             sb.append("\t");
                sb.append(amplitude);       sb.append("\t");
                sb.append(volume);          sb.append("\t");
                sb.append(mktcap);          sb.append("\t");
                sb.append(pe);              sb.append("\t");
                sb.append(market);          sb.append("\t");
                sb.append(category_id);
                String result = sb.toString();
                context.write(new Text(result), new Text());
            }
}

這里解釋一下每個(gè)字段的含義

字段 含義
name # 英文名稱
cname # 中文名稱
category # 行業(yè)板塊
symbol # 代碼
price # 最新價(jià)
diff # 漲跌額
chg # 漲跌幅
preclose # 昨收
open # 今開(kāi)盤(pán)
high # 最高價(jià)
low # 最低價(jià)
amplitude # 振幅
volume # 成交量
mktcap # 市值(億)
pe # 市盈率
market # 上市地
category_id # 板塊ID

main 方法:

    public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException {
            Job job = Job.getInstance();
            job.setJobName("QingXiStock");
            job.setJarByClass(QingXiStock.class);

            job.setMapperClass(doMapper.class);
            //job.setReducerClass(doReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            Path in = new Path("hdfs://localhost:9000//mystock/in");
            Path out = new Path("hdfs://localhost:9000//mystock/out");
            FileInputFormat.addInputPath(job, in);
            FileOutputFormat.setOutputPath(job, out);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

清洗結(jié)束后,會(huì)在 hdfs 的 /mystock/out 目錄下生成新文件

清洗后的數(shù)據(jù)

導(dǎo)入 Hive 葵擎,進(jìn)行分析

數(shù)據(jù)已經(jīng)沒(méi)問(wèn)題了谅阿,接下來(lái)可以直接導(dǎo)入 hive 來(lái)進(jìn)行分析

首先進(jìn)入 hive ,新建一個(gè)數(shù)據(jù)庫(kù)

create database sina;

然后新建一個(gè)外部表 stock 酬滤,之所以新建一個(gè)外部表签餐,是因?yàn)橥獠勘聿粫?huì)移動(dòng)數(shù)據(jù),它只是存放元數(shù)據(jù)盯串,當(dāng)外部表刪除后氯檐,只是刪除了元數(shù)據(jù),而數(shù)據(jù)不會(huì)被刪掉体捏,所以相對(duì)更安全

create external table if not exists stock (
    > name string,
    > cname string,
    > category string,
    > symbol string,
    > price float,
    > diff float,
    > chg float,
    > preclose float,
    > open float,
    > high float,
    > low float,
    > amplitude string,
    > volume bigint,
    > mktcap bigint,
    > pe float,
    > market string,
    > category_id int
    > ) row format delimited
    > fields terminated by '\t'
    > lines terminated by '\n'
    > stored as textfile
    > location '/mystock/out/';

接下來(lái)查看一下前十條數(shù)據(jù)

select * form stock limit 10;

前十條數(shù)據(jù)

查看一下各個(gè)板塊包含的股票數(shù)量

select category,count(category) as num from stock group by category order by num desc;
各個(gè)股票板塊的股票數(shù)量

查看市值最高的十支股票

select cname,mktcap from stock order by mktcap desc limit 10;
市值最高的十支股票

查看各個(gè)上市地區(qū)的股票數(shù)量

select market,count(market) as num from stock group by market order by num desc;
各個(gè)上市地區(qū)的股票數(shù)量

查看成交量最高的十支股票

select cname,symbol,volume from stock order by volume desc limit 10;
成交量最高的十支股票

查看市盈率最高的十支股票

select cname,pe,symbol from stock order by pe desc limit 10;
市盈率最高的十支股票

漲跌幅最高的十支股票

select cname,symbol,chg from stock order by chg desc limit 10;
漲跌幅最高的十支股票

查看數(shù)據(jù)的總量一共有多少

select count(*) from stock;

8141條

Sqoop 導(dǎo)出至 MySQL

hive 分析完之后冠摄,接下來(lái)使用 sqoop 將 hive 中的數(shù)據(jù)導(dǎo)出到 mysql ,因?yàn)槲覀兊臄?shù)據(jù)量只有 8000 多條几缭,所以這里直接導(dǎo)出整個(gè)表河泳。

首先在 mysql 里面新建一個(gè)數(shù)據(jù)庫(kù) sina

create database if not exists sina default charset utf8 collate utf8_general_ci;

進(jìn)入數(shù)據(jù)庫(kù)后新建一個(gè)表

create table if not exists stock (
    -> name varchar(100),
    -> cname varchar(100),
    -> category varchar(100),
    -> symbol varchar(50),
    -> price float,
    -> diff float,
    -> chg float,
    -> preclose float,
    -> open float,
    -> high float,
    -> low float,
    -> amplitude varchar(50),
    -> volume bigint,
    -> mktcap bigint,
    -> pe float,
    -> market varchar(10),
    -> category_id int(10)
    -> ) default charset=utf8;

然后執(zhí)行 sqoop 導(dǎo)出命令

sqoop export \
> --connect jdbc:mysql://localhost:3306/sina?characterEncoding=UTF-8 \
> --username root \
> --password strongs \
> --table stock \
> --export-dir /user/hive/warehouse/sina.db/stock/part-r-00000 \
> --input-fields-terminated-by '\t'

導(dǎo)出過(guò)程執(zhí)行完畢后,查看一下 mysql 中的數(shù)據(jù)

select * from stock limit 10;

mysql 中的數(shù)據(jù)

查看一下數(shù)據(jù)總量年栓,可以看到和 hive 中的一樣拆挥,是 8141 條


8141條

數(shù)據(jù)可視化

可視化過(guò)程我之前寫(xiě)過(guò)一篇博客 利用ECharts可視化mysql數(shù)據(jù)庫(kù)中的數(shù)據(jù) 和這次的道理差不多,只不過(guò)這次的 echarts 用到了 ajax 動(dòng)態(tài)加載某抓,而且所有的請(qǐng)求都?xì)w到了一個(gè) servlet 纸兔,所以這里只貼一下 echarts 和 servlet 的代碼

工程目錄

可視化工程目錄

ServletBase.java 是一個(gè) servlet 抽象類(lèi),我們的 servlet 需要繼承它否副,然后實(shí)現(xiàn)里面的方法

package my.servlet;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.lang.reflect.Method;

/**
 * Created by teaGod on 2017/12/7.
 */
@WebServlet(name = "ServletBase")
public abstract class ServletBase extends HttpServlet {
    private static final long serialVersionUID = 1L;

    @Override
    public void service(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        String methodName = request.getParameter("cmd");
        if(methodName==null || methodName.trim().equals("")){
            methodName="execute";
        }
        try{
            Method method = this.getClass()
                    .getMethod(methodName,
                            HttpServletRequest.class,
                            HttpServletResponse.class);
            method.invoke(this,request,response);
        }catch(Exception e){
            throw new RuntimeException(e);
        }
    }

    public abstract void execute(HttpServletRequest request, HttpServletResponse response)
            throws Exception;
}

ServletStock.java 根據(jù)各個(gè) jsp 傳過(guò)來(lái)的 opt 參數(shù)來(lái)確定執(zhí)行哪些邏輯

package my.servlet;

import my.entity.*;
import my.manager.StockManager;
import net.sf.json.JSONArray;
import org.apache.log4j.Logger;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;

/**
 * Created by teaGod on 2017/12/7.
 */
@WebServlet(name = "ServletStock", value = "/ServletStock")
public class ServletStock extends ServletBase {
    private static final long serialVersionUID = 1L;

    public static Logger logger = Logger.getLogger(ServletStock.class);

    @Override
    public void execute(HttpServletRequest request, HttpServletResponse response) throws Exception {

        response.setContentType("text/html;charset=utf-8");

        String opt = request.getParameter("opt");
        StockManager stockManager = new StockManager();

        if (opt.equals("marketCount")) {
            List<MarketCount> list;
            list = stockManager.getMarketCountList();
            writeJson(list, response);
        } else if (opt.equals("categoryCount")) {
            List<CategoryCount> list;
            list = stockManager.getCategoryCountList();
            writeJson(list, response);
        } else if (opt.equals("cnameMktnum")) {
            List<CnameMktnum> list;
            list = stockManager.getCnameMktnumList();
            writeJson(list, response);
        } else if (opt.equals("marketSumchg")) {
            List<MarketSumchg> list;
            list = stockManager.getMarketSumchgList();
            writeJson(list, response);
        } else if (opt.equals("cnameVolume")) {
            List<CnameVolume> list;
            list = stockManager.getCnameVolumeList();
            writeJson(list, response);
        } else if (opt.equals("cnameHigh")) {
              List<CnameHigh> list;
              list = stockManager.getCnameHighList();
              writeJson(list, response);
         }
    }

    private void writeJson(List list, HttpServletResponse response) throws IOException {
        JSONArray jsonArray = JSONArray.fromObject(list);
        //System.err.println(jsonArray);
        PrintWriter out = response.getWriter();
        out.print(jsonArray);
        out.flush();
        out.close();
    }
}

不同的上市地所占比例汉矿,可以看到選擇紐約證券交易所上市的股票已經(jīng)占了一半以上

不同的上市地所占比例

echarts 代碼

<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta charset="utf-8">
<title>不同上市地所占比例</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">

<%@include file="../basic/cssjs.jsp"%>

<!-- 引入Jquery包 -->
<script type="text/javascript" src="../js/jquery-2.1.4.min.js"></script>

<!-- 引入Echarts3包 -->
<script type="text/javascript" src="../js/echarts.min.js"></script>

</head>

<body>

    <%@include file="../basic/top.jsp"%>

    <!-- content start -->
    <div class="container-fluid">
        <div class="row-fluid">

            <div class="span3">
                <!-- left navbar start -->
                <%@include file="../basic/left.jsp"%>
                <!-- left navbar end -->
            </div>

            <!-- right content start -->
            <div class="span9">
                <div class="session well">餅狀圖可用來(lái)展現(xiàn)相對(duì)簡(jiǎn)單的比例構(gòu)成關(guān)系,讓觀者能從中熟悉某個(gè)項(xiàng)目與整個(gè)數(shù)據(jù)組間所存在的比例關(guān)系副编。</div>
                <div class="session">
                    <div id="main" style="width: 100%; height: 600px;"></div>
                </div>

            </div>
            <!-- right content end -->
        </div>
    </div>
    <!-- content end -->
</body>
</html>
<script type="text/javascript">
// 基于準(zhǔn)備好的dom负甸,初始化echarts實(shí)例
var myChart = echarts.init(document.getElementById('main'));
// 指定圖表的配置項(xiàng)和數(shù)據(jù)
myChart.setOption({
    title : {
        text: '不同上市地所占比例',
        x:'center'
    },
    tooltip : {
        trigger: 'item',
        formatter: '{a} <br/>流强 : {c} (kguikeq%)'
    },
    legend: {
        orient: 'vertical',
        y:'100',
         data: []
    },
    toolbox: {
        show : true,
        feature : {
            mark : {show: true},
            dataView : {show: true, readOnly: false},
            restore : {show: true},
            saveAsImage : {show: true}
        }
    },
    series : [
        {
            name: '上市地之間比例',
            type: 'pie',
            radius : '55%',
            center: ['50%', '60%'],
            //data:[{}],
            data:[],
            itemStyle: {
                emphasis: {
                    shadowBlur: 10,
                    shadowOffsetX: 0,
                    shadowColor: 'rgba(0, 0, 0, 0.5)'
                }
            }
        }
    ]
});

//異步加載數(shù)據(jù)
var mapOnlyKey = [];
var mapKeyValue = [];
var mapOnlyValue = [];

    var info = {"opt":"marketCount"};
    $.post("../ServletStock", info, function(data){
            mapOnlyKey.length=0;
            mapKeyValue.length=0;
            mapOnlyValue.length=0;

            for(var i=0; i < data.length; i++){
                mapOnlyKey.push(data[i].market);
                mapKeyValue.push({"value":Math.round(data[i].count), "name": data[i].market});
                mapOnlyValue.push( data[i].count );
            }
            
              //console.log(mapOnlyKey);
                //console.log(mapKeyValue);
               // console.log(mapOnlyValue);
                //return false; 
                // 填入數(shù)據(jù)
                myChart.setOption({
                    legend: {
                        //類(lèi)別
                        data: mapOnlyKey
                    },
                    series: [{
                        // 根據(jù)名字對(duì)應(yīng)到相應(yīng)的系列
                        name: '數(shù)量',
                        data:mapKeyValue
                    }]
                });
            // 使用剛指定的配置項(xiàng)和數(shù)據(jù)顯示圖表。
            }, 'json');
</script>

不同上市地的漲跌幅統(tǒng)計(jì)呻待,可以看到紐約證券交易所的漲跌幅總量最大打月,但這也和在這里上市的股票最多有關(guān)系,不過(guò)每個(gè)上市地都是跌幅大于漲幅的蚕捉,所以你可以看到股票并不是很容易就可以玩的溜的

不同上市地的漲跌幅統(tǒng)計(jì)

echarts 代碼:

<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta charset="utf-8">
<title>不同上市地區(qū)漲跌幅統(tǒng)計(jì)</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">

<%@include file="../basic/cssjs.jsp"%>

<!-- 引入Jquery包 -->
<script type="text/javascript" src="../js/jquery-2.1.4.min.js"></script>

<!-- 引入Echarts3包 -->
<script type="text/javascript" src="../js/echarts.min.js"></script>

</head>

<body>

    <%@include file="../basic/top.jsp"%>

    <!-- content start -->
    <div class="container-fluid">
        <div class="row-fluid">

            <div class="span3">
                <!-- left navbar start -->
                <%@include file="../basic/left.jsp"%>
                <!-- left navbar end -->
            </div>

            <!-- right content start -->
            <div class="span9">
                <div class="session well">折線圖適合用來(lái)展現(xiàn)某個(gè)項(xiàng)目的發(fā)展趨勢(shì)奏篙,或展現(xiàn)并比較多個(gè)項(xiàng)目的發(fā)展趨勢(shì)。</div>
                <div class="session">
                    <div id="main" style="width: 100%; height: 600px;"></div>
                </div>

            </div>
            <!-- right content end -->
        </div>
    </div>
    <!-- content end -->
</body>
</html>
<script type="text/javascript">
    var data = [];
    var markLineData = [];
    for ( var i = 1; i < data.length; i++) {
        markLineData.push([ {
            xAxis : i - 1,
            yAxis : data[i - 1],
            value : (data[i] + data[i - 1]).toFixed(2)
        }, {
            xAxis : i,
            yAxis : data[i]
        } ]);
    }
    option = {
        title : {
            text : '不同上市地漲跌幅統(tǒng)計(jì)',
            subtext : '單位:%'
        },
        tooltip : {
            trigger : 'axis'
        },
        toolbox: {
            show : true,
            feature : {
                mark : {show: true},
                dataView : {show: true, readOnly: false},
                magicType : {show: true, type: ['bar']},
                restore : {show: true},
                saveAsImage : {show: true}
            }
        },
        xAxis : {
            name: '上市地',
            data : []
        },
        yAxis : {},
        series : [ {
            type : 'line',
            data : data,
            markPoint : {
                data : [ {
                    type : 'max',
                    name : '最大值'
                }, {
                    type : 'min',
                    name : '最小值'
                } ]
            },
            markLine : {
                smooth : true,
                effect : {
                    show : true
                },
                distance : 10,
                label : {
                    normal : {
                        position : 'middle'
                    }
                },
                symbol : [ 'none', 'none' ],
                data : markLineData
            }
        } ]
    };
    var myChart = echarts.init(document.getElementById('main'));
    myChart.setOption(option);
    var mapOnlyKey = [];
    var mapKeyValue = [];
    var mapOnlyValue = [];
    var info = {
        "opt" : "marketSumchg"
    };
    $.post("../ServletStock", info, function(data) {
        mapOnlyKey.length = 0;
        mapKeyValue.length = 0;
        mapOnlyValue.length = 0;
        for ( var i = 0; i < data.length; i++) {
            mapOnlyKey.push(data[i].market);
            mapKeyValue.push({
                "value" : Math.round(data[i].sumchg),
                "name" : data[i].market
            });
            mapOnlyValue.push(data[i].sumchg);
        }
        myChart.setOption({
            legend : {
                data : mapOnlyKey
            },
            xAxis : [ {
                data : mapOnlyKey
            } ],
            series : [ {
                name : '百分比',
                data : mapKeyValue
            } ]
        });
    }, 'json');
</script>

最受歡迎的十大股票板塊迫淹,可以看到股權(quán)是排在第一的秘通,排在第二的是銀行,軟件排在第五

最受歡迎的十大股票板塊

echarts 代碼:

<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta charset="utf-8">
<title>最受歡迎的十大股票板塊</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">

<%@include file="../basic/cssjs.jsp"%>

<!-- 引入Jquery包 -->
<script type="text/javascript" src="../js/jquery-2.1.4.min.js"></script>

<!-- 引入Echarts3包 -->
<script type="text/javascript" src="../js/echarts.min.js"></script>

</head>

<body>

    <%@include file="../basic/top.jsp"%>

    <!-- content start -->
    <div class="container-fluid">
        <div class="row-fluid">

            <div class="span3">
                <!-- left navbar start -->
                <%@include file="../basic/left.jsp"%>
                <!-- left navbar end -->
            </div>

            <!-- right content start -->
            <div class="span9">
                <div class="session well">柱狀圖以坐標(biāo)軸上的長(zhǎng)方形元素作為變量敛熬,以此來(lái)達(dá)到展現(xiàn)并比較數(shù)據(jù)情況的目的</div>
                <div class="session">
                    <div id="main" style="width: 100%; height: 600px;"></div>
                </div>

            </div>
            <!-- right content end -->
        </div>
    </div>
    <!-- content end -->
</body>
</html>
<script type="text/javascript">
        // 基于準(zhǔn)備好的dom肺稀,初始化echarts實(shí)例
        var myChart = echarts.init(document.getElementById('main'));
        // 指定圖表的配置項(xiàng)和數(shù)據(jù)
        myChart.setOption({
            title: {
                text: '最受歡迎的十大股票板塊'
            },
            tooltip: {
                show: true
            },
            legend: {
                data:[]
            },
            xAxis : [
                {
                    name: '行業(yè)板塊',
                    type : 'category',
                    data : []
                }
            ],
            yAxis : [
                {
                    name: '市值',
                    type : 'value'
                }
            ],
            series : [
                {
                    name:'數(shù)量',
                    type:'bar',
                    data: []
                }
            ]
        });
        
        // 異步加載數(shù)據(jù)
        var mapOnlyKey = [];
        var mapKeyValue = [];
        var mapOnlyValue = [];
        var info = {"opt": "categoryCount"};
        $.post("../ServletStock", info, function(data){
            mapOnlyKey.length=0;
            mapKeyValue.length=0;
            mapOnlyValue.length=0;
            for(var i=0; i < data.length; i++){
                mapOnlyKey.push( data[i].category);
                mapKeyValue.push({"value":Math.round(data[i].count), "name": data[i].category});
                mapOnlyValue.push( data[i].count );
            }
            console.log(mapOnlyKey);
            console.log(mapKeyValue);
            console.log(mapOnlyValue);
            
            // 填入數(shù)據(jù)
            myChart.setOption({
                legend: {
                    //類(lèi)別
                    data: mapOnlyKey
                },
                xAxis : [
                    {
                        data : mapOnlyKey
                    }
                ],
                series: [{
                    // 根據(jù)名字對(duì)應(yīng)到相應(yīng)的系列
                    name: '數(shù)量',
                    data: mapKeyValue
                }]
            });
        // 使用剛指定的配置項(xiàng)和數(shù)據(jù)顯示圖表。
        }, 'json');
        
    </script>

市值最高的十支股票应民,可以看到蘋(píng)果公司以超過(guò) 8000 億的數(shù)量領(lǐng)先话原,有個(gè)股票的名字 “HSBC Holdings, plc. Perpetual Sub Cap Secs” 比較長(zhǎng),把別的股票名字都蓋住了诲锹,阿里巴巴排在第十

市值最高的十支股票

成交量最高的十支股票


成交量最高的十支股票

到這里離線分析的流程就告一段落了繁仁,hadoop 適用于對(duì)時(shí)延要求不高的離線處理,而當(dāng)我們需要實(shí)時(shí)處理的時(shí)候归园,就需要用到 Storm 或者 Spark Streaming 了

實(shí)時(shí)流程

再放一遍前面的實(shí)時(shí)處理流程圖


實(shí)時(shí)處理

這次我們要實(shí)時(shí)爬取各只股票的最高價(jià)黄虱,然后以 echarts 動(dòng)態(tài)圖的形式展現(xiàn)出來(lái),所以這次需要修改一下爬蟲(chóng)

爬取最高價(jià)

首先我們定義一個(gè)實(shí)體類(lèi)庸诱,來(lái)封裝各只股票的名稱與最高價(jià)

CnameHigh.java:

package my.webmagic;

public class CnameHigh {
    private String cname;
    private Float high;
    public CnameHigh() {
        super();
    }
    public CnameHigh(String cname, Float high) {
        super();
        this.cname = cname;
        this.high = high;
    }
    public String getCname() {
        return cname;
    }
    public void setCname(String cname) {
        this.cname = cname;
    }
    public Float getHigh() {
        return high;
    }
    public void setHigh(Float high) {
        this.high = high;
    }
    @Override
    public String toString() {
        return cname + ":" + high;
    }
    
}

這里我們?cè)O(shè)置了爬蟲(chóng)每爬一次就睡 1500 毫秒捻浦,比之前的爬蟲(chóng)少睡了 500 毫秒,因?yàn)槲以O(shè)置的 echarts 動(dòng)態(tài)圖是每 2000 毫秒刷新一次桥爽,所以至少要保證數(shù)據(jù)更新的速度要比顯示的速度快默勾。

    private Site site = Site.me().setDomain("stock.finance.sina.com.cn").setSleepTime(1500)
            .setUserAgent("Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:39.0) Gecko/20100101 Firefox/39.0");

在 process 方法中修改爬蟲(chóng)的邏輯,這次我們不需要全部的 json 數(shù)據(jù)聚谁,只需要其中的 cname(股票名稱) 和 high(最高價(jià)) 字段,所以需要在爬蟲(chóng)代碼中解析 json 數(shù)據(jù)滞诺,將這兩個(gè)字段取出來(lái)

public void process(Page page) {
//      page.putField("sixty", regexJson(page.getJson().toString()));
        String stockJson = regexJson(page.getJson().toString());
        if (!stockJson.contains("data") || !stockJson.contains("high")) {
            return;
        }
        JSONObject myjson = JSON.parseObject(stockJson);
        
        JSONArray dataArray = myjson.getJSONArray("data");
        
        ArrayList<CnameHigh> highList = new ArrayList<>();
        
        String cname;
        Float high;
        for (int i = 0; i < dataArray.size(); i++) {
            JSONObject jsonObject = dataArray.getJSONObject(i);
            cname = jsonObject.getString("cname");
            high = jsonObject.getFloatValue("high");
            CnameHigh cnameHigh = new CnameHigh(cname, high);
            highList.add(cnameHigh);
        }
        page.putField("high", highList);
}

在 main 方法里我們修改了 url 里的 num 參數(shù)形导,由原來(lái)的每次爬 60 條數(shù)據(jù)改成每次爬 20 條數(shù)據(jù),這是為了讓爬蟲(chóng)跑的時(shí)間長(zhǎng)一點(diǎn)习霹,以使動(dòng)態(tài)圖可以顯示的久一點(diǎn)朵耕。此外,我們還修改了輸出目錄淋叶,然后我們將原來(lái)的 JsonFilePipeline 改成了 FilePipeline 阎曹,這樣可以減少 Spark Streaming 中的實(shí)時(shí)處理代碼量

public static void main(String[] args) {
        String url_init = "http://stock.finance.sina.com.cn/usstock/api/jsonp.php/" + 
    "/US_CategoryService.getList?num=20&sort=&asc=0&page=1";
        String url_pattern = "http://stock.finance.sina.com.cn/usstock/api/jsonp.php/" + 
            "/US_CategoryService.getList?num=20&sort=&asc=0&page=";
        String output = "/data/edu6/tmp/";
        QueueScheduler scheduler = new QueueScheduler();
        Spider spider = Spider.create(new GetStockHigh())
                .addUrl(url_init)
                .setScheduler(scheduler)
                .addPipeline(new FilePipeline(output))
                .addPipeline(new ConsolePipeline());
        for (int i = 1; i < 418; i++) {
            Request request = new Request();
            request.setUrl(url_pattern + i);
            scheduler.push(request, spider);
        }
        spider.thread(1).run();
}

先讓爬蟲(chóng)跑一下看看得到的數(shù)據(jù)是什么樣子

爬蟲(chóng)下載到的包含股票名稱和最高價(jià)的文件

可以看到我們爬到的文件最前面兩行不是我們需要的,所以我們需要在 Spark Streaming 中過(guò)濾掉,然后將剩下的股票名稱和最高價(jià)存到數(shù)據(jù)庫(kù)里面处嫌,同時(shí)用 echarts 實(shí)時(shí)展示

為了過(guò)濾掉前兩行栅贴,我們發(fā)現(xiàn)了一個(gè)規(guī)律,如果以 ":" 將每行文本分割熏迹,第一行被分成三段檐薯,第二行被分成一段,而其余我們需要的數(shù)據(jù)都被分成了兩段注暗。以這個(gè)規(guī)律坛缕,在 Spark Streaming 中編寫(xiě)代碼來(lái)過(guò)濾出我們需要的數(shù)據(jù)。

編寫(xiě) Spark Streaming

在 Spark Streaming 中我們要做的工作就是不斷地讀取 kafka 傳送過(guò)來(lái)的數(shù)據(jù)捆昏,過(guò)濾后存到 mysql 里面

首先定義 kafka 的配置

val sparkConf = new SparkConf().setAppName("StockMonitor").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("checkpoint")
val topics = Set("flumesendkafka")
val brokers = "localhost:9092"
val zkQuorum = "localhost:2181"
val kafkaParams = Map[String, String](
        "metadata.broker.list" -> brokers,
        "serializer.class" -> "kafka.serializer.StringEncoder")

然后定義數(shù)據(jù)庫(kù)的連接配置

val db_host = "localhost"
val db_name = "sina"
val db_user = "root"
val db_passwd = "strongs"
val db_connection_str = "jdbc:mysql://" + db_host + ":3306/" + db_name + "?user=" + db_user + "&password=" + db_passwd

然后定義 Dstream

val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

最后對(duì) Dstream 執(zhí)行計(jì)算赚楚,這里注意一點(diǎn),我們是對(duì)每個(gè) RDD 分區(qū)建立一個(gè)數(shù)據(jù)庫(kù)連接骗卜。因?yàn)槿绻麑?duì)每一行建立一個(gè)連接的話宠页,頻繁的新建和關(guān)閉數(shù)據(jù)庫(kù)連接對(duì)系統(tǒng)開(kāi)銷(xiāo)很大,影響實(shí)時(shí)處理的速度膨俐;而對(duì)直接每個(gè) RDD 建立一個(gè)連接的話又會(huì)報(bào)不能序列化的異常勇皇。

dstream.foreachRDD {rdd =>
          rdd.foreachPartition {partitionOfRdd =>
               var conn: Connection = DriverManager.getConnection(db_connection_str)
               
                val lines = partitionOfRdd.filter(_.split(':').length == 2)     //過(guò)濾出我們需要的數(shù)據(jù)
                try {
                    lines.foreach ( line => {
                      var strs = line.mkString.split(':')
                      
                      if (strs.length == 2) {
                        var ps: PreparedStatement = null
                        val sql = "insert into cnamehigh (cname, high) values (?, ?)"
                        val cname = strs(0)
                        val high = strs(1).toFloat
                        println("cname:" + cname + ",high:" + high)
                          
                        ps = conn.prepareStatement(sql)
                        ps.setString(1, cname)
                        ps.setFloat(2, high)
                        ps.execute()
                        if (ps != null) {
                            ps.close()
                        }
                      }
                    })
                } catch {
                    case e: Exception => println("MySQL Exception")// todo: handle error
                } finally {
                    if (conn != null) {
                        conn.close()
                    }
                }
            }
}

完整代碼 StockMonitor.scala:

package my.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.DriverManager

object StockMonitor {
    def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("StockMonitor").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
        ssc.checkpoint("checkpoint")
        val topics = Set("flumesendkafka")
        val brokers = "localhost:9092"
        val zkQuorum = "localhost:2181"
        val kafkaParams = Map[String, String](
            "metadata.broker.list" -> brokers,
            "serializer.class" -> "kafka.serializer.StringEncoder")
        val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
        
        val db_host = "localhost"
        val db_name = "sina"
        val db_user = "root"
        val db_passwd = "strongs"
        val db_connection_str = "jdbc:mysql://" + db_host + ":3306/" + db_name + "?user=" + db_user + "&password=" + db_passwd
        
        dstream.foreachRDD {rdd =>
          rdd.foreachPartition {partitionOfRdd =>
               var conn: Connection = DriverManager.getConnection(db_connection_str)
               
                val lines = partitionOfRdd.filter(_.split(':').length == 2)     //過(guò)濾出我們需要的數(shù)據(jù)
                try {
                    lines.foreach ( line => {
                      var strs = line.mkString.split(':')
                      
                      if (strs.length == 2) {
                        var ps: PreparedStatement = null
                        val sql = "insert into cnamehigh (cname, high) values (?, ?)"
                        val cname = strs(0)
                        val high = strs(1).toFloat
                        println("cname:" + cname + ",high:" + high)
                          
                        ps = conn.prepareStatement(sql)
                        ps.setString(1, cname)
                        ps.setFloat(2, high)
                        ps.execute()
                        if (ps != null) {
                            ps.close()
                        }
                      }
                    })
                } catch {
                    case e: Exception => println("MySQL Exception")// todo: handle error
                } finally {
                    if (conn != null) {
                        conn.close()
                    }
                }
            }
        }
        
        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
    }
}

使用 Kafka 對(duì) SparkStreaming 進(jìn)行測(cè)試

開(kāi)啟 kafka 之前要先開(kāi)啟 zookeeper

/apps/zookeeper/bin/zkServer.sh start

開(kāi)啟 kafka

/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties

然后另開(kāi)一個(gè)終端在 kafka 中新建一個(gè) topic

/apps/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --topic flumesendkafka --partitions 1

查看剛才新建的 topic

/apps/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
剛剛新建的 topic

然后打開(kāi) mysql 切換到 sina 數(shù)據(jù)庫(kù),新建一個(gè)表焚刺,這里我們新增了一個(gè)自增的 id 字段敛摘,因?yàn)樵谧隹梢暬臅r(shí)候,要一直顯示最新的數(shù)據(jù)乳愉,這時(shí)我們就可以按照 id 來(lái)降序查找兄淫,以保證每次查到的數(shù)據(jù)都不同

create table cnamehigh (id int not null auto_increment, cname varchar(100), high float, primary key(id));

查看一下數(shù)據(jù)庫(kù),可以看到里面還沒(méi)有數(shù)據(jù)

select * from cnamehigh;

查看數(shù)據(jù)庫(kù)

我們先用控制臺(tái)來(lái)當(dāng) kafka 的 producer蔓姚,模擬輸入一些數(shù)據(jù)捕虽,看看能不能正確的插入到 mysql 中

運(yùn)行 spark streaming

另開(kāi)一個(gè)終端,新建一個(gè) console producer

/apps/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flumesendkafka

然后輸入一些模擬數(shù)據(jù)

其中只有第一行是正確的數(shù)據(jù)

在 spark streaming 的控制臺(tái)中看到坡脐,只輸出了第一行數(shù)據(jù)

spark streaming 輸出結(jié)果

而且在 mysql 中也可以看到新增進(jìn)來(lái)的數(shù)據(jù)

mysql 中新增的數(shù)據(jù)

好了泄私,因?yàn)閯偛诺谋聿迦肓诵碌臄?shù)據(jù),所以我們把剛才的表刪掉备闲,重新創(chuàng)建一個(gè)相同的表備用

create table cnamehigh1 like cnamehigh;
drop table cnamehigh;
alter table cnamehigh1 rename cnamehigh;

接下來(lái)我們配置 flume

Flume 配置

/data/edu1 目錄下新建一個(gè) flume 配置文件

vim /data/edu1/spooldir_mem_kafka.conf

將下列配置填寫(xiě)進(jìn)去

agent1.sources = src
agent1.channels = ch
agent1.sinks = des

agent1.sources.src.type = spooldir
agent1.sources.src.restart = true
agent1.sources.src.spoolDir = /data/edu6/tmp/stock.finance.sina.com.cn

agent1.channels.ch.type = memory

agent1.sinks.des.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.des.brokerList = localhost:9092
agent1.sinks.des.topic = flumesendkafka
agent1.sinks.des.batchSize = 1
agent1.sinks.des.requiredAcks = 1

agent1.sources.src.channels = ch
agent1.sinks.des.channel = ch

這里我們?cè)O(shè)置 flume 監(jiān)控的目錄為新爬蟲(chóng)的輸出目錄
然后設(shè)置 batchSize = 1 是為了讓數(shù)據(jù)庫(kù)更新的及時(shí)一點(diǎn)晌端,以便我們可以觀測(cè)到動(dòng)態(tài)圖的變化

最后一點(diǎn)工作就是 echarts 動(dòng)態(tài)圖的完成

echarts 動(dòng)態(tài)圖

可視化的部分已經(jīng)花了很大的篇幅講過(guò)了,這里就不啰嗦了恬砂,直接貼上 echarts 動(dòng)態(tài)圖的代碼

<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
    <meta charset="utf-8">
    <title>股票最高價(jià)實(shí)時(shí)統(tǒng)計(jì)</title>
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    
    <%@include file="../basic/cssjs.jsp" %>
    
     <!-- 引入Jquery包 -->
    <script type="text/javascript" src="../js/jquery-2.1.4.min.js"></script>
    
    <!-- 引入Echarts3包 -->
    <script type="text/javascript" src="../js/echarts.min.js"></script>

</head>

<body>

    <%@include file="../basic/top.jsp" %>

    <!-- content start -->
    <div class="container-fluid">
        <div class="row-fluid">
        
            
            
            <!-- right content start -->
            <div class="span12">
                <div class="session">
                    <div id="main" style="width: 100%;height:600px;"></div>
                </div>
            </div>
            <!-- right content end -->
        </div>
    </div>
    <!-- content end -->
</body>
</html>
<script type="text/javascript">

    var data = [];
    var markLineData = [];
    for ( var i = 1; i < data.length; i++) {
        markLineData.push([ {
            xAxis : i - 1,
            yAxis : data[i - 1],
            value : (data[i] + data[i - 1]).toFixed(2)
        }, {
            xAxis : i,
            yAxis : data[i]
        } ]);
    }
    option = {
        title : {
            text : '股票最高價(jià)實(shí)時(shí)統(tǒng)計(jì)',
        },
        tooltip : {
            trigger : 'axis'
        },
        xAxis : {
            name: '股票名稱',
            data : (function (){
                var now = new Date();
                var res = [];
                var len = 10;
                while (len--) {
                    res.push("");
                }
                return res;
            })()
        },
        yAxis: [
                {
                    type: 'value',
                    scale: true,
                    name: '最高價(jià)',
                    max: 500,
                    min: 0,
                    boundaryGap: [0.2, 0.2]
                }
            ],
        dataZoom: [
                   {
                       id: 'dataZoomX',
                       type: 'slider',
                       xAxisIndex: [0],
                       filterMode: 'filter'
                   },
               ],
        series : [ {
            type : 'line',
            data : (function (){
                var res = [];
                var len = 0;
                while (len < 10) {
                    res.push(0);
                    len++;
                }
                return res;
            })(),
            markPoint : {
                data : [ {
                    type : 'max',
                    name : '最大值'
                }, {
                    type : 'min',
                    name : '最小值'
                } ]
            },
            markLine : {
                smooth : true,
                effect : {
                    show : true
                },
                distance : 10,
                label : {
                    normal : {
                        position : 'middle'
                    }
                },
                symbol : [ 'none', 'none' ],
                data : markLineData
            }
        } ]
    };
    setInterval(function () {//實(shí)現(xiàn)定時(shí)訪問(wèn)數(shù)據(jù)庫(kù)添加地方1
    var myChart = echarts.init(document.getElementById('main'));
    myChart.setOption(option);
    var mapOnlyKey = [];
    var mapKeyValue = [];
    var mapOnlyValue = [];
    var info = {
        "opt" : "cnameHigh"
    };
    $.post("../ServletStock", info, function(data) {
        mapOnlyKey.length = 0;
        mapKeyValue.length = 0;
        mapOnlyValue.length = 0;
        
        
        for ( var i = 0; i < data.length; i++) {
            mapOnlyKey.push(data[i].cname);
            mapKeyValue.push({
                "value" : Math.round(data[i].high),
                "name" : data[i].cname
            });
            mapOnlyValue.push(data[i].high);
        }
        
        var data1 = option.series[0].data;
        data1.shift();
        data1.push(mapOnlyValue.shift());
        
        option.xAxis.data.shift();
        option.xAxis.data.push(mapOnlyKey.shift());
        
        myChart.setOption(option);
    
    }, 'json')
    }, 2000);//實(shí)現(xiàn)定時(shí)訪問(wèn)數(shù)據(jù)庫(kù)添加地方2
</script>

不過(guò)有一點(diǎn)需要注意咧纠, dao 層的 sql 我是這么寫(xiě)的,以保證每次查到的數(shù)據(jù)都不同

select cname,high from cnamehigh order by id desc limit 1;

但是因?yàn)閿?shù)據(jù)庫(kù)更新的比較快泻骤,所以我們每查一次可能 id 已經(jīng)漲了幾十上百了漆羔,所以嚴(yán)格來(lái)說(shuō)這也不太算實(shí)時(shí)梧奢,不過(guò)道理還是一樣的

所有準(zhǔn)備工作都做完后,最后就是讓工程跑起來(lái)演痒,終于到了激動(dòng)人心的時(shí)刻亲轨!

工程執(zhí)行的順序依次為:開(kāi)啟可視化=>開(kāi)啟kafka=>開(kāi)啟spark streaming=>開(kāi)啟flume=>開(kāi)啟爬蟲(chóng)程序

開(kāi)啟可視化

開(kāi)啟可視化

可以看到動(dòng)態(tài)圖還沒(méi)有變化

動(dòng)態(tài)圖

開(kāi)啟 kafka

/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties

開(kāi)啟 spark streaming

開(kāi)啟 spark streaming

開(kāi)啟 flume

flume-ng agent -c /data/edu1/ -f /data/edu1/spooldir_mem_kafka.conf -n agent1 -Dflume.root.logger=DEBUG,console

最后開(kāi)啟爬蟲(chóng)

開(kāi)啟爬蟲(chóng)

實(shí)時(shí)的動(dòng)態(tài)圖

股票最高價(jià)實(shí)時(shí)統(tǒng)計(jì).gif

總結(jié)

經(jīng)歷了四個(gè)月的大數(shù)據(jù)學(xué)習(xí),我學(xué)到了很多有趣的東西嫡霞,其中既有對(duì)已有知識(shí)的鞏固瓶埋,也領(lǐng)略到了大數(shù)據(jù)這個(gè)新興行業(yè)的魅力。感謝各位老師的悉心指導(dǎo)诊沪,還有各位小伙伴的互相交流养筒,希望大家一直保持著旺盛的好奇心與求知欲,永遠(yuǎn)年輕端姚。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末晕粪,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子渐裸,更是在濱河造成了極大的恐慌巫湘,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,470評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件昏鹃,死亡現(xiàn)場(chǎng)離奇詭異尚氛,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)洞渤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)阅嘶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人载迄,你說(shuō)我怎么就攤上這事讯柔。” “怎么了护昧?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,577評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵魂迄,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我惋耙,道長(zhǎng)捣炬,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,176評(píng)論 1 292
  • 正文 為了忘掉前任绽榛,我火速辦了婚禮遥金,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蒜田。我一直安慰自己,他們只是感情好选泻,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,189評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布冲粤。 她就那樣靜靜地躺著美莫,像睡著了一般。 火紅的嫁衣襯著肌膚如雪梯捕。 梳的紋絲不亂的頭發(fā)上厢呵,一...
    開(kāi)封第一講書(shū)人閱讀 51,155評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音傀顾,去河邊找鬼襟铭。 笑死,一個(gè)胖子當(dāng)著我的面吹牛短曾,可吹牛的內(nèi)容都是我干的寒砖。 我是一名探鬼主播,決...
    沈念sama閱讀 40,041評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼嫉拐,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼哩都!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起婉徘,我...
    開(kāi)封第一講書(shū)人閱讀 38,903評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤漠嵌,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后盖呼,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體儒鹿,經(jīng)...
    沈念sama閱讀 45,319評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,539評(píng)論 2 332
  • 正文 我和宋清朗相戀三年几晤,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了约炎。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,703評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡锌仅,死狀恐怖章钾,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情热芹,我是刑警寧澤贱傀,帶...
    沈念sama閱讀 35,417評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站伊脓,受9級(jí)特大地震影響府寒,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜报腔,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,013評(píng)論 3 325
  • 文/蒙蒙 一株搔、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧纯蛾,春花似錦纤房、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,664評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)捌刮。三九已至,卻和暖如春舒岸,著一層夾襖步出監(jiān)牢的瞬間绅作,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,818評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工蛾派, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留俄认,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,711評(píng)論 2 368
  • 正文 我出身青樓洪乍,卻偏偏與公主長(zhǎng)得像眯杏,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子典尾,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,601評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容