【CSDN博客遷移】Spark高級數(shù)據(jù)分析(1) ——紐約出租車軌跡的空間和時間數(shù)據(jù)分析

前言


本文在之前搭建的集群上壳猜,運行一個地理空間分析的示例阵幸,示例來自于《Spark高級數(shù)據(jù)分析》第八章箕肃。
Github項目地址:https://github.com/sryza/aas/tree/master/ch08-geotime 恋博,
這個例子是通過分析紐約市2013年1月份的出租車數(shù)據(jù)芋绸,統(tǒng)計紐約市乘客下車點落在每個行政區(qū)的個數(shù)肝劲。
在開始正文之前,需要掌握以下基礎(chǔ)知識:

  • Scala基礎(chǔ)語法
  • Spark基礎(chǔ)概念和原理(推薦《Spark快速大數(shù)據(jù)大分析》)

紐約出租車地理空間數(shù)據(jù)分析的主要流程:

  • 數(shù)據(jù)獲取
  • 數(shù)據(jù)時間和和空間處理類庫
  • 數(shù)據(jù)預(yù)處理與地理空間分析
  • 提交應(yīng)用至集群迁客,分布式計算

數(shù)據(jù)獲取


本文的數(shù)據(jù)是紐約市2013年1月份乘客打車費用數(shù)據(jù)郭宝,數(shù)據(jù)大小是914.9M,解壓后為2.5G。

數(shù)據(jù)下載地址

http://www.andresmh.com/nyctaxitrips/(trip_data_1.csv.zip)

數(shù)據(jù)下載方式

  • 直接在window下載掷漱,上傳至linux服務(wù)器,注意我的集群是docker容器粘室,直接傳到容器master節(jié)點。
  • 在linux直接下載,命令如下
wget http://www.andresmh.com/nyctaxitrips/(trip_data_1.csv.zip)

數(shù)據(jù)描述

#解壓數(shù)據(jù)集
unzip trip_data_1.csv.zip
# 查看前10行數(shù)據(jù)
head -n 10 trip_data_1.csv

結(jié)果如下圖

數(shù)據(jù)描述.png

數(shù)據(jù)字段描述:

medallion:UUID hack_license:UUID 
vendor_id:類型 rate_code:比率 store_and_fwd_flag:是否是四驅(qū)
pickup_datatime:客人上車時間 dropoff_datatime:客人下車時間
passenger_count:載客數(shù)量 trip_time_in_secs:載客時間 trip_distance:載客距離
pickup_longitude:客人上車經(jīng)度 pickup_latitude:客人上車維度
dropoff_longitude:客人下車經(jīng)度 dropoff_latitude:客人下車維度

數(shù)據(jù)處理第三方類庫


注意scala是可以直接調(diào)用java類庫的卜范。
時間處理類庫:joda-time,nscala-time_2.11.jar(2.11對應(yīng)scala版本)
本文空間關(guān)系處理庫采用Esri的esri-geometry-api衔统,當(dāng)然也可以采用GeoTools等開源庫。
自定義RichGeometry類封裝Esri矢量空間處理接口海雪;

package com.cloudera.datascience.geotime
import com.esri.core.geometry.{GeometryEngine, SpatialReference, Geometry}
import scala.language.implicitConversions
/**
 * A wrapper that provides convenience methods for using the spatial relations in the ESRI
 * GeometryEngine with a particular instance of the Geometry interface and an associated
 * SpatialReference.
 *
 * @param geometry the geometry object
 * @param spatialReference optional spatial reference; if not specified, uses WKID 4326 a.k.a.
 *                         WGS84, the standard coordinate frame for Earth.
 */
