簡(jiǎn)介
項(xiàng)目簡(jiǎn)介
此項(xiàng)目是實(shí)現(xiàn)仿大數(shù)據(jù)項(xiàng)目流程,包括沥阱,日志收集傳輸缎罢,日志格式化,數(shù)據(jù)實(shí)時(shí)分析喳钟,數(shù)據(jù)持久化到HDFS屁使,數(shù)據(jù)離線報(bào)表統(tǒng)計(jì),離線任務(wù)調(diào)度奔则,日志記錄搜索幾大部分
首先說(shuō)一下這個(gè)項(xiàng)目的大致流程蛮寂,這個(gè)項(xiàng)目是仿天貓數(shù)據(jù)分析,是自己模擬用戶購(gòu)買(mǎi)瀏覽商品易茬,生成日志酬蹋,對(duì)日志進(jìn)行收集,然后分兩部分抽莱,一部分對(duì)數(shù)據(jù)實(shí)時(shí)分析范抓,生成日用戶活躍地理位置,第二部分是數(shù)據(jù)持久化之后食铐,T+1對(duì)數(shù)據(jù)進(jìn)行分析匕垫,統(tǒng)計(jì)各省銷量以及各省活躍用戶數(shù)
項(xiàng)目數(shù)據(jù)流
-
日志數(shù)據(jù)生成
日志格式: requestid, ts, userid, 城市,經(jīng)度,緯度,操作(瀏覽,購(gòu)買(mǎi))
flume采集日志數(shù)據(jù)進(jìn)入kafka log topic
-
kafkastream消費(fèi)log topic日志,寫(xiě)入process topic
對(duì)數(shù)據(jù)進(jìn)行格式化處理,以及過(guò)濾數(shù)據(jù)虐呻。格式化后的數(shù)據(jù)格式: requestid, ts, userid, 城市,經(jīng)度,緯度,操作
-
實(shí)時(shí)模塊
sparkstreaming處理process topic日志象泵,扔進(jìn)realtime topic
格式化數(shù)據(jù),寫(xiě)到process topic中斟叼,得到城市和用戶id -
flume采集process topic數(shù)據(jù)偶惠,寫(xiě)入hdfs
持久化日志到hdfs中
-
report模塊(離線處理模塊)
T+1離線處理模塊,spark計(jì)算hdfs中的數(shù)據(jù),統(tǒng)計(jì)前一天的各省銷售記錄,寫(xiě)入mysql
azkaban調(diào)度任務(wù)
ElasticSearch查詢歷史記錄
web頁(yè)面實(shí)時(shí)展示活躍用戶朗涩,和報(bào)表頁(yè)面
然而忽孽。。電腦垃圾,扛不住兄一,只做了一部分
azkaban厘线,elasticsearch,web頁(yè)面沒(méi)有做
項(xiàng)目構(gòu)成
- logbuilder
模擬日志生成(后面為了方便瘾腰,寫(xiě)了一個(gè)shell用于日志生成)
-
kafkastream
kafkastream清洗日志
-
sparkstream
sparkstream實(shí)時(shí)處理日志顯示操作用戶的地理位置
-
report
T+1報(bào)表項(xiàng)目,批處理日志皆的,分析各省銷售量對(duì)比,寫(xiě)入mysql
實(shí)現(xiàn)
我們按照數(shù)據(jù)流的方式來(lái)寫(xiě)實(shí)現(xiàn)
1. 日志生成
使用SpringBoot做了一個(gè)日志生成器覆履。模擬生成日志蹋盆,日志格式
logger:>>>> requestid,userid,ts,城市,經(jīng)度硝全,維度栖雾,操作(0瀏覽 1購(gòu)買(mǎi))
public static void main(String[] args) {
SpringApplication.run(LogBuilderApplication.class);
HashMap map = new HashMap<Integer, String[]>();
// 存儲(chǔ)城市,經(jīng)緯度
map.put(0,new String[]{"海門(mén)","121.15","31.89"});
map.put(1,new String[]{"鹽城","120.13","33.38"});
map.put(2,new String[]{"上海","121.48","31.22"});
map.put(3,new String[]{"廈門(mén)","118.1","24.46"});
Random random = new Random();
while (true){
int i1 = random.nextInt(map.size());
String[] o = (String[]) map.get(i1);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
long ts = System.currentTimeMillis();
String requestid = UUID.randomUUID().toString();
int userid = random.nextInt(100000);
// requestid,userid, 城市,經(jīng)度,緯度,操作(瀏覽,購(gòu)買(mǎi))0 瀏覽 1 購(gòu)買(mǎi)
logger.info("logger:>>>>{},{},{},{},{},{},{}",requestid, ts, userid, o[0],o[1],o[2],random.nextInt(2));
}
}
由于不想打包伟众,重啟析藕,就自己用shell寫(xiě)了一個(gè)日志生成器,更好使用
#!/bin/bash
#二維數(shù)組
city=('海門(mén) 121.15 31.89' '鹽城 120.13 33.38' '上海 121.48 31.22' '廈門(mén) 118.1 24.46')
#city=('海門(mén) 121.15" "31.89"' '"鹽城" "120.13" "33.38"' '"上海" "121.48" "31.22"' '"廈門(mén)" "118.1" "24.46"')
#獲取數(shù)組長(zhǎng)度
echo ${#city[@]}
len=$((${#city[@]}-1))
echo $len
#死循環(huán)凳厢,隨機(jī)數(shù)0-數(shù)組長(zhǎng)度
while :
do
rand=`shuf -i0-${len} -n1`
echo $rand
echo ${city[${rand}]}
c=(${city[${rand}]})
ci=${c[0]}
cj=${c[1]}
cw=${c[2]}
req=$(cat /proc/sys/kernel/random/uuid)
date=`date -d 'day' +%Y%m%d`
ts=`date -d $date +%s`
userid=`shuf -i1-100000 -n1`
action=`shuf -i0-1 -n1`
echo "logger:>>>>$req,$ts,$userid,${ci},${cj},${cw},${action}"
sleep 3
done
2. 日志采集
日志采集使用Flume账胧,將日志文件數(shù)據(jù)采集到Kafka中,
# 監(jiān)控本地文件寫(xiě)到kafka
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#設(shè)置source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/test_flume_file/test_flume_file
#設(shè)置Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = 192.168.33.4:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
#設(shè)置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#連接
a1.sources.r1.channels = c1
#注意先紫,是channel
a1.sinks.k1.channel = c1
3. KafkaStream進(jìn)行數(shù)據(jù)清洗
這一步主要是對(duì)日志數(shù)據(jù)進(jìn)行清洗治泥,過(guò)濾掉不符合規(guī)范的日志,
過(guò)濾后的日志
requestid,userid,ts,城市遮精,經(jīng)度居夹,維度,操作(0瀏覽 1購(gòu)買(mǎi))
public static void main(String[] args) {
String from = "log";
String to = "process";
Properties properties = new Properties();
// 設(shè)置application id
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"stream-tmall");
// 設(shè)置kafka地址
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.33.3:9092,192.168.33.4:9092,192.168.33.5:9092");
// 創(chuàng)建拓?fù)浣Y(jié)構(gòu)
Topology topology = new Topology();
// 構(gòu)建數(shù)據(jù)來(lái)源本冲,數(shù)據(jù)處理邏輯准脂,數(shù)據(jù)去向
topology.addSource("SOURCE",from)
.addProcessor("PROCESS", ()-> new LogProcesser(), "SOURCE")
.addSink("SINK", to, "PROCESS");
//過(guò)時(shí)
// TopologyBuilder builder = new TopologyBuilder();
// builder.addSource("SOURCE",from)
// .addProcessor("PROCESS", ()->new LogProcesser(), "SOURCE")
// .addSink("SINK",to);
//
//
// // 創(chuàng)建kafkastream
KafkaStreams streams = new KafkaStreams(topology,properties);
// 開(kāi)啟流處理
streams.start();
System.out.println("kafkaStream is start!!!");
}
public class LogProcesser implements Processor<byte[],byte[]> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(byte[] key, byte[] value) {
// 核心流程,處理日志
String line = new String(value);
if (line.contains("logger:>>>>")){
System.out.println("LogProcess process data:" + line);
String[] split = line.split("logger:>>>>");
// 轉(zhuǎn)發(fā)
context.forward("LogProcess".getBytes(), split[1].trim().getBytes());
}
context.commit();
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}
將Kafka log topic中的數(shù)據(jù)消費(fèi),將日志數(shù)據(jù)進(jìn)行格式化檬洞,寫(xiě)到Process topic中
4. SparkStreaming數(shù)據(jù)實(shí)時(shí)處理
我們實(shí)時(shí)部分狸膏,是消費(fèi)Kafka中process topic中的數(shù)據(jù),每一條數(shù)據(jù)都是今天用戶的操作添怔,所以我們將每一條日志的城市取出來(lái)湾戳,放到realtime topic中,等待后序消費(fèi)澎灸,把數(shù)據(jù)推送到前端進(jìn)行實(shí)時(shí)展示(這部分沒(méi)做院塞,頁(yè)面不好寫(xiě))
public class SparkStreamingProcesser {
private static final String brokers = "192.168.33.3:9092,192.168.33.4:9092,192.168.33.5:9092";
private static final String group_id = "tmall_online";
private static final List<String> topic = new ArrayList<String>(Arrays.asList("process"));
private static final String toTopic = "realtime";
public static void main(String[] args) {
//1. 得到spark上下文
SparkConf conf = new SparkConf().setAppName("tmall_online").setMaster("local[*]");
//2. 創(chuàng)建sparkstreamingcontext。每隔2鐘會(huì)處理一次收集到的數(shù)據(jù)
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
//3. 創(chuàng)建kafka的參數(shù)
HashMap<String, Object> kafkaParams = new HashMap<String, Object>();
// 設(shè)置kafka集群地址
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
// 設(shè)置消費(fèi)者組
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
// 設(shè)置key反序列化類
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 4. 通過(guò)參數(shù)創(chuàng)建一個(gè)kafka stream
JavaInputDStream<ConsumerRecord<Object, Object>> stream = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topic, kafkaParams));
// 5.獲取數(shù)據(jù)并處理
JavaDStream<String> map = stream.map(msg-> msg.value().toString());
JavaDStream<Object> map1 = map.map(x -> {
KafkaProducerUtils producerUtils = new KafkaProducerUtils();
String[] split = x.split(",");
System.out.println("接收到:" + split[1]+","+split[2]);
producerUtils.sendMessage(toTopic, split[3]+","+split[4]+","+split[5]);
return "a";
});
map1.print();
jssc.start();
try {
jssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
jssc.close();
}
}
}
5. 日志持久化
我們需要將用戶操作日志持久化到HDFS中性昭,我們將格式化之后的數(shù)據(jù)采集到HDFS拦止,所以我們使用Flume將Kafka中process topic的數(shù)據(jù)采集到HDFS
# 監(jiān)控kafka寫(xiě)到hdfs
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#設(shè)置source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 500
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.33.3:9092,192.168.33.4:9092,192.168.33.5:9092
a1.sources.r1.kafka.topics = process
a1.sources.r1.kafka.consumer.group.id = c_flume
#設(shè)置Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/customfile/tmall/dt=%Y%m%d
a1.sinks.k1.hdfs.filePrefix = hadoop1_%H_events_
a1.sinks.k1.hdfs.fileSuffix=.log
a1.sinks.k1.hdfs.rollSize = 102400000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.idleTimeout = 5400
#是否使用本地時(shí)間戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#設(shè)置文件相關(guān) 文件類型為純文本
a1.sinks.k1.hdfs.fileType = DataStream
#設(shè)置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
#連接
a1.sources.r1.channels = c1
#注意,是channel
a1.sinks.k1.channel = c1
6. 報(bào)表項(xiàng)目
當(dāng)每天數(shù)據(jù)采集到HDFS之后,我們需要T+1處理這些日志汹族,產(chǎn)生報(bào)表萧求,我們這里主要產(chǎn)生兩個(gè)指標(biāo)
- 各省活躍用戶
- 各省銷售量
這里我們使用Scala編寫(xiě)Spark程序,做報(bào)表顶瞒,并沒(méi)有寫(xiě)到mysql夸政,將csv數(shù)據(jù)寫(xiě)到hdfs中,后序可以導(dǎo)入mysql榴徐,電腦帶不起了
object ReportStatistics {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("report_statistics")
.enableHiveSupport()
.getOrCreate()
val date = args(0)
//1. 讀取前一天數(shù)據(jù)
import spark.implicits._
val df = spark.sparkContext.textFile(s"/user/custom/tmall/dt=$date")
.filter(x=>{
val strings: Array[String] = x.split(",")
strings.length == 7
})
.map(x=>{
val strings: Array[String] = x.split(",")
(strings(0),strings(1),strings(2),strings(3),strings(6))
}).toDF("requestid", "ts", "userid", "provice", "action")
df.cache().createOrReplaceTempView("t1")
// 統(tǒng)計(jì)各省用戶日活
spark.sql(
s"""
|select count(distinct(userid)) as users,provice from t1 group by provice
|union all
|select count(distinct(userid)) as users,"all" as provice
""".stripMargin).repartition(1).write
.mode(SaveMode.Overwrite).csv(s"/user/data/reporting/tmall/dt=$date/users")
// 統(tǒng)計(jì)各省銷售量
spark.sql(
s"""
|select count(requestid) as cnt,provice from t1 where action=0 group by provice
|union all
|select count(requestid) as cnt from t1 where action=0
""".stripMargin).repartition(1).write
.mode(SaveMode.Overwrite).csv(s"/user/data/reporting/tmall/dt=$date/sales")
}
}
總結(jié)
項(xiàng)目現(xiàn)在就是做到現(xiàn)在這樣守问,后面還有Azkaban任務(wù)調(diào)度,F(xiàn)lume將數(shù)據(jù)采集到ElasticSearch中坑资,用于試試查詢,還有web顯示頁(yè)面仿便。
顯示的話攒巍,報(bào)表基本做完柒莉,一個(gè)web項(xiàng)目讀取mysql展示就行,如果有想完善的鲤氢,可以完善一下卷玉,
項(xiàng)目中遇到的問(wèn)題
- Flume將數(shù)據(jù)采集到HDFS的時(shí)候產(chǎn)生了大量的小文件相种,后面調(diào)整了配置參數(shù)
源碼看這里: https://gitee.com/zhangqiye/tmall
如有問(wèn)題品姓,可以加群: 552113611