本文主要講述使用Kafka+Strom+Hbase搭建的一套廣告實(shí)時(shí)計(jì)算系統(tǒng)橄维。其中服務(wù)器顯示使用的是SpringBoot+Vue+ElementUI+EChats.
主要內(nèi)容:
- 1.需求
- 2.日志格式
- 3.Hbase表格設(shè)計(jì)
- 4.編寫Storm程序
- 5.Kafka接收消息
- 6.Hbase數(shù)據(jù)查詢
- 7.參考
1.需求
- 1、某個(gè)廣告在某個(gè)省的當(dāng)前投放量
- 2崭捍、某個(gè)廣告在某個(gè)市的當(dāng)前投放量
- 3、某個(gè)廣告在某個(gè)用戶客戶端上的當(dāng)前投放量
- 4茬故、某個(gè)廣告在累加一段時(shí)間內(nèi)的某個(gè)省額歷史投放趨勢(shì)
- 5墨闲、某個(gè)廣告在累加一段時(shí)間內(nèi)的某個(gè)市額歷史投放趨勢(shì)
- 6均驶、某個(gè)廣告在累加一段時(shí)間內(nèi)的某個(gè)客戶端歷史投放趨勢(shì)
- 7团滥、某個(gè)廣告的當(dāng)前的點(diǎn)擊量
- 8竿屹、某個(gè)廣告在累加一段時(shí)間內(nèi)的點(diǎn)擊趨勢(shì)
效果預(yù)覽1
效果預(yù)覽2
2.日志格式
2014-01-13\t19:11:55\t{"adid":"31789","uid":"9871","action":"view"}\t63.237.239.3\t北京\t北京
日期:2014-01-13
時(shí)間:19:11:55
Json:方便擴(kuò)展
adid:廣告ID
uid:用戶ID
action:用戶行為click、view
IP:63.237.239.3
示逆ⅰ:北京
市:北京
3.Hbase建表
表名 | realtime_ad_stat |
---|---|
行鍵 | ADID_Province_20181212 ADID_City_20181212 ADID_UID_20181212 |
列簇 | stat |
列 | view_cnt拱燃、click_cnt |
# 創(chuàng)建表
create 'realtime_ad_stat',{NAME => 'stat',VERSIONS => 2147483647}
# 查看表
list
# 清空數(shù)據(jù)
truncate 'realtime_ad_stat'
# 刪除表
disable 'realtime_ad_stat'
drop 'realtime_ad_stat'
4.編寫Storm程序
4.1.AdTopology
public class AdTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder topologyBuilder = new TopologyBuilder();
KafkaSpoutConfig<String, String> kafkaSpoutConfig =
KafkaSpoutConfig.builder("hadoop1:9092,hadoop2:9092,hadoop3:9092", "AD")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "STORM_AD_GROUP")
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
.build();
topologyBuilder.setSpout("KafkaSpout", new KafkaSpout(kafkaSpoutConfig), 2);
topologyBuilder.setBolt("me.jinkun.ad.storm.LogToModelBolt", new LogToModelBolt(), 2).localOrShuffleGrouping("KafkaSpout");
topologyBuilder.setBolt("me.jinkun.ad.storm.ToHbaseBolt", new ToHbaseBolt(), 4).localOrShuffleGrouping("me.jinkun.ad.storm.LogToModelBolt");
StormTopology topology = topologyBuilder.createTopology();
Config config = new Config();
config.setDebug(false);
if (args != null && args.length > 0) {
//運(yùn)行集群模式
config.setNumWorkers(4);
StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
} else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("AdTopology", config, topology);
}
}
}
從Kafka里讀取Topic為AD的最新的日志消息并發(fā)送個(gè)LogToModelBolt
4.2.LogToModelBolt
public class LogToModelBolt extends BaseBasicBolt {
private static final Logger LOG = LoggerFactory.getLogger(LogToModelBolt.class);
public void execute(Tuple input, BasicOutputCollector collector) {
// 2014-01-13 19:11:55 {"adid":"31789","uid":"9871","action":"view"} 63.237.239.3 北京 北京
String line = input.getStringByField("value");
if (LOG.isInfoEnabled()) {
LOG.info("line:[{}]", line);
}
String[] arr = line.split("\t", -1);
if (arr.length == 6) {
String date = arr[0].trim().replace("-", "");
String time = arr[1].trim();
String json = arr[2].trim();
String ip = arr[3].trim();
String province = arr[4].trim();
String city = arr[5].trim();
if (StringUtils.isNotEmpty(json)) {
Ad ad = new Gson().fromJson(json, Ad.class);
if (null != ad && StringUtils.isNotEmpty(ad.getAdid())) {
// 省
if (StringUtils.isNotEmpty(province)) {
String rowkey = ad.getAdid() + "_" + province + "_" + date;
collector.emit(new Values(ad.getAction(), rowkey, 1L));
}
// 市
if (StringUtils.isNotEmpty(city)) {
String rowkey = ad.getAdid() + "_" + city + "_" + date;
collector.emit(new Values(ad.getAction(), rowkey, 1L));
}
// 客戶端
if (StringUtils.isNotEmpty(province)) {
String rowkey = ad.getAdid() + "_" + ad.getUid() + "_" + date;
collector.emit(new Values(ad.getAction(), rowkey, 1L));
}
}
}
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("action", "rowkey", "cnt"));
}
}
解析Log并轉(zhuǎn)化為Model,發(fā)送給ToHbaseBolt
4.3.ToHbaseBolt
public class ToHbaseBolt extends BaseBasicBolt {
private static final Logger LOG = LoggerFactory.getLogger(ToHbaseBolt.class);
private Table table;
@Override
public void prepare(Map stormConf, TopologyContext context) {
try {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
Connection conn = ConnectionFactory.createConnection(conf);
table = conn.getTable(TableName.valueOf("realtime_ad_stat"));
} catch (IOException e) {
e.printStackTrace();
}
}
public void execute(Tuple input, BasicOutputCollector collector) {
String action = input.getStringByField("action");
String rowkey = input.getStringByField("rowkey");
Long pv = input.getLongByField("cnt");
try {
if ("view".equals(action)) {
table.incrementColumnValue(Bytes.toBytes(rowkey), Bytes.toBytes("stat"), Bytes.toBytes("view_cnt"), pv);
}
if ("click".equals(action)) {
table.incrementColumnValue(Bytes.toBytes(rowkey), Bytes.toBytes("stat"), Bytes.toBytes("click_cnt"), pv);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
ToHbaseBolt 將處理后的數(shù)據(jù)寫入到Hbase表里
5.Kafka
5.1.創(chuàng)建名為AD的Topic
#查看
kafka-topics.sh --describe \
--zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka
#創(chuàng)建AD
kafka-topics.sh --create \
--zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka \
--topic AD \
--partitions 3 \
--replication-factor 3
#消費(fèi)者AD
kafka-console-consumer.sh \
--zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka \
--topic AD \
--from-beginning
#刪除
kafka-topics.sh --delete \
--zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka \
--topic AD
5.2.模擬發(fā)送消息
public class ProducerClient {
private static final Logger LOG = LoggerFactory.getLogger(ProducerClient.class);
private static final String[] PROVINCES_CITIES = new String[]{
"山東\t濟(jì)南",
"河北\t石家莊",
"吉林\t長(zhǎng)春",
"黑龍江\t哈爾濱",
"遼寧\t沈陽(yáng)",
"內(nèi)蒙古\t呼和浩特",
"新疆\t烏魯木齊",
"甘肅\t蘭州",
"寧夏\t銀川",
"山西\t太原",
"陜西\t西安",
"河南\t鄭州",
"安徽\(chéng)t合肥",
"江蘇\t南京",
"浙江\t杭州",
"福建\t福州",
"廣東\t廣州",
"江西\t南昌",
"海南\t毫撸口",
"廣西\t南寧",
"貴州\t貴陽(yáng)",
"湖南\t長(zhǎng)沙",
"湖北\t武漢",
"四川\t成都",
"云南\t昆明",
"西藏\t拉薩",
"青海\t西寧",
"天津\t天津",
"上海\t上海",
"重慶\t重慶",
"北京\t北京",
"臺(tái)灣\t臺(tái)北",
"香港\t香港",
"澳門\t澳門"
};
private static final String[] ACTIONS = new String[]{
"view", "click"
};
private static final String[] ADIDS = new String[]{
"1", "2", "3", "4", "5"
};
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props);
boolean flag = true;
if (flag) {
for (int i = 0; i < 2000; i++) {
//3碗誉、發(fā)送數(shù)據(jù)
//2014-01-13 19:11:55 {"adid":"31789","uid":"9871"} 63.237.239.3 北京市 北京市
StringBuilder sb = new StringBuilder();
//sb.append(new SimpleDateFormat("yyyy-MM-dd").format(date));
sb.append("2018-08-10");
sb.append("\t");
sb.append("12:00:00");
sb.append("\t");
sb.append("{\"adid\":\"" + ADIDS[new Random().nextInt(ADIDS.length)] + "\",\"uid\":\"" + new Random().nextInt(200) + "\",\"action\":\"" + ACTIONS[new Random().nextInt(ACTIONS.length)] + "\"}");
sb.append("\t");
sb.append(new Random().nextInt(255) + "." + new Random().nextInt(255) + "." + new Random().nextInt(255) + "." + new Random().nextInt(255));
sb.append("\t");
sb.append(PROVINCES_CITIES[new Random().nextInt(PROVINCES_CITIES.length)]);
kafkaProducer.send(new ProducerRecord("AD", sb.toString()));
}
Thread.sleep(1000);
kafkaProducer.flush();
if (LOG.isInfoEnabled()) {
LOG.info("{}", "發(fā)送消息完成");
}
}
kafkaProducer.close();
}
}
部分日志截圖
6.Hbase數(shù)據(jù)查詢
public Map<String, Object> get(Table table, String adid, String date, String province) {
try {
if (StringUtils.isNotEmpty(date)) {
date = date.replace("-", "");
}
Map<String, Object> map = Maps.newHashMapWithExpectedSize(5);
map.put("adid", adid);
map.put("date", date);
map.put("province", province);
// adid_province_date or adid_city_date
String rowKey = adid + "_" + province + "_" + date;
Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);
//獲取stat:view_cnt
long viewCnt = 0L;
byte[] viewBytes = result.getValue(Bytes.toBytes("stat"), Bytes.toBytes("view_cnt"));
if (viewBytes != null) {
viewCnt = Bytes.toLong(viewBytes);
}
map.put("view", viewCnt);
//獲取stat:click_cnt
long clickCnt = 0L;
byte[] clickBytes = result.getValue(Bytes.toBytes("stat"), Bytes.toBytes("click_cnt"));
if (clickBytes != null) {
clickCnt = Bytes.toLong(clickBytes);
}
map.put("click", clickCnt);
return map;
} catch (IOException e) {
e.printStackTrace();
throw new ServiceException("查詢列表失敗");
}
}
使用Hbase客戶端將realtime_ad_stat表里的數(shù)據(jù)封裝成Map對(duì)象并轉(zhuǎn)為Json給前端展示
{
"data":[
{
"date":"20180810",
"view":6,
"adid":"1",
"province":"山東",
"click":4
},
{
"date":"20180810",
"view":4,
"adid":"1",
"province":"河北",
"click":8
},
{
"date":"20180810",
"view":2,
"adid":"1",
"province":"吉林",
"click":4
},
{
"date":"20180810",
"view":4,
"adid":"1",
"province":"黑龍江",
"click":2
},
{
"date":"20180810",
"view":4,
"adid":"1",
"province":"遼寧",
"click":7
},
{
"date":"20180810",
"view":6,
"adid":"1",
"province":"內(nèi)蒙古",
"click":5
},
{
"date":"20180810",
"view":10,
"adid":"1",
"province":"新疆",
"click":6
},
{
"date":"20180810",
"view":12,
"adid":"1",
"province":"甘肅",
"click":5
},
{
"date":"20180810",
"view":11,
"adid":"1",
"province":"寧夏",
"click":5
},
{
"date":"20180810",
"view":5,
"adid":"1",
"province":"山西",
"click":5
},
{
"date":"20180810",
"view":7,
"adid":"1",
"province":"陜西",
"click":5
},
{
"date":"20180810",
"view":3,
"adid":"1",
"province":"河南",
"click":6
},
{
"date":"20180810",
"view":1,
"adid":"1",
"province":"安徽",
"click":8
},
{
"date":"20180810",
"view":6,
"adid":"1",
"province":"江蘇",
"click":10
},
{
"date":"20180810",
"view":12,
"adid":"1",
"province":"浙江",
"click":5
},
{
"date":"20180810",
"view":4,
"adid":"1",
"province":"福建",
"click":2
},
{
"date":"20180810",
"view":5,
"adid":"1",
"province":"廣東",
"click":13
},
{
"date":"20180810",
"view":8,
"adid":"1",
"province":"江西",
"click":6
},
{
"date":"20180810",
"view":5,
"adid":"1",
"province":"海南",
"click":1
},
{
"date":"20180810",
"view":6,
"adid":"1",
"province":"廣西",
"click":7
},
{
"date":"20180810",
"view":5,
"adid":"1",
"province":"貴州",
"click":11
},
{
"date":"20180810",
"view":8,
"adid":"1",
"province":"湖南",
"click":8
},
{
"date":"20180810",
"view":9,
"adid":"1",
"province":"湖北",
"click":4
},
{
"date":"20180810",
"view":6,
"adid":"1",
"province":"四川",
"click":8
},
{
"date":"20180810",
"view":2,
"adid":"1",
"province":"云南",
"click":7
},
{
"date":"20180810",
"view":4,
"adid":"1",
"province":"西藏",
"click":4
},
{
"date":"20180810",
"view":4,
"adid":"1",
"province":"青海",
"click":3
},
{
"date":"20180810",
"view":16,
"adid":"1",
"province":"天津",
"click":4
},
{
"date":"20180810",
"view":12,
"adid":"1",
"province":"上海",
"click":12
},
{
"date":"20180810",
"view":10,
"adid":"1",
"province":"重慶",
"click":16
},
{
"date":"20180810",
"view":10,
"adid":"1",
"province":"北京",
"click":14
},
{
"date":"20180810",
"view":5,
"adid":"1",
"province":"臺(tái)灣",
"click":4
},
{
"date":"20180810",
"view":18,
"adid":"1",
"province":"香港",
"click":10
},
{
"date":"20180810",
"view":8,
"adid":"1",
"province":"澳門",
"click":12
}
],
"message":"操作成功!",
"resultCode":"00000"
}
7.參考:
EChats
HBase企業(yè)應(yīng)用開(kāi)發(fā)實(shí)戰(zhàn) 第8章
Hadoop集群環(huán)境搭建(三臺(tái))
Zookeeper集群安裝
Strom之WordCount
Hbase之環(huán)境搭建
Kafka之集群安裝