class RichGeometry(val geometry: Geometry,
    val spatialReference: SpatialReference = SpatialReference.create(4326)) extends Serializable {

  def area2D(): Double = geometry.calculateArea2D()

  def distance(other: Geometry): Double = {
    GeometryEngine.distance(geometry, other, spatialReference)
  }

  def contains(other: Geometry): Boolean = {
    GeometryEngine.contains(geometry, other, spatialReference)
  }

  def within(other: Geometry): Boolean = {
    GeometryEngine.within(geometry, other, spatialReference)
  }

  def overlaps(other: Geometry): Boolean = {
    GeometryEngine.overlaps(geometry, other, spatialReference)
  }

  def touches(other: Geometry): Boolean = {
    GeometryEngine.touches(geometry, other, spatialReference)
  }

  def crosses(other: Geometry): Boolean = {
    GeometryEngine.crosses(geometry, other, spatialReference)
  }

  def disjoint(other: Geometry): Boolean = {
    GeometryEngine.disjoint(geometry, other, spatialReference)
  }
}

/**
 * Helper object for implicitly creating RichGeometry wrappers
 * for a given Geometry instance.
 */
object RichGeometry extends Serializable {
  implicit def createRichGeometry(g: Geometry): RichGeometry = new RichGeometry(g)
}

數(shù)據(jù)預(yù)處理與地理空間分析


上傳原始數(shù)據(jù)到HDFS集群

#在Hdfs集群下創(chuàng)建taxidata目錄锦爵,注意必須帶/
hadoop fs -mkdir /taxidata
#上傳本地物理機數(shù)據(jù)至HDFS集群
hadoop fs -put trip_data_1.csv /taxidata/trip_data_1.csv

自定義safe函數(shù)處理格式不正確的數(shù)據(jù)

詳細請看代碼注釋第三部分

地理空間分析

獲取紐約行政區(qū)劃數(shù)據(jù),利用esri gerometry類庫判斷各行政區(qū)下車點的記錄數(shù)(詳細請看代碼注釋第四部分)奥裸。

/**
  * 打車信息類
  * **/
case class Trip(
  pickupTime: DateTime,
  dropoffTime: DateTime,
  pickupLoc: Point,
  dropoffLoc: Point)

/**
  * 出租車數(shù)據(jù)地理空間分析
  */
object RunGeoTime extends Serializable {

