基于spark的用戶畫(huà)像項(xiàng)目實(shí)戰(zhàn)

轉(zhuǎn)自千峰王溯老師

1仙粱、用戶畫(huà)像項(xiàng)目簡(jiǎn)介

1.1 什么是用戶畫(huà)像

所謂的用戶畫(huà)像就是給用戶貼一些標(biāo)簽丁侄,通過(guò)標(biāo)簽說(shuō)明用戶是一個(gè)什么樣的人俊扭。

具體來(lái)說(shuō)棋凳,給用戶貼一些標(biāo)簽之后拦坠,根據(jù)用戶的目標(biāo)、行為和觀點(diǎn)的差異剩岳,將他們區(qū)分為不同的類型贞滨,然后從每種類型中抽取典型特征,賦予名字拍棕、照片晓铆、一些人口統(tǒng)計(jì)學(xué)要素、場(chǎng)景等描述绰播,形成了一個(gè)人物原型骄噪。

過(guò)程就是:通過(guò)客戶信息抽象為用戶畫(huà)像進(jìn)而抽象出對(duì)客戶的認(rèn)知。

1.2 用戶畫(huà)像主要維度

人口屬性:用戶是誰(shuí)(性別幅垮、年齡腰池、職業(yè)等基本信息)

消費(fèi)需求:消費(fèi)習(xí)慣和消費(fèi)偏好

購(gòu)買(mǎi)能力:收入、購(gòu)買(mǎi)力忙芒、購(gòu)買(mǎi)頻次示弓、渠道

興趣愛(ài)好:品牌偏好、個(gè)人興趣

社交屬性:用戶活躍場(chǎng)景(社交媒體等)

1.3 用戶畫(huà)像的數(shù)據(jù)類型

數(shù)據(jù)有動(dòng)態(tài)數(shù)據(jù)和靜態(tài)數(shù)據(jù)呵萨,所謂的靜態(tài)數(shù)據(jù)如性別和年齡等短期無(wú)法改變的數(shù)據(jù)奏属;而動(dòng)態(tài)數(shù)據(jù)就是如短期行為相關(guān)的數(shù)據(jù),比如說(shuō)今天我想買(mǎi)件裙子潮峦,明天我就去看褲子了囱皿,這種數(shù)據(jù)特征就是比較多變勇婴。

1.4 用戶畫(huà)像的用途

殺熟、推薦(非常多)【用戶畫(huà)像是推薦系統(tǒng)的重要數(shù)據(jù)源】嘱腥、市場(chǎng)營(yíng)銷(xiāo)耕渴、客服

讓用戶和企業(yè)雙贏。讓用戶快速找到想要的商品齿兔,讓企業(yè)找到為產(chǎn)品買(mǎi)單的人橱脸。

(一)微觀層面

在產(chǎn)品設(shè)計(jì)時(shí),通過(guò)用戶畫(huà)像來(lái)描述用戶的需求分苇。

在數(shù)據(jù)應(yīng)用上添诉,可以用來(lái)推薦、搜索医寿、風(fēng)控

將定性分析和定量分析結(jié)合栏赴,進(jìn)行數(shù)據(jù)化運(yùn)營(yíng)和用戶分析

進(jìn)行精準(zhǔn)化營(yíng)銷(xiāo)

(二)宏觀層面

確定發(fā)展的戰(zhàn)略、戰(zhàn)術(shù)方向

進(jìn)行市場(chǎng)細(xì)分與用戶分群靖秩,以市場(chǎng)為導(dǎo)向

(三)畫(huà)像建模預(yù)測(cè)

進(jìn)行人口屬性細(xì)分:明確是誰(shuí)须眷,購(gòu)買(mǎi)了什么,為什么

購(gòu)買(mǎi)行為細(xì)分:提供市場(chǎng)機(jī)會(huì)、市場(chǎng)規(guī)模等關(guān)鍵信息

產(chǎn)品需求細(xì)分:提供更具差異化競(jìng)爭(zhēng)力的產(chǎn)品規(guī)格和業(yè)務(wù)價(jià)值

興趣態(tài)度細(xì)分:提供人群類別畫(huà)像:渠道策略,定價(jià)策略斗这,產(chǎn)品策略,品牌策略

1.5 用戶畫(huà)像的步驟

(一)確定畫(huà)像的目標(biāo)

在產(chǎn)品不同生命周期捎稚,或者不同使用途徑,目標(biāo)不同求橄,對(duì)畫(huà)像的需求也有所不同今野,所以進(jìn)行畫(huà)像之前需要明確目標(biāo)是什么,需求是什么罐农。

(二)確定所需用戶畫(huà)像的維度

根據(jù)目標(biāo)確定用戶畫(huà)像所需要的維度条霜,比如說(shuō)想進(jìn)行商品推薦,就需要能影響用戶選擇商品的因素作為畫(huà)像維度涵亏。比如用戶維度(用戶的年齡宰睡、性別會(huì)影響用戶的選擇),資產(chǎn)維度(用戶的收入等因素會(huì)影響用戶對(duì)價(jià)格選擇)气筋,行為維度(用戶最近巢鹉冢看的應(yīng)該是想買(mǎi)的)等等。

(三)確定畫(huà)像的層級(jí)

用戶畫(huà)像層級(jí)越多宠默,說(shuō)明畫(huà)像粒度越小麸恍,對(duì)用戶的理解也越清晰。比如說(shuō)用戶維度,可以分為新用戶和老用戶抹沪,進(jìn)而劃分用戶的性別刻肄、年齡等。這個(gè)需要根據(jù)目標(biāo)需求進(jìn)行劃分融欧。

(四)通過(guò)原始數(shù)據(jù)敏弃,采用機(jī)器學(xué)習(xí)算法為用戶貼上標(biāo)簽

因?yàn)槲覀儷@得的原始數(shù)據(jù)是一些雜亂無(wú)章的數(shù)據(jù),所以就需要算法通過(guò)某些特征為用戶貼上標(biāo)簽

(五)通過(guò)機(jī)器學(xué)習(xí)算法將標(biāo)簽變?yōu)闃I(yè)務(wù)的輸出

每個(gè)人會(huì)有很多很多的標(biāo)簽噪馏,需要進(jìn)一步將這些標(biāo)簽轉(zhuǎn)化為對(duì)用戶的理解权她。需要對(duì)不同的標(biāo)簽建不同的權(quán)重,從而得出對(duì)業(yè)務(wù)的輸出逝薪。比如說(shuō)具有一些標(biāo)簽的用戶會(huì)喜歡什么樣的產(chǎn)品。

(六)業(yè)務(wù)產(chǎn)生數(shù)據(jù)蝴罪,數(shù)據(jù)反哺業(yè)務(wù)董济,不斷循環(huán)的閉環(huán)

1.6 常見(jiàn)的用戶畫(huà)像標(biāo)簽

2、系統(tǒng)架構(gòu)

2.1 整體架構(gòu)(線下項(xiàng)目)

2.2 數(shù)據(jù)處理流程(要做什么事)

ETL(Extract Tranform Load)用來(lái)描述數(shù)據(jù)從來(lái)源端要门,經(jīng)過(guò) 抽取虏肾、轉(zhuǎn)換、加載 到目的端的過(guò)程欢搜;

ODS(Operational Data Store)操作數(shù)據(jù)存儲(chǔ)封豪。此層數(shù)據(jù)無(wú)任何更改,直接沿用外圍系統(tǒng)數(shù)據(jù)結(jié)構(gòu)和數(shù)據(jù)炒瘟,不對(duì)外開(kāi)放吹埠;為臨時(shí)存儲(chǔ)層,是接口數(shù)據(jù)的臨時(shí)存儲(chǔ)區(qū)域疮装,為后一步的數(shù)據(jù)處理做準(zhǔn)備缘琅。

要實(shí)現(xiàn)的主要步驟:ETL、報(bào)表統(tǒng)計(jì)(數(shù)據(jù)分析)廓推、生成商圈庫(kù)刷袍、數(shù)據(jù)標(biāo)簽化(核心)

2.3 主要數(shù)據(jù)集(要分析的日志數(shù)據(jù)文件)說(shuō)明

  • 為整合后的日志數(shù)據(jù),每天一份樊展,json格式(離線處理)
  • 這個(gè)數(shù)據(jù)集整合了內(nèi)部呻纹、外部的數(shù)據(jù),以及競(jìng)價(jià)信息(與廣告相關(guān))
  • 數(shù)據(jù)的列非常多专缠,接近百個(gè)
字段 解釋
ip 設(shè)備的真實(shí)IP
sessionid 會(huì)話標(biāo)識(shí)
advertisersid 廣告主ID
adorderid 廣告ID
adcreativeid 廣告創(chuàng)意ID( >= 200000 : dsp , < 200000 oss)
adplatformproviderid 廣告平臺(tái)商ID(>= 100000: rtb , < 100000 : api )
sdkversionnumber SDK版本號(hào)
adplatformkey 平臺(tái)商key
putinmodeltype 針對(duì)廣告主的投放模式,1:展示量投放 2:點(diǎn)擊量投放
requestmode 數(shù)據(jù)請(qǐng)求方式(1:請(qǐng)求雷酪、2:展示、3:點(diǎn)擊)
adprice 廣告價(jià)格
adppprice 平臺(tái)商價(jià)格
requestdate 請(qǐng)求時(shí)間,格式為:yyyy-m-dd hh:mm:ss
appid 應(yīng)用id
appname 應(yīng)用名稱
uuid 設(shè)備唯一標(biāo)識(shí)藤肢,比如imei或者androidid等
device 設(shè)備型號(hào)太闺,如htc、iphone
client 設(shè)備類型 (1:android 2:ios 3:wp)
osversion 設(shè)備操作系統(tǒng)版本嘁圈,如4.0
density 備屏幕的密度 android的取值為0.75省骂、1蟀淮、1.5,ios的取值為:1、2
pw 設(shè)備屏幕寬度
ph 設(shè)備屏幕高度
provincename 設(shè)備所在省份名稱
cityname 設(shè)備所在城市名稱
ispid 運(yùn)營(yíng)商id
ispname 運(yùn)營(yíng)商名稱
networkmannerid 聯(lián)網(wǎng)方式id
networkmannername 聯(lián)網(wǎng)方式名稱
iseffective 有效標(biāo)識(shí)(有效指可以正常計(jì)費(fèi)的)(0:無(wú)效 1:有效)
isbilling 是否收費(fèi)(0:未收費(fèi) 1:已收費(fèi))
adspacetype 廣告位類型(1:banner 2:插屏 3:全屏)
adspacetypename 廣告位類型名稱(banner钞澳、插屏怠惶、全屏)
devicetype 設(shè)備類型(1:手機(jī) 2:平板)
processnode 流程節(jié)點(diǎn)(1:請(qǐng)求量kpi 2:有效請(qǐng)求 3:廣告請(qǐng)求)
apptype 應(yīng)用類型id
district 設(shè)備所在縣名稱
paymode 針對(duì)平臺(tái)商的支付模式,1:展示量投放(CPM) 2:點(diǎn)擊量投放(CPC)
isbid 是否rtb
bidprice rtb競(jìng)價(jià)價(jià)格
winprice rtb競(jìng)價(jià)成功價(jià)格
iswin 是否競(jìng)價(jià)成功
cur values:usd|rmb等
rate 匯率
cnywinprice rtb競(jìng)價(jià)成功轉(zhuǎn)換成人民幣的價(jià)格
imei 手機(jī)串碼
mac 手機(jī)MAC碼
idfa 手機(jī)APP的廣告碼
openudid 蘋(píng)果設(shè)備的識(shí)別碼
androidid 安卓設(shè)備的識(shí)別碼
rtbprovince rtb 省
rtbcity rtb 市
rtbdistrict rtb 區(qū)
rtbstreet rtb 街道
storeurl app的市場(chǎng)下載地址
realip 真實(shí)ip
isqualityapp 優(yōu)選標(biāo)識(shí)
bidfloor 底價(jià)
aw 廣告位的寬
ah 廣告位的高
imeimd5 imei_md5
macmd5 mac_md5
idfamd5 idfa_md5
openudidmd5 openudid_md5
androididmd5 androidid_md5
imeisha1 imei_sha1
macsha1 mac_sha1
idfasha1 idfa_sha1
openudidsha1 openudid_sha1
androididsha1 androidid_sha1
uuidunknow uuid_unknow UUID密文
userid 平臺(tái)用戶id
iptype 表示ip庫(kù)類型轧粟,1為點(diǎn)媒ip庫(kù)策治,2為廣告協(xié)會(huì)的ip地理信息標(biāo)準(zhǔn)庫(kù),默認(rèn)為1
initbidprice 初始出價(jià)
adpayment 轉(zhuǎn)換后的廣告消費(fèi)(保留小數(shù)點(diǎn)后6位)
agentrate 代理商利潤(rùn)率
lomarkrate 代理利潤(rùn)率
adxrate 媒介利潤(rùn)率
title 標(biāo)題
keywords 關(guān)鍵字
tagid 廣告位標(biāo)識(shí)(當(dāng)視頻流量時(shí)值為視頻ID號(hào))
callbackdate 回調(diào)時(shí)間 格式為:YYYY/mm/dd hh:mm:ss
channelid 頻道ID
mediatype 媒體類型
email 用戶email
tel 用戶電話號(hào)碼
sex 用戶性別
age 用戶年齡

