大數(shù)據(jù)項(xiàng)目之仿天貓數(shù)據(jù)分析

簡(jiǎn)介

項(xiàng)目簡(jiǎn)介

此項(xiàng)目是實(shí)現(xiàn)仿大數(shù)據(jù)項(xiàng)目流程,包括沥阱,日志收集傳輸缎罢,日志格式化,數(shù)據(jù)實(shí)時(shí)分析喳钟,數(shù)據(jù)持久化到HDFS屁使,數(shù)據(jù)離線報(bào)表統(tǒng)計(jì),離線任務(wù)調(diào)度奔则,日志記錄搜索幾大部分

首先說(shuō)一下這個(gè)項(xiàng)目的大致流程蛮寂,這個(gè)項(xiàng)目是仿天貓數(shù)據(jù)分析,是自己模擬用戶購(gòu)買(mǎi)瀏覽商品易茬,生成日志酬蹋,對(duì)日志進(jìn)行收集,然后分兩部分抽莱,一部分對(duì)數(shù)據(jù)實(shí)時(shí)分析范抓,生成日用戶活躍地理位置,第二部分是數(shù)據(jù)持久化之后食铐,T+1對(duì)數(shù)據(jù)進(jìn)行分析匕垫,統(tǒng)計(jì)各省銷量以及各省活躍用戶數(shù)

項(xiàng)目數(shù)據(jù)流

  1. 日志數(shù)據(jù)生成

    日志格式: requestid, ts, userid, 城市,經(jīng)度,緯度,操作(瀏覽,購(gòu)買(mǎi))

  2. flume采集日志數(shù)據(jù)進(jìn)入kafka log topic

  3. kafkastream消費(fèi)log topic日志,寫(xiě)入process topic

    對(duì)數(shù)據(jù)進(jìn)行格式化處理,以及過(guò)濾數(shù)據(jù)虐呻。格式化后的數(shù)據(jù)格式: requestid, ts, userid, 城市,經(jīng)度,緯度,操作

  4. 實(shí)時(shí)模塊

    sparkstreaming處理process topic日志象泵,扔進(jìn)realtime topic
    格式化數(shù)據(jù),寫(xiě)到process topic中斟叼,得到城市和用戶id

  5. flume采集process topic數(shù)據(jù)偶惠,寫(xiě)入hdfs

    持久化日志到hdfs中

  6. report模塊(離線處理模塊)

    T+1離線處理模塊,spark計(jì)算hdfs中的數(shù)據(jù),統(tǒng)計(jì)前一天的各省銷售記錄,寫(xiě)入mysql

  7. azkaban調(diào)度任務(wù)

  8. ElasticSearch查詢歷史記錄

  9. web頁(yè)面實(shí)時(shí)展示活躍用戶朗涩,和報(bào)表頁(yè)面

然而忽孽。。電腦垃圾,扛不住兄一,只做了一部分

azkaban厘线,elasticsearch,web頁(yè)面沒(méi)有做

項(xiàng)目構(gòu)成

  1. logbuilder

模擬日志生成(后面為了方便瘾腰,寫(xiě)了一個(gè)shell用于日志生成)

  1. kafkastream

    kafkastream清洗日志

  2. sparkstream

    sparkstream實(shí)時(shí)處理日志顯示操作用戶的地理位置

  3. report

    T+1報(bào)表項(xiàng)目,批處理日志皆的,分析各省銷售量對(duì)比,寫(xiě)入mysql

實(shí)現(xiàn)

我們按照數(shù)據(jù)流的方式來(lái)寫(xiě)實(shí)現(xiàn)

1. 日志生成

使用SpringBoot做了一個(gè)日志生成器覆履。模擬生成日志蹋盆,日志格式

logger:>>>> requestid,userid,ts,城市,經(jīng)度硝全,維度栖雾,操作(0瀏覽 1購(gòu)買(mǎi))
public static void main(String[] args) {

        SpringApplication.run(LogBuilderApplication.class);

        HashMap map = new HashMap<Integer, String[]>();

        // 存儲(chǔ)城市,經(jīng)緯度
        map.put(0,new String[]{"海門(mén)","121.15","31.89"});
        map.put(1,new String[]{"鹽城","120.13","33.38"});
        map.put(2,new String[]{"上海","121.48","31.22"});
        map.put(3,new String[]{"廈門(mén)","118.1","24.46"});
        Random random = new Random();

        while (true){
            int i1 = random.nextInt(map.size());
            String[] o = (String[]) map.get(i1);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            long ts = System.currentTimeMillis();
            String requestid = UUID.randomUUID().toString();

            int userid = random.nextInt(100000);
            // requestid,userid, 城市,經(jīng)度,緯度,操作(瀏覽,購(gòu)買(mǎi))0 瀏覽 1 購(gòu)買(mǎi)
            logger.info("logger:>>>>{},{},{},{},{},{},{}",requestid, ts, userid, o[0],o[1],o[2],random.nextInt(2));
        }
    }

