轉(zhuǎn)自千峰王溯老師
1仙粱、用戶畫(huà)像項(xiàng)目簡(jiǎn)介
1.1 什么是用戶畫(huà)像
所謂的用戶畫(huà)像就是給用戶貼一些標(biāo)簽丁侄,通過(guò)標(biāo)簽說(shuō)明用戶是一個(gè)什么樣的人俊扭。
具體來(lái)說(shuō)棋凳,給用戶貼一些標(biāo)簽之后拦坠,根據(jù)用戶的目標(biāo)、行為和觀點(diǎn)的差異剩岳,將他們區(qū)分為不同的類型贞滨,然后從每種類型中抽取典型特征,賦予名字拍棕、照片晓铆、一些人口統(tǒng)計(jì)學(xué)要素、場(chǎng)景等描述绰播,形成了一個(gè)人物原型骄噪。
過(guò)程就是:通過(guò)客戶信息抽象為用戶畫(huà)像進(jìn)而抽象出對(duì)客戶的認(rèn)知。
1.2 用戶畫(huà)像主要維度
人口屬性:用戶是誰(shuí)(性別幅垮、年齡腰池、職業(yè)等基本信息)
消費(fèi)需求:消費(fèi)習(xí)慣和消費(fèi)偏好
購(gòu)買(mǎi)能力:收入、購(gòu)買(mǎi)力忙芒、購(gòu)買(mǎi)頻次示弓、渠道
興趣愛(ài)好:品牌偏好、個(gè)人興趣
社交屬性:用戶活躍場(chǎng)景(社交媒體等)
1.3 用戶畫(huà)像的數(shù)據(jù)類型
數(shù)據(jù)有動(dòng)態(tài)數(shù)據(jù)和靜態(tài)數(shù)據(jù)呵萨,所謂的靜態(tài)數(shù)據(jù)如性別和年齡等短期無(wú)法改變的數(shù)據(jù)奏属;而動(dòng)態(tài)數(shù)據(jù)就是如短期行為相關(guān)的數(shù)據(jù),比如說(shuō)今天我想買(mǎi)件裙子潮峦,明天我就去看褲子了囱皿,這種數(shù)據(jù)特征就是比較多變勇婴。
1.4 用戶畫(huà)像的用途
殺熟、推薦(非常多)【用戶畫(huà)像是推薦系統(tǒng)的重要數(shù)據(jù)源】嘱腥、市場(chǎng)營(yíng)銷(xiāo)耕渴、客服
讓用戶和企業(yè)雙贏。讓用戶快速找到想要的商品齿兔,讓企業(yè)找到為產(chǎn)品買(mǎi)單的人橱脸。
(一)微觀層面
在產(chǎn)品設(shè)計(jì)時(shí),通過(guò)用戶畫(huà)像來(lái)描述用戶的需求分苇。
在數(shù)據(jù)應(yīng)用上添诉,可以用來(lái)推薦、搜索医寿、風(fēng)控
將定性分析和定量分析結(jié)合栏赴,進(jìn)行數(shù)據(jù)化運(yùn)營(yíng)和用戶分析
進(jìn)行精準(zhǔn)化營(yíng)銷(xiāo)
(二)宏觀層面
確定發(fā)展的戰(zhàn)略、戰(zhàn)術(shù)方向
進(jìn)行市場(chǎng)細(xì)分與用戶分群靖秩,以市場(chǎng)為導(dǎo)向
(三)畫(huà)像建模預(yù)測(cè)
進(jìn)行人口屬性細(xì)分:明確是誰(shuí)须眷,購(gòu)買(mǎi)了什么,為什么
購(gòu)買(mǎi)行為細(xì)分:提供市場(chǎng)機(jī)會(huì)、市場(chǎng)規(guī)模等關(guān)鍵信息
產(chǎn)品需求細(xì)分:提供更具差異化競(jìng)爭(zhēng)力的產(chǎn)品規(guī)格和業(yè)務(wù)價(jià)值
興趣態(tài)度細(xì)分:提供人群類別畫(huà)像:渠道策略,定價(jià)策略斗这,產(chǎn)品策略,品牌策略
1.5 用戶畫(huà)像的步驟
(一)確定畫(huà)像的目標(biāo)
在產(chǎn)品不同生命周期捎稚,或者不同使用途徑,目標(biāo)不同求橄,對(duì)畫(huà)像的需求也有所不同今野,所以進(jìn)行畫(huà)像之前需要明確目標(biāo)是什么,需求是什么罐农。
(二)確定所需用戶畫(huà)像的維度
根據(jù)目標(biāo)確定用戶畫(huà)像所需要的維度条霜,比如說(shuō)想進(jìn)行商品推薦,就需要能影響用戶選擇商品的因素作為畫(huà)像維度涵亏。比如用戶維度(用戶的年齡宰睡、性別會(huì)影響用戶的選擇),資產(chǎn)維度(用戶的收入等因素會(huì)影響用戶對(duì)價(jià)格選擇)气筋,行為維度(用戶最近巢鹉冢看的應(yīng)該是想買(mǎi)的)等等。
(三)確定畫(huà)像的層級(jí)
用戶畫(huà)像層級(jí)越多宠默,說(shuō)明畫(huà)像粒度越小麸恍,對(duì)用戶的理解也越清晰。比如說(shuō)用戶維度,可以分為新用戶和老用戶抹沪,進(jìn)而劃分用戶的性別刻肄、年齡等。這個(gè)需要根據(jù)目標(biāo)需求進(jìn)行劃分融欧。
(四)通過(guò)原始數(shù)據(jù)敏弃,采用機(jī)器學(xué)習(xí)算法為用戶貼上標(biāo)簽
因?yàn)槲覀儷@得的原始數(shù)據(jù)是一些雜亂無(wú)章的數(shù)據(jù),所以就需要算法通過(guò)某些特征為用戶貼上標(biāo)簽
(五)通過(guò)機(jī)器學(xué)習(xí)算法將標(biāo)簽變?yōu)闃I(yè)務(wù)的輸出
每個(gè)人會(huì)有很多很多的標(biāo)簽噪馏,需要進(jìn)一步將這些標(biāo)簽轉(zhuǎn)化為對(duì)用戶的理解权她。需要對(duì)不同的標(biāo)簽建不同的權(quán)重,從而得出對(duì)業(yè)務(wù)的輸出逝薪。比如說(shuō)具有一些標(biāo)簽的用戶會(huì)喜歡什么樣的產(chǎn)品。
(六)業(yè)務(wù)產(chǎn)生數(shù)據(jù)蝴罪,數(shù)據(jù)反哺業(yè)務(wù)董济,不斷循環(huán)的閉環(huán)
1.6 常見(jiàn)的用戶畫(huà)像標(biāo)簽
2、系統(tǒng)架構(gòu)
2.1 整體架構(gòu)(線下項(xiàng)目)
2.2 數(shù)據(jù)處理流程(要做什么事)
ETL(Extract Tranform Load)用來(lái)描述數(shù)據(jù)從來(lái)源端要门,經(jīng)過(guò) 抽取虏肾、轉(zhuǎn)換、加載 到目的端的過(guò)程欢搜;
ODS(Operational Data Store)操作數(shù)據(jù)存儲(chǔ)封豪。此層數(shù)據(jù)無(wú)任何更改,直接沿用外圍系統(tǒng)數(shù)據(jù)結(jié)構(gòu)和數(shù)據(jù)炒瘟,不對(duì)外開(kāi)放吹埠;為臨時(shí)存儲(chǔ)層,是接口數(shù)據(jù)的臨時(shí)存儲(chǔ)區(qū)域疮装,為后一步的數(shù)據(jù)處理做準(zhǔn)備缘琅。
要實(shí)現(xiàn)的主要步驟:ETL、報(bào)表統(tǒng)計(jì)(數(shù)據(jù)分析)廓推、生成商圈庫(kù)刷袍、數(shù)據(jù)標(biāo)簽化(核心)
2.3 主要數(shù)據(jù)集(要分析的日志數(shù)據(jù)文件)說(shuō)明
- 為整合后的日志數(shù)據(jù),每天一份樊展,json格式(離線處理)
- 這個(gè)數(shù)據(jù)集整合了內(nèi)部呻纹、外部的數(shù)據(jù),以及競(jìng)價(jià)信息(與廣告相關(guān))
- 數(shù)據(jù)的列非常多专缠,接近百個(gè)
字段 | 解釋 |
---|---|
ip | 設(shè)備的真實(shí)IP |
sessionid | 會(huì)話標(biāo)識(shí) |
advertisersid | 廣告主ID |
adorderid | 廣告ID |
adcreativeid | 廣告創(chuàng)意ID( >= 200000 : dsp , < 200000 oss) |
adplatformproviderid | 廣告平臺(tái)商ID(>= 100000: rtb , < 100000 : api ) |
sdkversionnumber | SDK版本號(hào) |
adplatformkey | 平臺(tái)商key |
putinmodeltype | 針對(duì)廣告主的投放模式,1:展示量投放 2:點(diǎn)擊量投放 |
requestmode | 數(shù)據(jù)請(qǐng)求方式(1:請(qǐng)求雷酪、2:展示、3:點(diǎn)擊) |
adprice | 廣告價(jià)格 |
adppprice | 平臺(tái)商價(jià)格 |
requestdate | 請(qǐng)求時(shí)間,格式為:yyyy-m-dd hh:mm:ss |
appid | 應(yīng)用id |
appname | 應(yīng)用名稱 |
uuid | 設(shè)備唯一標(biāo)識(shí)藤肢,比如imei或者androidid等 |
device | 設(shè)備型號(hào)太闺,如htc、iphone |
client | 設(shè)備類型 (1:android 2:ios 3:wp) |
osversion | 設(shè)備操作系統(tǒng)版本嘁圈,如4.0 |
density | 備屏幕的密度 android的取值為0.75省骂、1蟀淮、1.5,ios的取值為:1、2 |
pw | 設(shè)備屏幕寬度 |
ph | 設(shè)備屏幕高度 |
provincename | 設(shè)備所在省份名稱 |
cityname | 設(shè)備所在城市名稱 |
ispid | 運(yùn)營(yíng)商id |
ispname | 運(yùn)營(yíng)商名稱 |
networkmannerid | 聯(lián)網(wǎng)方式id |
networkmannername | 聯(lián)網(wǎng)方式名稱 |
iseffective | 有效標(biāo)識(shí)(有效指可以正常計(jì)費(fèi)的)(0:無(wú)效 1:有效) |
isbilling | 是否收費(fèi)(0:未收費(fèi) 1:已收費(fèi)) |
adspacetype | 廣告位類型(1:banner 2:插屏 3:全屏) |
adspacetypename | 廣告位類型名稱(banner钞澳、插屏怠惶、全屏) |
devicetype | 設(shè)備類型(1:手機(jī) 2:平板) |
processnode | 流程節(jié)點(diǎn)(1:請(qǐng)求量kpi 2:有效請(qǐng)求 3:廣告請(qǐng)求) |
apptype | 應(yīng)用類型id |
district | 設(shè)備所在縣名稱 |
paymode | 針對(duì)平臺(tái)商的支付模式,1:展示量投放(CPM) 2:點(diǎn)擊量投放(CPC) |
isbid | 是否rtb |
bidprice | rtb競(jìng)價(jià)價(jià)格 |
winprice | rtb競(jìng)價(jià)成功價(jià)格 |
iswin | 是否競(jìng)價(jià)成功 |
cur | values:usd|rmb等 |
rate | 匯率 |
cnywinprice | rtb競(jìng)價(jià)成功轉(zhuǎn)換成人民幣的價(jià)格 |
imei | 手機(jī)串碼 |
mac | 手機(jī)MAC碼 |
idfa | 手機(jī)APP的廣告碼 |
openudid | 蘋(píng)果設(shè)備的識(shí)別碼 |
androidid | 安卓設(shè)備的識(shí)別碼 |
rtbprovince | rtb 省 |
rtbcity | rtb 市 |
rtbdistrict | rtb 區(qū) |
rtbstreet | rtb 街道 |
storeurl | app的市場(chǎng)下載地址 |
realip | 真實(shí)ip |
isqualityapp | 優(yōu)選標(biāo)識(shí) |
bidfloor | 底價(jià) |
aw | 廣告位的寬 |
ah | 廣告位的高 |
imeimd5 | imei_md5 |
macmd5 | mac_md5 |
idfamd5 | idfa_md5 |
openudidmd5 | openudid_md5 |
androididmd5 | androidid_md5 |
imeisha1 | imei_sha1 |
macsha1 | mac_sha1 |
idfasha1 | idfa_sha1 |
openudidsha1 | openudid_sha1 |
androididsha1 | androidid_sha1 |
uuidunknow | uuid_unknow UUID密文 |
userid | 平臺(tái)用戶id |
iptype | 表示ip庫(kù)類型轧粟,1為點(diǎn)媒ip庫(kù)策治,2為廣告協(xié)會(huì)的ip地理信息標(biāo)準(zhǔn)庫(kù),默認(rèn)為1 |
initbidprice | 初始出價(jià) |
adpayment | 轉(zhuǎn)換后的廣告消費(fèi)(保留小數(shù)點(diǎn)后6位) |
agentrate | 代理商利潤(rùn)率 |
lomarkrate | 代理利潤(rùn)率 |
adxrate | 媒介利潤(rùn)率 |
title | 標(biāo)題 |
keywords | 關(guān)鍵字 |
tagid | 廣告位標(biāo)識(shí)(當(dāng)視頻流量時(shí)值為視頻ID號(hào)) |
callbackdate | 回調(diào)時(shí)間 格式為:YYYY/mm/dd hh:mm:ss |
channelid | 頻道ID |
mediatype | 媒體類型 |
用戶email | |
tel | 用戶電話號(hào)碼 |
sex | 用戶性別 |
age | 用戶年齡 |
3兰吟、創(chuàng)建工程【第二天重點(diǎn)】
3.1 步驟
-
新建一個(gè)Maven項(xiàng)目通惫,用于處理數(shù)據(jù)
Maven 管理項(xiàng)目中用到的所有jar
-
修改 pom.xml 文件,增加:
- 定義依賴版本
- 導(dǎo)入依賴
- 定義配置文件
創(chuàng)建scala目錄(src混蔼、test中分別創(chuàng)建)
在scala目錄中創(chuàng)建包 cn.itbigdata.dmp
編寫(xiě)一個(gè)主程序的架構(gòu) (DMPApp)
增加配置文件 dev/application.conf
新增目錄 utils履腋,新增參數(shù)解析類 ConfigHolder
3.2 修改pom.xml文件【重點(diǎn)】
-
設(shè)置依賴版本信息
<properties> <scala.version>2.11.8</scala.version> <scala.version.simple>2.11</scala.version.simple> <hadoop.version>2.6.1</hadoop.version> <spark.version>2.3.3</spark.version> <hive.version>1.1.0</hive.version> <fastjson.version>1.2.44</fastjson.version> <geoip.version>1.3.0</geoip.version> <geoip2.version>2.12.0</geoip2.version> <config.version>1.2.1</config.version> </properties>
-
導(dǎo)入計(jì)算引擎的依賴
<!-- scala --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-xml</artifactId> <version>2.11.0-M4</version> </dependency> <!-- hadoop --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- spark core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version.simple}</artifactId> <version>${spark.version}</version> </dependency> <!-- spark sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version.simple}</artifactId> <version>${spark.version}</version> </dependency> <!-- spark graphx --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-graphx_${scala.version.simple}</artifactId> <version>${spark.version}</version> </dependency>
-
<font color=red>導(dǎo)入存儲(chǔ)引擎的依賴(可省略)</font>
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <exclusions> <exclusion> <groupId>org.apache.hive</groupId> <artifactId>hive-service-rpc</artifactId> </exclusion> <exclusion> <groupId>org.apache.hive</groupId> <artifactId>hive-service</artifactId> </exclusion> </exclusions> <version>${hive.version}</version> </dependency>
-
導(dǎo)入工具依賴
<!-- 用于IP地址轉(zhuǎn)換(經(jīng)度、維度) --> <dependency> <groupId>com.maxmind.geoip</groupId> <artifactId>geoip-api</artifactId> <version>${geoip.version}</version> </dependency> <dependency> <groupId>com.maxmind.geoip2</groupId> <artifactId>geoip2</artifactId> <version>${geoip2.version}</version> </dependency> <!-- 將經(jīng)緯度轉(zhuǎn)換為編碼 --> <dependency> <groupId>ch.hsr</groupId> <artifactId>geohash</artifactId> <version>${geoip.version}</version> </dependency> <!-- scala解析json --> <dependency> <groupId>org.json4s</groupId> <artifactId>json4s-jackson_${scala.version.simple}</artifactId> <version>3.6.5</version> </dependency> <!-- 管理配置文件 --> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>${config.version}</version> </dependency>
-
<font color=red>導(dǎo)入編譯配置(可省略)</font>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <mainClass>cn.itcast.dmp.processing.App</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build>
3.3 創(chuàng)建scala代碼包
在 src/main/ 下 創(chuàng)建scala代碼包
在scala包中創(chuàng)建 cn.itbigdata.dmp 包
-
在 cn.itbigdata.dmp 包下創(chuàng)建
- beans(存放類的定義)
- etl(etl相關(guān)處理)
- report(報(bào)表處理)
- tradingarea(商圈庫(kù))
- tags(標(biāo)簽處理)
- customtrait(存放接口定義)
- utils(存放工具類)
在整個(gè)工程中建立data目錄惭嚣,存放要處理的數(shù)據(jù)
3.4 DmpApp 主程序(初始化部分)
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
// 項(xiàng)目的主程序遵湖,在這里完成相關(guān)的任務(wù)
object DmpApp {
def main(args: Array[String]): Unit = {
// 初始化
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("DmpApp")
.set("spark.worker.timeout", "600s")
.set("spark.cores.max", "10")
.set("spark.rpc.askTimeout", "600s")
.set("spark.network.timeout", "600s")
.set("spark.task.maxFailures", "5")
.set("spark.speculation", "true")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.buffer.pageSize", "8m")
.set("park.debug.maxToStringFields", "200")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
// 關(guān)閉資源
spark.close()
}
}
3.5 spark相關(guān)參數(shù)解釋
參數(shù)名 | 默認(rèn)值 | 定義值 |
---|---|---|
spark.worker.timeout | 60 | 500 |
spark.network.timeout | 120s | 600s |
spark.rpc.askTimeout | spark.network.timeout | 600s |
spark.cores.max | 10 | |
spark.task.maxFailures | 4 | 5 |
spark.speculation | false | true |
spark.driver.allowMultipleContexts | false | true |
spark.serializer | org.apache.spark.serializer.JavaSerializer | org.apache.spark.serializer.KryoSerializer |
spark.buffer.pageSize | 1M - 64M,系統(tǒng)計(jì)算 | 8M |
- spark.worker.timeout: 網(wǎng)絡(luò)故障導(dǎo)致心跳長(zhǎng)時(shí)間不上報(bào)給master晚吞,經(jīng)過(guò)spark.worker.timeout(秒)時(shí)間后延旧,master檢測(cè)到worker異常,標(biāo)識(shí)為DEAD狀態(tài)槽地,同時(shí)移除掉worker信息以及其上面的executor信息迁沫;
- spark.network.timeout:所有網(wǎng)絡(luò)交互的默認(rèn)超時(shí)。由網(wǎng)絡(luò)或者 gc 引起捌蚊,worker或executor沒(méi)有接收到executor或task的心跳反饋弯洗。提高 spark.network.timeout 的值,根據(jù)情況改成300(5min)或更高逢勾;
- spark.rpc.askTimeout: rpc 調(diào)用的超時(shí)時(shí)間牡整;
- spark.cores.max:每個(gè)應(yīng)用程序所能申請(qǐng)的最大CPU核數(shù);
- spark.task.maxFailures:當(dāng)task執(zhí)行失敗時(shí)溺拱,并不會(huì)直接導(dǎo)致整個(gè)應(yīng)用程序down掉逃贝,只有在重試了 spark.task.maxFailures 次后任然失敗的情況下才會(huì)使程序down掉;
- spark.speculation:推測(cè)執(zhí)行是指對(duì)于一個(gè)Stage里面運(yùn)行慢的Task迫摔,會(huì)在其他節(jié)點(diǎn)的Executor上再次啟動(dòng)這個(gè)task沐扳,如果其中一個(gè)Task實(shí)例運(yùn)行成功則將這個(gè)最先完成的Task的計(jì)算結(jié)果作為最終結(jié)果,同時(shí)會(huì)干掉其他Executor上運(yùn)行的實(shí)例句占,從而加快運(yùn)行速度沪摄;
- spark.driver.allowMultipleContexts: SparkContext默認(rèn)只有一個(gè)實(shí)例,設(shè)置為true允許有多個(gè)實(shí)例;
- spark.serializer:在Spark的架構(gòu)中杨拐,在網(wǎng)絡(luò)中傳遞的或者緩存在內(nèi)存祈餐、硬盤(pán)中的對(duì)象需要進(jìn)行序列化操作【發(fā)給Executor上的Task;需要緩存的RDD(前提是使用序列化方式緩存)哄陶;廣播變量帆阳;shuffle過(guò)程中的數(shù)據(jù)緩存等】;默認(rèn)的Java序列化方式性能不高屋吨,同時(shí)序列化后占用的字節(jié)數(shù)也較多蜒谤;官方也推薦使用Kryo的序列化庫(kù)。官方文檔介紹至扰,Kryo序列化機(jī)制比Java序列化機(jī)制性能提高10倍左右鳍徽;
- spark.buffer.pageSize: spark內(nèi)存分配的單位,無(wú)默認(rèn)值敢课,大小在1M-64M之間旬盯,spark根據(jù)jvm堆內(nèi)存大小計(jì)算得到;值過(guò)小翎猛,內(nèi)存分配效率低;值過(guò)大接剩,造成內(nèi)存的浪費(fèi)切厘;
3.6 開(kāi)發(fā)環(huán)境參數(shù)配置文件
application.conf
// 開(kāi)發(fā)環(huán)境參數(shù)配置文件
# App 信息
spark.appname="dmpApp"
# spark 信息
spark.master="local[*]"
spark.worker.timeout="120"
spark.cores.max="10"
spark.rpc.askTimeout="600s"
spark.network.timeout="600s"
spark.task.maxFailures="5"
spark.speculation="true"
spark.driver.allowMultipleContexts="true"
spark.serializer="org.apache.spark.serializer.KryoSerializer"
spark.buffer.pageSize="8m"
# kudu 信息
kudu.master="node1:7051,node2:7051,node3:7051"
# 輸入數(shù)據(jù)的信息
addata.path="data/dataset_main.json"
ipdata.geo.path="data/dataset_geoLiteCity.dat"
qqwrydat.path="data/dataset_qqwry.dat"
installDir.path="data"
# 對(duì)應(yīng)ETL輸出信息
ods.prefix="ods"
ad.data.tablename="adinfo"
# 輸出報(bào)表對(duì)應(yīng):地域統(tǒng)計(jì)、廣告地域懊缺、APP疫稿、設(shè)備、網(wǎng)絡(luò)鹃两、運(yùn)營(yíng)商遗座、渠道 7個(gè)分析
report.region.stat.tablename="RegionStatAnalysis"
report.region.tablename="AdRegionAnalysis"
report.app.tablename="AppAnalysis"
report.device.tablename="DeviceAnalysis"
report.network.tablename="NetworkAnalysis"
report.isp.tablename="IspAnalysis"
report.channel.tablename="ChannelAnalysis"
# 高德API
gaoDe.app.key="a94274923065a14222172c9b933f4a4e"
gaoDe.url="https://restapi.amap.com/v3/geocode/regeo?"
# GeoHash (key的長(zhǎng)度)
geohash.key.length=10
# 商圈庫(kù)
trading.area.tablename="tradingArea"
# tags
non.empty.field="imei,mac,idfa,openudid,androidid,imeimd5,macmd5,idfamd5,openudidmd5,androididmd5,imeisha1,macsha1,idfasha1,openudidsha1,androididsha1"
appname.dic.path="data/dic_app"
device.dic.path="data/dic_device"
tags.table.name.prefix="tags"
# 標(biāo)簽衰減系數(shù)
tag.coeff="0.92"
# es 相關(guān)參數(shù)
es.cluster.name="cluster_es"
es.index.auto.create="true"
es.Nodes="192.168.40.164"
es.port="9200"
es.index.reads.missing.as.empty="true"
es.nodes.discovery="false"
es.nodes.wan.only="true"
es.http.timeout="2000000"
3.7 配置文件解析類
// 解析參數(shù)文件幫助類
import com.typesafe.config.ConfigFactory
object ConfigHolder {
private val config = ConfigFactory.load()
// App Info
lazy val sparkAppName: String = config.getString("spark.appname")
// Spark parameters
lazy val sparkMaster: String = config.getString("spark.master")
lazy val sparkParameters: List[(String, String)] = List(
("spark.worker.timeout", config.getString("spark.worker.timeout")),
("spark.cores.max", config.getString("spark.cores.max")),
("spark.rpc.askTimeout", config.getString("spark.rpc.askTimeout")),
("spark.network.timeout", config.getString("spark.network.timeout")),
("spark.task.maxFailures", config.getString("spark.task.maxFailures")),
("spark.speculation", config.getString("spark.speculation")),
("spark.driver.allowMultipleContexts", config.getString("spark.driver.allowMultipleContexts")),
("spark.serializer", config.getString("spark.serializer")),
("spark.buffer.pageSize", config.getString("spark.buffer.pageSize"))
)
// kudu parameters
lazy val kuduMaster: String = config.getString("kudu.master")
// input dataset
lazy val adDataPath: String = config.getString("addata.path")
lazy val ipsDataPath: String = config.getString("ipdata.geo.path")
def ipToRegionFilePath: String = config.getString("qqwrydat.path")
def installDir: String = config.getString("installDir.path")
// output dataset
private lazy val delimiter = "_"
private lazy val odsPrefix: String = config.getString("ods.prefix")
private lazy val adInfoTableName: String = config.getString("ad.data.tablename")
// lazy val ADMainTableName = s"$odsPrefix$delimiter$adInfoTableName$delimiter${DateUtils.getTodayDate()}"
// report
lazy val Report1RegionStatTableName: String = config.getString("report.region.stat.tablename")
lazy val ReportRegionTableName: String = config.getString("report.region.tablename")
lazy val ReportAppTableName: String = config.getString("report.app.tablename")
lazy val ReportDeviceTableName: String = config.getString("report.device.tablename")
lazy val ReportNetworkTableName: String = config.getString("report.network.tablename")
lazy val ReportIspTableName: String = config.getString("report.isp.tablename")
lazy val ReportChannelTableName: String = config.getString("report.channel.tablename")
// GaoDe API
private lazy val gaoDeKey: String = config.getString("gaoDe.app.key")
private lazy val gaoDeTempUrl: String = config.getString("gaoDe.url")
lazy val gaoDeUrl: String = s"$gaoDeTempUrl&key=$gaoDeKey"
// GeoHash
lazy val keyLength: Int = config.getInt("geohash.key.length")
// 商圈庫(kù)
lazy val tradingAreaTableName: String =config.getString("trading.area.tablename")
// tags
lazy val idFields: String = config.getString("non.empty.field")
lazy val filterSQL: String = idFields
.split(",")
.map(field => s"$field is not null ")
.mkString(" or ")
lazy val appNameDic: String = config.getString("appname.dic.path")
lazy val deviceDic: String = config.getString("device.dic.path")
lazy val tagsTableNamePrefix: String = config.getString("tags.table.name.prefix") + delimiter
lazy val tagCoeff: Double = config.getDouble("tag.coeff")
// 加載 elasticsearch 相關(guān)參數(shù)
lazy val ESSparkParameters = List(
("cluster.name", config.getString("es.cluster.name")),
("es.index.auto.create", config.getString("es.index.auto.create")),
("es.nodes", config.getString("es.Nodes")),
("es.port", config.getString("es.port")),
("es.index.reads.missing.as.empty", config.getString("es.index.reads.missing.as.empty")),
("es.nodes.discovery", config.getString("es.nodes.discovery")),
("es.nodes.wan.only", config.getString("es.nodes.wan.only")),
("es.http.timeout", config.getString("es.http.timeout"))
)
def main(args: Array[String]): Unit = {
println(ConfigHolder.sparkParameters)
println(ConfigHolder.installDir)
}
}
3.8 DmpApp主程序(使用配置文件)
import cn.itbigdata.dmp.utils.ConfigHolder
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DmpApp {
def main(args: Array[String]): Unit = {
// 1、初始化(SparkConf俊扳、SparkSession)
val conf = new SparkConf()
.setAppName(ConfigHolder.sparkAppName)
.setMaster(ConfigHolder.sparkMaster)
.setAll(ConfigHolder.sparkParameters)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
spark.sparkContext.setLogLevel("warn")
println("OK!")
// 1途蒋、ETL
// 2、報(bào)表
// 3馋记、生成商圈庫(kù)
// 4号坡、標(biāo)簽化
// 關(guān)閉資源
spark.close()
}
}
4、ETL開(kāi)發(fā)
需求:
-
將數(shù)據(jù)文件每一行中的 ip 地址梯醒,轉(zhuǎn)換為經(jīng)度宽堆、維度、省茸习、市的信息畜隶;
ip => 經(jīng)度、維度、省籽慢、市
保存轉(zhuǎn)換后的數(shù)據(jù)文件(每天一個(gè)文件)
處理步驟:
- 讀數(shù)據(jù)
- 數(shù)據(jù)處理
- 找出每一行數(shù)據(jù)中的ip地址
- 根據(jù)ip地址浸遗,算出對(duì)應(yīng)的省、市嗡综、經(jīng)度乙帮、緯度,添加到每行數(shù)據(jù)的尾部
- 保存數(shù)據(jù)
- 其他需求:數(shù)據(jù)每日加載一次极景,每天的數(shù)據(jù)單獨(dú)存放在一個(gè)文件中
難點(diǎn)問(wèn)題:處理數(shù)據(jù)(IP地址如何轉(zhuǎn)化為省察净、市、經(jīng)度盼樟、緯度)
4.1 搭建ETL架構(gòu)
新建trait(Processor)氢卡,為數(shù)據(jù)處理提供一個(gè)統(tǒng)一的接口類
import org.apache.spark.sql.SparkSession
// 數(shù)據(jù)處理接口
// SparkSession 用于數(shù)據(jù)的加載和處理
// KuduContext 用于數(shù)據(jù)的保存
trait Processor {
def process(spark: SparkSession)
}
新建 ETLProcessor ,負(fù)責(zé)ETL處理
import cn.itbigdata.dmp.customtrait.Processor
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.sql.SparkSession
object ETLProcessor extends Processor{
override def process(spark: SparkSession): Unit = {
// 定義參數(shù)
val sourceDataFile = ConfigHolder.adLogPath
val sinkDataPath = ""
// 1 讀數(shù)據(jù)
val sourceDF: DataFrame = spark.read.json(sourceDataFile)
// 2 處理數(shù)據(jù)
// 2.1 找到ip
// 2.2 將ip 轉(zhuǎn)為 省晨缴、市译秦、經(jīng)度、維度
val rdd = sourceDF.rdd
.map(row => {
val ip: String = row.getAs[String]("ip")
ip
})
// 2.3 將省击碗、市筑悴、經(jīng)度、維度放在原數(shù)據(jù)的最后
// 3 保存數(shù)據(jù)
}
}
4.2 IP地址轉(zhuǎn)換為經(jīng)緯度
- 使用GeoIP稍途,將ip地址轉(zhuǎn)為經(jīng)緯度
- GeoIP阁吝,是一套含IP數(shù)據(jù)庫(kù)的軟件工具
- Geo根據(jù)來(lái)訪者的IP, 定位該IP所在經(jīng)緯度械拍、國(guó)家/地區(qū)突勇、省市、和街道等位置信息
- GeoIP有兩個(gè)版本坷虑,一個(gè)免費(fèi)版甲馋,一個(gè)收費(fèi)版本
- 收費(fèi)版本的準(zhǔn)確率高一些,更新頻率也更頻繁
- 因?yàn)镚eoIP讀取的是本地的二進(jìn)制IP數(shù)據(jù)庫(kù)迄损,所以效率很高
4.3 IP地址轉(zhuǎn)換為省市
- 純真數(shù)據(jù)庫(kù)定躏,將ip轉(zhuǎn)為省、市
- 純真數(shù)據(jù)庫(kù)收集了包括中國(guó)電信芹敌、中國(guó)移動(dòng)共屈、中國(guó)聯(lián)通、長(zhǎng)城寬帶党窜、聚友寬帶等 ISP 的 IP 地址數(shù)據(jù)
- 純真數(shù)據(jù)庫(kù)是二進(jìn)制文件拗引,有開(kāi)源的java代碼,簡(jiǎn)單的修改幌衣,調(diào)用就可以了
case class Location(ip: String, region: String, city: String, longitude: Float, latitude: Float)
private def ipToLocation(ip: String): Location ={
// 1 獲取service
val service = new LookupService("data/geoLiteCity.dat")
// 2 獲取Location
val longAndLatLocation = service.getLocation(ip)
// 3 獲取經(jīng)度矾削、維度
val longitude = longAndLatLocation.longitude
val latitude = longAndLatLocation.latitude
// 4 利用純真數(shù)據(jù)庫(kù)獲取省市
val ipService = new IPAddressUtils
val regeinLocation: IPLocation = ipService.getregion(ip)
val region = regeinLocation.getRegion
val city = regeinLocation.getCity
Location(ip, region, city, longitude, latitude)
}
需要實(shí)現(xiàn)幫助類:
-
計(jì)算當(dāng)天日期
import java.util.{Calendar, Date} import org.apache.commons.lang.time.FastDateFormat object DateUtils { def getToday: String = { val now = new Date FastDateFormat.getInstance("yyyyMMdd").format(now) } def getYesterday: String = { val calendar: Calendar = Calendar.getInstance calendar.set(Calendar.HOUR_OF_DAY, -24) FastDateFormat.getInstance("yyyyMMdd").format(calendar.getTime()) } def main(args: Array[String]): Unit = { println(getToday) println(getYesterday) } }
4.4 ETL完整實(shí)現(xiàn)
import java.util.Calendar
import cn.itbigdata.dmp.customtrait.Processor
import cn.itbigdata.dmp.util.iplocation.{IPAddressUtils, IPLocation}
import com.maxmind.geoip.LookupService
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object ETLProcessor extends Processor{
// 定義參數(shù)
private val sourceDataFile: String = "data/data.json"
private val sinkDataPath: String = s"outputdata/maindata.${getYesterday}"
private val geoFilePath: String = "data/geoLiteCity.dat"
override def process(spark: SparkSession): Unit = {
// 1 讀數(shù)據(jù)
val sourceDF: DataFrame = spark.read.json("data/data.json")
// 2 處理數(shù)據(jù)
// 2.1 找到ip
// 2.2 將ip 轉(zhuǎn)為 省壤玫、市、經(jīng)度哼凯、維度
import spark.implicits._
val ipDF: DataFrame = sourceDF.rdd
.map { row =>
val ip = row.getAs[String]("ip")
// 將ip轉(zhuǎn)換為 省欲间、市、經(jīng)度断部、緯度
ip2Location(ip)
}.toDF
// 2.2 ipDF 與 sourceDF 做join猎贴,給每一行增加省、市蝴光、經(jīng)緯度
val sinkDF: DataFrame = sourceDF.join(ipDF, Seq("ip"), "inner")
// 3 保存數(shù)據(jù)
sinkDF.write.mode(SaveMode.Overwrite).json(sinkDataPath)
}
case class Location(ip: String, region: String, city: String, longitude: Float, latitude: Float)
private def ip2Location(ip: String): Location ={
// 1 獲取service
val service = new LookupService(geoFilePath)
// 2 獲取Location
val longAndLatLocation = service.getLocation(ip)
// 3 獲取經(jīng)度她渴、維度
val longitude = longAndLatLocation.longitude
val latitude = longAndLatLocation.latitude
// 4 利用純真數(shù)據(jù)庫(kù)獲取省市
val ipService = new IPAddressUtils
val regionLocation: IPLocation = ipService.getregion(ip)
val region = regionLocation.getRegion
val city = regionLocation.getCity
Location(ip, region, city, longitude, latitude)
}
private def getYesterday: String = {
val calendar: Calendar = Calendar.getInstance
calendar.set(Calendar.HOUR_OF_DAY, -24)
FastDateFormat.getInstance("yyyyMMdd").format(calendar.getTime())
}
}
5、報(bào)表開(kāi)發(fā)(數(shù)據(jù)分析--SparkSQL)
需要處理的報(bào)表
- 統(tǒng)計(jì)各地域的數(shù)量分布情況(RegionStatProcessor)
- 廣告投放的地域分布情況統(tǒng)計(jì)(RegionAnalysisProcessor)
- APP分布情況統(tǒng)計(jì)(AppAnalysisProcessor)
- 手機(jī)設(shè)備類型分布情況統(tǒng)計(jì)(DeviceAnalysisProcessor)
- 網(wǎng)絡(luò)類型分布情況統(tǒng)計(jì)(NetworkAnalysisProcessor)
- 網(wǎng)絡(luò)運(yùn)營(yíng)商分布情況統(tǒng)計(jì)(IspAnalysisProcessor)
- 渠道分布情況統(tǒng)計(jì)(ChannelAnalysisProcessor)
5.1 數(shù)據(jù)地域分布
- 報(bào)表處理的步驟
- 了解業(yè)務(wù)需求:根據(jù)省蔑祟、市分組趁耗,求數(shù)據(jù)量的分布情況
- 源數(shù)據(jù):為每天的日志數(shù)據(jù),即ETL的結(jié)果數(shù)據(jù)疆虚;
- 目標(biāo)數(shù)據(jù):保存在本地文件中苛败,每個(gè)報(bào)表對(duì)應(yīng)目錄;
- 編寫(xiě)SQL径簿,并測(cè)試
- 代碼實(shí)現(xiàn)
- 定義 RegionStatProcessor 繼承自Processor罢屈,實(shí)現(xiàn)process方法。具體實(shí)現(xiàn)步驟如下:
import cn.itbigdata.dmp.customtrait.Processor
import cn.itbigdata.dmp.utils.{ConfigHolder, DateUtils}
import org.apache.spark.sql.{SaveMode, SparkSession}
object RegionStatProcessor extends Processor{
override def process(spark: SparkSession): Unit = {
// 定義參數(shù)
val sourceDataPath = s"outputdata/maindata-${DateUtils.getYesterday}"
val sinkDataPath = "output/regionstat"
// 讀文件
val sourceDF = spark.read.json(sourceDataPath)
sourceDF.createOrReplaceTempView("adinfo")
// 處理數(shù)據(jù)
val RegionSQL1 =
"""
|select to_date(now()) as statdate, region, city, count(*) as infocount
| from adinfo
|group by region, city
|""".stripMargin
val sinkDF = spark.sql(RegionSQL1)
sinkDF.show()
// 寫(xiě)文件
sinkDF.coalesce(1).write.mode(SaveMode.Append).json(sinkDataPath)
}
}
5.2 廣告投放地域分布
按照需求篇亭,完成以下模式的報(bào)表
備注:要求3個(gè)率:競(jìng)價(jià)成功率缠捌、廣告點(diǎn)擊率,媒體點(diǎn)擊率
指標(biāo)計(jì)算邏輯
指標(biāo) | 說(shuō)明 | adplatformproviderid | requestmode | processnode | iseffective | isbilling | isbid | iswin | adorderid | adcreativeid |
---|---|---|---|---|---|---|---|---|---|---|
原始請(qǐng)求 | 發(fā)來(lái)的所有原始請(qǐng)求數(shù) | 1 | >=1 | |||||||
有效請(qǐng)求 | 滿足有效體檢的數(shù)量 | 1 | >=2 | |||||||
廣告請(qǐng)求 | 滿足廣告請(qǐng)求的請(qǐng)求數(shù)量 | 1 | 3 | |||||||
參與競(jìng)價(jià)數(shù) | 參與競(jìng)價(jià)的次數(shù) | >=100000 | 1 | 1 | 1 | !=0 | ||||
競(jìng)價(jià)成功數(shù) | 成功競(jìng)價(jià)的次數(shù) | >=100000 | 1 | 1 | 1 | |||||
(廣告主)展示數(shù) | 針對(duì)廣告主統(tǒng)計(jì):廣告最終在終端被展示的數(shù)量 | 2 | 1 | |||||||
(廣告主)點(diǎn)擊數(shù) | 針對(duì)廣告主統(tǒng)計(jì):廣告被展示后暗赶,實(shí)際被點(diǎn)擊的數(shù)量 | 3 | 1 | |||||||
(媒介)展示數(shù) | 針對(duì)媒介統(tǒng)計(jì):廣告在終端被展示的數(shù)量 | 2 | 1 | 1 | ||||||
(媒介)點(diǎn)擊數(shù) | 針對(duì)媒介統(tǒng)計(jì):展示的廣告實(shí)際被點(diǎn)擊的數(shù)量 | 3 | 1 | 1 | ||||||
DSP廣告消費(fèi) | winprice/1000 | >=100000 | 1 | 1 | 1 | >200000 | >200000 | |||
DSP廣告成本 | Adptment/1000 | >=100000 | 1 | 1 | 1 | >200000 | >200000 |
DSP廣告消費(fèi) = DSP的RTB的錢(qián)
DSP廣告成本 = 廣告主付給DSP的錢(qián)
DSP的盈利 = DSP廣告成本 - DSP廣告消費(fèi)
備注:對(duì)應(yīng)字段:
OriginalRequest、ValidRequest肃叶、adRequest
bidsNum蹂随、bidsSus、bidRate
adDisplayNum因惭、adClickNum岳锁、adClickRate
MediumDisplayNum、MediumClickNum蹦魔、MediumClickRate
adconsume激率、adcost
代碼實(shí)現(xiàn)
import cn.itbigdata.dmp.customtrait.Processor
import cn.itbigdata.dmp.utils.DateUtils
import org.apache.spark.sql.{SaveMode, SparkSession}
object RegionAnalysisProcessor extends Processor{
override def process(spark: SparkSession): Unit = {
// 定義參數(shù)
val sourceDataPath = s"outputdata/maindata-${DateUtils.getYesterday}"
val sinkDataPath = "outputdata/regionanalysis"
// 讀文件
val sourceDF = spark.read.json(sourceDataPath)
sourceDF.createOrReplaceTempView("adinfo")
// 處理數(shù)據(jù)
val RegionSQL1 =
"""
|select to_date(now()) statdate, region, city,
| sum(case when requestmode=1 and processnode>=1 then 1 else 0 end) as OriginalRequest,
| sum(case when requestmode=1 and processnode>=2 then 1 else 0 end) as ValidRequest,
| sum(case when requestmode=1 and processnode=3 then 1 else 0 end) as adRequest,
| sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and isbid=1 and adorderid!=0
| then 1 else 0 end) as bidsNum,
| sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and iswin=1
| then 1 else 0 end) as bidsSus,
| sum(case when requestmode=2 and iseffective=1 then 1 else 0 end) as adDisplayNum,
| sum(case when requestmode=3 and iseffective=1 then 1 else 0 end) as adClickNum,
| sum(case when requestmode=2 and iseffective=1 and isbilling=1 then 1 else 0 end) as MediumDisplayNum,
| sum(case when requestmode=3 and iseffective=1 and isbilling=1 then 1 else 0 end) as MediumClickNum,
| sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1
| and iswin=1 and adorderid>200000 and adcreativeid>200000
| then winprice/1000 else 0 end) as adconsume,
| sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1
| and iswin=1 and adorderid>200000 and adcreativeid>200000
| then adpayment/1000 else 0 end) as adcost
| from adinfo
|group by region, city
| """.stripMargin
spark.sql(RegionSQL1).createOrReplaceTempView("tabtemp")
val RegionSQL2 =
"""
|select statdate, region, city,
| OriginalRequest, ValidRequest, adRequest,
| bidsNum, bidsSus, bidsSus/bidsNum as bidRate,
| adDisplayNum, adClickNum, adClickNum/adDisplayNum as adClickRate,
| MediumDisplayNum, MediumClickNum, MediumClickNum/MediumDisplayNum as mediumClickRate,
| adconsume, adcost
| from tabtemp
""".stripMargin
val sinkDF = spark.sql(RegionSQL2)
// 寫(xiě)文件
sinkDF.coalesce(1).write.mode(SaveMode.Append).json(sinkDataPath)
}
}
6、數(shù)據(jù)標(biāo)簽化
6.1 什么是數(shù)據(jù)標(biāo)簽化
-
為什么要給數(shù)據(jù)打標(biāo)簽
- 分析數(shù)據(jù)的需求
- 用戶對(duì)與數(shù)據(jù)搜索的需求勿决,支持定向人群的條件篩選乒躺。如:
- 地域,甚至是商圈
- 性別
- 年齡
- 興趣
- 設(shè)備
-
數(shù)據(jù)格式
目標(biāo)數(shù)據(jù):(用戶id低缩, 所有標(biāo)簽)嘉冒。標(biāo)簽如下所示:
(CH@123485 -> 1.0, KW@word -> 1.0, CT@Beijing -> 1.0, GD@女 -> 1.0, AGE@40 -> 1.0, TA@北海 -> 1.0, TA@沙灘 -> 1.0)
- Tag 數(shù)據(jù)組織形式Map[String, Double]
- 前綴+標(biāo)簽曹货;1.0為權(quán)重
-
需要制作的標(biāo)簽
- 廣告類型
- 渠道
- App名稱
- 性別
- 地理位置
- 設(shè)備
- 關(guān)鍵詞
- 年齡
- 商圈(暫時(shí)不管)
-
日志數(shù)據(jù)的標(biāo)簽化
- 計(jì)算標(biāo)簽(廣告類型、渠道讳推、AppName顶籽、性別 ... ...)
- 提取用戶標(biāo)識(shí)
- 統(tǒng)一用戶識(shí)別
- 標(biāo)簽數(shù)據(jù)落地
6.2 搭建框架
object TagProcessor extends Processor{
override def process(spark: SparkSession, kudu: KuduContext): Unit = {
// 定義參數(shù)
val sourceTableName = ConfigHolder.ADMainTableName
val sinkTableName = ""
val keys = ""
// 1 讀數(shù)據(jù)
val sourceDF = spark.read
.option("kudu.master", ConfigHolder.kuduMaster)
.option("kudu.table", sourceTableName)
.kudu
// 2 處理數(shù)據(jù)
sourceDF.rdd
.map(row => {
// 廣告類型、渠道银觅、App名稱
val adTags = AdTypeTag.make(row)
// 性別礼饱、地理位置、設(shè)備
// 關(guān)鍵詞究驴、年齡镊绪、商圈
})
// 3 保存數(shù)據(jù)
}
}
定義接口類:
import org.apache.spark.sql.Row
trait TagMaker {
def make(row: Row, dic: collection.Map[String, String]=null): Map[String, Double]
}
6.3 打標(biāo)簽
6.3.1 廣告類型(AdTypeTag)
字段意義 1:banner; 2:插屏纳胧; 3:全屏
import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.spark.sql.Row
object AdTypeTag extends TagMaker{
private val adPrefix = "adtype@"
override def make(row: Row, dic: collection.Map[String, String]): Map[String, Double] = {
// 1 取值
val adType: Long = row.getAs[Long]("adspacetype")
// 1:banner镰吆; 2:插屏; 3:全屏
// 2 計(jì)算標(biāo)簽
adType match {
case 1 => Map(s"${adPrefix}banner" -> 1.0)
case 2 => Map(s"${adPrefix}插屏" -> 1.0)
case 3 => Map(s"${adPrefix}全屏" -> 1.0)
case _ => Map[String, Double]()
}
}
}
6.3.2 渠道(ChannelTag)
字段:channelid
import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row
object ChannelTag extends TagMaker{
private val channelPrefix = "channel@"
override def make(row: Row, dic: collection.Map[String, String]=null): Map[String, Double] = {
// 1 取值
val channelid = row.getAs[String]("channelid")
// 2 計(jì)算標(biāo)簽
if (StringUtils.isNotBlank(channelid)){
Map(s"${channelPrefix}channelid" -> 1.0)
}
else
Map[String, Double]()
}
def main(args: Array[String]): Unit = {
// 判斷某字符串是否不為空跑慕,且長(zhǎng)度不為0万皿,且不由空白符(空格)構(gòu)成
if (!StringUtils.isNotBlank(null)) println("blank1 !")
if (!StringUtils.isNotBlank("")) println("blank2 !")
if (!StringUtils.isNotBlank(" ")) println("blank3 !")
}
}
6.3.3 App名稱(AppNameTag)
字段:appid;要將 appid 轉(zhuǎn)為 appname
查給定的字典表:dicapp
import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row
object AppNameTag extends TagMaker{
// 獲取前綴
private val appNamePrefix = "appname@"
override def make(row: Row, dic: collection.Map[String, String]): Map[String, Double] = {
// 1 獲取地段信息
val appId = row.getAs[String]("appid")
// 2 計(jì)算并返回標(biāo)簽
val appName = dic.getOrElse(appId, "")
if (StringUtils.isNotBlank(appName))
Map(s"${appNamePrefix}$appName" -> 1.0)
else
Map[String, Double]()
}
}
6.3.4 性別(SexTag)
字段:sex核行;
import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row
object SexTag extends TagMaker{
private val sexPrefix: String = "sex@"
override def make(row: Row, dic: collection.Map[String, String]=null): Map[String, Double] = {
// 獲取標(biāo)簽信息
val sexid: String = row.getAs[String]("sex")
val sex = sexid match {
case "1" => "男"
case "2" => "女"
case _ => "待填寫(xiě)"
}
// 計(jì)算返回標(biāo)簽
if (StringUtils.isNotBlank(sex))
Map(s"$sexPrefix$sex" -> 1.0)
else
Map[String, Double]()
}
}
6.3.5 地理位置(GeoTag)
字段:region牢硅、city
import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row
object GeoTag extends TagMaker{
private val regionPrefix = "region@"
private val cityPrefix = "city@"
override def make(row: Row, dic: collection.Map[String, String]=null): Map[String, Double] = {
// 獲取標(biāo)簽信息
val region = row.getAs[String]("region")
val city = row.getAs[String]("city")
// 計(jì)算并返回標(biāo)簽信息
val regionTag = if (StringUtils.isNotBlank(region))
Map(s"$regionPrefix$region" -> 1.0)
else
Map[String, Double]()
val cityTag = if (StringUtils.isNotBlank(city))
Map(s"$cityPrefix$city" -> 1.0)
else
Map[String, Double]()
regionTag ++ cityTag
}
}
6.3.6 設(shè)備(DeviceTag)
字段:client、networkmannername芝雪、ispname减余;
數(shù)據(jù)字典:dicdevice
- client:設(shè)備類型 (1:android 2:ios 3:wp 4:others)
- networkmannername:聯(lián)網(wǎng)方式名稱(2G、3G惩系、4G位岔、其他)
- ispname:運(yùn)營(yíng)商名稱(電信、移動(dòng)堡牡、聯(lián)通...)
import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row
object DeviceTag extends TagMaker{
val clientPrefix = "client@"
val networkPrefix = "network@"
val ispPrefix = "isp@"
override def make(row: Row, dic: collection.Map[String, String]): Map[String, Double] = {
// 獲取標(biāo)簽信息
val clientName: String = row.getAs[Long]("client").toString
val networkName: String = row.getAs[Long]("networkmannername").toString
val ispName: String = row.getAs[Long]("ispname").toString
// 計(jì)算并返回標(biāo)簽
val clientId = dic.getOrElse(clientName, "D00010004")
val networkId = dic.getOrElse(networkName, "D00020005")
val ispId = dic.getOrElse(ispName, "D00030004")
val clientTag = if (StringUtils.isNotBlank(clientId))
Map(s"$clientPrefix$clientId" -> 1.0)
else
Map[String, Double]()
val networkTag = if (StringUtils.isNotBlank(networkId))
Map(s"$networkPrefix$networkId" -> 1.0)
else
Map[String, Double]()
val ispTag = if (StringUtils.isNotBlank(ispId))
Map(s"$ispPrefix$ispId" -> 1.0)
else
Map[String, Double]()
clientTag ++ networkTag ++ ispTag
}
}
6.3.7 關(guān)鍵詞(KeywordTag)
字段:keywords
import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row
object KeywordsTag extends TagMaker{
private val keywordPrefix = "keyword@"
override def make(row: Row, dic: collection.Map[String, String]): Map[String, Double] = {
row.getAs[String]("keywords")
.split(",")
.filter(word => StringUtils.isNotBlank(word))
.map(word => s"$keywordPrefix$word" -> 1.0)
.toMap
}
}
6.3.8 年齡(AgeTag)
字段:age
import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row
object AgeTag extends TagMaker{
private val agePrefix = "age@"
override def make(row: Row, dic: collection.Map[String, String]): Map[String, Double] = {
val age = row.getAs[String]("age")
if (StringUtils.isNotBlank(age))
Map(s"$agePrefix$age" -> 1.0)
else
Map[String, Double]()
}
}
6.3.9 主處理程序(TagProcessor)
import cn.itbigdata.dmp.customtrait.Processor
import cn.itbigdata.dmp.utils.DateUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
object TagProcessor extends Processor{
override def process(spark: SparkSession): Unit = {
// 定義參數(shù)
val sourceTableName = s"outputdata/maindata-${DateUtils.getYesterday}"
val appdicFilePath = "data/dicapp"
val deviceFilePath = "data/dicdevice"
val sinkTableName = ""
// 1 讀數(shù)據(jù)
val sourceDF = spark.read.json(sourceTableName)
// 讀app信息(文件)抒抬,轉(zhuǎn)換為廣播變量(優(yōu)化)
val appdicMap = spark.sparkContext.textFile(appdicFilePath)
.map(line => {
val arr: Array[String] = line.split("##")
(arr(0), arr(1))
}).collectAsMap()
val appdicBC: Broadcast[collection.Map[String, String]] = spark.sparkContext.broadcast(appdicMap)
// 讀字典信息(文件),轉(zhuǎn)換為廣播變量(優(yōu)化)
val deviceMap = spark.sparkContext.textFile(deviceFilePath)
.map(line => {
val arr: Array[String] = line.split("##")
(arr(0), arr(1))
}).collectAsMap()
val deviceBC: Broadcast[collection.Map[String, String]] = spark.sparkContext.broadcast(deviceMap)
// 2 處理數(shù)據(jù)
sourceDF.printSchema()
sourceDF.rdd
.map(row => {
// 廣告類型晤柄、渠道擦剑、App名稱
val adTags: Map[String, Double] = AdTypeTag.make(row)
val channelTags: Map[String, Double] = ChannelTag.make(row)
val appNameTags: Map[String, Double] = AppNameTag.make(row, appdicBC.value)
// 性別、地理位置芥颈、設(shè)備類型
val sexTags = SexTag.make(row)
val geoTags = GeoTag.make(row)
val deviceTags = DeviceTag.make(row, deviceBC.value)
// 關(guān)鍵詞惠勒、年齡
val keywordsTags = KeywordsTag.make(row)
val ageTags = AgeTag.make(row)
// 將所有數(shù)據(jù)組成一個(gè)大的 Map 返回
val tags = adTags ++ channelTags ++ appNameTags ++ sexTags ++ geoTags ++ deviceTags ++ keywordsTags ++ ageTags
tags
}).collect.foreach(println)
// 3 保存數(shù)據(jù)
}
}
6.4 提取用戶標(biāo)識(shí)
日志數(shù)據(jù)針對(duì)某個(gè)用戶單次特定的瀏覽行為
一個(gè)用戶一天可能存在多條數(shù)據(jù)
標(biāo)簽是針對(duì)人的
-
存在的問(wèn)題:
- 在數(shù)據(jù)集中抽出人的概念、讓一個(gè)人能對(duì)應(yīng)一條數(shù)據(jù)
- 在日志信息中找不到可用的用戶id爬坑,只能退而求其次纠屋,找設(shè)備的信息,用設(shè)備的信息標(biāo)識(shí)用戶:
- IMEI:國(guó)際移動(dòng)設(shè)備識(shí)別碼(International Mobile Equipment Identity盾计,IMEI)巾遭,即通常所說(shuō)的手機(jī)序列號(hào)肉康、手機(jī)“串號(hào)”,用于在移動(dòng)電話網(wǎng)絡(luò)中識(shí)別每一部獨(dú)立的手機(jī)等移動(dòng)通信設(shè)備灼舍,相當(dāng)于移動(dòng)電話的身份證吼和。IMEI是寫(xiě)在主板上的,重裝APP不會(huì)改變IMEI骑素。Android 6.0以上系統(tǒng)需要用戶授予read_phone_state權(quán)限炫乓,如果用戶拒絕就無(wú)法獲得;
- IDFA:于iOS 6 時(shí)面世献丑,可以監(jiān)控廣告效果末捣,同時(shí)保證用戶設(shè)備不被APP追蹤的折中方案〈撮希可能發(fā)生變化箩做,如系統(tǒng)重置、在設(shè)置里還原廣告標(biāo)識(shí)符妥畏。用戶可以在設(shè)置里打開(kāi)“限制廣告跟蹤”邦邦;
- mac地址:硬件標(biāo)識(shí)符,包括WiFi mac地址和藍(lán)牙m(xù)ac地址醉蚁。iOS 7 之后被禁止燃辖;
- OpenUDID:在iOS 5發(fā)布時(shí),UDID被棄用了网棍,這引起了廣大開(kāi)發(fā)者需要尋找一個(gè)可以替代UDID黔龟,并且不受蘋(píng)果控制的方案。由此OpenUDID成為了當(dāng)時(shí)使用最廣泛的開(kāi)源UDID替代方案滥玷。OpenUDID在工程中實(shí)現(xiàn)起來(lái)非常簡(jiǎn)單氏身,并且還支持一系列的廣告提供商;
- Android ID:在設(shè)備首次啟動(dòng)時(shí)惑畴,系統(tǒng)會(huì)隨機(jī)生成一個(gè)64位的數(shù)字蛋欣,并把這個(gè)數(shù)字以16進(jìn)制字符串的形式保存下來(lái),這個(gè)16進(jìn)制的字符串就是ANDROID_ID桨菜,當(dāng)設(shè)備被wipe后該值會(huì)被重置豁状;
- 日志數(shù)據(jù)中可用于標(biāo)識(shí)用戶的字段包括:
- imei捉偏、mac倒得、idfa、openudid夭禽、androidid
- imeimd5霞掺、macmd5、idfamd5讹躯、openudidmd5菩彬、androididmd5
- imeisha1缠劝、macsha1、idfasha1骗灶、openudidsha1惨恭、androididsha1
- 什么是無(wú)效數(shù)據(jù):以上15個(gè)字段全部為空锣笨,那么這條數(shù)據(jù)不能與任何用戶發(fā)生關(guān)聯(lián)决采,這條數(shù)據(jù)對(duì)我們來(lái)說(shuō)沒(méi)有任何用處,它是無(wú)效數(shù)據(jù)唇辨。這些數(shù)據(jù)需要除去免都。
// 15個(gè)字段同時(shí)為空時(shí)需要過(guò)濾 lazy val filterSQL: String = idFields .split(",") .map(field => s"$field != ''") .mkString(" or ") // 抽取用戶標(biāo)識(shí) val userIds = getUserIds(row) // 返回標(biāo)簽 (userIds.head, (userIds, tags)) // 提取用戶標(biāo)識(shí) private def getUserIds(row: Row): List[String] = { val userIds: List[String] = idFields.split(",") .map(field => (field, row.getAs[String](field))) .filter { case (key, value) => StringUtils.isNotBlank(value) } .map { case (key, value) => s"$key::$value" }.toList userIds }
6.5 用戶識(shí)別
- 使用十五個(gè)字段(非空)聯(lián)合標(biāo)識(shí)用戶
- 數(shù)據(jù)采集過(guò)程中:
- 每次采集的數(shù)據(jù)可能是不同的字段
- 某些字段還可能發(fā)生變化
- 如何識(shí)別相同用戶的數(shù)據(jù)锉罐?
6.6 用戶識(shí)別&數(shù)據(jù)聚合與合并
// 統(tǒng)一用戶識(shí)別;數(shù)據(jù)聚合與合并
private def graphxAnalysis(rdd: RDD[(List[String], List[(String, Double)])]): RDD[(List[String], List[(String, Double)])] ={
// 1 定義頂點(diǎn)(數(shù)據(jù)結(jié)構(gòu):Long, ""绕娘; 算法:List中每個(gè)元素都可作為頂點(diǎn)脓规,List本身也可作為頂點(diǎn))
val dotsRDD: RDD[(String, List[String])] = rdd.flatMap{ case (lst1, _) => lst1.map(elem => (elem, lst1)) }
val vertexes: RDD[(Long, String)] = dotsRDD.map { case (id, ids) => (id.hashCode.toLong, "") }
// 2 定義邊(數(shù)據(jù)結(jié)構(gòu): Edge(Long, Long, 0))
val edges: RDD[Edge[Int]] = dotsRDD.map { case (id, ids) => Edge(id.hashCode.toLong, ids.mkString.hashCode.toLong, 0) }
// 3 生成圖
val graph = Graph(vertexes, edges)
// 4 強(qiáng)連通圖
val idRDD: VertexRDD[VertexId] = graph.connectedComponents()
.vertices
// 5 定義數(shù)據(jù)(ids與tags)
val idsRDD: RDD[(VertexId, (List[String], List[(String, Double)]))] =
rdd.map { case (ids, tags) => (ids.mkString.hashCode.toLong, (ids, tags)) }
// 6 步驟4的結(jié)果 與 步驟5的結(jié)果 做join,將全部的數(shù)據(jù)做了分類【一個(gè)用戶一個(gè)分類】
val joinRDD: RDD[(VertexId, (List[String], List[(String, Double)]))] = idRDD.join(idsRDD)
.map { case (key, value) => value }
// 7 數(shù)據(jù)的聚合(相同用戶的數(shù)據(jù)放在一起)
val aggRDD: RDD[(VertexId, (List[String], List[(String, Double)]))] = joinRDD.reduceByKey { case ((bufferIds, bufferTags), (ids, tags)) =>
(bufferIds ++ ids, bufferTags ++ tags)
}
// 8 數(shù)據(jù)的合并(對(duì)于id,去重险领;對(duì)tags侨舆,合并權(quán)重)
val resultRDD: RDD[(List[String], List[(String, Double)])] = aggRDD.map { case (key, (ids, tags)) =>
val newTags = tags.groupBy(x => x._1)
.mapValues(lst => lst.map { case (word, weight) => weight }.sum)
.toList
(ids.distinct, newTags)
}
resultRDD
}
6.7 標(biāo)簽落地
數(shù)據(jù)保存到kudu中,請(qǐng)注意:
1舷暮、每天保存一張表(需要新建)态罪,表名:usertags_當(dāng)天日期
2、數(shù)據(jù)類型轉(zhuǎn)換 RDD [(List[String], List[(String, Double)])] => RDD[(String, String)] => DataFrame
- 將 List[String] 轉(zhuǎn)為 String下面;分隔符的定義要注意
- 將 List[(String, Double)] 轉(zhuǎn)為String复颈,分隔符的定義要注意
- 分隔符:不能與數(shù)據(jù)中的符號(hào)重復(fù);分隔符保證要能加上沥割,還要能去掉耗啦。
// 3 數(shù)據(jù)落地(kudu)
// 將List數(shù)據(jù)類型變?yōu)镾tring
import spark.implicits._
val resultDF = mergeRDD.map{ case (ids, tags) =>
(ids.mkString("||"), tags.map{case (key, value) => s"$key->$value"}.mkString("||"))
}.toDF("ids", "tags")
DBUtils.appendData(kudu, resultDF, sinkTableName, keys)
}
// 獲取昨天日期
6.8 標(biāo)簽處理代碼(TagProcessor)
package cn.itcast.dmp.tags
import cn.itcast.dmp.Processor
import cn.itcast.dmp.utils.ConfigHolder
import org.apache.commons.lang3.StringUtils
import org.apache.kudu.spark.kudu.{KuduContext, KuduDataFrameReader}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
object TagProcessor extends Processor{
override def process(spark: SparkSession, kudu: KuduContext): Unit = {
// 定義參數(shù)
val sourceTableName = ConfigHolder.ADMainTableName
val sinkTableName = ""
val keys = ""
val dicAppPath = ConfigHolder.appNameDic
val dicDevicePath = ConfigHolder.deviceDic
val tradingAreaTableName = ConfigHolder.tradingAreaTableName
val filterSQL = ConfigHolder.filterSQL
val idFields = ConfigHolder.idFields
// 1 讀數(shù)據(jù)
val sc = spark.sparkContext
val sourceDF = spark.read
.option("kudu.master", ConfigHolder.kuduMaster)
.option("kudu.table", sourceTableName)
.kudu
// 讀app字典信息
val appDic = sc.textFile(dicAppPath)
.map(line => {
val arr = line.split("##")
(arr(0), arr(1))
})
.collect()
.toMap
val appDicBC = sc.broadcast(appDic)
// 讀device字典信息
val deviceDic = sc.textFile(dicDevicePath)
.map(line => {
val arr = line.split("##")
(arr(0), arr(1))
})
.collect()
.toMap
val deviceDicBC = sc.broadcast(deviceDic)
// 讀商圈信息(讀;過(guò)濾机杜;轉(zhuǎn)為rdd帜讲;取數(shù);收集數(shù)據(jù)到driver椒拗;轉(zhuǎn)為map)
// 限制條件:商圈表的信息不能過(guò)大(過(guò)濾后的大小小于20M為宜)
val tradingAreaDic: Map[String, String] = spark.read
.option("kudu.master", ConfigHolder.kuduMaster)
.option("kudu.table", tradingAreaTableName)
.kudu
.filter("areas!=''")
.rdd
.map { case Row(geohash: String, areas: String) => (geohash, areas) }
.collect()
.toMap
val tradingAreaBC = sc.broadcast(tradingAreaDic)
// 2 處理數(shù)據(jù)
// 過(guò)濾15個(gè)標(biāo)識(shí)字段都為空的數(shù)據(jù)
val userTagsRDD: RDD[(List[String], List[(String, Double)])] = sourceDF.filter(filterSQL)
.rdd
.map(row => {
// 廣告類型似将、渠道、App名稱
val adTags = AdTypeTag.make(row)
val channelTags = ChannelTag.make(row)
val appNameTags = AppNameTag.make(row, appDicBC.value)
// 性別蚀苛、地理位置在验、設(shè)備
val sexTags = SexTag.make(row, appDicBC.value)
val geoTags = GeoTag.make(row, appDicBC.value)
val deviceTags = DeviceTag.make(row, deviceDicBC.value)
// 關(guān)鍵詞、年齡堵未、商圈
val keywordTags = KeywordTag.make(row, appDicBC.value)
val ageTags = AgeTag.make(row, appDicBC.value)
val tradingAreaTags = tradingAreaTag.make(row, tradingAreaDic.value)
// 標(biāo)簽合并
val tags = adTags ++ channelTags ++ appNameTags ++ sexTags ++ geoTags ++ deviceTags ++ keywordTags ++ ageTags ++ tradingAreaTags
// 抽取用戶標(biāo)識(shí)
val userIds: List[String] = idFields.split(",")
.map(field => (field, row.getAs[String](field)))
.filter { case (key, value) => StringUtils.isNotBlank(value) }
.map { case (key, value) => s"$key::$value" }.toList
// 返回標(biāo)簽
(userIds, tags)
})
userTagsRDD.foreach(println)
// 3 統(tǒng)一用戶識(shí)別腋舌,合并數(shù)據(jù)
val mergeRDD: RDD[(List[String], List[(String, Double)])] = graphxAnalysis(logTagsRDD)
// 4 數(shù)據(jù)落地(kudu)
// 將List數(shù)據(jù)類型變?yōu)镾tring
import spark.implicits._
val resultDF = mergeRDD.map{ case (ids, tags) =>
(ids.mkString("|||"), tags.map{case (key, value) => s"$key->$value"}.mkString("|||"))
}.toDF("ids", "tags")
DBUtils.createTableAndsaveData(kudu, resultDF, sinkTableName, keys)
// 關(guān)閉資源
sc.stop()
}
}
7、Spark GraphX
7.1 圖計(jì)算基本概念
圖是用于表示對(duì)象之間模型關(guān)系的數(shù)學(xué)結(jié)構(gòu)渗蟹。圖由頂點(diǎn)和連接頂點(diǎn)的邊構(gòu)成块饺。頂點(diǎn)是對(duì)象赞辩,而邊是對(duì)象之間的關(guān)系。
有向圖是頂點(diǎn)之間的邊是有方向的授艰。有向圖的例子如 Twitter 上的關(guān)注者辨嗽。用戶 Bob 關(guān)注了用戶 Carol ,而 Carol 并沒(méi)有關(guān)注 Bob淮腾。
就是圖召庞,通過(guò)點(diǎn)(對(duì)象)和邊(路徑),構(gòu)成了不同對(duì)象之間的關(guān)系
7.2 圖計(jì)算應(yīng)用場(chǎng)景
1)最短路徑
最短路徑在社交網(wǎng)絡(luò)里面来破,有一個(gè)六度空間的理論篮灼,表示你和任何一個(gè)陌生人之間所間隔的人不會(huì)超過(guò)五個(gè),也就是說(shuō),最多通過(guò)五個(gè)中間人你就能夠認(rèn)識(shí)任何一個(gè)陌生人。這也是圖算法的一種徘禁,也就是說(shuō)诅诱,任何兩個(gè)人之間的最短路徑都是小于等于6。
2)社群發(fā)現(xiàn)
社群發(fā)現(xiàn)用來(lái)發(fā)現(xiàn)社交網(wǎng)絡(luò)中三角形的個(gè)數(shù)(圈子)送朱,可以分析出哪些圈子更穩(wěn)固娘荡,關(guān)系更緊密,用來(lái)衡量社群耦合關(guān)系的緊密程度驶沼。一個(gè)人的社交圈子里面炮沐,三角形個(gè)數(shù)越多,說(shuō)明他的社交關(guān)系越穩(wěn)固回怜、緊密大年。像Facebook、Twitter等社交網(wǎng)站玉雾,常用到的的社交分析算法就是社群發(fā)現(xiàn)翔试。
3)推薦算法(ALS)
推薦算法(ALS)ALS是一個(gè)矩陣分解算法,比如購(gòu)物網(wǎng)站要給用戶進(jìn)行商品推薦复旬,就需要知道哪些用戶對(duì)哪些商品感興趣垦缅,這時(shí),可以通過(guò)ALS構(gòu)建一個(gè)矩陣圖驹碍,在這個(gè)矩陣圖里壁涎,假如被用戶購(gòu)買(mǎi)過(guò)的商品是1,沒(méi)有被用戶購(gòu)買(mǎi)過(guò)的是0志秃,這時(shí)我們需要計(jì)算的就是有哪些0有可能會(huì)變成1
GraphX 通過(guò)彈性分布式屬性圖擴(kuò)展了 Spark RDD怔球。
通常,在圖計(jì)算中洽损,基本的數(shù)據(jù)結(jié)構(gòu)表達(dá)是:
-
Graph = (Vertex庞溜,Edge)
- Vertex (頂點(diǎn)/節(jié)點(diǎn)) (VertexId: Long, info: Any)
- Edge (邊)Edge(srcId: VertexId, dstId: VertexId, attr) 【attr 權(quán)重】
7.3 Spark GraphX例子一(強(qiáng)連通體)
ID | 關(guān)鍵詞 | AppName |
---|---|---|
1 | 卡羅拉 | 團(tuán)車(chē) |
2 | 印度尼西亞,巴厘島 | 去哪兒旅游 |
3 | 善導(dǎo)大師 | 知乎 |
4 | 王的女人,美人無(wú)淚 | 優(yōu)酷 |
5 | 世界杯 | 搜狐 |
6 | 劉嘉玲,港臺(tái)娛樂(lè) | 鳳凰網(wǎng) |
7 | 日韓娛樂(lè) | 花椒直播 |
9 | AK47 | 絕地求生:刺激戰(zhàn)場(chǎng) |
10 | 搞笑 | YY直播 |
11 | 文學(xué),時(shí)政 | 知乎 |
ID | IDS |
---|---|
1 | 43125 |
2 | 43125 |
3 | 43125 |
4 | 43125 |
5 | 43125 |
4 | 4567 |
5 | 4567 |
6 | 4567 |
7 | 4567 |
9 | 91011 |
10 | 91011 |
11 | 91011 |
Connected Components算法(連通體算法):
1革半、定義頂點(diǎn)
2碑定、定義邊
3流码、生成圖
4、用標(biāo)注圖中每個(gè)連通體延刘,將連通體中序號(hào)最小的頂點(diǎn)的id作為連通體的id
任務(wù):
- 定義頂點(diǎn)
- 定義邊
- 生成圖
- 生成強(qiáng)連通圖
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object GraphXDemo1 {
def main(args: Array[String]): Unit = {
// 1漫试、初始化sparkcontext
val conf = new SparkConf()
.setAppName("GraphXDemo1")
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
// 2、定義頂點(diǎn) (VertexId: Long, info: Any)
val vertexes: RDD[(VertexId, Map[String, Double])] = sc.makeRDD(List(
(1L, Map("keyword:卡羅拉" -> 1.0, "AppName:團(tuán)車(chē)" -> 1.0)),
(2L, Map("keyword:印度尼西亞" -> 1.0, "keyword:巴厘島" -> 1.0, "AppName:去哪兒旅游" -> 1.0)),
(3L, Map("keyword:善導(dǎo)大師" -> 1.0, "AppName:知乎" -> 1.0)),
(4L, Map("keyword:王的女人" -> 1.0, "keyword:美人無(wú)淚" -> 1.0, "AppName:優(yōu)酷" -> 1.0)),
(5L, Map("keyword:世界杯" -> 1.0, "AppName:搜狐" -> 1.0)),
(6L, Map("keyword:劉嘉玲" -> 1.0, "keyword:港臺(tái)娛樂(lè)" -> 1.0, "AppName:鳳凰網(wǎng)" -> 1.0)),
(7L, Map("keyword:日韓娛樂(lè)" -> 1.0, "AppName:花椒直播" -> 1.0)),
(9L, Map("keyword:AK47" -> 1.0, "AppName:絕地求生:刺激戰(zhàn)場(chǎng)" -> 1.0)),
(10L, Map("keyword:搞笑" -> 1.0, "AppName:YY直播" -> 1.0)),
(11L, Map("keyword:文學(xué)" -> 1.0, "keyword:時(shí)政" -> 1.0, "AppName:知乎" -> 1.0))
))
// 3碘赖、定義邊 Edge(srcId: VertexId, dstId: VertexId, attr)
val edges: RDD[Edge[Int]] = sc.makeRDD(List(
Edge(1L, 42125L, 0),
Edge(2L, 42125L, 0),
Edge(3L, 42125L, 0),
Edge(4L, 42125L, 0),
Edge(5L, 42125L, 0),
Edge(4L, 4567L, 0),
Edge(5L, 4567L, 0),
Edge(6L, 4567L, 0),
Edge(7L, 4567L, 0),
Edge(9L, 91011, 0),
Edge(10L, 91011, 0),
Edge(11L, 91011, 0)
))
// 4驾荣、生成圖;生成強(qiáng)聯(lián)通圖
Graph(vertexes, edges)
.connectedComponents()
.vertices
.sortBy(_._2)
.collect()
.foreach(println)
// 5普泡、資源釋放
sc.stop()
}
}
7.4 Spark GraphX例子二(用戶識(shí)別&數(shù)據(jù)合并)
根據(jù)前面的例子播掷,我們已經(jīng)知道根據(jù)規(guī)則如何識(shí)別用戶,程序如何處理呢撼班?
數(shù)據(jù)的定義:
備注:
1歧匈、這里定義的數(shù)據(jù)格式與我們程序中的數(shù)據(jù)格式完全一致
2、RDD中是一個(gè)元組砰嘁,第一個(gè)元素代表用戶的各種 id 件炉;第二個(gè)元素代表用戶的標(biāo)簽
任務(wù):
1、6條數(shù)據(jù)代表多少個(gè)用戶
2矮湘、合并相同用戶的數(shù)據(jù)
val dataRDD: RDD[(List[String], List[(String, Double)])] = sc.makeRDD(List(
(List("a1", "b1", "c1"), List("keyword$北京" -> 1.0, "keyword$上海" -> 1.0, "area$中關(guān)村" -> 1.0)),
(List("b1", "c2", "d1"), List("keyword$上海" -> 1.0, "keyword$天津" -> 1.0, "area$回龍觀" -> 1.0)),
(List("d1"), List("keyword$天津" -> 1.0, "area$中關(guān)村" -> 1.0)),
(List("a2", "b2", "c3"), List("keyword$大數(shù)據(jù)" -> 1.0, "keyword$spark" -> 1.0, "area$西二旗" -> 1.0)),
(List("b2", "c4", "d4"), List("keyword$spark" -> 1.0, "area$五道口" -> 1.0)),
(List("c3", "e3"), List("keyword$hive" -> 1.0, "keyword$spark" -> 1.0, "area$西二旗" -> 1.0))
))
完整的處理步驟:
- 定義頂點(diǎn)
- 定義邊
- 生成圖
- 找強(qiáng)連通體
- 找需要合并的數(shù)據(jù)
- 數(shù)據(jù)聚合
- 數(shù)據(jù)合并
處理程序:
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object GraphXDemo {
def main(args: Array[String]): Unit = {
// 初始化
val conf: SparkConf = new SparkConf().setAppName("GraphXDemo").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("error")
// 定義數(shù)據(jù)
val dataRDD: RDD[(List[String], List[(String, Double)])] = sc.makeRDD(List(
(List("a1", "b1", "c1"), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中關(guān)村" -> 1.0)),
(List("b1", "c2", "d1"), List("kw$上海" -> 1.0, "kw$天津" -> 1.0, "area$回龍觀" -> 1.0)),
(List("d1"), List("kw$天津" -> 1.0, "area$中關(guān)村" -> 1.0)),
(List("a2", "b2", "c3"), List("kw$大數(shù)據(jù)" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0)),
(List("b2", "c4", "d4"), List("kw$spark" -> 1.0, "area$五道口" -> 1.0)),
(List("c3", "e3"), List("kw$hive" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0))
))
val value: RDD[(String, List[String], List[(String, Double)])] = dataRDD.flatMap { case (allIds: List[String], tags: List[(String, Double)]) => {
allIds.map { case elem: String => (elem, allIds, tags) }
}
}
// 1 將標(biāo)識(shí)信息中的每一個(gè)元素抽取出來(lái)斟冕,作為id
// 備注1、這里使用了flatMap缅阳,將元素壓平磕蛇;
// 備注2、這里丟掉了標(biāo)簽信息十办,因?yàn)檫@個(gè)RDD主要用于構(gòu)造頂點(diǎn)孤里、邊,tags信息用不
// 備注3橘洞、頂點(diǎn)捌袜、邊的數(shù)據(jù)要求Long,所以這里做了數(shù)據(jù)類型轉(zhuǎn)換
val dotRDD: RDD[(VertexId, VertexId)] = dataRDD.flatMap { case (allids, tags) =>
// 方法一:好理解炸枣,不好寫(xiě)
// for (id <- allids) yield {
// (id.hashCode.toLong, allids.mkString.hashCode.toLong)
// }
// 方法二:不好理解虏等,好寫(xiě)。兩方法等價(jià)
allids.map(id => (id.hashCode.toLong, allids.mkString.hashCode.toLong))
}
// 2 定義頂點(diǎn)
val vertexesRDD: RDD[(VertexId, String)] = dotRDD.map { case (id, ids) => (id, "") }
// 3 定義邊(id: 單個(gè)的標(biāo)識(shí)信息适肠;ids: 全部的標(biāo)識(shí)信息)
val edgesRDD: RDD[Edge[Int]] = dotRDD.map { case (id, ids) => Edge(id, ids, 0) }
// 4 生成圖
val graph = Graph(vertexesRDD, edgesRDD)
// 5 找到強(qiáng)連通體
val connectRDD: VertexRDD[VertexId] = graph.connectedComponents()
.vertices
// 6 定義中心點(diǎn)的數(shù)據(jù)
val centerVertexRDD: RDD[(VertexId, (List[String], List[(String, Double)]))] = dataRDD.map { case (allids, tags) =>
(allids.mkString.hashCode.toLong, (allids, tags))
}
// 7 步驟5霍衫、6的數(shù)據(jù)做join,獲取需要合并的數(shù)據(jù)
val allInfoRDD = connectRDD.join(centerVertexRDD)
.map { case (id1, (id2, (allIds, tags))) => (id2, (allIds, tags)) }
// 8 數(shù)據(jù)聚合(即將同一個(gè)用戶的標(biāo)識(shí)侯养、標(biāo)簽放在一起)
val mergeInfoRDD: RDD[(VertexId, (List[String], List[(String, Double)]))] = allInfoRDD.reduceByKey { case ((bufferList, bufferMap), (allIds, tags)) =>
val newList = bufferList ++ allIds
// map 的合并
val newMap = bufferMap ++ tags
(newList, newMap)
}
// 9 數(shù)據(jù)合并(allIds:去重敦跌;tags:合并權(quán)重)
val resultRDD: RDD[(List[String], Map[String, Double])] = mergeInfoRDD.map { case (key, (allIds, tags)) =>
val newIds = allIds.distinct
// 按照key做聚合;然后對(duì)聚合得到的lst第二個(gè)元素做累加
val newTags = tags.groupBy(x => x._1).mapValues(lst => lst.map(x => x._2).sum)
(newIds, newTags)
}
resultRDD.foreach(println)
sc.stop()
}
// def main(args: Array[String]): Unit = {
// val lst = List(
// ("kw$大數(shù)據(jù)",1.0),
// ("kw$spark",1.0),
// ("area$西二旗",1.0),
// ("kw$spark",1.0),
// ("area$五道口",1.0),
// ("kw$hive",1.0),
// ("kw$spark",1.0),
// ("area$西二旗",1.0)
// )
//
// lst.groupBy(x=> x._1).map{case (key, value) => (key, value.map(x=>x._2).sum)}.foreach(println)
// println("************************************************************")
//
// lst.groupBy(x=> x._1).mapValues(lst => lst.map(x=>x._2).sum).foreach(println)
// println("************************************************************")
}