3兰吟、創(chuàng)建工程【第二天重點(diǎn)】

3.1 步驟

  • 新建一個(gè)Maven項(xiàng)目通惫,用于處理數(shù)據(jù)

    Maven 管理項(xiàng)目中用到的所有jar

  • 修改 pom.xml 文件,增加:

    • 定義依賴版本
    • 導(dǎo)入依賴
    • 定義配置文件
  • 創(chuàng)建scala目錄(src混蔼、test中分別創(chuàng)建)

  • 在scala目錄中創(chuàng)建包 cn.itbigdata.dmp

  • 編寫(xiě)一個(gè)主程序的架構(gòu) (DMPApp)

  • 增加配置文件 dev/application.conf

  • 新增目錄 utils履腋,新增參數(shù)解析類 ConfigHolder

3.2 修改pom.xml文件【重點(diǎn)】

  1. 設(shè)置依賴版本信息

    <properties>
        <scala.version>2.11.8</scala.version>
        <scala.version.simple>2.11</scala.version.simple>
        <hadoop.version>2.6.1</hadoop.version>
        <spark.version>2.3.3</spark.version>
        <hive.version>1.1.0</hive.version>
        <fastjson.version>1.2.44</fastjson.version>
        <geoip.version>1.3.0</geoip.version>
        <geoip2.version>2.12.0</geoip2.version>
        <config.version>1.2.1</config.version>
    </properties>
    
  2. 導(dǎo)入計(jì)算引擎的依賴

         <!-- scala -->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-xml</artifactId>
                <version>2.11.0-M4</version>
            </dependency>
            <!-- hadoop -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <!-- spark core -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_${scala.version.simple}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!-- spark sql -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${scala.version.simple}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <!-- spark graphx -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-graphx_${scala.version.simple}</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
  3. <font color=red>導(dǎo)入存儲(chǔ)引擎的依賴(可省略)</font>

    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-jdbc</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-service-rpc</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-service</artifactId>
            </exclusion>
        </exclusions>
        <version>${hive.version}</version>
    </dependency>
    
  4. 導(dǎo)入工具依賴

    <!-- 用于IP地址轉(zhuǎn)換(經(jīng)度、維度) -->
    <dependency>
        <groupId>com.maxmind.geoip</groupId>
        <artifactId>geoip-api</artifactId>
        <version>${geoip.version}</version>
    </dependency>
    <dependency>
        <groupId>com.maxmind.geoip2</groupId>
        <artifactId>geoip2</artifactId>
        <version>${geoip2.version}</version>
    </dependency>
    
    <!-- 將經(jīng)緯度轉(zhuǎn)換為編碼 -->
    <dependency>
        <groupId>ch.hsr</groupId>
        <artifactId>geohash</artifactId>
        <version>${geoip.version}</version>
    </dependency>
    
    <!-- scala解析json -->
    <dependency>
     <groupId>org.json4s</groupId>
     <artifactId>json4s-jackson_${scala.version.simple}</artifactId>
        <version>3.6.5</version>
    </dependency>
    
    <!-- 管理配置文件 -->
    <dependency>
        <groupId>com.typesafe</groupId>
        <artifactId>config</artifactId>
        <version>${config.version}</version>
    </dependency>
    
  1. <font color=red>導(dǎo)入編譯配置(可省略)</font>

       <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
    
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <archive>
                            <manifest>
                                <addClasspath>true</addClasspath>
                                <mainClass>cn.itcast.dmp.processing.App</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    

3.3 創(chuàng)建scala代碼包

1558338673059.png
  • 在 src/main/ 下 創(chuàng)建scala代碼包

  • 在scala包中創(chuàng)建 cn.itbigdata.dmp 包

  • 在 cn.itbigdata.dmp 包下創(chuàng)建

    • beans(存放類的定義)
    • etl(etl相關(guān)處理)
    • report(報(bào)表處理)
    • tradingarea(商圈庫(kù))
    • tags(標(biāo)簽處理)
    • customtrait(存放接口定義)
    • utils(存放工具類)
  • 在整個(gè)工程中建立data目錄惭嚣,存放要處理的數(shù)據(jù)

3.4 DmpApp 主程序(初始化部分)

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

// 項(xiàng)目的主程序遵湖,在這里完成相關(guān)的任務(wù)
object DmpApp {
  def main(args: Array[String]): Unit = {
    // 初始化
    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("DmpApp")
      .set("spark.worker.timeout", "600s")
      .set("spark.cores.max", "10")
      .set("spark.rpc.askTimeout", "600s")
      .set("spark.network.timeout", "600s")
      .set("spark.task.maxFailures", "5")
      .set("spark.speculation", "true")
      .set("spark.driver.allowMultipleContexts", "true")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.buffer.pageSize", "8m")
      .set("park.debug.maxToStringFields", "200")


    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

    // 關(guān)閉資源
    spark.close()
  }
}

3.5 spark相關(guān)參數(shù)解釋

參數(shù)名 默認(rèn)值 定義值
spark.worker.timeout 60 500
spark.network.timeout 120s 600s
spark.rpc.askTimeout spark.network.timeout 600s
spark.cores.max 10
spark.task.maxFailures 4 5
spark.speculation false true
spark.driver.allowMultipleContexts false true
spark.serializer org.apache.spark.serializer.JavaSerializer org.apache.spark.serializer.KryoSerializer
spark.buffer.pageSize 1M - 64M,系統(tǒng)計(jì)算 8M
  • spark.worker.timeout: 網(wǎng)絡(luò)故障導(dǎo)致心跳長(zhǎng)時(shí)間不上報(bào)給master晚吞,經(jīng)過(guò)spark.worker.timeout(秒)時(shí)間后延旧,master檢測(cè)到worker異常,標(biāo)識(shí)為DEAD狀態(tài)槽地,同時(shí)移除掉worker信息以及其上面的executor信息迁沫;
  • spark.network.timeout:所有網(wǎng)絡(luò)交互的默認(rèn)超時(shí)。由網(wǎng)絡(luò)或者 gc 引起捌蚊,worker或executor沒(méi)有接收到executor或task的心跳反饋弯洗。提高 spark.network.timeout 的值,根據(jù)情況改成300(5min)或更高逢勾;
  • spark.rpc.askTimeout: rpc 調(diào)用的超時(shí)時(shí)間牡整;
  • spark.cores.max:每個(gè)應(yīng)用程序所能申請(qǐng)的最大CPU核數(shù);
  • spark.task.maxFailures:當(dāng)task執(zhí)行失敗時(shí)溺拱,并不會(huì)直接導(dǎo)致整個(gè)應(yīng)用程序down掉逃贝,只有在重試了 spark.task.maxFailures 次后任然失敗的情況下才會(huì)使程序down掉;
  • spark.speculation:推測(cè)執(zhí)行是指對(duì)于一個(gè)Stage里面運(yùn)行慢的Task迫摔,會(huì)在其他節(jié)點(diǎn)的Executor上再次啟動(dòng)這個(gè)task沐扳,如果其中一個(gè)Task實(shí)例運(yùn)行成功則將這個(gè)最先完成的Task的計(jì)算結(jié)果作為最終結(jié)果,同時(shí)會(huì)干掉其他Executor上運(yùn)行的實(shí)例句占,從而加快運(yùn)行速度沪摄;
  • spark.driver.allowMultipleContexts: SparkContext默認(rèn)只有一個(gè)實(shí)例,設(shè)置為true允許有多個(gè)實(shí)例;
  • spark.serializer:在Spark的架構(gòu)中杨拐,在網(wǎng)絡(luò)中傳遞的或者緩存在內(nèi)存祈餐、硬盤(pán)中的對(duì)象需要進(jìn)行序列化操作【發(fā)給Executor上的Task;需要緩存的RDD(前提是使用序列化方式緩存)哄陶;廣播變量帆阳;shuffle過(guò)程中的數(shù)據(jù)緩存等】;默認(rèn)的Java序列化方式性能不高屋吨,同時(shí)序列化后占用的字節(jié)數(shù)也較多蜒谤;官方也推薦使用Kryo的序列化庫(kù)。官方文檔介紹至扰,Kryo序列化機(jī)制比Java序列化機(jī)制性能提高10倍左右鳍徽;
  • spark.buffer.pageSize: spark內(nèi)存分配的單位,無(wú)默認(rèn)值敢课,大小在1M-64M之間旬盯,spark根據(jù)jvm堆內(nèi)存大小計(jì)算得到;值過(guò)小翎猛,內(nèi)存分配效率低;值過(guò)大接剩,造成內(nèi)存的浪費(fèi)切厘;

3.6 開(kāi)發(fā)環(huán)境參數(shù)配置文件