由于不想打包伟众,重啟析藕,就自己用shell寫(xiě)了一個(gè)日志生成器,更好使用

#!/bin/bash
#二維數(shù)組
city=('海門(mén) 121.15 31.89' '鹽城 120.13 33.38' '上海 121.48 31.22' '廈門(mén) 118.1 24.46')
#city=('海門(mén) 121.15" "31.89"' '"鹽城" "120.13" "33.38"' '"上海" "121.48" "31.22"' '"廈門(mén)" "118.1" "24.46"')
#獲取數(shù)組長(zhǎng)度
echo ${#city[@]}
len=$((${#city[@]}-1))
echo $len
#死循環(huán)凳厢,隨機(jī)數(shù)0-數(shù)組長(zhǎng)度
while :
do
    rand=`shuf -i0-${len} -n1`
    echo $rand
    echo ${city[${rand}]}
 c=(${city[${rand}]})
ci=${c[0]}
cj=${c[1]}
cw=${c[2]}
  req=$(cat /proc/sys/kernel/random/uuid)
date=`date -d 'day' +%Y%m%d`
ts=`date -d $date +%s`
userid=`shuf -i1-100000 -n1`
action=`shuf -i0-1 -n1`
    echo "logger:>>>>$req,$ts,$userid,${ci},${cj},${cw},${action}"
    sleep 3
done

2. 日志采集

日志采集使用Flume账胧,將日志文件數(shù)據(jù)采集到Kafka中,

# 監(jiān)控本地文件寫(xiě)到kafka
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#設(shè)置source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/test_flume_file/test_flume_file

#設(shè)置Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = 192.168.33.4:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

#設(shè)置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#連接
a1.sources.r1.channels = c1
#注意先紫,是channel
a1.sinks.k1.channel = c1

3. KafkaStream進(jìn)行數(shù)據(jù)清洗

這一步主要是對(duì)日志數(shù)據(jù)進(jìn)行清洗治泥,過(guò)濾掉不符合規(guī)范的日志,

過(guò)濾后的日志

requestid,userid,ts,城市遮精,經(jīng)度居夹,維度,操作(0瀏覽 1購(gòu)買(mǎi))
 public static void main(String[] args) {

        String from = "log";
        String to = "process";
        Properties properties = new Properties();

        // 設(shè)置application id
        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"stream-tmall");
        // 設(shè)置kafka地址
        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.33.3:9092,192.168.33.4:9092,192.168.33.5:9092");

        // 創(chuàng)建拓?fù)浣Y(jié)構(gòu)
        Topology topology = new Topology();
        // 構(gòu)建數(shù)據(jù)來(lái)源本冲,數(shù)據(jù)處理邏輯准脂,數(shù)據(jù)去向
        topology.addSource("SOURCE",from)
                .addProcessor("PROCESS", ()-> new LogProcesser(), "SOURCE")
                .addSink("SINK", to, "PROCESS");
        //過(guò)時(shí)
//        TopologyBuilder builder = new TopologyBuilder();
//        builder.addSource("SOURCE",from)
//                .addProcessor("PROCESS", ()->new LogProcesser(), "SOURCE")
//                .addSink("SINK",to);
//
//
//        // 創(chuàng)建kafkastream
        KafkaStreams streams = new KafkaStreams(topology,properties);
        // 開(kāi)啟流處理
        streams.start();
        System.out.println("kafkaStream is start!!!");


    }
    
    
public class LogProcesser implements Processor<byte[],byte[]> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {

        this.context = context;
    }

    @Override
    public void process(byte[] key, byte[] value) {

        // 核心流程,處理日志
        String line = new String(value);
        if (line.contains("logger:>>>>")){
            System.out.println("LogProcess process data:" + line);
            String[] split = line.split("logger:>>>>");
            // 轉(zhuǎn)發(fā)
            context.forward("LogProcess".getBytes(), split[1].trim().getBytes());
        }
        context.commit();
    }

    @Override
    public void punctuate(long timestamp) {

    }

    @Override
    public void close() {

    }
}

將Kafka log topic中的數(shù)據(jù)消費(fèi),將日志數(shù)據(jù)進(jìn)行格式化檬洞,寫(xiě)到Process topic中

4. SparkStreaming數(shù)據(jù)實(shí)時(shí)處理

我們實(shí)時(shí)部分狸膏,是消費(fèi)Kafka中process topic中的數(shù)據(jù),每一條數(shù)據(jù)都是今天用戶的操作添怔,所以我們將每一條日志的城市取出來(lái)湾戳,放到realtime topic中,等待后序消費(fèi)澎灸,把數(shù)據(jù)推送到前端進(jìn)行實(shí)時(shí)展示(這部分沒(méi)做院塞,頁(yè)面不好寫(xiě))

public class SparkStreamingProcesser {

    private static final String brokers = "192.168.33.3:9092,192.168.33.4:9092,192.168.33.5:9092";
    private static final String group_id = "tmall_online";
    private static final List<String> topic = new ArrayList<String>(Arrays.asList("process"));

    private static final String toTopic = "realtime";


    public static void main(String[] args) {
        //1. 得到spark上下文
        SparkConf conf = new SparkConf().setAppName("tmall_online").setMaster("local[*]");

        //2. 創(chuàng)建sparkstreamingcontext。每隔2鐘會(huì)處理一次收集到的數(shù)據(jù)
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

        //3. 創(chuàng)建kafka的參數(shù)
        HashMap<String, Object> kafkaParams = new HashMap<String, Object>();

        // 設(shè)置kafka集群地址
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        // 設(shè)置消費(fèi)者組
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
        // 設(shè)置key反序列化類
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);


        // 4. 通過(guò)參數(shù)創(chuàng)建一個(gè)kafka stream
        JavaInputDStream<ConsumerRecord<Object, Object>> stream = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topic, kafkaParams));

        // 5.獲取數(shù)據(jù)并處理
        JavaDStream<String> map = stream.map(msg-> msg.value().toString());


        JavaDStream<Object> map1 = map.map(x -> {
            KafkaProducerUtils producerUtils = new KafkaProducerUtils();
            String[] split = x.split(",");
            System.out.println("接收到:" + split[1]+","+split[2]);
            producerUtils.sendMessage(toTopic, split[3]+","+split[4]+","+split[5]);
            return "a";
        });
        map1.print();


        jssc.start();

        try {
            jssc.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            jssc.close();
        }

    }


}

