Storm+Hbase廣告實(shí)時(shí)統(tǒng)計(jì)

本文主要講述使用Kafka+Strom+Hbase搭建的一套廣告實(shí)時(shí)計(jì)算系統(tǒng)橄维。其中服務(wù)器顯示使用的是SpringBoot+Vue+ElementUI+EChats.

主要內(nèi)容:

  • 1.需求
  • 2.日志格式
  • 3.Hbase表格設(shè)計(jì)
  • 4.編寫Storm程序
  • 5.Kafka接收消息
  • 6.Hbase數(shù)據(jù)查詢
  • 7.參考

1.需求

  • 1、某個(gè)廣告在某個(gè)省的當(dāng)前投放量
  • 2崭捍、某個(gè)廣告在某個(gè)市的當(dāng)前投放量
  • 3、某個(gè)廣告在某個(gè)用戶客戶端上的當(dāng)前投放量
  • 4茬故、某個(gè)廣告在累加一段時(shí)間內(nèi)的某個(gè)省額歷史投放趨勢(shì)
  • 5墨闲、某個(gè)廣告在累加一段時(shí)間內(nèi)的某個(gè)市額歷史投放趨勢(shì)
  • 6均驶、某個(gè)廣告在累加一段時(shí)間內(nèi)的某個(gè)客戶端歷史投放趨勢(shì)
  • 7团滥、某個(gè)廣告的當(dāng)前的點(diǎn)擊量
  • 8竿屹、某個(gè)廣告在累加一段時(shí)間內(nèi)的點(diǎn)擊趨勢(shì)
效果預(yù)覽1

效果預(yù)覽2

2.日志格式

2014-01-13\t19:11:55\t{"adid":"31789","uid":"9871","action":"view"}\t63.237.239.3\t北京\t北京

日期:2014-01-13
時(shí)間:19:11:55
Json:方便擴(kuò)展
  adid:廣告ID
  uid:用戶ID
  action:用戶行為click、view
IP:63.237.239.3
示逆ⅰ:北京
市:北京

3.Hbase建表

表名 realtime_ad_stat
行鍵 ADID_Province_20181212 ADID_City_20181212 ADID_UID_20181212
列簇 stat
view_cnt拱燃、click_cnt
# 創(chuàng)建表
create 'realtime_ad_stat',{NAME => 'stat',VERSIONS => 2147483647}

# 查看表
list

# 清空數(shù)據(jù)
truncate 'realtime_ad_stat'

# 刪除表
disable 'realtime_ad_stat'
drop 'realtime_ad_stat'

4.編寫Storm程序

4.1.AdTopology

public class AdTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder topologyBuilder = new TopologyBuilder();

        KafkaSpoutConfig<String, String> kafkaSpoutConfig =
                KafkaSpoutConfig.builder("hadoop1:9092,hadoop2:9092,hadoop3:9092", "AD")
                        .setProp(ConsumerConfig.GROUP_ID_CONFIG, "STORM_AD_GROUP")
                        .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
                        .build();
        topologyBuilder.setSpout("KafkaSpout", new KafkaSpout(kafkaSpoutConfig), 2);
        topologyBuilder.setBolt("me.jinkun.ad.storm.LogToModelBolt", new LogToModelBolt(), 2).localOrShuffleGrouping("KafkaSpout");
        topologyBuilder.setBolt("me.jinkun.ad.storm.ToHbaseBolt", new ToHbaseBolt(), 4).localOrShuffleGrouping("me.jinkun.ad.storm.LogToModelBolt");

        StormTopology topology = topologyBuilder.createTopology();
        Config config = new Config();
        config.setDebug(false);

        if (args != null && args.length > 0) {
            //運(yùn)行集群模式
            config.setNumWorkers(4);
            StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
        } else {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("AdTopology", config, topology);
        }
    }
}

從Kafka里讀取Topic為AD的最新的日志消息并發(fā)送個(gè)LogToModelBolt

4.2.LogToModelBolt

public class LogToModelBolt extends BaseBasicBolt {

    private static final Logger LOG = LoggerFactory.getLogger(LogToModelBolt.class);