  val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ENGLISH)

  def main(args: Array[String]): Unit = {

    /*--------------1.初始化SparkContext-------------------*/
    val sc = new SparkContext(new SparkConf().setAppName("SpaceGeo"))

    /*--------------2.讀取HDFS數(shù)據(jù)-------------------*/
    val taxiRaw = sc.textFile("hdfs://master:9000/taxidata")

    /*--------------3.出租車數(shù)據(jù)預(yù)處理------------------*/
    //3.1 利用自定義的safe函數(shù)處理原始數(shù)據(jù)
    val safeParse = safe(parse)
    val taxiParsed = taxiRaw.map(safeParse)
    //taxiParsed數(shù)據(jù)持久化
    taxiParsed.cache()

    //查看非法數(shù)據(jù)
   /* val taxiBad = taxiParsed.collect({
      case t if t.isRight => t.right.get
    })*/

    //collect返回到驅(qū)動器险掀,為了單機開發(fā)和測試使用,不建議集群使用
    //taxiBad.collect().foreach(println)


    /*val taxiGood = taxiParsed.collect({
      case t if t.isLeft => t.left.get
    })
    taxiGood.cache()*/

    //3.2 剔除非法數(shù)據(jù)結(jié)果湾宙,獲得正確格式的數(shù)據(jù)
    val taxiGood=taxiParsed.filter(_.isLeft).map(_.left.get)
    taxiGood.cache()

    //自定義一次打車的乘坐時間函數(shù)
    def hours(trip: Trip): Long = {
      val d = new Duration(trip.pickupTime, trip.dropoffTime)
      d.getStandardHours
    }
    //3.3 打印統(tǒng)計乘客上下車時間的記錄樟氢,打印結(jié)果如執(zhí)行分析結(jié)果圖中的1
    taxiGood.values.map(hours).countByValue().toList.sorted.foreach(println)
    taxiParsed.unpersist()

    //根據(jù)上面的輸出結(jié)果,統(tǒng)計一次乘車時間大于0小于3小時的記錄
    val taxiClean = taxiGood.filter {
      case (lic, trip) => {
        val hrs = hours(trip)
        0 <= hrs && hrs < 3
      }
    }

    /*--------------4.出租車數(shù)據(jù)空間分析------------------*/
    //4.1 獲取紐約行政區(qū)劃數(shù)據(jù)
    val geojson = scala.io.Source.fromURL(getClass.getResource("/nyc-boroughs.geojson")).mkString
    //轉(zhuǎn)換為地理要素
    val features = geojson.parseJson.convertTo[FeatureCollection]

    val areaSortedFeatures = features.sortBy(f => {
      val borough = f("boroughCode").convertTo[Int]
      (borough, -f.geometry.area2D())
    })

    val bFeatures = sc.broadcast(areaSortedFeatures)
    //4.2 判斷乘客下車點落在那個行政區(qū)
    def borough(trip: Trip): Option[String] = {
      val feature: Option[Feature] = bFeatures.value.find(f => {
        f.geometry.contains(trip.dropoffLoc)
      })
      feature.map(f => {
        f("borough").convertTo[String]
      })
    }
    //4.3 第一次統(tǒng)計打印各行政區(qū)下車點的記錄侠鳄,打印結(jié)果如執(zhí)行分析結(jié)果圖中的2
    taxiClean.values.map(borough).countByValue().foreach(println)

    
    //4.4 剔除起點和終點數(shù)據(jù)缺失的數(shù)據(jù)
    def hasZero(trip: Trip): Boolean = {
      val zero = new Point(0.0, 0.0)
      (zero.equals(trip.pickupLoc) || zero.equals(trip.dropoffLoc))
    }

    val taxiDone = taxiClean.filter {
      case (lic, trip) => !hasZero(trip)
    }.cache()

    //4.5 踢出零點數(shù)據(jù)后統(tǒng)計打印各行政區(qū)下車點的記錄埠啃,打印結(jié)果如執(zhí)行分析結(jié)果圖中的3
    taxiDone.values.map(borough).countByValue().foreach(println)
    taxiGood.unpersist()

    //輸出地理空間分析結(jié)果到HDFS
    //taxiDone.saveAsTextFile("hdfs://master:9000/GeoResult")

  }

  //字符串轉(zhuǎn)double
  def point(longitude: String, latitude: String): Point = {
    new Point(longitude.toDouble, latitude.toDouble)
  }

  //獲取taxiraw RDD記錄中的出租車司機駕照和Trip對象
  def parse(line: String): (String, Trip) = {
    val fields = line.split(',')
    val license = fields(1)
    // Not thread-safe:
    val formatterCopy = formatter.clone().asInstanceOf[SimpleDateFormat]
    val pickupTime = new DateTime(formatterCopy.parse(fields(5)))
    val dropoffTime = new DateTime(formatterCopy.parse(fields(6)))
    val pickupLoc = point(fields(10), fields(11))
    val dropoffLoc = point(fields(12), fields(13))

    val trip = Trip(pickupTime, dropoffTime, pickupLoc, dropoffLoc)
    (license, trip)
  }

  //非法記錄數(shù)據(jù)處理函數(shù)
  def safe[S, T](f: S => T): S => Either[T, (S, Exception)] = {
    new Function[S, Either[T, (S, Exception)]] with Serializable {
      def apply(s: S): Either[T, (S, Exception)] = {
        try {
          Left(f(s))
        } catch {
          case e: Exception => Right((s, e))
        }
      }
    }
  }

}

分布式計算


打包應(yīng)用

Windows下環(huán)境spark項目環(huán)境配置

在Windows上安裝maven scala2.11.8(我的版本),intelij 及inteli的scala插件伟恶,導(dǎo)入ch08-geotime項目碴开,如下圖

ch08項目程序.png