5. 日志持久化

我們需要將用戶操作日志持久化到HDFS中性昭,我們將格式化之后的數(shù)據(jù)采集到HDFS拦止,所以我們使用Flume將Kafka中process topic的數(shù)據(jù)采集到HDFS

# 監(jiān)控kafka寫(xiě)到hdfs
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#設(shè)置source

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 500
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.33.3:9092,192.168.33.4:9092,192.168.33.5:9092
a1.sources.r1.kafka.topics = process
a1.sources.r1.kafka.consumer.group.id = c_flume

#設(shè)置Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/customfile/tmall/dt=%Y%m%d
a1.sinks.k1.hdfs.filePrefix = hadoop1_%H_events_
a1.sinks.k1.hdfs.fileSuffix=.log
a1.sinks.k1.hdfs.rollSize = 102400000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.idleTimeout = 5400
#是否使用本地時(shí)間戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#設(shè)置文件相關(guān) 文件類型為純文本
a1.sinks.k1.hdfs.fileType = DataStream
#設(shè)置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

#連接
a1.sources.r1.channels = c1
#注意,是channel
a1.sinks.k1.channel = c1

6. 報(bào)表項(xiàng)目

當(dāng)每天數(shù)據(jù)采集到HDFS之后,我們需要T+1處理這些日志汹族,產(chǎn)生報(bào)表萧求,我們這里主要產(chǎn)生兩個(gè)指標(biāo)

  1. 各省活躍用戶
  2. 各省銷售量

這里我們使用Scala編寫(xiě)Spark程序,做報(bào)表顶瞒,并沒(méi)有寫(xiě)到mysql夸政,將csv數(shù)據(jù)寫(xiě)到hdfs中,后序可以導(dǎo)入mysql榴徐,電腦帶不起了