    public void execute(Tuple input, BasicOutputCollector collector) {
        // 2014-01-13   19:11:55    {"adid":"31789","uid":"9871","action":"view"}    63.237.239.3    北京 北京
        String line = input.getStringByField("value");
        if (LOG.isInfoEnabled()) {
            LOG.info("line:[{}]", line);
        }
        String[] arr = line.split("\t", -1);
        if (arr.length == 6) {
            String date = arr[0].trim().replace("-", "");
            String time = arr[1].trim();
            String json = arr[2].trim();
            String ip = arr[3].trim();
            String province = arr[4].trim();
            String city = arr[5].trim();

            if (StringUtils.isNotEmpty(json)) {
                Ad ad = new Gson().fromJson(json, Ad.class);
                if (null != ad && StringUtils.isNotEmpty(ad.getAdid())) {
                    // 省
                    if (StringUtils.isNotEmpty(province)) {
                        String rowkey = ad.getAdid() + "_" + province + "_" + date;
                        collector.emit(new Values(ad.getAction(), rowkey, 1L));
                    }

                    // 市
                    if (StringUtils.isNotEmpty(city)) {
                        String rowkey = ad.getAdid() + "_" + city + "_" + date;
                        collector.emit(new Values(ad.getAction(), rowkey, 1L));
                    }

                    // 客戶端
                    if (StringUtils.isNotEmpty(province)) {
                        String rowkey = ad.getAdid() + "_" + ad.getUid() + "_" + date;
                        collector.emit(new Values(ad.getAction(), rowkey, 1L));
                    }
                }
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("action", "rowkey", "cnt"));
    }
}

解析Log并轉(zhuǎn)化為Model,發(fā)送給ToHbaseBolt

4.3.ToHbaseBolt

public class ToHbaseBolt extends BaseBasicBolt {

    private static final Logger LOG = LoggerFactory.getLogger(ToHbaseBolt.class);