application.conf

// 開(kāi)發(fā)環(huán)境參數(shù)配置文件
# App 信息
spark.appname="dmpApp"

# spark 信息
spark.master="local[*]"
spark.worker.timeout="120"
spark.cores.max="10"
spark.rpc.askTimeout="600s"
spark.network.timeout="600s"
spark.task.maxFailures="5"
spark.speculation="true"
spark.driver.allowMultipleContexts="true"
spark.serializer="org.apache.spark.serializer.KryoSerializer"
spark.buffer.pageSize="8m"

# kudu 信息
kudu.master="node1:7051,node2:7051,node3:7051"

# 輸入數(shù)據(jù)的信息
addata.path="data/dataset_main.json"
ipdata.geo.path="data/dataset_geoLiteCity.dat"
qqwrydat.path="data/dataset_qqwry.dat"
installDir.path="data"

# 對(duì)應(yīng)ETL輸出信息
ods.prefix="ods"
ad.data.tablename="adinfo"

# 輸出報(bào)表對(duì)應(yīng):地域統(tǒng)計(jì)、廣告地域懊缺、APP疫稿、設(shè)備、網(wǎng)絡(luò)鹃两、運(yùn)營(yíng)商遗座、渠道 7個(gè)分析
report.region.stat.tablename="RegionStatAnalysis"
report.region.tablename="AdRegionAnalysis"
report.app.tablename="AppAnalysis"
report.device.tablename="DeviceAnalysis"
report.network.tablename="NetworkAnalysis"
report.isp.tablename="IspAnalysis"
report.channel.tablename="ChannelAnalysis"

# 高德API
gaoDe.app.key="a94274923065a14222172c9b933f4a4e"
gaoDe.url="https://restapi.amap.com/v3/geocode/regeo?"

# GeoHash (key的長(zhǎng)度)
geohash.key.length=10

# 商圈庫(kù)
trading.area.tablename="tradingArea"

# tags
non.empty.field="imei,mac,idfa,openudid,androidid,imeimd5,macmd5,idfamd5,openudidmd5,androididmd5,imeisha1,macsha1,idfasha1,openudidsha1,androididsha1"
appname.dic.path="data/dic_app"
device.dic.path="data/dic_device"
tags.table.name.prefix="tags"

# 標(biāo)簽衰減系數(shù)
tag.coeff="0.92"

# es 相關(guān)參數(shù)
es.cluster.name="cluster_es"
es.index.auto.create="true"
es.Nodes="192.168.40.164"
es.port="9200"
es.index.reads.missing.as.empty="true"
es.nodes.discovery="false"
es.nodes.wan.only="true"
es.http.timeout="2000000"

3.7 配置文件解析類

// 解析參數(shù)文件幫助類
import com.typesafe.config.ConfigFactory

object ConfigHolder {
  private val config = ConfigFactory.load()
  //  App Info
  lazy val sparkAppName: String = config.getString("spark.appname")

  // Spark parameters
  lazy val sparkMaster: String = config.getString("spark.master")

  lazy val sparkParameters: List[(String, String)] = List(
    ("spark.worker.timeout", config.getString("spark.worker.timeout")),
    ("spark.cores.max", config.getString("spark.cores.max")),
    ("spark.rpc.askTimeout", config.getString("spark.rpc.askTimeout")),
    ("spark.network.timeout", config.getString("spark.network.timeout")),
    ("spark.task.maxFailures", config.getString("spark.task.maxFailures")),
    ("spark.speculation", config.getString("spark.speculation")),
    ("spark.driver.allowMultipleContexts", config.getString("spark.driver.allowMultipleContexts")),
    ("spark.serializer", config.getString("spark.serializer")),
    ("spark.buffer.pageSize", config.getString("spark.buffer.pageSize"))
  )

  // kudu parameters
  lazy val kuduMaster: String = config.getString("kudu.master")

  // input dataset
  lazy val adDataPath: String = config.getString("addata.path")
  lazy val ipsDataPath: String = config.getString("ipdata.geo.path")
  def ipToRegionFilePath: String = config.getString("qqwrydat.path")
  def installDir: String = config.getString("installDir.path")

  // output dataset
  private lazy val delimiter = "_"
  private lazy val odsPrefix: String = config.getString("ods.prefix")
  private lazy val adInfoTableName: String = config.getString("ad.data.tablename")
//  lazy val ADMainTableName = s"$odsPrefix$delimiter$adInfoTableName$delimiter${DateUtils.getTodayDate()}"

  // report
  lazy val Report1RegionStatTableName: String = config.getString("report.region.stat.tablename")
  lazy val ReportRegionTableName: String = config.getString("report.region.tablename")
  lazy val ReportAppTableName: String = config.getString("report.app.tablename")
  lazy val ReportDeviceTableName: String = config.getString("report.device.tablename")
  lazy val ReportNetworkTableName: String = config.getString("report.network.tablename")
  lazy val ReportIspTableName: String = config.getString("report.isp.tablename")
  lazy val ReportChannelTableName: String = config.getString("report.channel.tablename")

  // GaoDe API
  private lazy val gaoDeKey: String = config.getString("gaoDe.app.key")
  private lazy val gaoDeTempUrl: String = config.getString("gaoDe.url")
  lazy val gaoDeUrl: String = s"$gaoDeTempUrl&key=$gaoDeKey"

  // GeoHash
  lazy val keyLength: Int = config.getInt("geohash.key.length")

  // 商圈庫(kù)
  lazy val tradingAreaTableName: String =config.getString("trading.area.tablename")

  // tags
  lazy val idFields: String = config.getString("non.empty.field")
  lazy val filterSQL: String = idFields
    .split(",")
    .map(field => s"$field is not null ")
    .mkString(" or ")
  lazy val appNameDic: String = config.getString("appname.dic.path")
  lazy val deviceDic: String = config.getString("device.dic.path")
  lazy val tagsTableNamePrefix: String = config.getString("tags.table.name.prefix") + delimiter
  lazy val tagCoeff: Double = config.getDouble("tag.coeff")

  // 加載 elasticsearch 相關(guān)參數(shù)
  lazy val ESSparkParameters = List(
    ("cluster.name", config.getString("es.cluster.name")),
    ("es.index.auto.create", config.getString("es.index.auto.create")),
    ("es.nodes", config.getString("es.Nodes")),
    ("es.port", config.getString("es.port")),
    ("es.index.reads.missing.as.empty", config.getString("es.index.reads.missing.as.empty")),
    ("es.nodes.discovery", config.getString("es.nodes.discovery")),
    ("es.nodes.wan.only", config.getString("es.nodes.wan.only")),
    ("es.http.timeout", config.getString("es.http.timeout"))
  )

  def main(args: Array[String]): Unit = {
    println(ConfigHolder.sparkParameters)
    println(ConfigHolder.installDir)
  }
}

3.8 DmpApp主程序(使用配置文件)

import cn.itbigdata.dmp.utils.ConfigHolder
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object DmpApp {
  def main(args: Array[String]): Unit = {
    // 1、初始化(SparkConf俊扳、SparkSession)
    val conf = new SparkConf()
      .setAppName(ConfigHolder.sparkAppName)
      .setMaster(ConfigHolder.sparkMaster)
      .setAll(ConfigHolder.sparkParameters)

    val spark: SparkSession = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    spark.sparkContext.setLogLevel("warn")
    println("OK!")

    // 1途蒋、ETL

    // 2、報(bào)表

    // 3馋记、生成商圈庫(kù)

    // 4号坡、標(biāo)簽化

    // 關(guān)閉資源
    spark.close()
  }
}

4、ETL開(kāi)發(fā)

需求:

  • 將數(shù)據(jù)文件每一行中的 ip 地址梯醒,轉(zhuǎn)換為經(jīng)度宽堆、維度、省茸习、市的信息畜隶;

    ip => 經(jīng)度、維度、省籽慢、市

  • 保存轉(zhuǎn)換后的數(shù)據(jù)文件(每天一個(gè)文件)

處理步驟:

  • 讀數(shù)據(jù)
  • 數(shù)據(jù)處理
    • 找出每一行數(shù)據(jù)中的ip地址
    • 根據(jù)ip地址浸遗,算出對(duì)應(yīng)的省、市嗡综、經(jīng)度乙帮、緯度,添加到每行數(shù)據(jù)的尾部
  • 保存數(shù)據(jù)
  • 其他需求:數(shù)據(jù)每日加載一次极景,每天的數(shù)據(jù)單獨(dú)存放在一個(gè)文件中

難點(diǎn)問(wèn)題:處理數(shù)據(jù)(IP地址如何轉(zhuǎn)化為省察净、市、經(jīng)度盼樟、緯度)

4.1 搭建ETL架構(gòu)

新建trait(Processor)氢卡,為數(shù)據(jù)處理提供一個(gè)統(tǒng)一的接口類

import org.apache.spark.sql.SparkSession

// 數(shù)據(jù)處理接口
// SparkSession 用于數(shù)據(jù)的加載和處理
// KuduContext 用于數(shù)據(jù)的保存
trait Processor {
  def process(spark: SparkSession)
}

新建 ETLProcessor ,負(fù)責(zé)ETL處理

import cn.itbigdata.dmp.customtrait.Processor
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.sql.SparkSession

object ETLProcessor extends Processor{
  override def process(spark: SparkSession): Unit = {
    // 定義參數(shù)
    val sourceDataFile = ConfigHolder.adLogPath
    val sinkDataPath = ""

    // 1 讀數(shù)據(jù)
    val sourceDF: DataFrame = spark.read.json(sourceDataFile)

    // 2 處理數(shù)據(jù)
    // 2.1 找到ip
    // 2.2 將ip 轉(zhuǎn)為 省晨缴、市译秦、經(jīng)度、維度
    val rdd = sourceDF.rdd
      .map(row => {
        val ip: String = row.getAs[String]("ip")
        ip
      })

    // 2.3 將省击碗、市筑悴、經(jīng)度、維度放在原數(shù)據(jù)的最后

    // 3 保存數(shù)據(jù)
  }
}

4.2 IP地址轉(zhuǎn)換為經(jīng)緯度

  • 使用GeoIP稍途,將ip地址轉(zhuǎn)為經(jīng)緯度
  • GeoIP阁吝,是一套含IP數(shù)據(jù)庫(kù)的軟件工具
  • Geo根據(jù)來(lái)訪者的IP, 定位該IP所在經(jīng)緯度械拍、國(guó)家/地區(qū)突勇、省市、和街道等位置信息
  • GeoIP有兩個(gè)版本坷虑,一個(gè)免費(fèi)版甲馋,一個(gè)收費(fèi)版本
  • 收費(fèi)版本的準(zhǔn)確率高一些,更新頻率也更頻繁
  • 因?yàn)镚eoIP讀取的是本地的二進(jìn)制IP數(shù)據(jù)庫(kù)迄损,所以效率很高