object ReportStatistics {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("report_statistics")
      .enableHiveSupport()
      .getOrCreate()


    val date = args(0)
    //1. 讀取前一天數(shù)據(jù)

    import spark.implicits._

    val df = spark.sparkContext.textFile(s"/user/custom/tmall/dt=$date")
      .filter(x=>{
        val strings: Array[String] = x.split(",")
        strings.length == 7
      })
      .map(x=>{
        val strings: Array[String] = x.split(",")
        (strings(0),strings(1),strings(2),strings(3),strings(6))
      }).toDF("requestid", "ts", "userid", "provice", "action")


    df.cache().createOrReplaceTempView("t1")
    // 統(tǒng)計(jì)各省用戶日活
    spark.sql(
      s"""
         |select count(distinct(userid)) as users,provice from t1 group by provice
         |union all
         |select count(distinct(userid)) as users,"all" as provice
       """.stripMargin).repartition(1).write
      .mode(SaveMode.Overwrite).csv(s"/user/data/reporting/tmall/dt=$date/users")

    // 統(tǒng)計(jì)各省銷售量

    spark.sql(
      s"""
         |select count(requestid) as cnt,provice from t1 where action=0 group by provice
         |union all
         |select count(requestid) as cnt from t1 where action=0
       """.stripMargin).repartition(1).write
      .mode(SaveMode.Overwrite).csv(s"/user/data/reporting/tmall/dt=$date/sales")
  }

}

總結(jié)

項(xiàng)目現(xiàn)在就是做到現(xiàn)在這樣守问,后面還有Azkaban任務(wù)調(diào)度,F(xiàn)lume將數(shù)據(jù)采集到ElasticSearch中坑资,用于試試查詢,還有web顯示頁(yè)面仿便。

顯示的話攒巍,報(bào)表基本做完柒莉,一個(gè)web項(xiàng)目讀取mysql展示就行,如果有想完善的鲤氢,可以完善一下卷玉,

項(xiàng)目中遇到的問(wèn)題

  1. Flume將數(shù)據(jù)采集到HDFS的時(shí)候產(chǎn)生了大量的小文件相种,后面調(diào)整了配置參數(shù)

源碼看這里: https://gitee.com/zhangqiye/tmall

如有問(wèn)題品姓,可以加群: 552113611

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末衬潦,一起剝皮案震驚了整個(gè)濱河市植酥,隨后出現(xiàn)的幾起案子弦牡,更是在濱河造成了極大的恐慌漂羊,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件椭豫,死亡現(xiàn)場(chǎng)離奇詭異赏酥,居然都是意外死亡淤毛,警方通過(guò)查閱死者的電腦和手機(jī)算柳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)瞬项,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)囱淋,“玉大人妥衣,你說(shuō)我怎么就攤上這事税手÷梗” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵麻裳,是天一觀的道長(zhǎng)器钟。 經(jīng)常有香客問(wèn)我傲霸,道長(zhǎng),這世上最難降的妖魔是什么乃摹? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮播歼,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘掰读。我一直安慰自己秘狞,他們只是感情好蹈集,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布烁试。 她就那樣靜靜地躺著拢肆,像睡著了一般减响。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上郭怪,一...
    開(kāi)封第一講書(shū)人閱讀 51,125評(píng)論 1 297
  • 那天支示,我揣著相機(jī)與錄音鄙才,去河邊找鬼。 笑死嘴纺,一個(gè)胖子當(dāng)著我的面吹牛栽渴,可吹牛的內(nèi)容都是我干的裆蒸。 我是一名探鬼主播熔萧,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼辙谜!你這毒婦竟也來(lái)了俺榆?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤装哆,失蹤者是張志新(化名)和其女友劉穎罐脊,沒(méi)想到半個(gè)月后定嗓,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡萍桌,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年宵溅,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片上炎。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡恃逻,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出藕施,到底是詐尸還是另有隱情寇损,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布裳食,位于F島的核電站矛市,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏诲祸。R本人自食惡果不足惜浊吏,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望烦绳。 院中可真熱鬧卿捎,春花似錦、人聲如沸径密。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)享扔。三九已至,卻和暖如春植袍,著一層夾襖步出監(jiān)牢的瞬間惧眠,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來(lái)泰國(guó)打工于个, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留氛魁,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓厅篓,卻偏偏與公主長(zhǎng)得像秀存,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子羽氮,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容