    private Table table;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        try {
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
            Connection conn = ConnectionFactory.createConnection(conf);
            table = conn.getTable(TableName.valueOf("realtime_ad_stat"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void execute(Tuple input, BasicOutputCollector collector) {
        String action = input.getStringByField("action");
        String rowkey = input.getStringByField("rowkey");
        Long pv = input.getLongByField("cnt");

        try {
            if ("view".equals(action)) {
                table.incrementColumnValue(Bytes.toBytes(rowkey), Bytes.toBytes("stat"), Bytes.toBytes("view_cnt"), pv);
            }
            if ("click".equals(action)) {
                table.incrementColumnValue(Bytes.toBytes(rowkey), Bytes.toBytes("stat"), Bytes.toBytes("click_cnt"), pv);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

ToHbaseBolt 將處理后的數(shù)據(jù)寫入到Hbase表里

5.Kafka

5.1.創(chuàng)建名為AD的Topic

#查看
kafka-topics.sh --describe \
--zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka

#創(chuàng)建AD
kafka-topics.sh --create \
--zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka \
--topic AD \
--partitions 3 \
--replication-factor 3

#消費(fèi)者AD
kafka-console-consumer.sh \
--zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka \
--topic AD \
--from-beginning

#刪除
kafka-topics.sh --delete \
--zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka \
--topic AD

5.2.模擬發(fā)送消息

public class ProducerClient {

    private static final Logger LOG = LoggerFactory.getLogger(ProducerClient.class);
    private static final String[] PROVINCES_CITIES = new String[]{
            "山東\t濟(jì)南",
            "河北\t石家莊",
            "吉林\t長(zhǎng)春",
            "黑龍江\t哈爾濱",
            "遼寧\t沈陽(yáng)",
            "內(nèi)蒙古\t呼和浩特",
            "新疆\t烏魯木齊",
            "甘肅\t蘭州",
            "寧夏\t銀川",
            "山西\t太原",
            "陜西\t西安",
            "河南\t鄭州",
            "安徽\(chéng)t合肥",
            "江蘇\t南京",
            "浙江\t杭州",
            "福建\t福州",
            "廣東\t廣州",
            "江西\t南昌",
            "海南\t毫撸口",
            "廣西\t南寧",
            "貴州\t貴陽(yáng)",
            "湖南\t長(zhǎng)沙",
            "湖北\t武漢",
            "四川\t成都",
            "云南\t昆明",
            "西藏\t拉薩",
            "青海\t西寧",
            "天津\t天津",
            "上海\t上海",
            "重慶\t重慶",
            "北京\t北京",
            "臺(tái)灣\t臺(tái)北",
            "香港\t香港",
            "澳門\t澳門"
    };
    private static final String[] ACTIONS = new String[]{
            "view", "click"
    };
    private static final String[] ADIDS = new String[]{
            "1", "2", "3", "4", "5"
    };

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props);
        boolean flag = true;
        if (flag) {
            for (int i = 0; i < 2000; i++) {
                //3碗誉、發(fā)送數(shù)據(jù)
                //2014-01-13   19:11:55    {"adid":"31789","uid":"9871"}    63.237.239.3    北京市 北京市
                StringBuilder sb = new StringBuilder();
                //sb.append(new SimpleDateFormat("yyyy-MM-dd").format(date));
                sb.append("2018-08-10");
                sb.append("\t");
                sb.append("12:00:00");
                sb.append("\t");
                sb.append("{\"adid\":\"" + ADIDS[new Random().nextInt(ADIDS.length)] + "\",\"uid\":\"" + new Random().nextInt(200) + "\",\"action\":\"" + ACTIONS[new Random().nextInt(ACTIONS.length)] + "\"}");
                sb.append("\t");
                sb.append(new Random().nextInt(255) + "." + new Random().nextInt(255) + "." + new Random().nextInt(255) + "." + new Random().nextInt(255));
                sb.append("\t");
                sb.append(PROVINCES_CITIES[new Random().nextInt(PROVINCES_CITIES.length)]);
                kafkaProducer.send(new ProducerRecord("AD", sb.toString()));
            }
            Thread.sleep(1000);
            kafkaProducer.flush();

            if (LOG.isInfoEnabled()) {
                LOG.info("{}", "發(fā)送消息完成");
            }
        }

        kafkaProducer.close();
    }
}
部分日志截圖

6.Hbase數(shù)據(jù)查詢

public Map<String, Object> get(Table table, String adid, String date, String province) {
  try {
    if (StringUtils.isNotEmpty(date)) {
      date = date.replace("-", "");
    }

    Map<String, Object> map = Maps.newHashMapWithExpectedSize(5);
    map.put("adid", adid);
    map.put("date", date);
    map.put("province", province);

    // adid_province_date or adid_city_date
    String rowKey = adid + "_" + province + "_" + date;

    Get get = new Get(Bytes.toBytes(rowKey));
    Result result = table.get(get);

    //獲取stat:view_cnt
    long viewCnt = 0L;
    byte[] viewBytes = result.getValue(Bytes.toBytes("stat"), Bytes.toBytes("view_cnt"));
    if (viewBytes != null) {
      viewCnt = Bytes.toLong(viewBytes);
    }
    map.put("view", viewCnt);

    //獲取stat:click_cnt
    long clickCnt = 0L;
    byte[] clickBytes = result.getValue(Bytes.toBytes("stat"), Bytes.toBytes("click_cnt"));
    if (clickBytes != null) {
      clickCnt = Bytes.toLong(clickBytes);
    }
    map.put("click", clickCnt);
    return map;
  } catch (IOException e) {
    e.printStackTrace();
    throw new ServiceException("查詢列表失敗");
  }
}

使用Hbase客戶端將realtime_ad_stat表里的數(shù)據(jù)封裝成Map對(duì)象并轉(zhuǎn)為Json給前端展示

{
    "data":[
        {
            "date":"20180810",
            "view":6,
            "adid":"1",
            "province":"山東",
            "click":4
        },
        {
            "date":"20180810",
            "view":4,
            "adid":"1",
            "province":"河北",
            "click":8
        },
        {
            "date":"20180810",
            "view":2,
            "adid":"1",
            "province":"吉林",
            "click":4
        },
        {
            "date":"20180810",
            "view":4,
            "adid":"1",
            "province":"黑龍江",
            "click":2
        },
        {
            "date":"20180810",
            "view":4,
            "adid":"1",
            "province":"遼寧",
            "click":7
        },
        {
            "date":"20180810",
            "view":6,
            "adid":"1",
            "province":"內(nèi)蒙古",
            "click":5
        },
        {
            "date":"20180810",
            "view":10,
            "adid":"1",
            "province":"新疆",
            "click":6
        },
        {
            "date":"20180810",
            "view":12,
            "adid":"1",
            "province":"甘肅",
            "click":5
        },
        {
            "date":"20180810",
            "view":11,
            "adid":"1",
            "province":"寧夏",
            "click":5
        },
        {
            "date":"20180810",
            "view":5,
            "adid":"1",
            "province":"山西",
            "click":5
        },
        {
            "date":"20180810",
            "view":7,
            "adid":"1",
            "province":"陜西",
            "click":5
        },
        {
            "date":"20180810",
            "view":3,
            "adid":"1",
            "province":"河南",
            "click":6
        },
        {
            "date":"20180810",
            "view":1,
            "adid":"1",
            "province":"安徽",
            "click":8
        },
        {
            "date":"20180810",
            "view":6,
            "adid":"1",
            "province":"江蘇",
            "click":10
        },
        {
            "date":"20180810",
            "view":12,
            "adid":"1",
            "province":"浙江",
            "click":5
        },
        {
            "date":"20180810",
            "view":4,
            "adid":"1",
            "province":"福建",
            "click":2
        },
        {
            "date":"20180810",
            "view":5,
            "adid":"1",
            "province":"廣東",
            "click":13
        },
        {
            "date":"20180810",
            "view":8,
            "adid":"1",
            "province":"江西",
            "click":6
        },
        {
            "date":"20180810",
            "view":5,
            "adid":"1",
            "province":"海南",
            "click":1
        },
        {
            "date":"20180810",
            "view":6,
            "adid":"1",
            "province":"廣西",
            "click":7
        },
        {
            "date":"20180810",
            "view":5,
            "adid":"1",
            "province":"貴州",
            "click":11
        },
        {
            "date":"20180810",
            "view":8,
            "adid":"1",
            "province":"湖南",
            "click":8
        },
        {
            "date":"20180810",
            "view":9,
            "adid":"1",
            "province":"湖北",
            "click":4
        },
        {
            "date":"20180810",
            "view":6,
            "adid":"1",
            "province":"四川",
            "click":8
        },
        {
            "date":"20180810",
            "view":2,
            "adid":"1",
            "province":"云南",
            "click":7
        },
        {
            "date":"20180810",
            "view":4,
            "adid":"1",
            "province":"西藏",
            "click":4
        },
        {
            "date":"20180810",
            "view":4,
            "adid":"1",
            "province":"青海",
            "click":3
        },
        {
            "date":"20180810",
            "view":16,
            "adid":"1",
            "province":"天津",
            "click":4
        },
        {
            "date":"20180810",
            "view":12,
            "adid":"1",
            "province":"上海",
            "click":12
        },
        {
            "date":"20180810",
            "view":10,
            "adid":"1",
            "province":"重慶",
            "click":16
        },
        {
            "date":"20180810",
            "view":10,
            "adid":"1",
            "province":"北京",
            "click":14
        },
        {
            "date":"20180810",
            "view":5,
            "adid":"1",
            "province":"臺(tái)灣",
            "click":4
        },
        {
            "date":"20180810",
            "view":18,
            "adid":"1",
            "province":"香港",
            "click":10
        },
        {
            "date":"20180810",
            "view":8,
            "adid":"1",
            "province":"澳門",
            "click":12
        }
    ],
    "message":"操作成功!",
    "resultCode":"00000"
}

7.參考:

EChats
HBase企業(yè)應(yīng)用開(kāi)發(fā)實(shí)戰(zhàn) 第8章
Hadoop集群環(huán)境搭建(三臺(tái))
Zookeeper集群安裝
Strom之WordCount
Hbase之環(huán)境搭建
Kafka之集群安裝

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末父晶,一起剝皮案震驚了整個(gè)濱河市哮缺,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌诱建,老刑警劉巖蝴蜓,帶你破解...
    沈念sama閱讀 219,188評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異俺猿,居然都是意外死亡茎匠,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門押袍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)诵冒,“玉大人,你說(shuō)我怎么就攤上這事谊惭∑觯” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 165,562評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵圈盔,是天一觀的道長(zhǎng)豹芯。 經(jīng)常有香客問(wèn)我,道長(zhǎng)驱敲,這世上最難降的妖魔是什么铁蹈? 我笑而不...
    開(kāi)封第一講書人閱讀 58,893評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮众眨,結(jié)果婚禮上握牧,老公的妹妹穿的比我還像新娘。我一直安慰自己娩梨,他們只是感情好沿腰,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,917評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著狈定,像睡著了一般颂龙。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 51,708評(píng)論 1 305
  • 那天厘托,我揣著相機(jī)與錄音友雳,去河邊找鬼稿湿。 笑死铅匹,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的饺藤。 我是一名探鬼主播包斑,決...
    沈念sama閱讀 40,430評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼涕俗!你這毒婦竟也來(lái)了罗丰?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 39,342評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤再姑,失蹤者是張志新(化名)和其女友劉穎萌抵,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體元镀,經(jīng)...
    沈念sama閱讀 45,801評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡绍填,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,976評(píng)論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了栖疑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片讨永。...
    茶點(diǎn)故事閱讀 40,115評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖遇革,靈堂內(nèi)的尸體忽然破棺而出卿闹,到底是詐尸還是另有隱情,我是刑警寧澤萝快,帶...
    沈念sama閱讀 35,804評(píng)論 5 346
  • 正文 年R本政府宣布锻霎,位于F島的核電站,受9級(jí)特大地震影響揪漩,放射性物質(zhì)發(fā)生泄漏旋恼。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,458評(píng)論 3 331
  • 文/蒙蒙 一氢拥、第九天 我趴在偏房一處隱蔽的房頂上張望邪乍。 院中可真熱鬧酥筝,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 32,008評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)蕊连。三九已至森缠,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背涣觉。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,135評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工痴荐, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人官册。 一個(gè)月前我還...
    沈念sama閱讀 48,365評(píng)論 3 373
  • 正文 我出身青樓生兆,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親膝宁。 傳聞我的和親對(duì)象是個(gè)殘疾皇子鸦难,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,055評(píng)論 2 355