學(xué)習(xí)大數(shù)據(jù)技術(shù)有一段時(shí)間了,之前也寫(xiě)過(guò)一些零零散散的博客排苍,作為自己學(xué)習(xí)的一些記錄,不過(guò)每篇博客都只是涵蓋部分技術(shù)学密。這次想寫(xiě)一篇比較完整的博客淘衙,記錄一個(gè)完整的項(xiàng)目從頭到尾生產(chǎn)的過(guò)程,也是對(duì)自己學(xué)習(xí)的一個(gè)總結(jié)
廢話不多說(shuō)则果,直入正題
這次的項(xiàng)目涉及了兩條流程
一條是離線處理幔翰。爬蟲(chóng)爬到股票數(shù)據(jù)后盟劫,先交給 Map Reduce 清洗一下周拐,生成格式化的數(shù)據(jù),然后倒入 hive 進(jìn)行分析帝洪,之后交給 sqoop 導(dǎo)出至 mysql 并用 echarts 可視化展現(xiàn)
另一條是實(shí)時(shí)處理款青。爬蟲(chóng)一直爬取數(shù)據(jù)做修,flume 監(jiān)控爬蟲(chóng)爬下來(lái)的文件所在目錄,并不斷傳送給 kafka,spark streaming 會(huì)定期從 kafka 那里拿到數(shù)據(jù)饰及,實(shí)時(shí)分析并將數(shù)據(jù)保存到 mysql蔗坯,最后可視化。
離線流程
網(wǎng)頁(yè)結(jié)構(gòu)分析
本次爬取 新浪財(cái)經(jīng)美股實(shí)時(shí)行情燎含,頁(yè)面長(zhǎng)這樣
F12宾濒,打開(kāi)開(kāi)發(fā)者工具,選擇 network 面板屏箍,F(xiàn)5 刷新頁(yè)面绘梦,找到股票的 json 數(shù)據(jù)的 api 接口。
不同的網(wǎng)站尋找 api 接口的方式不太一樣赴魁,給大家一個(gè)小訣竅卸奉,一般的接口都是 xhr 或 script 類(lèi)型,而且它的 url 后面一般都會(huì)跟著一個(gè) page 參數(shù)颖御,代表著這是第幾頁(yè)
雙擊 url 之后來(lái)到了一個(gè)新的頁(yè)面
這里可以看到返回的數(shù)據(jù)不是標(biāo)準(zhǔn)的 json 格式榄棵,前面跟著一串
IO.XSRV2.CallbackList['QGNtUNkM_FleaeT1']
,而且我們也可以在 url 里面看到這一串字符潘拱,現(xiàn)在在 url 里他刪掉疹鳄,結(jié)果就變成了下面這樣子。
現(xiàn)在數(shù)據(jù)的格式基本標(biāo)準(zhǔn)了芦岂,只不過(guò)最前面多了兩對(duì)小括號(hào)尚辑,我們?cè)跁?huì)在爬蟲(chóng)程序里面去掉它。根據(jù)上面拿到的的 url 盔腔,開(kāi)始編寫(xiě)我們的爬蟲(chóng)。
爬取數(shù)據(jù)
爬蟲(chóng)程序我寫(xiě)了兩種方案月褥,一種是用 python 語(yǔ)言寫(xiě)的弛随;還有一種是使用 java 語(yǔ)言實(shí)現(xiàn)的 webmagic 框架寫(xiě)的,由于篇幅問(wèn)題宁赤,python 的方案就不在這篇博客里面采用了舀透,以后可能會(huì)單開(kāi)一篇博客介紹 python 版的股票爬蟲(chóng)。
WebMagic 是一個(gè)國(guó)人寫(xiě)的簡(jiǎn)單靈活的Java爬蟲(chóng)框架决左。
要使用 webmagic 愕够,首先下載它的依賴包 webmagic-0.7.3-all.tar.gz
在 eclipse 里面新建一個(gè) Java Project,在工程根目錄下新建一個(gè)文件夾佛猛,將依賴包解壓至文件夾中惑芭,全選之后添加到 Build Path
然后就可以寫(xiě)爬蟲(chóng)代碼了
private Site site = Site.me().setDomain("stock.finance.sina.com.cn").setSleepTime(2000)
.setUserAgent("Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:39.0) Gecko/20100101 Firefox/39.0");
首先構(gòu)建要請(qǐng)求的 Site ,這里注意一點(diǎn)继找,代碼最上面的 setDomain("stock.finance.sina.com.cn") 里面的這個(gè) "stock.finance.sina.com.cn" 就是下圖中的 Host遂跟,同時(shí)也是爬蟲(chóng)程序下載的網(wǎng)頁(yè)所存放的目錄
然后編寫(xiě)一個(gè)方法,過(guò)濾掉前面提到的 json 數(shù)據(jù)外邊的兩對(duì)小括號(hào)
public String regexJson(String source) {
//用于去掉包裹 json 數(shù)據(jù)的兩對(duì)小括號(hào)
String regex = "\\{.*\\}";
String result;
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(source);
if (matcher.find()) {
result = matcher.group(0);
} else {
result = null;
}
return result;
}
在 process 方法中編寫(xiě)爬蟲(chóng)邏輯
@Override
public void process(Page page) {
// TODO Auto-generated method stub
page.putField("sixty", regexJson(page.getJson().toString()));
}
主方法
public static void main(String[] args) {
String url_init = "http://stock.finance.sina.com.cn/usstock/api/jsonp.php/" +
"/US_CategoryService.getList?num=60&sort=&asc=0&page=1";
String url_pattern = "http://stock.finance.sina.com.cn/usstock/api/jsonp.php/" +
"/US_CategoryService.getList?num=60&sort=&asc=0&page=";
String output = "/data/edu1/tmp/";
QueueScheduler scheduler = new QueueScheduler();
Spider spider = Spider.create(new GetStock())
.addUrl(url_init)
.setScheduler(scheduler)
.addPipeline(new JsonFilePipeline(output))
.addPipeline(new ConsolePipeline());
for (int i = 1; i < 140; i++) {
Request request = new Request();
request.setUrl(url_pattern + i);
scheduler.push(request, spider);
}
spider.thread(50).run();
}
爬蟲(chóng)運(yùn)行結(jié)束后,會(huì)在 /data/edu1/tmp/stock.finance.sina.com.cn
下面生成許多 json 文件幻锁,查看某一個(gè)文件凯亮,可以看到里面的 json 字符串。
接下來(lái)我們把這些文件上傳到 hdfs 上面哄尔,然后開(kāi)始編寫(xiě) MapReduce 程序清洗臟數(shù)據(jù)
hadoop fs -put /data/edu1/tmp/stock.finance.sina.com.cn/* /mystock/in
MapReduce清洗
數(shù)據(jù)清洗從名字上也看的出就是把“臟”的數(shù)據(jù)“洗掉”假消,指發(fā)現(xiàn)并糾正數(shù)據(jù)文件中可識(shí)別的錯(cuò)誤的最后一道程序,包括檢查數(shù)據(jù)一致性岭接,處理無(wú)效值和缺失值等富拗。
這里我們把 json 格式的數(shù)據(jù)最終洗成可以直接導(dǎo)入 hive 的以 '\t' 為分隔符文本格式。而且 json 數(shù)據(jù)中有的字段會(huì)有缺失的現(xiàn)象出現(xiàn)亿傅,所以我們還要填補(bǔ)空值媒峡,保持?jǐn)?shù)據(jù)的一致性
這里我們用到了阿里的 fastjson 庫(kù)來(lái)解析 json
map 代碼:
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String initJsonString = value.toString();
JSONObject initJson = JSONObject.parseObject(initJsonString);
if (!initJsonString.contains("data")) {
return;
}
JSONObject myjson = initJson.getJSONObject("sixty");
JSONArray data = myjson.getJSONArray("data");
for (int i = 0; i < data.size(); i++) {
JSONObject stock = data.getJSONObject(i);
String name = stock.getString("name").trim().equals("")?"null":stock.getString("name").trim().replaceAll("\r|\n|\t", "");
String cname = stock.getString("cname").trim().equals("")?"null":stock.getString("cname").trim().replaceAll("\r|\n|\t", "");
String category = stock.getString("category");
if (category == null || category.equals("")) {
category = "null";
} else {
category = category.toString().trim().replaceAll("\r|\n|\t", "");
}
System.out.println(category);
String symbol = stock.getString("symbol").trim().equals("")?"null":stock.getString("symbol").trim().replaceAll("\r|\n|\t", "");
String price = stock.getString("price").trim().equals("")?"null":stock.getString("price").trim().replaceAll("\r|\n|\t", "");
String diff = stock.getString("diff").trim().equals("")?"null":stock.getString("diff").trim().replaceAll("\r|\n|\t", "");
String chg = stock.getString("chg").trim().equals("")?"null":stock.getString("chg").trim().replaceAll("\r|\n|\t", "");
String preclose = stock.getString("preclose").equals("")?"null":stock.getString("preclose").trim().replaceAll("\r|\n|\t", "");
String open = stock.getString("open").trim().equals("")?"null":stock.getString("open").trim().replaceAll("\r|\n|\t", "");
String high = stock.getString("high").trim().equals("")?"null":stock.getString("high").trim().replaceAll("\r|\n|\t", "");
String low = stock.getString("low").trim().equals("")?"null":stock.getString("low").trim().replaceAll("\r|\n|\t", "");
String amplitude = stock.getString("amplitude").trim().equals("")?"null":stock.getString("amplitude").trim().replaceAll("\r|\n|\t", "");
String volume = stock.getString("volume").trim().equals("")?"null":stock.getString("volume").trim().replaceAll("\r|\n|\t", "");
String mktcap = stock.getString("mktcap").trim().equals("")?"null":stock.getString("mktcap").trim().replaceAll("\r|\n|\t", "");
String pe = stock.getString("pe");
if (pe == null || pe.equals("")) {
pe = "null";
} else {
pe = pe.trim().replaceAll("\r|\n|\t", "");
}
String market = stock.getString("market").trim().equals("")?"null":stock.getString("market").trim().replaceAll("\r|\n|\t", "");
String category_id = stock.getString("category_id");
if (category_id == null || category_id.equals("")) {
category_id = "null";
} else {
category_id = category_id.toString().trim().replaceAll("\r|\n|\t", "");
}
StringBuffer sb = new StringBuffer();
sb.append(name); sb.append("\t");
sb.append(cname); sb.append("\t");
sb.append(category); sb.append("\t");
sb.append(symbol); sb.append("\t");
sb.append(price); sb.append("\t");
sb.append(diff); sb.append("\t");
sb.append(chg); sb.append("\t");
sb.append(preclose); sb.append("\t");
sb.append(open); sb.append("\t");
sb.append(high); sb.append("\t");
sb.append(low); sb.append("\t");
sb.append(amplitude); sb.append("\t");
sb.append(volume); sb.append("\t");
sb.append(mktcap); sb.append("\t");
sb.append(pe); sb.append("\t");
sb.append(market); sb.append("\t");
sb.append(category_id);
String result = sb.toString();
context.write(new Text(result), new Text());
}
}
這里解釋一下每個(gè)字段的含義
字段 | 含義 |
---|---|
name | # 英文名稱 |
cname | # 中文名稱 |
category | # 行業(yè)板塊 |
symbol | # 代碼 |
price | # 最新價(jià) |
diff | # 漲跌額 |
chg | # 漲跌幅 |
preclose | # 昨收 |
open | # 今開(kāi)盤(pán) |
high | # 最高價(jià) |
low | # 最低價(jià) |
amplitude | # 振幅 |
volume | # 成交量 |
mktcap | # 市值(億) |
pe | # 市盈率 |
market | # 上市地 |
category_id | # 板塊ID |
main 方法:
public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setJobName("QingXiStock");
job.setJarByClass(QingXiStock.class);
job.setMapperClass(doMapper.class);
//job.setReducerClass(doReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path in = new Path("hdfs://localhost:9000//mystock/in");
Path out = new Path("hdfs://localhost:9000//mystock/out");
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
清洗結(jié)束后,會(huì)在 hdfs 的 /mystock/out
目錄下生成新文件
導(dǎo)入 Hive 葵擎,進(jìn)行分析
數(shù)據(jù)已經(jīng)沒(méi)問(wèn)題了谅阿,接下來(lái)可以直接導(dǎo)入 hive 來(lái)進(jìn)行分析
首先進(jìn)入 hive ,新建一個(gè)數(shù)據(jù)庫(kù)
create database sina;
然后新建一個(gè)外部表 stock 酬滤,之所以新建一個(gè)外部表签餐,是因?yàn)橥獠勘聿粫?huì)移動(dòng)數(shù)據(jù),它只是存放元數(shù)據(jù)盯串,當(dāng)外部表刪除后氯檐,只是刪除了元數(shù)據(jù),而數(shù)據(jù)不會(huì)被刪掉体捏,所以相對(duì)更安全
create external table if not exists stock (
> name string,
> cname string,
> category string,
> symbol string,
> price float,
> diff float,
> chg float,
> preclose float,
> open float,
> high float,
> low float,
> amplitude string,
> volume bigint,
> mktcap bigint,
> pe float,
> market string,
> category_id int
> ) row format delimited
> fields terminated by '\t'
> lines terminated by '\n'
> stored as textfile
> location '/mystock/out/';
接下來(lái)查看一下前十條數(shù)據(jù)
select * form stock limit 10;
查看一下各個(gè)板塊包含的股票數(shù)量
select category,count(category) as num from stock group by category order by num desc;
查看市值最高的十支股票
select cname,mktcap from stock order by mktcap desc limit 10;
查看各個(gè)上市地區(qū)的股票數(shù)量
select market,count(market) as num from stock group by market order by num desc;
查看成交量最高的十支股票
select cname,symbol,volume from stock order by volume desc limit 10;
查看市盈率最高的十支股票
select cname,pe,symbol from stock order by pe desc limit 10;
漲跌幅最高的十支股票
select cname,symbol,chg from stock order by chg desc limit 10;
查看數(shù)據(jù)的總量一共有多少
select count(*) from stock;
Sqoop 導(dǎo)出至 MySQL
hive 分析完之后冠摄,接下來(lái)使用 sqoop 將 hive 中的數(shù)據(jù)導(dǎo)出到 mysql ,因?yàn)槲覀兊臄?shù)據(jù)量只有 8000 多條几缭,所以這里直接導(dǎo)出整個(gè)表河泳。
首先在 mysql 里面新建一個(gè)數(shù)據(jù)庫(kù) sina
create database if not exists sina default charset utf8 collate utf8_general_ci;
進(jìn)入數(shù)據(jù)庫(kù)后新建一個(gè)表
create table if not exists stock (
-> name varchar(100),
-> cname varchar(100),
-> category varchar(100),
-> symbol varchar(50),
-> price float,
-> diff float,
-> chg float,
-> preclose float,
-> open float,
-> high float,
-> low float,
-> amplitude varchar(50),
-> volume bigint,
-> mktcap bigint,
-> pe float,
-> market varchar(10),
-> category_id int(10)
-> ) default charset=utf8;
然后執(zhí)行 sqoop 導(dǎo)出命令
sqoop export \
> --connect jdbc:mysql://localhost:3306/sina?characterEncoding=UTF-8 \
> --username root \
> --password strongs \
> --table stock \
> --export-dir /user/hive/warehouse/sina.db/stock/part-r-00000 \
> --input-fields-terminated-by '\t'
導(dǎo)出過(guò)程執(zhí)行完畢后,查看一下 mysql 中的數(shù)據(jù)
select * from stock limit 10;
查看一下數(shù)據(jù)總量年栓,可以看到和 hive 中的一樣拆挥,是 8141 條
數(shù)據(jù)可視化
可視化過(guò)程我之前寫(xiě)過(guò)一篇博客 利用ECharts可視化mysql數(shù)據(jù)庫(kù)中的數(shù)據(jù) 和這次的道理差不多,只不過(guò)這次的 echarts 用到了 ajax 動(dòng)態(tài)加載某抓,而且所有的請(qǐng)求都?xì)w到了一個(gè) servlet 纸兔,所以這里只貼一下 echarts 和 servlet 的代碼
工程目錄
ServletBase.java 是一個(gè) servlet 抽象類(lèi),我們的 servlet 需要繼承它否副,然后實(shí)現(xiàn)里面的方法
package my.servlet;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.lang.reflect.Method;
/**
* Created by teaGod on 2017/12/7.
*/
@WebServlet(name = "ServletBase")
public abstract class ServletBase extends HttpServlet {
private static final long serialVersionUID = 1L;
@Override
public void service(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
String methodName = request.getParameter("cmd");
if(methodName==null || methodName.trim().equals("")){
methodName="execute";
}
try{
Method method = this.getClass()
.getMethod(methodName,
HttpServletRequest.class,
HttpServletResponse.class);
method.invoke(this,request,response);
}catch(Exception e){
throw new RuntimeException(e);
}
}
public abstract void execute(HttpServletRequest request, HttpServletResponse response)
throws Exception;
}
ServletStock.java 根據(jù)各個(gè) jsp 傳過(guò)來(lái)的 opt 參數(shù)來(lái)確定執(zhí)行哪些邏輯
package my.servlet;
import my.entity.*;
import my.manager.StockManager;
import net.sf.json.JSONArray;
import org.apache.log4j.Logger;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
/**
* Created by teaGod on 2017/12/7.
*/
@WebServlet(name = "ServletStock", value = "/ServletStock")
public class ServletStock extends ServletBase {
private static final long serialVersionUID = 1L;
public static Logger logger = Logger.getLogger(ServletStock.class);
@Override
public void execute(HttpServletRequest request, HttpServletResponse response) throws Exception {
response.setContentType("text/html;charset=utf-8");
String opt = request.getParameter("opt");
StockManager stockManager = new StockManager();
if (opt.equals("marketCount")) {
List<MarketCount> list;
list = stockManager.getMarketCountList();
writeJson(list, response);
} else if (opt.equals("categoryCount")) {
List<CategoryCount> list;
list = stockManager.getCategoryCountList();
writeJson(list, response);
} else if (opt.equals("cnameMktnum")) {
List<CnameMktnum> list;
list = stockManager.getCnameMktnumList();
writeJson(list, response);
} else if (opt.equals("marketSumchg")) {
List<MarketSumchg> list;
list = stockManager.getMarketSumchgList();
writeJson(list, response);
} else if (opt.equals("cnameVolume")) {
List<CnameVolume> list;
list = stockManager.getCnameVolumeList();
writeJson(list, response);
} else if (opt.equals("cnameHigh")) {
List<CnameHigh> list;
list = stockManager.getCnameHighList();
writeJson(list, response);
}
}
private void writeJson(List list, HttpServletResponse response) throws IOException {
JSONArray jsonArray = JSONArray.fromObject(list);
//System.err.println(jsonArray);
PrintWriter out = response.getWriter();
out.print(jsonArray);
out.flush();
out.close();
}
}
不同的上市地所占比例汉矿,可以看到選擇紐約證券交易所上市的股票已經(jīng)占了一半以上
echarts 代碼
<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta charset="utf-8">
<title>不同上市地所占比例</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<%@include file="../basic/cssjs.jsp"%>
<!-- 引入Jquery包 -->
<script type="text/javascript" src="../js/jquery-2.1.4.min.js"></script>
<!-- 引入Echarts3包 -->
<script type="text/javascript" src="../js/echarts.min.js"></script>
</head>
<body>
<%@include file="../basic/top.jsp"%>
<!-- content start -->
<div class="container-fluid">
<div class="row-fluid">
<div class="span3">
<!-- left navbar start -->
<%@include file="../basic/left.jsp"%>
<!-- left navbar end -->
</div>
<!-- right content start -->
<div class="span9">
<div class="session well">餅狀圖可用來(lái)展現(xiàn)相對(duì)簡(jiǎn)單的比例構(gòu)成關(guān)系,讓觀者能從中熟悉某個(gè)項(xiàng)目與整個(gè)數(shù)據(jù)組間所存在的比例關(guān)系副编。</div>
<div class="session">
<div id="main" style="width: 100%; height: 600px;"></div>
</div>
</div>
<!-- right content end -->
</div>
</div>
<!-- content end -->
</body>
</html>
<script type="text/javascript">
// 基于準(zhǔn)備好的dom负甸,初始化echarts實(shí)例
var myChart = echarts.init(document.getElementById('main'));
// 指定圖表的配置項(xiàng)和數(shù)據(jù)
myChart.setOption({
title : {
text: '不同上市地所占比例',
x:'center'
},
tooltip : {
trigger: 'item',
formatter: '{a} <br/>流强 : {c} (kguikeq%)'
},
legend: {
orient: 'vertical',
y:'100',
data: []
},
toolbox: {
show : true,
feature : {
mark : {show: true},
dataView : {show: true, readOnly: false},
restore : {show: true},
saveAsImage : {show: true}
}
},
series : [
{
name: '上市地之間比例',
type: 'pie',
radius : '55%',
center: ['50%', '60%'],
//data:[{}],
data:[],
itemStyle: {
emphasis: {
shadowBlur: 10,
shadowOffsetX: 0,
shadowColor: 'rgba(0, 0, 0, 0.5)'
}
}
}
]
});
//異步加載數(shù)據(jù)
var mapOnlyKey = [];
var mapKeyValue = [];
var mapOnlyValue = [];
var info = {"opt":"marketCount"};
$.post("../ServletStock", info, function(data){
mapOnlyKey.length=0;
mapKeyValue.length=0;
mapOnlyValue.length=0;
for(var i=0; i < data.length; i++){
mapOnlyKey.push(data[i].market);
mapKeyValue.push({"value":Math.round(data[i].count), "name": data[i].market});
mapOnlyValue.push( data[i].count );
}
//console.log(mapOnlyKey);
//console.log(mapKeyValue);
// console.log(mapOnlyValue);
//return false;
// 填入數(shù)據(jù)
myChart.setOption({
legend: {
//類(lèi)別
data: mapOnlyKey
},
series: [{
// 根據(jù)名字對(duì)應(yīng)到相應(yīng)的系列
name: '數(shù)量',
data:mapKeyValue
}]
});
// 使用剛指定的配置項(xiàng)和數(shù)據(jù)顯示圖表。
}, 'json');
</script>
不同上市地的漲跌幅統(tǒng)計(jì)呻待,可以看到紐約證券交易所的漲跌幅總量最大打月,但這也和在這里上市的股票最多有關(guān)系,不過(guò)每個(gè)上市地都是跌幅大于漲幅的蚕捉,所以你可以看到股票并不是很容易就可以玩的溜的
echarts 代碼:
<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta charset="utf-8">
<title>不同上市地區(qū)漲跌幅統(tǒng)計(jì)</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<%@include file="../basic/cssjs.jsp"%>
<!-- 引入Jquery包 -->
<script type="text/javascript" src="../js/jquery-2.1.4.min.js"></script>
<!-- 引入Echarts3包 -->
<script type="text/javascript" src="../js/echarts.min.js"></script>
</head>
<body>
<%@include file="../basic/top.jsp"%>
<!-- content start -->
<div class="container-fluid">
<div class="row-fluid">
<div class="span3">
<!-- left navbar start -->
<%@include file="../basic/left.jsp"%>
<!-- left navbar end -->
</div>
<!-- right content start -->
<div class="span9">
<div class="session well">折線圖適合用來(lái)展現(xiàn)某個(gè)項(xiàng)目的發(fā)展趨勢(shì)奏篙,或展現(xiàn)并比較多個(gè)項(xiàng)目的發(fā)展趨勢(shì)。</div>
<div class="session">
<div id="main" style="width: 100%; height: 600px;"></div>
</div>
</div>
<!-- right content end -->
</div>
</div>
<!-- content end -->
</body>
</html>
<script type="text/javascript">
var data = [];
var markLineData = [];
for ( var i = 1; i < data.length; i++) {
markLineData.push([ {
xAxis : i - 1,
yAxis : data[i - 1],
value : (data[i] + data[i - 1]).toFixed(2)
}, {
xAxis : i,
yAxis : data[i]
} ]);
}
option = {
title : {
text : '不同上市地漲跌幅統(tǒng)計(jì)',
subtext : '單位:%'
},
tooltip : {
trigger : 'axis'
},
toolbox: {
show : true,
feature : {
mark : {show: true},
dataView : {show: true, readOnly: false},
magicType : {show: true, type: ['bar']},
restore : {show: true},
saveAsImage : {show: true}
}
},
xAxis : {
name: '上市地',
data : []
},
yAxis : {},
series : [ {
type : 'line',
data : data,
markPoint : {
data : [ {
type : 'max',
name : '最大值'
}, {
type : 'min',
name : '最小值'
} ]
},
markLine : {
smooth : true,
effect : {
show : true
},
distance : 10,
label : {
normal : {
position : 'middle'
}
},
symbol : [ 'none', 'none' ],
data : markLineData
}
} ]
};
var myChart = echarts.init(document.getElementById('main'));
myChart.setOption(option);
var mapOnlyKey = [];
var mapKeyValue = [];
var mapOnlyValue = [];
var info = {
"opt" : "marketSumchg"
};
$.post("../ServletStock", info, function(data) {
mapOnlyKey.length = 0;
mapKeyValue.length = 0;
mapOnlyValue.length = 0;
for ( var i = 0; i < data.length; i++) {
mapOnlyKey.push(data[i].market);
mapKeyValue.push({
"value" : Math.round(data[i].sumchg),
"name" : data[i].market
});
mapOnlyValue.push(data[i].sumchg);
}
myChart.setOption({
legend : {
data : mapOnlyKey
},
xAxis : [ {
data : mapOnlyKey
} ],
series : [ {
name : '百分比',
data : mapKeyValue
} ]
});
}, 'json');
</script>
最受歡迎的十大股票板塊迫淹,可以看到股權(quán)是排在第一的秘通,排在第二的是銀行,軟件排在第五
echarts 代碼:
<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta charset="utf-8">
<title>最受歡迎的十大股票板塊</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<%@include file="../basic/cssjs.jsp"%>
<!-- 引入Jquery包 -->
<script type="text/javascript" src="../js/jquery-2.1.4.min.js"></script>
<!-- 引入Echarts3包 -->
<script type="text/javascript" src="../js/echarts.min.js"></script>
</head>
<body>
<%@include file="../basic/top.jsp"%>
<!-- content start -->
<div class="container-fluid">
<div class="row-fluid">
<div class="span3">
<!-- left navbar start -->
<%@include file="../basic/left.jsp"%>
<!-- left navbar end -->
</div>
<!-- right content start -->
<div class="span9">
<div class="session well">柱狀圖以坐標(biāo)軸上的長(zhǎng)方形元素作為變量敛熬,以此來(lái)達(dá)到展現(xiàn)并比較數(shù)據(jù)情況的目的</div>
<div class="session">
<div id="main" style="width: 100%; height: 600px;"></div>
</div>
</div>
<!-- right content end -->
</div>
</div>
<!-- content end -->
</body>
</html>
<script type="text/javascript">
// 基于準(zhǔn)備好的dom肺稀,初始化echarts實(shí)例
var myChart = echarts.init(document.getElementById('main'));
// 指定圖表的配置項(xiàng)和數(shù)據(jù)
myChart.setOption({
title: {
text: '最受歡迎的十大股票板塊'
},
tooltip: {
show: true
},
legend: {
data:[]
},
xAxis : [
{
name: '行業(yè)板塊',
type : 'category',
data : []
}
],
yAxis : [
{
name: '市值',
type : 'value'
}
],
series : [
{
name:'數(shù)量',
type:'bar',
data: []
}
]
});
// 異步加載數(shù)據(jù)
var mapOnlyKey = [];
var mapKeyValue = [];
var mapOnlyValue = [];
var info = {"opt": "categoryCount"};
$.post("../ServletStock", info, function(data){
mapOnlyKey.length=0;
mapKeyValue.length=0;
mapOnlyValue.length=0;
for(var i=0; i < data.length; i++){
mapOnlyKey.push( data[i].category);
mapKeyValue.push({"value":Math.round(data[i].count), "name": data[i].category});
mapOnlyValue.push( data[i].count );
}
console.log(mapOnlyKey);
console.log(mapKeyValue);
console.log(mapOnlyValue);
// 填入數(shù)據(jù)
myChart.setOption({
legend: {
//類(lèi)別
data: mapOnlyKey
},
xAxis : [
{
data : mapOnlyKey
}
],
series: [{
// 根據(jù)名字對(duì)應(yīng)到相應(yīng)的系列
name: '數(shù)量',
data: mapKeyValue
}]
});
// 使用剛指定的配置項(xiàng)和數(shù)據(jù)顯示圖表。
}, 'json');
</script>
市值最高的十支股票应民,可以看到蘋(píng)果公司以超過(guò) 8000 億的數(shù)量領(lǐng)先话原,有個(gè)股票的名字 “HSBC Holdings, plc. Perpetual Sub Cap Secs” 比較長(zhǎng),把別的股票名字都蓋住了诲锹,阿里巴巴排在第十
成交量最高的十支股票
到這里離線分析的流程就告一段落了繁仁,hadoop 適用于對(duì)時(shí)延要求不高的離線處理,而當(dāng)我們需要實(shí)時(shí)處理的時(shí)候归园,就需要用到 Storm 或者 Spark Streaming 了
實(shí)時(shí)流程
再放一遍前面的實(shí)時(shí)處理流程圖
這次我們要實(shí)時(shí)爬取各只股票的最高價(jià)黄虱,然后以 echarts 動(dòng)態(tài)圖的形式展現(xiàn)出來(lái),所以這次需要修改一下爬蟲(chóng)
爬取最高價(jià)
首先我們定義一個(gè)實(shí)體類(lèi)庸诱,來(lái)封裝各只股票的名稱與最高價(jià)
CnameHigh.java:
package my.webmagic;
public class CnameHigh {
private String cname;
private Float high;
public CnameHigh() {
super();
}
public CnameHigh(String cname, Float high) {
super();
this.cname = cname;
this.high = high;
}
public String getCname() {
return cname;
}
public void setCname(String cname) {
this.cname = cname;
}
public Float getHigh() {
return high;
}
public void setHigh(Float high) {
this.high = high;
}
@Override
public String toString() {
return cname + ":" + high;
}
}
這里我們?cè)O(shè)置了爬蟲(chóng)每爬一次就睡 1500 毫秒捻浦,比之前的爬蟲(chóng)少睡了 500 毫秒,因?yàn)槲以O(shè)置的 echarts 動(dòng)態(tài)圖是每 2000 毫秒刷新一次桥爽,所以至少要保證數(shù)據(jù)更新的速度要比顯示的速度快默勾。
private Site site = Site.me().setDomain("stock.finance.sina.com.cn").setSleepTime(1500)
.setUserAgent("Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:39.0) Gecko/20100101 Firefox/39.0");
在 process 方法中修改爬蟲(chóng)的邏輯,這次我們不需要全部的 json 數(shù)據(jù)聚谁,只需要其中的 cname(股票名稱) 和 high(最高價(jià)) 字段,所以需要在爬蟲(chóng)代碼中解析 json 數(shù)據(jù)滞诺,將這兩個(gè)字段取出來(lái)
public void process(Page page) {
// page.putField("sixty", regexJson(page.getJson().toString()));
String stockJson = regexJson(page.getJson().toString());
if (!stockJson.contains("data") || !stockJson.contains("high")) {
return;
}
JSONObject myjson = JSON.parseObject(stockJson);
JSONArray dataArray = myjson.getJSONArray("data");
ArrayList<CnameHigh> highList = new ArrayList<>();
String cname;
Float high;
for (int i = 0; i < dataArray.size(); i++) {
JSONObject jsonObject = dataArray.getJSONObject(i);
cname = jsonObject.getString("cname");
high = jsonObject.getFloatValue("high");
CnameHigh cnameHigh = new CnameHigh(cname, high);
highList.add(cnameHigh);
}
page.putField("high", highList);
}
在 main 方法里我們修改了 url 里的 num 參數(shù)形导,由原來(lái)的每次爬 60 條數(shù)據(jù)改成每次爬 20 條數(shù)據(jù),這是為了讓爬蟲(chóng)跑的時(shí)間長(zhǎng)一點(diǎn)习霹,以使動(dòng)態(tài)圖可以顯示的久一點(diǎn)朵耕。此外,我們還修改了輸出目錄淋叶,然后我們將原來(lái)的 JsonFilePipeline 改成了 FilePipeline 阎曹,這樣可以減少 Spark Streaming 中的實(shí)時(shí)處理代碼量
public static void main(String[] args) {
String url_init = "http://stock.finance.sina.com.cn/usstock/api/jsonp.php/" +
"/US_CategoryService.getList?num=20&sort=&asc=0&page=1";
String url_pattern = "http://stock.finance.sina.com.cn/usstock/api/jsonp.php/" +
"/US_CategoryService.getList?num=20&sort=&asc=0&page=";
String output = "/data/edu6/tmp/";
QueueScheduler scheduler = new QueueScheduler();
Spider spider = Spider.create(new GetStockHigh())
.addUrl(url_init)
.setScheduler(scheduler)
.addPipeline(new FilePipeline(output))
.addPipeline(new ConsolePipeline());
for (int i = 1; i < 418; i++) {
Request request = new Request();
request.setUrl(url_pattern + i);
scheduler.push(request, spider);
}
spider.thread(1).run();
}
先讓爬蟲(chóng)跑一下看看得到的數(shù)據(jù)是什么樣子
可以看到我們爬到的文件最前面兩行不是我們需要的,所以我們需要在 Spark Streaming 中過(guò)濾掉,然后將剩下的股票名稱和最高價(jià)存到數(shù)據(jù)庫(kù)里面处嫌,同時(shí)用 echarts 實(shí)時(shí)展示
為了過(guò)濾掉前兩行栅贴,我們發(fā)現(xiàn)了一個(gè)規(guī)律,如果以 ":" 將每行文本分割熏迹,第一行被分成三段檐薯,第二行被分成一段,而其余我們需要的數(shù)據(jù)都被分成了兩段注暗。以這個(gè)規(guī)律坛缕,在 Spark Streaming 中編寫(xiě)代碼來(lái)過(guò)濾出我們需要的數(shù)據(jù)。
編寫(xiě) Spark Streaming
在 Spark Streaming 中我們要做的工作就是不斷地讀取 kafka 傳送過(guò)來(lái)的數(shù)據(jù)捆昏,過(guò)濾后存到 mysql 里面
首先定義 kafka 的配置
val sparkConf = new SparkConf().setAppName("StockMonitor").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("checkpoint")
val topics = Set("flumesendkafka")
val brokers = "localhost:9092"
val zkQuorum = "localhost:2181"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"serializer.class" -> "kafka.serializer.StringEncoder")
然后定義數(shù)據(jù)庫(kù)的連接配置
val db_host = "localhost"
val db_name = "sina"
val db_user = "root"
val db_passwd = "strongs"
val db_connection_str = "jdbc:mysql://" + db_host + ":3306/" + db_name + "?user=" + db_user + "&password=" + db_passwd
然后定義 Dstream
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
最后對(duì) Dstream 執(zhí)行計(jì)算赚楚,這里注意一點(diǎn),我們是對(duì)每個(gè) RDD 分區(qū)建立一個(gè)數(shù)據(jù)庫(kù)連接骗卜。因?yàn)槿绻麑?duì)每一行建立一個(gè)連接的話宠页,頻繁的新建和關(guān)閉數(shù)據(jù)庫(kù)連接對(duì)系統(tǒng)開(kāi)銷(xiāo)很大,影響實(shí)時(shí)處理的速度膨俐;而對(duì)直接每個(gè) RDD 建立一個(gè)連接的話又會(huì)報(bào)不能序列化的異常勇皇。
dstream.foreachRDD {rdd =>
rdd.foreachPartition {partitionOfRdd =>
var conn: Connection = DriverManager.getConnection(db_connection_str)
val lines = partitionOfRdd.filter(_.split(':').length == 2) //過(guò)濾出我們需要的數(shù)據(jù)
try {
lines.foreach ( line => {
var strs = line.mkString.split(':')
if (strs.length == 2) {
var ps: PreparedStatement = null
val sql = "insert into cnamehigh (cname, high) values (?, ?)"
val cname = strs(0)
val high = strs(1).toFloat
println("cname:" + cname + ",high:" + high)
ps = conn.prepareStatement(sql)
ps.setString(1, cname)
ps.setFloat(2, high)
ps.execute()
if (ps != null) {
ps.close()
}
}
})
} catch {
case e: Exception => println("MySQL Exception")// todo: handle error
} finally {
if (conn != null) {
conn.close()
}
}
}
}
完整代碼 StockMonitor.scala:
package my.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.DriverManager
object StockMonitor {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("StockMonitor").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("checkpoint")
val topics = Set("flumesendkafka")
val brokers = "localhost:9092"
val zkQuorum = "localhost:2181"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"serializer.class" -> "kafka.serializer.StringEncoder")
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
val db_host = "localhost"
val db_name = "sina"
val db_user = "root"
val db_passwd = "strongs"
val db_connection_str = "jdbc:mysql://" + db_host + ":3306/" + db_name + "?user=" + db_user + "&password=" + db_passwd
dstream.foreachRDD {rdd =>
rdd.foreachPartition {partitionOfRdd =>
var conn: Connection = DriverManager.getConnection(db_connection_str)
val lines = partitionOfRdd.filter(_.split(':').length == 2) //過(guò)濾出我們需要的數(shù)據(jù)
try {
lines.foreach ( line => {
var strs = line.mkString.split(':')
if (strs.length == 2) {
var ps: PreparedStatement = null
val sql = "insert into cnamehigh (cname, high) values (?, ?)"
val cname = strs(0)
val high = strs(1).toFloat
println("cname:" + cname + ",high:" + high)
ps = conn.prepareStatement(sql)
ps.setString(1, cname)
ps.setFloat(2, high)
ps.execute()
if (ps != null) {
ps.close()
}
}
})
} catch {
case e: Exception => println("MySQL Exception")// todo: handle error
} finally {
if (conn != null) {
conn.close()
}
}
}
}
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
使用 Kafka 對(duì) SparkStreaming 進(jìn)行測(cè)試
開(kāi)啟 kafka 之前要先開(kāi)啟 zookeeper
/apps/zookeeper/bin/zkServer.sh start
開(kāi)啟 kafka
/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
然后另開(kāi)一個(gè)終端在 kafka 中新建一個(gè) topic
/apps/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --topic flumesendkafka --partitions 1
查看剛才新建的 topic
/apps/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
然后打開(kāi) mysql 切換到 sina 數(shù)據(jù)庫(kù),新建一個(gè)表焚刺,這里我們新增了一個(gè)自增的 id 字段敛摘,因?yàn)樵谧隹梢暬臅r(shí)候,要一直顯示最新的數(shù)據(jù)乳愉,這時(shí)我們就可以按照 id 來(lái)降序查找兄淫,以保證每次查到的數(shù)據(jù)都不同
create table cnamehigh (id int not null auto_increment, cname varchar(100), high float, primary key(id));
查看一下數(shù)據(jù)庫(kù),可以看到里面還沒(méi)有數(shù)據(jù)
select * from cnamehigh;
我們先用控制臺(tái)來(lái)當(dāng) kafka 的 producer蔓姚,模擬輸入一些數(shù)據(jù)捕虽,看看能不能正確的插入到 mysql 中
運(yùn)行 spark streaming
另開(kāi)一個(gè)終端,新建一個(gè) console producer
/apps/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flumesendkafka
然后輸入一些模擬數(shù)據(jù)
在 spark streaming 的控制臺(tái)中看到坡脐,只輸出了第一行數(shù)據(jù)
而且在 mysql 中也可以看到新增進(jìn)來(lái)的數(shù)據(jù)
好了泄私,因?yàn)閯偛诺谋聿迦肓诵碌臄?shù)據(jù),所以我們把剛才的表刪掉备闲,重新創(chuàng)建一個(gè)相同的表備用
create table cnamehigh1 like cnamehigh;
drop table cnamehigh;
alter table cnamehigh1 rename cnamehigh;
接下來(lái)我們配置 flume
Flume 配置
在 /data/edu1
目錄下新建一個(gè) flume 配置文件
vim /data/edu1/spooldir_mem_kafka.conf
將下列配置填寫(xiě)進(jìn)去
agent1.sources = src
agent1.channels = ch
agent1.sinks = des
agent1.sources.src.type = spooldir
agent1.sources.src.restart = true
agent1.sources.src.spoolDir = /data/edu6/tmp/stock.finance.sina.com.cn
agent1.channels.ch.type = memory
agent1.sinks.des.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.des.brokerList = localhost:9092
agent1.sinks.des.topic = flumesendkafka
agent1.sinks.des.batchSize = 1
agent1.sinks.des.requiredAcks = 1
agent1.sources.src.channels = ch
agent1.sinks.des.channel = ch
這里我們?cè)O(shè)置 flume 監(jiān)控的目錄為新爬蟲(chóng)的輸出目錄
然后設(shè)置 batchSize = 1 是為了讓數(shù)據(jù)庫(kù)更新的及時(shí)一點(diǎn)晌端,以便我們可以觀測(cè)到動(dòng)態(tài)圖的變化
最后一點(diǎn)工作就是 echarts 動(dòng)態(tài)圖的完成
echarts 動(dòng)態(tài)圖
可視化的部分已經(jīng)花了很大的篇幅講過(guò)了,這里就不啰嗦了恬砂,直接貼上 echarts 動(dòng)態(tài)圖的代碼
<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta charset="utf-8">
<title>股票最高價(jià)實(shí)時(shí)統(tǒng)計(jì)</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<%@include file="../basic/cssjs.jsp" %>
<!-- 引入Jquery包 -->
<script type="text/javascript" src="../js/jquery-2.1.4.min.js"></script>
<!-- 引入Echarts3包 -->
<script type="text/javascript" src="../js/echarts.min.js"></script>
</head>
<body>
<%@include file="../basic/top.jsp" %>
<!-- content start -->
<div class="container-fluid">
<div class="row-fluid">
<!-- right content start -->
<div class="span12">
<div class="session">
<div id="main" style="width: 100%;height:600px;"></div>
</div>
</div>
<!-- right content end -->
</div>
</div>
<!-- content end -->
</body>
</html>
<script type="text/javascript">
var data = [];
var markLineData = [];
for ( var i = 1; i < data.length; i++) {
markLineData.push([ {
xAxis : i - 1,
yAxis : data[i - 1],
value : (data[i] + data[i - 1]).toFixed(2)
}, {
xAxis : i,
yAxis : data[i]
} ]);
}
option = {
title : {
text : '股票最高價(jià)實(shí)時(shí)統(tǒng)計(jì)',
},
tooltip : {
trigger : 'axis'
},
xAxis : {
name: '股票名稱',
data : (function (){
var now = new Date();
var res = [];
var len = 10;
while (len--) {
res.push("");
}
return res;
})()
},
yAxis: [
{
type: 'value',
scale: true,
name: '最高價(jià)',
max: 500,
min: 0,
boundaryGap: [0.2, 0.2]
}
],
dataZoom: [
{
id: 'dataZoomX',
type: 'slider',
xAxisIndex: [0],
filterMode: 'filter'
},
],
series : [ {
type : 'line',
data : (function (){
var res = [];
var len = 0;
while (len < 10) {
res.push(0);
len++;
}
return res;
})(),
markPoint : {
data : [ {
type : 'max',
name : '最大值'
}, {
type : 'min',
name : '最小值'
} ]
},
markLine : {
smooth : true,
effect : {
show : true
},
distance : 10,
label : {
normal : {
position : 'middle'
}
},
symbol : [ 'none', 'none' ],
data : markLineData
}
} ]
};
setInterval(function () {//實(shí)現(xiàn)定時(shí)訪問(wèn)數(shù)據(jù)庫(kù)添加地方1
var myChart = echarts.init(document.getElementById('main'));
myChart.setOption(option);
var mapOnlyKey = [];
var mapKeyValue = [];
var mapOnlyValue = [];
var info = {
"opt" : "cnameHigh"
};
$.post("../ServletStock", info, function(data) {
mapOnlyKey.length = 0;
mapKeyValue.length = 0;
mapOnlyValue.length = 0;
for ( var i = 0; i < data.length; i++) {
mapOnlyKey.push(data[i].cname);
mapKeyValue.push({
"value" : Math.round(data[i].high),
"name" : data[i].cname
});
mapOnlyValue.push(data[i].high);
}
var data1 = option.series[0].data;
data1.shift();
data1.push(mapOnlyValue.shift());
option.xAxis.data.shift();
option.xAxis.data.push(mapOnlyKey.shift());
myChart.setOption(option);
}, 'json')
}, 2000);//實(shí)現(xiàn)定時(shí)訪問(wèn)數(shù)據(jù)庫(kù)添加地方2
</script>
不過(guò)有一點(diǎn)需要注意咧纠, dao 層的 sql 我是這么寫(xiě)的,以保證每次查到的數(shù)據(jù)都不同
select cname,high from cnamehigh order by id desc limit 1;
但是因?yàn)閿?shù)據(jù)庫(kù)更新的比較快泻骤,所以我們每查一次可能 id 已經(jīng)漲了幾十上百了漆羔,所以嚴(yán)格來(lái)說(shuō)這也不太算實(shí)時(shí)梧奢,不過(guò)道理還是一樣的
所有準(zhǔn)備工作都做完后,最后就是讓工程跑起來(lái)演痒,終于到了激動(dòng)人心的時(shí)刻亲轨!
工程執(zhí)行的順序依次為:開(kāi)啟可視化=>開(kāi)啟kafka=>開(kāi)啟spark streaming=>開(kāi)啟flume=>開(kāi)啟爬蟲(chóng)程序
開(kāi)啟可視化
可以看到動(dòng)態(tài)圖還沒(méi)有變化
開(kāi)啟 kafka
/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
開(kāi)啟 spark streaming
開(kāi)啟 flume
flume-ng agent -c /data/edu1/ -f /data/edu1/spooldir_mem_kafka.conf -n agent1 -Dflume.root.logger=DEBUG,console
最后開(kāi)啟爬蟲(chóng)
實(shí)時(shí)的動(dòng)態(tài)圖
總結(jié)
經(jīng)歷了四個(gè)月的大數(shù)據(jù)學(xué)習(xí),我學(xué)到了很多有趣的東西嫡霞,其中既有對(duì)已有知識(shí)的鞏固瓶埋,也領(lǐng)略到了大數(shù)據(jù)這個(gè)新興行業(yè)的魅力。感謝各位老師的悉心指導(dǎo)诊沪,還有各位小伙伴的互相交流养筒,希望大家一直保持著旺盛的好奇心與求知欲,永遠(yuǎn)年輕端姚。