前言
本文在之前搭建的集群上壳猜,運行一個地理空間分析的示例阵幸,示例來自于《Spark高級數(shù)據(jù)分析》第八章箕肃。
Github項目地址:https://github.com/sryza/aas/tree/master/ch08-geotime 恋博,
這個例子是通過分析紐約市2013年1月份的出租車數(shù)據(jù)芋绸,統(tǒng)計紐約市乘客下車點落在每個行政區(qū)的個數(shù)肝劲。
在開始正文之前,需要掌握以下基礎(chǔ)知識:
- Scala基礎(chǔ)語法
- Spark基礎(chǔ)概念和原理(推薦《Spark快速大數(shù)據(jù)大分析》)
紐約出租車地理空間數(shù)據(jù)分析的主要流程:
- 數(shù)據(jù)獲取
- 數(shù)據(jù)時間和和空間處理類庫
- 數(shù)據(jù)預(yù)處理與地理空間分析
- 提交應(yīng)用至集群迁客,分布式計算
數(shù)據(jù)獲取
本文的數(shù)據(jù)是紐約市2013年1月份乘客打車費用數(shù)據(jù)郭宝,數(shù)據(jù)大小是914.9M,解壓后為2.5G。
數(shù)據(jù)下載地址
http://www.andresmh.com/nyctaxitrips/(trip_data_1.csv.zip)
數(shù)據(jù)下載方式
- 直接在window下載掷漱,上傳至linux服務(wù)器,注意我的集群是docker容器粘室,直接傳到容器master節(jié)點。
- 在linux直接下載,命令如下
wget http://www.andresmh.com/nyctaxitrips/(trip_data_1.csv.zip)
數(shù)據(jù)描述
#解壓數(shù)據(jù)集
unzip trip_data_1.csv.zip
# 查看前10行數(shù)據(jù)
head -n 10 trip_data_1.csv
結(jié)果如下圖
數(shù)據(jù)字段描述:
medallion:UUID hack_license:UUID
vendor_id:類型 rate_code:比率 store_and_fwd_flag:是否是四驅(qū)
pickup_datatime:客人上車時間 dropoff_datatime:客人下車時間
passenger_count:載客數(shù)量 trip_time_in_secs:載客時間 trip_distance:載客距離
pickup_longitude:客人上車經(jīng)度 pickup_latitude:客人上車維度
dropoff_longitude:客人下車經(jīng)度 dropoff_latitude:客人下車維度
數(shù)據(jù)處理第三方類庫
注意scala是可以直接調(diào)用java類庫的卜范。
時間處理類庫:joda-time,nscala-time_2.11.jar(2.11對應(yīng)scala版本)
本文空間關(guān)系處理庫采用Esri的esri-geometry-api衔统,當(dāng)然也可以采用GeoTools等開源庫。
自定義RichGeometry類封裝Esri矢量空間處理接口海雪;
package com.cloudera.datascience.geotime
import com.esri.core.geometry.{GeometryEngine, SpatialReference, Geometry}
import scala.language.implicitConversions
/**
* A wrapper that provides convenience methods for using the spatial relations in the ESRI
* GeometryEngine with a particular instance of the Geometry interface and an associated
* SpatialReference.
*
* @param geometry the geometry object
* @param spatialReference optional spatial reference; if not specified, uses WKID 4326 a.k.a.
* WGS84, the standard coordinate frame for Earth.
*/
class RichGeometry(val geometry: Geometry,
val spatialReference: SpatialReference = SpatialReference.create(4326)) extends Serializable {
def area2D(): Double = geometry.calculateArea2D()
def distance(other: Geometry): Double = {
GeometryEngine.distance(geometry, other, spatialReference)
}
def contains(other: Geometry): Boolean = {
GeometryEngine.contains(geometry, other, spatialReference)
}
def within(other: Geometry): Boolean = {
GeometryEngine.within(geometry, other, spatialReference)
}
def overlaps(other: Geometry): Boolean = {
GeometryEngine.overlaps(geometry, other, spatialReference)
}
def touches(other: Geometry): Boolean = {
GeometryEngine.touches(geometry, other, spatialReference)
}
def crosses(other: Geometry): Boolean = {
GeometryEngine.crosses(geometry, other, spatialReference)
}
def disjoint(other: Geometry): Boolean = {
GeometryEngine.disjoint(geometry, other, spatialReference)
}
}
/**
* Helper object for implicitly creating RichGeometry wrappers
* for a given Geometry instance.
*/
object RichGeometry extends Serializable {
implicit def createRichGeometry(g: Geometry): RichGeometry = new RichGeometry(g)
}
數(shù)據(jù)預(yù)處理與地理空間分析
上傳原始數(shù)據(jù)到HDFS集群
#在Hdfs集群下創(chuàng)建taxidata目錄锦爵,注意必須帶/
hadoop fs -mkdir /taxidata
#上傳本地物理機數(shù)據(jù)至HDFS集群
hadoop fs -put trip_data_1.csv /taxidata/trip_data_1.csv
自定義safe函數(shù)處理格式不正確的數(shù)據(jù)
詳細請看代碼注釋第三部分
地理空間分析
獲取紐約行政區(qū)劃數(shù)據(jù),利用esri gerometry類庫判斷各行政區(qū)下車點的記錄數(shù)(詳細請看代碼注釋第四部分)奥裸。
/**
* 打車信息類
* **/
case class Trip(
pickupTime: DateTime,
dropoffTime: DateTime,
pickupLoc: Point,
dropoffLoc: Point)
/**
* 出租車數(shù)據(jù)地理空間分析
*/
object RunGeoTime extends Serializable {
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ENGLISH)
def main(args: Array[String]): Unit = {
/*--------------1.初始化SparkContext-------------------*/
val sc = new SparkContext(new SparkConf().setAppName("SpaceGeo"))
/*--------------2.讀取HDFS數(shù)據(jù)-------------------*/
val taxiRaw = sc.textFile("hdfs://master:9000/taxidata")
/*--------------3.出租車數(shù)據(jù)預(yù)處理------------------*/
//3.1 利用自定義的safe函數(shù)處理原始數(shù)據(jù)
val safeParse = safe(parse)
val taxiParsed = taxiRaw.map(safeParse)
//taxiParsed數(shù)據(jù)持久化
taxiParsed.cache()
//查看非法數(shù)據(jù)
/* val taxiBad = taxiParsed.collect({
case t if t.isRight => t.right.get
})*/
//collect返回到驅(qū)動器险掀,為了單機開發(fā)和測試使用,不建議集群使用
//taxiBad.collect().foreach(println)
/*val taxiGood = taxiParsed.collect({
case t if t.isLeft => t.left.get
})
taxiGood.cache()*/
//3.2 剔除非法數(shù)據(jù)結(jié)果湾宙,獲得正確格式的數(shù)據(jù)
val taxiGood=taxiParsed.filter(_.isLeft).map(_.left.get)
taxiGood.cache()
//自定義一次打車的乘坐時間函數(shù)
def hours(trip: Trip): Long = {
val d = new Duration(trip.pickupTime, trip.dropoffTime)
d.getStandardHours
}
//3.3 打印統(tǒng)計乘客上下車時間的記錄樟氢,打印結(jié)果如執(zhí)行分析結(jié)果圖中的1
taxiGood.values.map(hours).countByValue().toList.sorted.foreach(println)
taxiParsed.unpersist()
//根據(jù)上面的輸出結(jié)果,統(tǒng)計一次乘車時間大于0小于3小時的記錄
val taxiClean = taxiGood.filter {
case (lic, trip) => {
val hrs = hours(trip)
0 <= hrs && hrs < 3
}
}
/*--------------4.出租車數(shù)據(jù)空間分析------------------*/
//4.1 獲取紐約行政區(qū)劃數(shù)據(jù)
val geojson = scala.io.Source.fromURL(getClass.getResource("/nyc-boroughs.geojson")).mkString
//轉(zhuǎn)換為地理要素
val features = geojson.parseJson.convertTo[FeatureCollection]
val areaSortedFeatures = features.sortBy(f => {
val borough = f("boroughCode").convertTo[Int]
(borough, -f.geometry.area2D())
})
val bFeatures = sc.broadcast(areaSortedFeatures)
//4.2 判斷乘客下車點落在那個行政區(qū)
def borough(trip: Trip): Option[String] = {
val feature: Option[Feature] = bFeatures.value.find(f => {
f.geometry.contains(trip.dropoffLoc)
})
feature.map(f => {
f("borough").convertTo[String]
})
}
//4.3 第一次統(tǒng)計打印各行政區(qū)下車點的記錄侠鳄,打印結(jié)果如執(zhí)行分析結(jié)果圖中的2
taxiClean.values.map(borough).countByValue().foreach(println)
//4.4 剔除起點和終點數(shù)據(jù)缺失的數(shù)據(jù)
def hasZero(trip: Trip): Boolean = {
val zero = new Point(0.0, 0.0)
(zero.equals(trip.pickupLoc) || zero.equals(trip.dropoffLoc))
}
val taxiDone = taxiClean.filter {
case (lic, trip) => !hasZero(trip)
}.cache()
//4.5 踢出零點數(shù)據(jù)后統(tǒng)計打印各行政區(qū)下車點的記錄埠啃,打印結(jié)果如執(zhí)行分析結(jié)果圖中的3
taxiDone.values.map(borough).countByValue().foreach(println)
taxiGood.unpersist()
//輸出地理空間分析結(jié)果到HDFS
//taxiDone.saveAsTextFile("hdfs://master:9000/GeoResult")
}
//字符串轉(zhuǎn)double
def point(longitude: String, latitude: String): Point = {
new Point(longitude.toDouble, latitude.toDouble)
}
//獲取taxiraw RDD記錄中的出租車司機駕照和Trip對象
def parse(line: String): (String, Trip) = {
val fields = line.split(',')
val license = fields(1)
// Not thread-safe:
val formatterCopy = formatter.clone().asInstanceOf[SimpleDateFormat]
val pickupTime = new DateTime(formatterCopy.parse(fields(5)))
val dropoffTime = new DateTime(formatterCopy.parse(fields(6)))
val pickupLoc = point(fields(10), fields(11))
val dropoffLoc = point(fields(12), fields(13))
val trip = Trip(pickupTime, dropoffTime, pickupLoc, dropoffLoc)
(license, trip)
}
//非法記錄數(shù)據(jù)處理函數(shù)
def safe[S, T](f: S => T): S => Either[T, (S, Exception)] = {
new Function[S, Either[T, (S, Exception)]] with Serializable {
def apply(s: S): Either[T, (S, Exception)] = {
try {
Left(f(s))
} catch {
case e: Exception => Right((s, e))
}
}
}
}
}
分布式計算
打包應(yīng)用
Windows下環(huán)境spark項目環(huán)境配置
在Windows上安裝maven scala2.11.8(我的版本),intelij 及inteli的scala插件伟恶,導(dǎo)入ch08-geotime項目碴开,如下圖
配置pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cloudera.datascience.geotime</groupId>
<artifactId>ch08-geotime</artifactId>
<packaging>jar</packaging>
<name>Temporal and Geospatial Analysis</name>
<version>2.0.0</version>
<dependencies>
<!--注意 scala版本對應(yīng)spark集群中scala的版本,provided屬性要加上 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
<scope>provided</scope>
</dependency>
<!--注意 hadoop版本對應(yīng)spark集群中hadoop的版本博秫,provided屬性要加上 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
<scope>provided</scope>
</dependency>
<!--注意 spark版本對應(yīng)spark集群中spark的版本潦牛,2.11是對應(yīng)的scala版本 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.1</version>
<scope>provided</scope>
</dependency>
<!--nscala-time時間處理庫,2.11是對應(yīng)的scala版本 -->
<dependency>
<groupId>com.github.nscala-time</groupId>
<artifactId>nscala-time_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<!--esri空間關(guān)系庫台盯,2.11是對應(yīng)的scala版本 -->
<dependency>
<groupId>com.esri.geometry</groupId>
<artifactId>esri-geometry-api</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>io.spray</groupId>
<artifactId>spray-json_2.11</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<!--scala-maven插件必須加上罢绽,否則打包后無主程序 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<scalaVersion>2.11.8</scalaVersion>
<scalaCompatVersion>2.11.8</scalaCompatVersion>
<args>
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
</args>
<javacArgs>
<javacArg>-source</javacArg>
<javacArg>1.8.0</javacArg>
<javacArg>-target</javacArg>
<javacArg>1.8.0</javacArg>
</javacArgs>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!--maven-assembly插件可以打包應(yīng)用的依賴包 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<archive>
<manifest>
<mainClass>com.cloudera.datascience.geotime.RunGeoTime</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<recompressZippedFiles>false</recompressZippedFiles>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- 用于maven繼承項目的聚合 -->
<phase>package</phase> <!-- 綁定到package階段 -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Maven打包
在ch08-geotime項目下Terminal命令行
# maven打包,打包結(jié)果輸入到target目錄下
名稱為ch08-geotime-2.0.0-jar-with-dependencies.jar(包含依賴包)
mvn clean
mvn package
提交應(yīng)用到集群
上傳jar包至master節(jié)點静盅,確保集群已啟動良价,提交應(yīng)用至集群,主要過程如下:
- 用戶通過 spark-submit 腳本提交應(yīng)用蒿叠。
- spark-submit 腳本啟動驅(qū)動器程序明垢,調(diào)用用戶定義的 main() 方法。
- 驅(qū)動器程序與集群管理器通信市咽,申請資源以啟動執(zhí)行器節(jié)點痊银。
- 集群管理器為驅(qū)動器程序啟動執(zhí)行器節(jié)點。
- 驅(qū)動器進程執(zhí)行用戶應(yīng)用中的操作施绎。 根據(jù)程序中所定義的對RDD的轉(zhuǎn)化操作和行動操
作溯革,驅(qū)動器節(jié)點把工作以任務(wù)的形式發(fā)送到執(zhí)行器進程贞绳。- 任務(wù)在執(zhí)行器程序中進行計算并保存結(jié)果。
- 如果驅(qū)動器程序的 main() 方法退出致稀,或者調(diào)用了 SparkContext.stop()
驅(qū)動器程序會終止執(zhí)行器進程冈闭,并且通過集群管理器釋放資源。
————————《Spark快速大數(shù)據(jù)分析》
- 利用yarn集群提交應(yīng)用
# --class 運行 Java 或 Scala 程序時應(yīng)用的主類
# --master 表示要連接的集群管理器
# --deploy-mode 選擇在本地(客戶端“ client”)啟動驅(qū)動器程序抖单,還是在集群中的一臺工作節(jié)點機
器(集群“ cluster”)上啟動萎攒。在客戶端模式下, spark-submit 會將驅(qū)動器程序運行
在 spark-submit 被調(diào)用的這臺機器上矛绘。在集群模式下耍休,驅(qū)動器程序會被傳輸并執(zhí)行
于集群的一個工作節(jié)點上。默認是本地模式
# --name 應(yīng)用的顯示名货矮,會顯示在 Spark 的網(wǎng)頁用戶界面中
# 最后是應(yīng)用入口的 JAR 包或 Python 腳本
spark-submit --class com.cloudera.datascience.geotime.RunGeoTime
--master yarn --deploy-mode cluster
--executor-memory 2g --executor-cores 2
--name "taxiGeoSpace"
/home/ch08-geotime/ch08-geotime-space-2.0.0.jar
- 利用spark自帶的管理器提交應(yīng)用
# 注意集群模式地址是 spark://master:6066羊精,客戶端模式地址是spark://master:7077
spark-submit --class com.cloudera.datascience.geotime.RunGeoTime
--master spark://master:6066 --deploy-mode cluster
--executor-memory 2g --executor-cores 2 --name "taxiGeoSpace1"
/home/ch08-geotime/ch08-geotime-space--2.0.0.jar
執(zhí)行結(jié)果如下圖
總結(jié)
執(zhí)行時間是3min,后期要了解spark集群的運行參數(shù)配置
參考文獻
- 《Spark快速大數(shù)據(jù)分析》
- 《Spark高級數(shù)據(jù)分析》
- http://spark.apache.org/docs/latest/running-on-yarn.html Running Spark on YARN