4.3 IP地址轉(zhuǎn)換為省市

  • 純真數(shù)據(jù)庫(kù)定躏,將ip轉(zhuǎn)為省、市
  • 純真數(shù)據(jù)庫(kù)收集了包括中國(guó)電信芹敌、中國(guó)移動(dòng)共屈、中國(guó)聯(lián)通、長(zhǎng)城寬帶党窜、聚友寬帶等 ISP 的 IP 地址數(shù)據(jù)
  • 純真數(shù)據(jù)庫(kù)是二進(jìn)制文件拗引,有開(kāi)源的java代碼,簡(jiǎn)單的修改幌衣,調(diào)用就可以了
case class Location(ip: String, region: String, city: String, longitude: Float, latitude: Float)

  private def ipToLocation(ip: String): Location ={
    // 1 獲取service
    val service = new LookupService("data/geoLiteCity.dat")

    // 2 獲取Location
    val longAndLatLocation = service.getLocation(ip)

    // 3 獲取經(jīng)度矾削、維度
    val longitude = longAndLatLocation.longitude
    val latitude = longAndLatLocation.latitude

    // 4 利用純真數(shù)據(jù)庫(kù)獲取省市
    val ipService = new IPAddressUtils
    val regeinLocation: IPLocation = ipService.getregion(ip)
    val region = regeinLocation.getRegion
    val city = regeinLocation.getCity

    Location(ip, region, city, longitude, latitude)
  }

需要實(shí)現(xiàn)幫助類:

  • 計(jì)算當(dāng)天日期

    import java.util.{Calendar, Date}
    import org.apache.commons.lang.time.FastDateFormat
    
    object DateUtils {
      def getToday: String = {
        val now = new Date
        FastDateFormat.getInstance("yyyyMMdd").format(now)
    }
    
      def getYesterday: String = {
        val calendar: Calendar = Calendar.getInstance
        calendar.set(Calendar.HOUR_OF_DAY, -24)
        FastDateFormat.getInstance("yyyyMMdd").format(calendar.getTime())
      }
    
      def main(args: Array[String]): Unit = {
        println(getToday)
        println(getYesterday)
      }
    }
    

4.4 ETL完整實(shí)現(xiàn)

import java.util.Calendar
import cn.itbigdata.dmp.customtrait.Processor
import cn.itbigdata.dmp.util.iplocation.{IPAddressUtils, IPLocation}
import com.maxmind.geoip.LookupService
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object ETLProcessor extends Processor{
  // 定義參數(shù)
  private val sourceDataFile: String = "data/data.json"
  private val sinkDataPath: String = s"outputdata/maindata.${getYesterday}"
  private val geoFilePath: String = "data/geoLiteCity.dat"

  override def process(spark: SparkSession): Unit = {
    // 1 讀數(shù)據(jù)
    val sourceDF: DataFrame = spark.read.json("data/data.json")

    // 2 處理數(shù)據(jù)
    // 2.1 找到ip
    // 2.2 將ip 轉(zhuǎn)為 省壤玫、市、經(jīng)度哼凯、維度
    import spark.implicits._
    val ipDF: DataFrame = sourceDF.rdd
      .map { row =>
        val ip = row.getAs[String]("ip")
        // 將ip轉(zhuǎn)換為 省欲间、市、經(jīng)度断部、緯度
        ip2Location(ip)
      }.toDF

    // 2.2 ipDF 與 sourceDF 做join猎贴,給每一行增加省、市蝴光、經(jīng)緯度
    val sinkDF: DataFrame = sourceDF.join(ipDF, Seq("ip"), "inner")

    // 3 保存數(shù)據(jù)
    sinkDF.write.mode(SaveMode.Overwrite).json(sinkDataPath)
  }

  case class Location(ip: String, region: String, city: String, longitude: Float, latitude: Float)

  private def ip2Location(ip: String): Location ={
    // 1 獲取service
    val service = new LookupService(geoFilePath)

    // 2 獲取Location
    val longAndLatLocation = service.getLocation(ip)

    // 3 獲取經(jīng)度她渴、維度
    val longitude = longAndLatLocation.longitude
    val latitude = longAndLatLocation.latitude

    // 4 利用純真數(shù)據(jù)庫(kù)獲取省市
    val ipService = new IPAddressUtils
    val regionLocation: IPLocation = ipService.getregion(ip)
    val region = regionLocation.getRegion
    val city = regionLocation.getCity

    Location(ip, region, city, longitude, latitude)
  }

  private def getYesterday: String = {
    val calendar: Calendar = Calendar.getInstance
    calendar.set(Calendar.HOUR_OF_DAY, -24)
    FastDateFormat.getInstance("yyyyMMdd").format(calendar.getTime())
  }
}

5、報(bào)表開(kāi)發(fā)(數(shù)據(jù)分析--SparkSQL)

需要處理的報(bào)表

  • 統(tǒng)計(jì)各地域的數(shù)量分布情況(RegionStatProcessor)
  • 廣告投放的地域分布情況統(tǒng)計(jì)(RegionAnalysisProcessor)
  • APP分布情況統(tǒng)計(jì)(AppAnalysisProcessor)
  • 手機(jī)設(shè)備類型分布情況統(tǒng)計(jì)(DeviceAnalysisProcessor)
  • 網(wǎng)絡(luò)類型分布情況統(tǒng)計(jì)(NetworkAnalysisProcessor)
  • 網(wǎng)絡(luò)運(yùn)營(yíng)商分布情況統(tǒng)計(jì)(IspAnalysisProcessor)
  • 渠道分布情況統(tǒng)計(jì)(ChannelAnalysisProcessor)

5.1 數(shù)據(jù)地域分布

  • 報(bào)表處理的步驟
    • 了解業(yè)務(wù)需求:根據(jù)省蔑祟、市分組趁耗,求數(shù)據(jù)量的分布情況
    • 源數(shù)據(jù):為每天的日志數(shù)據(jù),即ETL的結(jié)果數(shù)據(jù)疆虚;
    • 目標(biāo)數(shù)據(jù):保存在本地文件中苛败,每個(gè)報(bào)表對(duì)應(yīng)目錄;
    • 編寫(xiě)SQL径簿,并測(cè)試
    • 代碼實(shí)現(xiàn)
  • 定義 RegionStatProcessor 繼承自Processor罢屈,實(shí)現(xiàn)process方法。具體實(shí)現(xiàn)步驟如下:
import cn.itbigdata.dmp.customtrait.Processor
import cn.itbigdata.dmp.utils.{ConfigHolder, DateUtils}
import org.apache.spark.sql.{SaveMode, SparkSession}

object RegionStatProcessor extends Processor{
  override def process(spark: SparkSession): Unit = {
    // 定義參數(shù)
    val sourceDataPath = s"outputdata/maindata-${DateUtils.getYesterday}"
    val sinkDataPath = "output/regionstat"

    // 讀文件
    val sourceDF = spark.read.json(sourceDataPath)
    sourceDF.createOrReplaceTempView("adinfo")

    // 處理數(shù)據(jù)
    val RegionSQL1 =
      """
        |select to_date(now()) as statdate, region, city, count(*) as infocount
        |  from adinfo
        |group by region, city
        |""".stripMargin

    val sinkDF = spark.sql(RegionSQL1)
    sinkDF.show()

    // 寫(xiě)文件
    sinkDF.coalesce(1).write.mode(SaveMode.Append).json(sinkDataPath)
  }
}

5.2 廣告投放地域分布

按照需求篇亭,完成以下模式的報(bào)表

備注:要求3個(gè)率:競(jìng)價(jià)成功率缠捌、廣告點(diǎn)擊率,媒體點(diǎn)擊率

指標(biāo)計(jì)算邏輯

指標(biāo) 說(shuō)明 adplatformproviderid requestmode processnode iseffective isbilling isbid iswin adorderid adcreativeid
原始請(qǐng)求 發(fā)來(lái)的所有原始請(qǐng)求數(shù) 1 >=1
有效請(qǐng)求 滿足有效體檢的數(shù)量 1 >=2
廣告請(qǐng)求 滿足廣告請(qǐng)求的請(qǐng)求數(shù)量 1 3
參與競(jìng)價(jià)數(shù) 參與競(jìng)價(jià)的次數(shù) >=100000 1 1 1 !=0
競(jìng)價(jià)成功數(shù) 成功競(jìng)價(jià)的次數(shù) >=100000 1 1 1
(廣告主)展示數(shù) 針對(duì)廣告主統(tǒng)計(jì):廣告最終在終端被展示的數(shù)量 2 1
(廣告主)點(diǎn)擊數(shù) 針對(duì)廣告主統(tǒng)計(jì):廣告被展示后暗赶,實(shí)際被點(diǎn)擊的數(shù)量 3 1
(媒介)展示數(shù) 針對(duì)媒介統(tǒng)計(jì):廣告在終端被展示的數(shù)量 2 1 1
(媒介)點(diǎn)擊數(shù) 針對(duì)媒介統(tǒng)計(jì):展示的廣告實(shí)際被點(diǎn)擊的數(shù)量 3 1 1
DSP廣告消費(fèi) winprice/1000 >=100000 1 1 1 >200000 >200000
DSP廣告成本 Adptment/1000 >=100000 1 1 1 >200000 >200000

DSP廣告消費(fèi) = DSP的RTB的錢(qián)

DSP廣告成本 = 廣告主付給DSP的錢(qián)

DSP的盈利 = DSP廣告成本 - DSP廣告消費(fèi)

備注:對(duì)應(yīng)字段:

OriginalRequest、ValidRequest肃叶、adRequest
bidsNum蹂随、bidsSus、bidRate
adDisplayNum因惭、adClickNum岳锁、adClickRate
MediumDisplayNum、MediumClickNum蹦魔、MediumClickRate
adconsume激率、adcost

代碼實(shí)現(xiàn)

import cn.itbigdata.dmp.customtrait.Processor
import cn.itbigdata.dmp.utils.DateUtils
import org.apache.spark.sql.{SaveMode, SparkSession}