配置pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.cloudera.datascience.geotime</groupId>
  <artifactId>ch08-geotime</artifactId>
  <packaging>jar</packaging>
  <name>Temporal and Geospatial Analysis</name>
  <version>2.0.0</version>

  <dependencies>
   <!--注意 scala版本對應(yīng)spark集群中scala的版本,provided屬性要加上 -->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.8</version>
      <scope>provided</scope>
    </dependency>
    <!--注意 hadoop版本對應(yīng)spark集群中hadoop的版本博秫,provided屬性要加上 -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.7.3</version>
      <scope>provided</scope>
    </dependency>
    <!--注意 spark版本對應(yīng)spark集群中spark的版本潦牛,2.11是對應(yīng)的scala版本 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.0.1</version>
      <scope>provided</scope>
    </dependency>
    <!--nscala-time時間處理庫,2.11是對應(yīng)的scala版本 -->
    <dependency>
      <groupId>com.github.nscala-time</groupId>
      <artifactId>nscala-time_2.11</artifactId>
      <version>1.8.0</version>
    </dependency>
    <!--esri空間關(guān)系庫台盯,2.11是對應(yīng)的scala版本 -->
    <dependency>
      <groupId>com.esri.geometry</groupId>
      <artifactId>esri-geometry-api</artifactId>
      <version>1.2.1</version>
    </dependency>
    <dependency>
      <groupId>io.spray</groupId>
      <artifactId>spray-json_2.11</artifactId>
      <version>1.3.2</version>
    </dependency>
    <dependency>
      <groupId>joda-time</groupId>
      <artifactId>joda-time</artifactId>
      <version>2.9.4</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
     <!--scala-maven插件必須加上罢绽,否則打包后無主程序 -->
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.2</version>
        <configuration>
          <scalaVersion>2.11.8</scalaVersion>
          <scalaCompatVersion>2.11.8</scalaCompatVersion>
          <args>
            <arg>-unchecked</arg>
            <arg>-deprecation</arg>
            <arg>-feature</arg>
          </args>
          <javacArgs>
            <javacArg>-source</javacArg>
            <javacArg>1.8.0</javacArg>
            <javacArg>-target</javacArg>
            <javacArg>1.8.0</javacArg>
          </javacArgs>
        </configuration>
        <executions>
          <execution>
            <phase>compile</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
       <!--maven-assembly插件可以打包應(yīng)用的依賴包 -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.6</version>
        <configuration>
          <archive>
            <manifest>
              <mainClass>com.cloudera.datascience.geotime.RunGeoTime</mainClass>
            </manifest>
          </archive>
            <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
          <recompressZippedFiles>false</recompressZippedFiles>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id> <!-- 用于maven繼承項目的聚合 -->
            <phase>package</phase> <!-- 綁定到package階段 -->
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

</project>

Maven打包

在ch08-geotime項目下Terminal命令行

# maven打包,打包結(jié)果輸入到target目錄下
名稱為ch08-geotime-2.0.0-jar-with-dependencies.jar(包含依賴包)
mvn clean
mvn package

提交應(yīng)用到集群

上傳jar包至master節(jié)點静盅,確保集群已啟動良价,提交應(yīng)用至集群,主要過程如下:

  1. 用戶通過 spark-submit 腳本提交應(yīng)用蒿叠。
  2. spark-submit 腳本啟動驅(qū)動器程序明垢,調(diào)用用戶定義的 main() 方法。
  3. 驅(qū)動器程序與集群管理器通信市咽,申請資源以啟動執(zhí)行器節(jié)點痊银。
  4. 集群管理器為驅(qū)動器程序啟動執(zhí)行器節(jié)點。
  5. 驅(qū)動器進程執(zhí)行用戶應(yīng)用中的操作施绎。 根據(jù)程序中所定義的對RDD的轉(zhuǎn)化操作和行動操
    作溯革,驅(qū)動器節(jié)點把工作以任務(wù)的形式發(fā)送到執(zhí)行器進程贞绳。
  6. 任務(wù)在執(zhí)行器程序中進行計算并保存結(jié)果。
  7. 如果驅(qū)動器程序的 main() 方法退出致稀,或者調(diào)用了 SparkContext.stop()
    驅(qū)動器程序會終止執(zhí)行器進程冈闭,并且通過集群管理器釋放資源。
    ————————《Spark快速大數(shù)據(jù)分析》
  • 利用yarn集群提交應(yīng)用
