接到一個(gè)有趣的作業(yè)廊勃,就是分析豆瓣用戶關(guān)注的小組霉赡,通過(guò)小組標(biāo)簽給這個(gè)用戶畫(huà)像已骇。
任務(wù)主要有這幾部分:
1.通過(guò)爬取的數(shù)據(jù),利用Spark Graphx對(duì)這些數(shù)據(jù)構(gòu)圖
2.將這個(gè)圖進(jìn)行可視化
3.對(duì)用戶進(jìn)行畫(huà)像分析挣郭,找出他的興趣標(biāo)簽
環(huán)境搭建
首先需要搭建Spark,如果需要yarn進(jìn)行可視化管理的話還需要安裝Hadoop疗韵,這里我安裝的是Hadoop2.7.4+Spark2.2.0
CentOS7安裝Hadoop2.7.4
1.安裝JDK1.8
將原有的OpenJDK卸載兑障,并下載rpm包進(jìn)行安裝,將JAVA_HOME蕉汪、PATH等環(huán)境變量配置好流译,檢驗(yàn)JAVA是否安裝成功。
2.安裝Hadoop2.7.4
配置免密登錄
ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
新建文件夾/usr/hadoop者疤,并在該目錄下再新建四個(gè)文件夾
/usr/hadoop/hdfs/data
/usr/hadoop/hdfs/name
/usr/hadoop/hdfs/namesecondary
/usr/hadoop/tmp
下載Hadoop2.7.4福澡,并將其放置在/usr/hadoop/目錄下,解壓
設(shè)置環(huán)境變量驹马,并使環(huán)境變量生效革砸,source /etc/profile
JAVA_HOME=/usr/java/jdk1.8.0_144/
JRE_HOME=/usr/java/jdk1.8.0_144/jre/
SCALA_HOME=/usr/lib/scala
HADOOP_HOME=/usr/hadoop/hadoop-2.7.4
SPARK_HOME=/usr/spark-2.2.0-bin-hadoop2.7
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$SCALA_HOME/bin:$SPARK_HOME/bin:$HADOOP_HOME/bin
CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
export JAVA_HOME
export JRE_HOME
export PATH
export CLASSPATH
export SCALA_HOME
export HADOOP_HOME
export SPARK_HOME
進(jìn)入$HADOOP_HOME/etc/hadoop目錄,配置 hadoop-env.sh等糯累。涉及的配置文件如下:
hadoop-2.7.4/etc/hadoop/hadoop-env.sh
hadoop-2.7.4/etc/hadoop/yarn-env.sh
hadoop-2.7.4/etc/hadoop/core-site.xml
hadoop-2.7.4/etc/hadoop/hdfs-site.xml
hadoop-2.7.4/etc/hadoop/mapred-site.xml
hadoop-2.7.4/etc/hadoop/yarn-site.xml
(注意:有的文件只有template算利,需要改名,例如mv mapred-site.xml.template mapred-site.xml)
配置hadoop-env.sh
# The java implementation to use.
#export JAVA_HOME=${JAVA_HOME}
export JAVA_HOME=/usr/java/jdk1.8.0_144
配置yarn-env.sh
#export JAVA_HOME
export JAVA_HOME=/usr/java/jdk1.8.0_144
配置core-site.xml
添加如下配置:
description最好不要用中文
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
<description>HDFS的URI寇蚊,文件系統(tǒng)://namenode標(biāo)識(shí):端口號(hào)</description>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/hadoop/hdfs/tmp</value>
<description>namenode上本地的hadoop臨時(shí)文件夾 </description>
</property>
</configuration>
配置hdfs-site.xml
添加如下配置
<configuration>
<!—hdfs-site.xml-->
<property>
<name>dfs.name.dir</name>
<value>/usr/hadoop/hdfs/name</value>
<description>namenode上存儲(chǔ)hdfs名字空間元數(shù)據(jù) </description>
</property>
<property>
<name>dfs.data.dir</name>
<value>/usr/hadoop/hdfs/data</value>
<description>datanode上數(shù)據(jù)塊的物理存儲(chǔ)位置</description>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
<description>副本個(gè)數(shù)笔时,配置默認(rèn)是3,應(yīng)小于datanode機(jī)器數(shù)量</description>
</property>
</configuration>
配置mapred-site.xml
添加如下配置:
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
配置yarn-site.xml
添加如下配置:
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>${yarn.resourcemanager.hostname}:8999</value>
</property>
</configuration>
(注意:這里將yarn的管理端口改為8999,訪問(wèn)管理頁(yè)面時(shí)也需要用該端口訪問(wèn))
Hadoop啟動(dòng)
1)格式化namenode
$ bin/hdfs namenode –format
當(dāng)多次格式化時(shí)仗岸,遇到個(gè)選擇允耿,選擇no借笙,如果選擇yes,將會(huì)導(dǎo)致namenode和datanode中/usr/hadoop/hdfs/data/current/VERSION较锡、/usr/hadoop/hdfs/name/current/VERSION中CclusterID 不一致业稼,從而發(fā)生sbin/start-all.sh啟動(dòng)時(shí),有的DataNode進(jìn)程啟動(dòng)不起來(lái)(jps查看)蚂蕴,遇到這樣情況低散,將name/current下的VERSION中的clusterID復(fù)制到data/current下的VERSION中,覆蓋掉原來(lái)的clusterID骡楼,讓兩個(gè)保持一致熔号,然后重啟,啟動(dòng)后執(zhí)行jps鸟整,查看進(jìn)程引镊,參考(https://my.oschina.net/u/189445/blog/509385)
2)啟動(dòng)NameNode 和 DataNode 守護(hù)進(jìn)程
$ sbin/start-dfs.sh
3)啟動(dòng)ResourceManager 和 NodeManager 守護(hù)進(jìn)程
$ sbin/start-yarn.sh
或者直接$sbin/start-all.sh 將上述所有進(jìn)程啟動(dòng)。
啟動(dòng)驗(yàn)證
1)執(zhí)行jps命令篮条,有如下進(jìn)程弟头,說(shuō)明Hadoop正常啟動(dòng)
# jps
54679 NameNode
54774 DataNode
15741 Jps
55214 NodeManager
55118 ResourceManager
54965 SecondaryNameNode
在瀏覽器中輸入:http://HadoopMaster的IP:8999/ 即可看到Y(jié)ARN的ResourceManager的界面。注意:默認(rèn)端口是8088涉茧,這里我設(shè)置了yarn.resourcemanager.webapp.address為:${yarn.resourcemanager.hostname}:8999赴恨。
或輸入http://HadoopMaster的IP:50070/查看NameNode狀態(tài)
Spark安裝
下載spark-2.2.0-bin-hadoop2.7,并進(jìn)行解壓伴栓,配置SPARK_HOME環(huán)境變量伦连,運(yùn)行spark-shell,查看spark是否能夠正常啟動(dòng)挣饥。
至此除师,生產(chǎn)環(huán)境搭建完畢!
開(kāi)發(fā)環(huán)境
折騰了兩天扔枫,寫(xiě)代碼運(yùn)行調(diào)試汛聚,最麻煩的環(huán)節(jié)還屬運(yùn)行調(diào)試。調(diào)試都是通過(guò)maven將程序打成jar包短荐,然后上傳到裝有Hadoop倚舀、Spark的服務(wù)器(用一個(gè)虛擬機(jī)來(lái)模擬)上在沙盒里進(jìn)行運(yùn)行,執(zhí)行效率之慢可想而知忍宋。有沒(méi)有什么更為便捷的辦法痕貌,寫(xiě)完代碼,右鍵直接執(zhí)行呢糠排,答案是有的舵稠。
Win7 64位+IDEA開(kāi)發(fā)Spark應(yīng)用
下載編譯好的Hadoop bin目錄文件夾(其中包含winutils.exe、hadoop.dll等文件)
設(shè)置環(huán)境變量HADOOP_HOME,在Path變量中增加一條哺徊,%HADOOP_HOME%/bin
下載Hadoop對(duì)應(yīng)版本編譯好的Spark文件
設(shè)置環(huán)境變量SPARK_HOME室琢,在Path變量中增加一條,%SPARK_HOME%/bin
cmd彈出窗口中測(cè)試安裝是否成功
(這個(gè)版本可能會(huì)報(bào)Hive錯(cuò)誤落追,可以忽略)
IDEA配置
在運(yùn)行某個(gè)Scala應(yīng)用時(shí)盈滴,需要增加一條配置參數(shù),
-Dspark.master=local[2]
如若開(kāi)發(fā)時(shí)依然提示找不到Hadoop目錄轿钠,可以在代碼中增加一條屬性配置
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.4\\")
正題
首先巢钓,看一下數(shù)據(jù)結(jié)構(gòu),有兩個(gè)數(shù)據(jù)集疗垛,一個(gè)是用戶數(shù)據(jù)機(jī)另一個(gè)是小組數(shù)據(jù)集症汹,這些數(shù)據(jù)集都是從Mongodb中導(dǎo)出而來(lái)。
用戶(persons.json)
{"_id":{"$oid":"59f3de6b0b6e9a0b9ca7bf4e"},"name":"person1","no":"168812667","group1":"HZhome","group2":"145219","group3":"276209","group4":"hzhouse","group5":"467221"}
...
小組(groups.json)
{"_id":{"$oid":"59f3de5f0b6e9a0b9ca7bf49"},"name":"杭州租房","no":"HZhome","tag1":"杭州","tag2":"租房","tag3":"合租","tag4":"求租","tag5":"杭州租房"}
...
由實(shí)例數(shù)據(jù)可以看出贷腕,persons.json每行記錄存有用戶信息烈菌,同時(shí)還包括該用戶加入的組號(hào)(groupx)。而groups.json中記錄小組的信息花履。這兩個(gè)數(shù)據(jù)集通過(guò)groupsno進(jìn)行關(guān)聯(lián)(注意:groupno并非是數(shù)字字符串)
其次,對(duì)數(shù)據(jù)進(jìn)行處理
因?yàn)槊啃卸际且粭ljson格式的記錄挚赊,可以利用fastjson對(duì)記錄進(jìn)行解析诡壁,因此pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dhl</groupId>
<artifactId>DoubanGraphx</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.tools.version>2.11</scala.tools.version>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.32</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
對(duì)persons.json進(jìn)行解析,構(gòu)建用戶和小組之間的關(guān)系荠割,同時(shí)graphx需要Long類型字段作為其VertexID妹卿,這里我們通過(guò)利用導(dǎo)出數(shù)據(jù)的oid字段進(jìn)行運(yùn)算獲得(作為Mongodb表中的rowid,該字段應(yīng)該具備唯一性)
case class Person(poidhex: VertexId, oid: String, name: String, no: String, groupno: String, vertextype: String)
def parsePerson(str: String): List[Person] = {
var result = List[Person]()
val json = JSON.parseObject(str)
val oidjson = json.getJSONObject("_id")
val oid = oidjson.getString("$oid")
val oidhex = new BigInteger(oid, 16).longValue()
val name = json.getString("name")
val no = json.getString("no")
val groups = new ListBuffer[String]
val jsonset = json.keySet().iterator()
while (jsonset.hasNext() == true) {
val strkey = jsonset.next()
if (strkey.length() > 4 && strkey.substring(0, 5).compareTo("group") == 0) {
result .::=(Person(oidhex, oid, name, no, json.getString(strkey),"p"))
}
}
result
}
同樣蔑鹦,對(duì)groups.json進(jìn)行處理
case class Group(goidhex: VertexId, oid: String, name: String, groupno: String, tags: List[String], vertextype: String)
def parseGroup(str: String): Group = {
val json = JSON.parseObject(str)
val oidjson = json.getJSONObject("_id")
val oid = oidjson.getString("$oid")
val oidhex = new BigInteger(oid, 16).longValue()
val name = json.getString("name")
val groupno = json.getString("no")
var tags = List[String]()
val jsonset = json.keySet().iterator()
while (jsonset.hasNext() == true) {
val strkey = jsonset.next()
if (strkey.length() > 3 && strkey.substring(0, 3).compareTo("tag") == 0) {
tags .::=(json.getString(strkey))
}
}
Group(oidhex, oid, name, groupno, tags, "g")
}
審查數(shù)據(jù)時(shí)候發(fā)現(xiàn)groups.json中存在no相同的記錄夺克,為此需要進(jìn)行去重
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.4\\")
val conf = new SparkConf().setAppName("Douban User Relationship")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val personsData = sc.textFile("C:\\Users\\daihl\\Desktop\\persons2.json")
val groupsData = sc.textFile("C:\\Users\\daihl\\Desktop\\groups2.json")
val personsRDD: RDD[Person] = personsData.flatMap(parsePerson).cache()
val groupsRDD: RDD[Group] = groupsData.map(parseGroup).cache()
//將RDD轉(zhuǎn)為DataFrame
val personsdf = sqlContext.createDataFrame(personsRDD)
val groupsdf = sqlContext.createDataFrame(groupsRDD)
//根據(jù)groupno進(jìn)行去重
val groupsds = groupsdf.dropDuplicates("groupno")
再通過(guò)groupno字段,將兩個(gè)數(shù)據(jù)集進(jìn)行連接嚎朽,并生成graphx的邊
val relation = personsdf.join(groupsds, personsdf("groupno") === groupsds("groupno"))
val edges = EdgeRDD.fromEdges(relation.rdd.map(row => Edge(row.getAs[Long]("poidhex"), row.getAs[Long]("goidhex"), ())))
再將person和group進(jìn)行合并铺纽,作為圖中的節(jié)點(diǎn)
由于數(shù)據(jù)集的合并需要相同的schema,所以需要對(duì)person和group進(jìn)行schema轉(zhuǎn)變
val newNames=Seq("oidhex", "oid", "name","no","vertextype")
val personsselect = personsdf.select("poidhex","oid", "name","no","vertextype").dropDuplicates("no").toDF(newNames:_*)
val groupsselect = groupsds.select("goidhex","oid", "name","groupno","vertextype").toDF(newNames:_*)
最終構(gòu)建圖
val vertexnode: RDD[(VertexId, (String, String, String))] = personsselect.union(groupsselect).rdd.map(row => (new BigInteger(row(1).toString, 16).longValue(), (row(2).toString, row(3)toString, row(4)toString)))
val defaultvertexnode = ("null", "null", "null")
val graph =Graph(vertexnode,edges,defaultvertexnode)
graphx圖的可視化
最簡(jiǎn)單的可以利用GraphStream進(jìn)行可視化(linkuriou.js也值得研究)
//創(chuàng)建原始可視化對(duì)象
val graphStream:SingleGraph = new SingleGraph("GraphStream")
// 設(shè)置graphStream全局屬性. Set up the visual attributes for graph visualization
// 加載頂點(diǎn)到可視化圖對(duì)象中
for ((id,(name:String, no:String, vertextype:String)) <- graph.vertices.collect()) {
val node = graphStream.addNode(id.toString).asInstanceOf[SingleNode]
node.addAttribute("ui.label",id +"\n"+name)
}
//加載邊到可視化圖對(duì)象中
for (Edge(x, y, defaultvertexnode) <- graph.edges.collect()) {
val edge = graphStream.addEdge(x.toString ++ y.toString,
x.toString, y.toString,
true).
asInstanceOf[AbstractEdge]
}
//顯示
graphStream.display()
總結(jié)
1.對(duì)Spark哟忍、Spark Graphx有了初步的了解和認(rèn)識(shí)
2.對(duì)RDD狡门、DataFrame、DataSet的操作的理解還需要深入
接下來(lái)工作
1.嘗試?yán)?a target="_blank" rel="nofollow">GraphFrames進(jìn)行構(gòu)圖
2.嘗試?yán)?a target="_blank" rel="nofollow">linkuriou.js進(jìn)行圖的可視化
3.對(duì)用戶進(jìn)行畫(huà)像分析锅很,找出他的興趣標(biāo)簽