object RegionAnalysisProcessor extends Processor{
  override def process(spark: SparkSession): Unit = {
    // 定義參數(shù)
    val sourceDataPath = s"outputdata/maindata-${DateUtils.getYesterday}"
    val sinkDataPath = "outputdata/regionanalysis"

    // 讀文件
    val sourceDF = spark.read.json(sourceDataPath)
    sourceDF.createOrReplaceTempView("adinfo")

    // 處理數(shù)據(jù)
    val RegionSQL1 =
      """
        |select to_date(now()) statdate, region, city,
        |       sum(case when requestmode=1 and processnode>=1 then 1 else 0 end) as OriginalRequest,
        |       sum(case when requestmode=1 and processnode>=2 then 1 else 0 end) as ValidRequest,
        |       sum(case when requestmode=1 and  processnode=3 then 1 else 0 end) as adRequest,
        |       sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and isbid=1 and adorderid!=0
        |                then 1 else 0 end) as bidsNum,
        |       sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1 and iswin=1
        |                then 1 else 0 end) as bidsSus,
        |       sum(case when requestmode=2 and iseffective=1 then 1 else 0 end) as adDisplayNum,
        |       sum(case when requestmode=3 and iseffective=1 then 1 else 0 end) as adClickNum,
        |       sum(case when requestmode=2 and iseffective=1 and isbilling=1 then 1 else 0 end) as MediumDisplayNum,
        |       sum(case when requestmode=3 and iseffective=1 and isbilling=1 then 1 else 0 end) as MediumClickNum,
        |       sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1
        |                 and iswin=1 and adorderid>200000 and adcreativeid>200000
        |                then winprice/1000 else 0 end) as adconsume,
        |       sum(case when adplatformproviderid>=100000 and iseffective=1 and isbilling=1
        |                 and iswin=1 and adorderid>200000 and adcreativeid>200000
        |                then adpayment/1000 else 0 end) as adcost
        |  from adinfo
        |group by region, city
        |    """.stripMargin
    spark.sql(RegionSQL1).createOrReplaceTempView("tabtemp")

    val RegionSQL2 =
      """
        |select statdate, region, city,
        |       OriginalRequest, ValidRequest, adRequest,
        |       bidsNum, bidsSus, bidsSus/bidsNum as bidRate,
        |       adDisplayNum, adClickNum, adClickNum/adDisplayNum as adClickRate,
        |       MediumDisplayNum, MediumClickNum, MediumClickNum/MediumDisplayNum as mediumClickRate,
        |       adconsume, adcost
        |  from tabtemp
    """.stripMargin
    val sinkDF = spark.sql(RegionSQL2)

    // 寫(xiě)文件
    sinkDF.coalesce(1).write.mode(SaveMode.Append).json(sinkDataPath)
  }
}

6、數(shù)據(jù)標(biāo)簽化

6.1 什么是數(shù)據(jù)標(biāo)簽化

  • 為什么要給數(shù)據(jù)打標(biāo)簽

    • 分析數(shù)據(jù)的需求
    • 用戶對(duì)與數(shù)據(jù)搜索的需求勿决,支持定向人群的條件篩選乒躺。如:
      • 地域,甚至是商圈
      • 性別
      • 年齡
      • 興趣
      • 設(shè)備
  • 數(shù)據(jù)格式

    目標(biāo)數(shù)據(jù):(用戶id低缩, 所有標(biāo)簽)嘉冒。標(biāo)簽如下所示:

    (CH@123485 -> 1.0, KW@word -> 1.0, CT@Beijing -> 1.0, GD@女 -> 1.0, AGE@40 -> 1.0, TA@北海 -> 1.0, TA@沙灘 -> 1.0)

    • Tag 數(shù)據(jù)組織形式Map[String, Double]
    • 前綴+標(biāo)簽曹货;1.0為權(quán)重
  • 需要制作的標(biāo)簽

    • 廣告類型
    • 渠道
    • App名稱
    • 性別
    • 地理位置
    • 設(shè)備
    • 關(guān)鍵詞
    • 年齡
    • 商圈(暫時(shí)不管)
  • 日志數(shù)據(jù)的標(biāo)簽化

    • 計(jì)算標(biāo)簽(廣告類型、渠道讳推、AppName顶籽、性別 ... ...)
    • 提取用戶標(biāo)識(shí)
    • 統(tǒng)一用戶識(shí)別
    • 標(biāo)簽數(shù)據(jù)落地

6.2 搭建框架

object TagProcessor extends Processor{
  override def process(spark: SparkSession, kudu: KuduContext): Unit = {
    // 定義參數(shù)
    val sourceTableName = ConfigHolder.ADMainTableName
    val sinkTableName = ""
    val keys = ""

    // 1 讀數(shù)據(jù)
    val sourceDF = spark.read
      .option("kudu.master", ConfigHolder.kuduMaster)
      .option("kudu.table", sourceTableName)
      .kudu

    // 2 處理數(shù)據(jù)
    sourceDF.rdd
      .map(row => {
        // 廣告類型、渠道银觅、App名稱
        val adTags = AdTypeTag.make(row)

        // 性別礼饱、地理位置、設(shè)備

        // 關(guān)鍵詞究驴、年齡镊绪、商圈

      })

    // 3 保存數(shù)據(jù)

  }
}

定義接口類:

import org.apache.spark.sql.Row

trait TagMaker {
  def make(row: Row, dic: collection.Map[String, String]=null): Map[String, Double]
}

6.3 打標(biāo)簽

6.3.1 廣告類型(AdTypeTag)

字段意義 1:banner; 2:插屏纳胧; 3:全屏

import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.spark.sql.Row

object AdTypeTag extends TagMaker{
  private val adPrefix = "adtype@"

  override def make(row: Row, dic: collection.Map[String, String]): Map[String, Double] = {
    // 1 取值
    val adType: Long = row.getAs[Long]("adspacetype")
    // 1:banner镰吆; 2:插屏; 3:全屏

    // 2 計(jì)算標(biāo)簽
    adType match {
      case 1 => Map(s"${adPrefix}banner" -> 1.0)
      case 2 => Map(s"${adPrefix}插屏" -> 1.0)
      case 3 => Map(s"${adPrefix}全屏" -> 1.0)
      case _ => Map[String, Double]()
    }
  }
}

6.3.2 渠道(ChannelTag)

字段:channelid

import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row

object ChannelTag extends TagMaker{
  private val channelPrefix = "channel@"

  override def make(row: Row, dic: collection.Map[String, String]=null): Map[String, Double] = {
    // 1 取值
    val channelid = row.getAs[String]("channelid")
    // 2 計(jì)算標(biāo)簽
    if (StringUtils.isNotBlank(channelid)){
      Map(s"${channelPrefix}channelid" -> 1.0)
    }
    else
      Map[String, Double]()
  }

  def main(args: Array[String]): Unit = {
    // 判斷某字符串是否不為空跑慕,且長(zhǎng)度不為0万皿,且不由空白符(空格)構(gòu)成
    if (!StringUtils.isNotBlank(null)) println("blank1 !")
    if (!StringUtils.isNotBlank("")) println("blank2 !")
    if (!StringUtils.isNotBlank("   ")) println("blank3 !")
  }
}

6.3.3 App名稱(AppNameTag)

字段:appid;要將 appid 轉(zhuǎn)為 appname

查給定的字典表:dicapp

import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row

object AppNameTag extends TagMaker{
  // 獲取前綴
  private val appNamePrefix = "appname@"

  override def make(row: Row, dic: collection.Map[String, String]): Map[String, Double] = {
    // 1 獲取地段信息
    val appId = row.getAs[String]("appid")

    // 2 計(jì)算并返回標(biāo)簽
    val appName = dic.getOrElse(appId, "")
    if (StringUtils.isNotBlank(appName))
      Map(s"${appNamePrefix}$appName" -> 1.0)
    else
      Map[String, Double]()
  }
}

6.3.4 性別(SexTag)

字段:sex核行;

import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row

object SexTag extends TagMaker{
  private val sexPrefix: String = "sex@"

  override def make(row: Row, dic: collection.Map[String, String]=null): Map[String, Double] = {
    // 獲取標(biāo)簽信息
    val sexid: String = row.getAs[String]("sex")
    val sex = sexid match {
      case "1" => "男"
      case "2" => "女"
      case _   => "待填寫(xiě)"
    }

    // 計(jì)算返回標(biāo)簽
    if (StringUtils.isNotBlank(sex))
      Map(s"$sexPrefix$sex" -> 1.0)
    else
      Map[String, Double]()
  }
}

6.3.5 地理位置(GeoTag)

字段:region牢硅、city

import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row

object GeoTag extends TagMaker{
  private val regionPrefix = "region@"
  private val cityPrefix = "city@"

  override def make(row: Row, dic: collection.Map[String, String]=null): Map[String, Double] = {
    // 獲取標(biāo)簽信息
    val region = row.getAs[String]("region")
    val city = row.getAs[String]("city")

    // 計(jì)算并返回標(biāo)簽信息
    val regionTag = if (StringUtils.isNotBlank(region))
      Map(s"$regionPrefix$region" -> 1.0)
    else
      Map[String, Double]()

    val cityTag = if (StringUtils.isNotBlank(city))
      Map(s"$cityPrefix$city" -> 1.0)
    else
      Map[String, Double]()

    regionTag ++ cityTag
  }
}

6.3.6 設(shè)備(DeviceTag)

字段:client、networkmannername芝雪、ispname减余;

數(shù)據(jù)字典:dicdevice

  • client:設(shè)備類型 (1:android 2:ios 3:wp 4:others)
  • networkmannername:聯(lián)網(wǎng)方式名稱(2G、3G惩系、4G位岔、其他)
  • ispname:運(yùn)營(yíng)商名稱(電信、移動(dòng)堡牡、聯(lián)通...)
import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row

object DeviceTag extends TagMaker{
  val clientPrefix = "client@"
  val networkPrefix = "network@"
  val ispPrefix = "isp@"

  override def make(row: Row, dic: collection.Map[String, String]): Map[String, Double] = {
    // 獲取標(biāo)簽信息
    val clientName: String = row.getAs[Long]("client").toString
    val networkName: String = row.getAs[Long]("networkmannername").toString
    val ispName: String = row.getAs[Long]("ispname").toString

    // 計(jì)算并返回標(biāo)簽
    val clientId = dic.getOrElse(clientName, "D00010004")
    val networkId = dic.getOrElse(networkName, "D00020005")
    val ispId = dic.getOrElse(ispName, "D00030004")

    val clientTag = if (StringUtils.isNotBlank(clientId))
      Map(s"$clientPrefix$clientId" -> 1.0)
    else
      Map[String, Double]()

    val networkTag = if (StringUtils.isNotBlank(networkId))
      Map(s"$networkPrefix$networkId" -> 1.0)
    else
      Map[String, Double]()

    val ispTag = if (StringUtils.isNotBlank(ispId))
      Map(s"$ispPrefix$ispId" -> 1.0)
    else
      Map[String, Double]()

    clientTag ++ networkTag ++ ispTag
  }
}

6.3.7 關(guān)鍵詞(KeywordTag)

字段:keywords

import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row

object KeywordsTag extends TagMaker{
  private val keywordPrefix = "keyword@"

  override def make(row: Row, dic: collection.Map[String, String]): Map[String, Double] = {
    row.getAs[String]("keywords")
      .split(",")
      .filter(word => StringUtils.isNotBlank(word))
      .map(word => s"$keywordPrefix$word" -> 1.0)
      .toMap
  }
}

6.3.8 年齡(AgeTag)

字段:age

import cn.itbigdata.dmp.customtrait.TagMaker
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row

object AgeTag extends TagMaker{
  private val agePrefix = "age@"

  override def make(row: Row, dic: collection.Map[String, String]): Map[String, Double] = {
    val age = row.getAs[String]("age")

    if (StringUtils.isNotBlank(age))
      Map(s"$agePrefix$age" -> 1.0)
    else
      Map[String, Double]()
  }
}

6.3.9 主處理程序(TagProcessor)

import cn.itbigdata.dmp.customtrait.Processor
import cn.itbigdata.dmp.utils.DateUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession

object TagProcessor extends Processor{
  override def process(spark: SparkSession): Unit = {
    // 定義參數(shù)
    val sourceTableName = s"outputdata/maindata-${DateUtils.getYesterday}"
    val appdicFilePath = "data/dicapp"
    val deviceFilePath = "data/dicdevice"
    val sinkTableName = ""

    // 1 讀數(shù)據(jù)
    val sourceDF = spark.read.json(sourceTableName)

    // 讀app信息(文件)抒抬,轉(zhuǎn)換為廣播變量(優(yōu)化)
    val appdicMap = spark.sparkContext.textFile(appdicFilePath)
      .map(line => {
        val arr: Array[String] = line.split("##")
        (arr(0), arr(1))
      }).collectAsMap()
    val appdicBC: Broadcast[collection.Map[String, String]] = spark.sparkContext.broadcast(appdicMap)

    // 讀字典信息(文件),轉(zhuǎn)換為廣播變量(優(yōu)化)
    val deviceMap = spark.sparkContext.textFile(deviceFilePath)
      .map(line => {
        val arr: Array[String] = line.split("##")
        (arr(0), arr(1))
      }).collectAsMap()
    val deviceBC: Broadcast[collection.Map[String, String]] = spark.sparkContext.broadcast(deviceMap)

    // 2 處理數(shù)據(jù)
    sourceDF.printSchema()
    sourceDF.rdd
      .map(row => {
        // 廣告類型晤柄、渠道擦剑、App名稱
        val adTags: Map[String, Double] = AdTypeTag.make(row)
        val channelTags: Map[String, Double] = ChannelTag.make(row)
        val appNameTags: Map[String, Double] = AppNameTag.make(row, appdicBC.value)

        // 性別、地理位置芥颈、設(shè)備類型
        val sexTags = SexTag.make(row)
        val geoTags = GeoTag.make(row)
        val deviceTags = DeviceTag.make(row, deviceBC.value)

        // 關(guān)鍵詞惠勒、年齡
        val keywordsTags = KeywordsTag.make(row)
        val ageTags = AgeTag.make(row)

        // 將所有數(shù)據(jù)組成一個(gè)大的 Map 返回
        val tags = adTags ++ channelTags ++ appNameTags ++ sexTags ++ geoTags ++ deviceTags ++ keywordsTags ++ ageTags

        tags
      }).collect.foreach(println)

    // 3 保存數(shù)據(jù)

  }
}

6.4 提取用戶標(biāo)識(shí)

  • 日志數(shù)據(jù)針對(duì)某個(gè)用戶單次特定的瀏覽行為

  • 一個(gè)用戶一天可能存在多條數(shù)據(jù)

  • 標(biāo)簽是針對(duì)人的

  • 存在的問(wèn)題:

    • 在數(shù)據(jù)集中抽出人的概念、讓一個(gè)人能對(duì)應(yīng)一條數(shù)據(jù)
    • 在日志信息中找不到可用的用戶id爬坑,只能退而求其次纠屋,找設(shè)備的信息,用設(shè)備的信息標(biāo)識(shí)用戶:
      • IMEI:國(guó)際移動(dòng)設(shè)備識(shí)別碼(International Mobile Equipment Identity盾计,IMEI)巾遭,即通常所說(shuō)的手機(jī)序列號(hào)肉康、手機(jī)“串號(hào)”,用于在移動(dòng)電話網(wǎng)絡(luò)中識(shí)別每一部獨(dú)立的手機(jī)等移動(dòng)通信設(shè)備灼舍,相當(dāng)于移動(dòng)電話的身份證吼和。IMEI是寫(xiě)在主板上的,重裝APP不會(huì)改變IMEI骑素。Android 6.0以上系統(tǒng)需要用戶授予read_phone_state權(quán)限炫乓,如果用戶拒絕就無(wú)法獲得;
      • IDFA:于iOS 6 時(shí)面世献丑,可以監(jiān)控廣告效果末捣,同時(shí)保證用戶設(shè)備不被APP追蹤的折中方案〈撮希可能發(fā)生變化箩做,如系統(tǒng)重置、在設(shè)置里還原廣告標(biāo)識(shí)符妥畏。用戶可以在設(shè)置里打開(kāi)“限制廣告跟蹤”邦邦;
      • mac地址:硬件標(biāo)識(shí)符,包括WiFi mac地址和藍(lán)牙m(xù)ac地址醉蚁。iOS 7 之后被禁止燃辖;
      • OpenUDID:在iOS 5發(fā)布時(shí),UDID被棄用了网棍,這引起了廣大開(kāi)發(fā)者需要尋找一個(gè)可以替代UDID黔龟,并且不受蘋(píng)果控制的方案。由此OpenUDID成為了當(dāng)時(shí)使用最廣泛的開(kāi)源UDID替代方案滥玷。OpenUDID在工程中實(shí)現(xiàn)起來(lái)非常簡(jiǎn)單氏身,并且還支持一系列的廣告提供商;
      • Android ID:在設(shè)備首次啟動(dòng)時(shí)惑畴,系統(tǒng)會(huì)隨機(jī)生成一個(gè)64位的數(shù)字蛋欣,并把這個(gè)數(shù)字以16進(jìn)制字符串的形式保存下來(lái),這個(gè)16進(jìn)制的字符串就是ANDROID_ID桨菜,當(dāng)設(shè)備被wipe后該值會(huì)被重置豁状;
    • 日志數(shù)據(jù)中可用于標(biāo)識(shí)用戶的字段包括:
      • imei捉偏、mac倒得、idfa、openudid夭禽、androidid
      • imeimd5霞掺、macmd5、idfamd5讹躯、openudidmd5菩彬、androididmd5
      • imeisha1缠劝、macsha1、idfasha1骗灶、openudidsha1惨恭、androididsha1
    • 什么是無(wú)效數(shù)據(jù):以上15個(gè)字段全部為空锣笨,那么這條數(shù)據(jù)不能與任何用戶發(fā)生關(guān)聯(lián)决采,這條數(shù)據(jù)對(duì)我們來(lái)說(shuō)沒(méi)有任何用處,它是無(wú)效數(shù)據(jù)唇辨。這些數(shù)據(jù)需要除去免都。
    // 15個(gè)字段同時(shí)為空時(shí)需要過(guò)濾
    lazy val filterSQL: String = idFields
        .split(",")
        .map(field => s"$field != ''")
        .mkString(" or ")
    
            // 抽取用戶標(biāo)識(shí)
            val userIds = getUserIds(row)
    
            // 返回標(biāo)簽
            (userIds.head, (userIds, tags))
    
      // 提取用戶標(biāo)識(shí)
      private def getUserIds(row: Row): List[String] = {
        val userIds: List[String] = idFields.split(",")
          .map(field => (field, row.getAs[String](field)))
          .filter { case (key, value) => StringUtils.isNotBlank(value) }
          .map { case (key, value) => s"$key::$value" }.toList
    
        userIds
      }
    

6.5 用戶識(shí)別

  • 使用十五個(gè)字段(非空)聯(lián)合標(biāo)識(shí)用戶
  • 數(shù)據(jù)采集過(guò)程中:
    • 每次采集的數(shù)據(jù)可能是不同的字段
    • 某些字段還可能發(fā)生變化
  • 如何識(shí)別相同用戶的數(shù)據(jù)锉罐?

6.6 用戶識(shí)別&數(shù)據(jù)聚合與合并

  // 統(tǒng)一用戶識(shí)別;數(shù)據(jù)聚合與合并
  private def graphxAnalysis(rdd: RDD[(List[String], List[(String, Double)])]): RDD[(List[String], List[(String, Double)])] ={
    // 1 定義頂點(diǎn)(數(shù)據(jù)結(jié)構(gòu):Long, ""绕娘; 算法:List中每個(gè)元素都可作為頂點(diǎn)脓规,List本身也可作為頂點(diǎn))
    val dotsRDD: RDD[(String, List[String])] = rdd.flatMap{ case (lst1, _) => lst1.map(elem => (elem, lst1)) }
    val vertexes: RDD[(Long, String)] = dotsRDD.map { case (id, ids) => (id.hashCode.toLong, "") }

    // 2 定義邊(數(shù)據(jù)結(jié)構(gòu): Edge(Long, Long, 0))
    val edges: RDD[Edge[Int]] = dotsRDD.map { case (id, ids) => Edge(id.hashCode.toLong, ids.mkString.hashCode.toLong, 0) }

    // 3 生成圖
    val graph = Graph(vertexes, edges)

    // 4 強(qiáng)連通圖
    val idRDD: VertexRDD[VertexId] = graph.connectedComponents()
      .vertices

    // 5 定義數(shù)據(jù)(ids與tags)
    val idsRDD: RDD[(VertexId, (List[String], List[(String, Double)]))] =
      rdd.map { case (ids, tags) => (ids.mkString.hashCode.toLong, (ids, tags)) }

    // 6 步驟4的結(jié)果 與 步驟5的結(jié)果 做join,將全部的數(shù)據(jù)做了分類【一個(gè)用戶一個(gè)分類】
    val joinRDD: RDD[(VertexId, (List[String], List[(String, Double)]))] = idRDD.join(idsRDD)
      .map { case (key, value) => value }

    // 7 數(shù)據(jù)的聚合(相同用戶的數(shù)據(jù)放在一起)
    val aggRDD: RDD[(VertexId, (List[String], List[(String, Double)]))] = joinRDD.reduceByKey { case ((bufferIds, bufferTags), (ids, tags)) =>
      (bufferIds ++ ids, bufferTags ++ tags)
    }

    // 8 數(shù)據(jù)的合并(對(duì)于id,去重险领;對(duì)tags侨舆,合并權(quán)重)
    val resultRDD: RDD[(List[String], List[(String, Double)])] = aggRDD.map { case (key, (ids, tags)) =>
      val newTags = tags.groupBy(x => x._1)
        .mapValues(lst => lst.map { case (word, weight) => weight }.sum)
        .toList
      (ids.distinct, newTags)
    }

    resultRDD
  }

6.7 標(biāo)簽落地

數(shù)據(jù)保存到kudu中,請(qǐng)注意:

1舷暮、每天保存一張表(需要新建)态罪,表名:usertags_當(dāng)天日期

2、數(shù)據(jù)類型轉(zhuǎn)換 RDD [(List[String], List[(String, Double)])] => RDD[(String, String)] => DataFrame

  • 將 List[String] 轉(zhuǎn)為 String下面;分隔符的定義要注意
  • 將 List[(String, Double)] 轉(zhuǎn)為String复颈,分隔符的定義要注意
  • 分隔符:不能與數(shù)據(jù)中的符號(hào)重復(fù);分隔符保證要能加上沥割,還要能去掉耗啦。
    // 3 數(shù)據(jù)落地(kudu)
    // 將List數(shù)據(jù)類型變?yōu)镾tring
    import spark.implicits._
    val resultDF = mergeRDD.map{ case (ids, tags) =>
      (ids.mkString("||"), tags.map{case (key, value) => s"$key->$value"}.mkString("||"))
    }.toDF("ids", "tags")
    DBUtils.appendData(kudu, resultDF, sinkTableName, keys)
  }