# --class 運行 Java 或 Scala 程序時應(yīng)用的主類
# --master 表示要連接的集群管理器
# --deploy-mode 選擇在本地(客戶端“ client”)啟動驅(qū)動器程序抖单,還是在集群中的一臺工作節(jié)點機
器(集群“ cluster”)上啟動萎攒。在客戶端模式下, spark-submit 會將驅(qū)動器程序運行
在 spark-submit 被調(diào)用的這臺機器上矛绘。在集群模式下耍休,驅(qū)動器程序會被傳輸并執(zhí)行
于集群的一個工作節(jié)點上。默認是本地模式
# --name 應(yīng)用的顯示名货矮,會顯示在 Spark 的網(wǎng)頁用戶界面中
# 最后是應(yīng)用入口的 JAR 包或 Python 腳本
spark-submit  --class com.cloudera.datascience.geotime.RunGeoTime 
--master yarn --deploy-mode cluster  
--executor-memory 2g --executor-cores 2  
--name "taxiGeoSpace"  
/home/ch08-geotime/ch08-geotime-space-2.0.0.jar 
  • 利用spark自帶的管理器提交應(yīng)用
# 注意集群模式地址是 spark://master:6066羊精,客戶端模式地址是spark://master:7077
spark-submit  --class com.cloudera.datascience.geotime.RunGeoTime 
--master spark://master:6066 --deploy-mode cluster  
--executor-memory 2g --executor-cores 2  --name "taxiGeoSpace1" 
 /home/ch08-geotime/ch08-geotime-space--2.0.0.jar

執(zhí)行結(jié)果如下圖

執(zhí)行結(jié)果.png

總結(jié)


執(zhí)行時間是3min,后期要了解spark集群的運行參數(shù)配置

參考文獻

  1. 《Spark快速大數(shù)據(jù)分析》
  2. 《Spark高級數(shù)據(jù)分析》
  3. http://spark.apache.org/docs/latest/running-on-yarn.html Running Spark on YARN
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末囚玫,一起剝皮案震驚了整個濱河市园匹,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌劫灶,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件掖桦,死亡現(xiàn)場離奇詭異本昏,居然都是意外死亡,警方通過查閱死者的電腦和手機枪汪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進店門涌穆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人雀久,你說我怎么就攤上這事宿稀。” “怎么了赖捌?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵祝沸,是天一觀的道長。 經(jīng)常有香客問我越庇,道長罩锐,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任卤唉,我火速辦了婚禮涩惑,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘桑驱。我一直安慰自己竭恬,他們只是感情好跛蛋,可當(dāng)我...
    茶點故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著痊硕,像睡著了一般赊级。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上寿桨,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天此衅,我揣著相機與錄音,去河邊找鬼亭螟。 笑死挡鞍,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的预烙。 我是一名探鬼主播墨微,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼扁掸!你這毒婦竟也來了翘县?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤谴分,失蹤者是張志新(化名)和其女友劉穎锈麸,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體牺蹄,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡忘伞,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了沙兰。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片氓奈。...
    茶點故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖鼎天,靈堂內(nèi)的尸體忽然破棺而出舀奶,到底是詐尸還是另有隱情,我是刑警寧澤斋射,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布育勺,位于F島的核電站,受9級特大地震影響罗岖,放射性物質(zhì)發(fā)生泄漏怀大。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一呀闻、第九天 我趴在偏房一處隱蔽的房頂上張望化借。 院中可真熱鬧,春花似錦捡多、人聲如沸蓖康。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蒜焊。三九已至倒信,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間泳梆,已是汗流浹背鳖悠。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留优妙,地道東北人乘综。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像套硼,于是被迫代替她去往敵國和親卡辰。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容