// 獲取昨天日期

6.8 標(biāo)簽處理代碼(TagProcessor)

package cn.itcast.dmp.tags

import cn.itcast.dmp.Processor
import cn.itcast.dmp.utils.ConfigHolder
import org.apache.commons.lang3.StringUtils
import org.apache.kudu.spark.kudu.{KuduContext, KuduDataFrameReader}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}

object TagProcessor extends Processor{
  override def process(spark: SparkSession, kudu: KuduContext): Unit = {
    // 定義參數(shù)
    val sourceTableName = ConfigHolder.ADMainTableName
    val sinkTableName = ""
    val keys = ""
    val dicAppPath = ConfigHolder.appNameDic
    val dicDevicePath = ConfigHolder.deviceDic
    val tradingAreaTableName = ConfigHolder.tradingAreaTableName
    val filterSQL = ConfigHolder.filterSQL
    val idFields = ConfigHolder.idFields

    // 1 讀數(shù)據(jù)
    val sc = spark.sparkContext
    val sourceDF = spark.read
      .option("kudu.master", ConfigHolder.kuduMaster)
      .option("kudu.table", sourceTableName)
      .kudu

    // 讀app字典信息
    val appDic = sc.textFile(dicAppPath)
      .map(line => {
        val arr = line.split("##")
        (arr(0), arr(1))
      })
      .collect()
      .toMap
    val appDicBC = sc.broadcast(appDic)

    // 讀device字典信息
    val deviceDic = sc.textFile(dicDevicePath)
      .map(line => {
        val arr = line.split("##")
        (arr(0), arr(1))
      })
      .collect()
      .toMap
    val deviceDicBC = sc.broadcast(deviceDic)

    // 讀商圈信息(讀;過(guò)濾机杜;轉(zhuǎn)為rdd帜讲;取數(shù);收集數(shù)據(jù)到driver椒拗;轉(zhuǎn)為map)
    // 限制條件:商圈表的信息不能過(guò)大(過(guò)濾后的大小小于20M為宜)
    val tradingAreaDic: Map[String, String] = spark.read
      .option("kudu.master", ConfigHolder.kuduMaster)
      .option("kudu.table", tradingAreaTableName)
      .kudu
      .filter("areas!=''")
      .rdd
      .map { case Row(geohash: String, areas: String) => (geohash, areas) }
      .collect()
      .toMap
    val tradingAreaBC = sc.broadcast(tradingAreaDic)

    // 2 處理數(shù)據(jù)
    // 過(guò)濾15個(gè)標(biāo)識(shí)字段都為空的數(shù)據(jù)
    val userTagsRDD: RDD[(List[String], List[(String, Double)])] = sourceDF.filter(filterSQL)
      .rdd
      .map(row => {
        // 廣告類型似将、渠道、App名稱
        val adTags = AdTypeTag.make(row)
        val channelTags = ChannelTag.make(row)
        val appNameTags = AppNameTag.make(row, appDicBC.value)

        // 性別蚀苛、地理位置在验、設(shè)備
        val sexTags = SexTag.make(row, appDicBC.value)
        val geoTags = GeoTag.make(row, appDicBC.value)
        val deviceTags = DeviceTag.make(row, deviceDicBC.value)

        // 關(guān)鍵詞、年齡堵未、商圈
        val keywordTags = KeywordTag.make(row, appDicBC.value)
        val ageTags = AgeTag.make(row, appDicBC.value)
        val tradingAreaTags = tradingAreaTag.make(row, tradingAreaDic.value)

        // 標(biāo)簽合并
        val tags = adTags ++ channelTags ++ appNameTags ++ sexTags ++ geoTags ++ deviceTags ++ keywordTags ++ ageTags ++ tradingAreaTags

        // 抽取用戶標(biāo)識(shí)
        val userIds: List[String] = idFields.split(",")
          .map(field => (field, row.getAs[String](field)))
          .filter { case (key, value) => StringUtils.isNotBlank(value) }
          .map { case (key, value) => s"$key::$value" }.toList

        // 返回標(biāo)簽
        (userIds, tags)
      })
    userTagsRDD.foreach(println)

    // 3 統(tǒng)一用戶識(shí)別腋舌,合并數(shù)據(jù)
    val mergeRDD: RDD[(List[String], List[(String, Double)])] = graphxAnalysis(logTagsRDD)      
      
    // 4 數(shù)據(jù)落地(kudu)
    // 將List數(shù)據(jù)類型變?yōu)镾tring
    import spark.implicits._
    val resultDF = mergeRDD.map{ case (ids, tags) =>
      (ids.mkString("|||"), tags.map{case (key, value) => s"$key->$value"}.mkString("|||"))
    }.toDF("ids", "tags")
    DBUtils.createTableAndsaveData(kudu, resultDF, sinkTableName, keys)      
      
    // 關(guān)閉資源
    sc.stop()  
  }
}

7、Spark GraphX

7.1 圖計(jì)算基本概念

圖是用于表示對(duì)象之間模型關(guān)系的數(shù)學(xué)結(jié)構(gòu)渗蟹。圖由頂點(diǎn)和連接頂點(diǎn)的邊構(gòu)成块饺。頂點(diǎn)是對(duì)象赞辩,而邊是對(duì)象之間的關(guān)系。

1555407826579.png

有向圖是頂點(diǎn)之間的邊是有方向的授艰。有向圖的例子如 Twitter 上的關(guān)注者辨嗽。用戶 Bob 關(guān)注了用戶 Carol ,而 Carol 并沒(méi)有關(guān)注 Bob淮腾。

1555407851059.png

就是圖召庞,通過(guò)點(diǎn)(對(duì)象)和邊(路徑),構(gòu)成了不同對(duì)象之間的關(guān)系

7.2 圖計(jì)算應(yīng)用場(chǎng)景

1)最短路徑

最短路徑在社交網(wǎng)絡(luò)里面来破,有一個(gè)六度空間的理論篮灼,表示你和任何一個(gè)陌生人之間所間隔的人不會(huì)超過(guò)五個(gè),也就是說(shuō),最多通過(guò)五個(gè)中間人你就能夠認(rèn)識(shí)任何一個(gè)陌生人。這也是圖算法的一種徘禁,也就是說(shuō)诅诱,任何兩個(gè)人之間的最短路徑都是小于等于6。

2)社群發(fā)現(xiàn)

社群發(fā)現(xiàn)用來(lái)發(fā)現(xiàn)社交網(wǎng)絡(luò)中三角形的個(gè)數(shù)(圈子)送朱,可以分析出哪些圈子更穩(wěn)固娘荡,關(guān)系更緊密,用來(lái)衡量社群耦合關(guān)系的緊密程度驶沼。一個(gè)人的社交圈子里面炮沐,三角形個(gè)數(shù)越多,說(shuō)明他的社交關(guān)系越穩(wěn)固回怜、緊密大年。像Facebook、Twitter等社交網(wǎng)站玉雾,常用到的的社交分析算法就是社群發(fā)現(xiàn)翔试。

3)推薦算法(ALS)

推薦算法(ALS)ALS是一個(gè)矩陣分解算法,比如購(gòu)物網(wǎng)站要給用戶進(jìn)行商品推薦复旬,就需要知道哪些用戶對(duì)哪些商品感興趣垦缅,這時(shí),可以通過(guò)ALS構(gòu)建一個(gè)矩陣圖驹碍,在這個(gè)矩陣圖里壁涎,假如被用戶購(gòu)買(mǎi)過(guò)的商品是1,沒(méi)有被用戶購(gòu)買(mǎi)過(guò)的是0志秃,這時(shí)我們需要計(jì)算的就是有哪些0有可能會(huì)變成1

GraphX 通過(guò)彈性分布式屬性圖擴(kuò)展了 Spark RDD怔球。

通常,在圖計(jì)算中洽损,基本的數(shù)據(jù)結(jié)構(gòu)表達(dá)是:

  • Graph = (Vertex庞溜,Edge)
    • Vertex (頂點(diǎn)/節(jié)點(diǎn)) (VertexId: Long, info: Any)
    • Edge (邊)Edge(srcId: VertexId, dstId: VertexId, attr) 【attr 權(quán)重】

7.3 Spark GraphX例子一(強(qiáng)連通體)

1557105493255.png
ID 關(guān)鍵詞 AppName
1 卡羅拉 團(tuán)車(chē)
2 印度尼西亞,巴厘島 去哪兒旅游
3 善導(dǎo)大師 知乎
4 王的女人,美人無(wú)淚 優(yōu)酷
5 世界杯 搜狐
6 劉嘉玲,港臺(tái)娛樂(lè) 鳳凰網(wǎng)
7 日韓娛樂(lè) 花椒直播
9 AK47 絕地求生:刺激戰(zhàn)場(chǎng)
10 搞笑 YY直播
11 文學(xué),時(shí)政 知乎
ID IDS
1 43125
2 43125
3 43125
4 43125
5 43125
4 4567
5 4567
6 4567
7 4567
9 91011
10 91011
11 91011

Connected Components算法(連通體算法):

1革半、定義頂點(diǎn)

2碑定、定義邊

3流码、生成圖

4、用標(biāo)注圖中每個(gè)連通體延刘,將連通體中序號(hào)最小的頂點(diǎn)的id作為連通體的id

任務(wù):

  1. 定義頂點(diǎn)
  2. 定義邊
  3. 生成圖
  4. 生成強(qiáng)連通圖
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object GraphXDemo1 {
  def main(args: Array[String]): Unit = {
    // 1漫试、初始化sparkcontext
    val conf = new SparkConf()
      .setAppName("GraphXDemo1")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    // 2、定義頂點(diǎn) (VertexId: Long, info: Any)
    val vertexes: RDD[(VertexId, Map[String, Double])] = sc.makeRDD(List(
      (1L, Map("keyword:卡羅拉" -> 1.0, "AppName:團(tuán)車(chē)" -> 1.0)),
      (2L, Map("keyword:印度尼西亞" -> 1.0, "keyword:巴厘島" -> 1.0, "AppName:去哪兒旅游" -> 1.0)),
      (3L, Map("keyword:善導(dǎo)大師" -> 1.0, "AppName:知乎" -> 1.0)),
      (4L, Map("keyword:王的女人" -> 1.0, "keyword:美人無(wú)淚" -> 1.0, "AppName:優(yōu)酷" -> 1.0)),
      (5L, Map("keyword:世界杯" -> 1.0, "AppName:搜狐" -> 1.0)),
      (6L, Map("keyword:劉嘉玲" -> 1.0, "keyword:港臺(tái)娛樂(lè)" -> 1.0, "AppName:鳳凰網(wǎng)" -> 1.0)),
      (7L, Map("keyword:日韓娛樂(lè)" -> 1.0, "AppName:花椒直播" -> 1.0)),
      (9L, Map("keyword:AK47" -> 1.0, "AppName:絕地求生:刺激戰(zhàn)場(chǎng)" -> 1.0)),
      (10L, Map("keyword:搞笑" -> 1.0, "AppName:YY直播" -> 1.0)),
      (11L, Map("keyword:文學(xué)" -> 1.0, "keyword:時(shí)政" -> 1.0, "AppName:知乎" -> 1.0))
    ))

    // 3碘赖、定義邊 Edge(srcId: VertexId, dstId: VertexId, attr)
    val edges: RDD[Edge[Int]] = sc.makeRDD(List(
      Edge(1L, 42125L, 0),
      Edge(2L, 42125L, 0),
      Edge(3L, 42125L, 0),
      Edge(4L, 42125L, 0),
      Edge(5L, 42125L, 0),
      Edge(4L, 4567L, 0),
      Edge(5L, 4567L, 0),
      Edge(6L, 4567L, 0),
      Edge(7L, 4567L, 0),
      Edge(9L, 91011, 0),
      Edge(10L, 91011, 0),
      Edge(11L, 91011, 0)
    ))

    // 4驾荣、生成圖;生成強(qiáng)聯(lián)通圖
    Graph(vertexes, edges)
      .connectedComponents()
      .vertices
      .sortBy(_._2)
      .collect()
      .foreach(println)

    // 5普泡、資源釋放
    sc.stop()
  }
}

7.4 Spark GraphX例子二(用戶識(shí)別&數(shù)據(jù)合并)

根據(jù)前面的例子播掷,我們已經(jīng)知道根據(jù)規(guī)則如何識(shí)別用戶,程序如何處理呢撼班?

數(shù)據(jù)的定義:

備注:

1歧匈、這里定義的數(shù)據(jù)格式與我們程序中的數(shù)據(jù)格式完全一致

2、RDD中是一個(gè)元組砰嘁,第一個(gè)元素代表用戶的各種 id 件炉;第二個(gè)元素代表用戶的標(biāo)簽

任務(wù):

1、6條數(shù)據(jù)代表多少個(gè)用戶

2矮湘、合并相同用戶的數(shù)據(jù)

    val dataRDD: RDD[(List[String], List[(String, Double)])] = sc.makeRDD(List(
      (List("a1", "b1", "c1"), List("keyword$北京" -> 1.0, "keyword$上海" -> 1.0, "area$中關(guān)村" -> 1.0)),
      (List("b1", "c2", "d1"), List("keyword$上海" -> 1.0, "keyword$天津" -> 1.0, "area$回龍觀" -> 1.0)),
      (List("d1"), List("keyword$天津" -> 1.0, "area$中關(guān)村" -> 1.0)),
      (List("a2", "b2", "c3"), List("keyword$大數(shù)據(jù)" -> 1.0, "keyword$spark" -> 1.0, "area$西二旗" -> 1.0)),
      (List("b2", "c4", "d4"), List("keyword$spark" -> 1.0, "area$五道口" -> 1.0)),
      (List("c3", "e3"), List("keyword$hive" -> 1.0, "keyword$spark" -> 1.0, "area$西二旗" -> 1.0))
    ))

完整的處理步驟:

  1. 定義頂點(diǎn)
  2. 定義邊
  3. 生成圖
  4. 找強(qiáng)連通體
  5. 找需要合并的數(shù)據(jù)
  6. 數(shù)據(jù)聚合
  7. 數(shù)據(jù)合并

處理程序:

import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object GraphXDemo {
  def main(args: Array[String]): Unit = {
    // 初始化
    val conf: SparkConf = new SparkConf().setAppName("GraphXDemo").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("error")

    // 定義數(shù)據(jù)
    val dataRDD: RDD[(List[String], List[(String, Double)])] = sc.makeRDD(List(
      (List("a1", "b1", "c1"), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中關(guān)村" -> 1.0)),
      (List("b1", "c2", "d1"), List("kw$上海" -> 1.0, "kw$天津" -> 1.0, "area$回龍觀" -> 1.0)),
      (List("d1"), List("kw$天津" -> 1.0, "area$中關(guān)村" -> 1.0)),
      (List("a2", "b2", "c3"), List("kw$大數(shù)據(jù)" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0)),
      (List("b2", "c4", "d4"), List("kw$spark" -> 1.0, "area$五道口" -> 1.0)),
      (List("c3", "e3"), List("kw$hive" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0))
    ))

    val value: RDD[(String, List[String], List[(String, Double)])] = dataRDD.flatMap { case (allIds: List[String], tags: List[(String, Double)]) => {
      allIds.map { case elem: String => (elem, allIds, tags) }
    }
    }

    // 1 將標(biāo)識(shí)信息中的每一個(gè)元素抽取出來(lái)斟冕,作為id
    // 備注1、這里使用了flatMap缅阳,將元素壓平磕蛇;
    // 備注2、這里丟掉了標(biāo)簽信息十办,因?yàn)檫@個(gè)RDD主要用于構(gòu)造頂點(diǎn)孤里、邊,tags信息用不
    // 備注3橘洞、頂點(diǎn)捌袜、邊的數(shù)據(jù)要求Long,所以這里做了數(shù)據(jù)類型轉(zhuǎn)換
    val dotRDD: RDD[(VertexId, VertexId)] = dataRDD.flatMap { case (allids, tags) =>
      // 方法一:好理解炸枣,不好寫(xiě)
      //      for (id <- allids) yield {
      //        (id.hashCode.toLong, allids.mkString.hashCode.toLong)
      //      }

      // 方法二:不好理解虏等,好寫(xiě)。兩方法等價(jià)
      allids.map(id => (id.hashCode.toLong, allids.mkString.hashCode.toLong))
    }

    // 2 定義頂點(diǎn)
    val vertexesRDD: RDD[(VertexId, String)] = dotRDD.map { case (id, ids) => (id, "") }

    // 3 定義邊(id: 單個(gè)的標(biāo)識(shí)信息适肠;ids: 全部的標(biāo)識(shí)信息)
    val edgesRDD: RDD[Edge[Int]] = dotRDD.map { case (id, ids) => Edge(id, ids, 0) }

    // 4 生成圖
    val graph = Graph(vertexesRDD, edgesRDD)

    // 5 找到強(qiáng)連通體
    val connectRDD: VertexRDD[VertexId] = graph.connectedComponents()
      .vertices

    // 6 定義中心點(diǎn)的數(shù)據(jù)
    val centerVertexRDD: RDD[(VertexId, (List[String], List[(String, Double)]))] = dataRDD.map { case (allids, tags) =>
      (allids.mkString.hashCode.toLong, (allids, tags))
    }

    // 7 步驟5霍衫、6的數(shù)據(jù)做join,獲取需要合并的數(shù)據(jù)
    val allInfoRDD = connectRDD.join(centerVertexRDD)
      .map { case (id1, (id2, (allIds, tags))) => (id2, (allIds, tags)) }

    // 8 數(shù)據(jù)聚合(即將同一個(gè)用戶的標(biāo)識(shí)侯养、標(biāo)簽放在一起)
    val mergeInfoRDD: RDD[(VertexId, (List[String], List[(String, Double)]))] = allInfoRDD.reduceByKey { case ((bufferList, bufferMap), (allIds, tags)) =>
      val newList = bufferList ++ allIds
      // map 的合并
      val newMap = bufferMap ++ tags
      (newList, newMap)
    }

    // 9 數(shù)據(jù)合并(allIds:去重敦跌;tags:合并權(quán)重)
    val resultRDD: RDD[(List[String], Map[String, Double])] = mergeInfoRDD.map { case (key, (allIds, tags)) =>
      val newIds = allIds.distinct
      // 按照key做聚合;然后對(duì)聚合得到的lst第二個(gè)元素做累加
      val newTags = tags.groupBy(x => x._1).mapValues(lst => lst.map(x => x._2).sum)
      (newIds, newTags)
    }
    resultRDD.foreach(println)

    sc.stop()
  }

  //  def main(args: Array[String]): Unit = {
  //    val lst = List(
  //      ("kw$大數(shù)據(jù)",1.0),
  //      ("kw$spark",1.0),
  //      ("area$西二旗",1.0),
  //      ("kw$spark",1.0),
  //      ("area$五道口",1.0),
  //      ("kw$hive",1.0),
  //      ("kw$spark",1.0),
  //      ("area$西二旗",1.0)
  //    )
  //
  //    lst.groupBy(x=> x._1).map{case (key, value) => (key, value.map(x=>x._2).sum)}.foreach(println)
  //    println("************************************************************")
  //
  //    lst.groupBy(x=> x._1).mapValues(lst => lst.map(x=>x._2).sum).foreach(println)
  //    println("************************************************************")
}

8、項(xiàng)目總結(jié)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末柠傍,一起剝皮案震驚了整個(gè)濱河市麸俘,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌惧笛,老刑警劉巖从媚,帶你破解...
    沈念sama閱讀 212,718評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異患整,居然都是意外死亡拜效,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)各谚,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)紧憾,“玉大人,你說(shuō)我怎么就攤上這事昌渤〉纠” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 158,207評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵愈涩,是天一觀的道長(zhǎng)望抽。 經(jīng)常有香客問(wèn)我,道長(zhǎng)履婉,這世上最難降的妖魔是什么煤篙? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,755評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮毁腿,結(jié)果婚禮上辑奈,老公的妹妹穿的比我還像新娘。我一直安慰自己已烤,他們只是感情好鸠窗,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,862評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著胯究,像睡著了一般稍计。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上裕循,一...
    開(kāi)封第一講書(shū)人閱讀 50,050評(píng)論 1 291
  • 那天臣嚣,我揣著相機(jī)與錄音,去河邊找鬼剥哑。 笑死硅则,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的株婴。 我是一名探鬼主播怎虫,決...
    沈念sama閱讀 39,136評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了大审?” 一聲冷哼從身側(cè)響起蘸际,我...
    開(kāi)封第一講書(shū)人閱讀 37,882評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎饥努,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體八回,經(jīng)...
    沈念sama閱讀 44,330評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡酷愧,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,651評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了缠诅。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片溶浴。...
    茶點(diǎn)故事閱讀 38,789評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖管引,靈堂內(nèi)的尸體忽然破棺而出士败,到底是詐尸還是另有隱情,我是刑警寧澤褥伴,帶...
    沈念sama閱讀 34,477評(píng)論 4 333
  • 正文 年R本政府宣布谅将,位于F島的核電站,受9級(jí)特大地震影響重慢,放射性物質(zhì)發(fā)生泄漏饥臂。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,135評(píng)論 3 317
  • 文/蒙蒙 一似踱、第九天 我趴在偏房一處隱蔽的房頂上張望隅熙。 院中可真熱鬧,春花似錦核芽、人聲如沸囚戚。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,864評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)驰坊。三九已至,卻和暖如春哮独,著一層夾襖步出監(jiān)牢的瞬間庐橙,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,099評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工借嗽, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留态鳖,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,598評(píng)論 2 362
  • 正文 我出身青樓恶导,卻偏偏與公主長(zhǎng)得像浆竭,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,697評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容