Spark 第一個(gè)scala 程序

1、環(huán)境準(zhǔn)備

1淡诗、JDK配置骇塘,Scala 配置 。目前教程環(huán)境用的是 hadopp2.6-CDH5.6.0韩容、spark 2.1.0 款违、jdk 1.7u51、scala2.11.8 群凶。
2插爹、Scala 下載地址 https://www.scala-lang.org/download/all.html 迅雷下載速度更快(PS: 這不是打廣告)。
3请梢、 安裝Scala 赠尾、JDK并配置環(huán)境變量(jdk 1.8 也是可以的、scala 要和spark 保持一致毅弧,因?yàn)榭赡芪覀儠?huì)修改spark源碼的需求)
4气嫁、IDEA scala 插件下載 。如果沒有scala插件 够坐,IDEA 不能新建 scala 類 和對(duì)象 寸宵。(如果在IDEA 插件中心下載太慢了可以崖面,查看版本后到 官網(wǎng)下載,或者用迅雷下載 官網(wǎng): https://plugins.jetbrains.com/idea
5邓馒、測(cè)試數(shù)據(jù)存放百度網(wǎng)盤 https://pan.baidu.com/s/1xju3QodxC-abpWADifigyA 密碼微信聯(lián)系我

image

2嘶朱、IDEA創(chuàng)建項(xiàng)目

1蛾坯、打開IDE :file - new - project (搭建maven 項(xiàng)目并勾選 create from archetype ,并且選擇 scala IDEA 自動(dòng)幫你配置scala 大部分依賴省不少事)
image
2光酣、 設(shè)置maven 坐標(biāo) GAV
image
3)、 配置項(xiàng)目maven 版本 設(shè)置以及本地倉庫地址 (在spark 源碼編譯對(duì)maven 有要求脉课,為了避免出現(xiàn)莫名問題 所以這里指定較高mean版本)
image
image
4救军、 添加Scala 到IDEA 中
image.png
5、 第一個(gè)scala Object
image.png

3倘零、編碼階段

1唱遭、業(yè)務(wù)需求,根據(jù)用戶日志統(tǒng)計(jì)出各服務(wù)調(diào)用次數(shù)日志文件為csv 格式如下:


image.png
2呈驶、添加依賴
        <!-- Spark-SQL 依賴-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <!--開源 ip 工具類 依賴根據(jù)IP計(jì)算出城市拷泽,已經(jīng)安裝到本地了, 需要的可以到github 上面找-->
        <dependency>
            <groupId>com.ggstar</groupId>
            <artifactId>ipdatabase</artifactId>
            <version>1.0</version>
        </dependency>
        <!-- ip 工具類 需要讀取excel 中ip信息 依賴-->
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi-ooxml</artifactId>
            <version>3.14</version>
        </dependency>
        <!-- ip 工具類依賴 需要讀取excel-->
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi</artifactId>
            <version>3.14</version>
        </dependency>
        <!-- json工具類依賴-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
3袖瞻、程序代碼
package cn.harsons.mbd

import java.util.Locale

import com.alibaba.fastjson.util.TypeUtils
import com.ggstar.util.ip.IpHelper
import org.apache.commons.lang.time.FastDateFormat
import org.apache.spark.sql.{SaveMode, SparkSession}

/**
  *
  * @author lyb
  * @date 2020/3/9 0009
  */
object UserLogStatApp {

  private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss Z", Locale.ENGLISH)

  def main(args: Array[String]): Unit = {
    // 拿到Spark Session  在本地開發(fā)時(shí)先設(shè)置local 模式發(fā)布正式在修改對(duì)應(yīng)的模式
    val session = SparkSession.builder().appName("UserLogStatApp").master("local[2]").getOrCreate()
    // 注冊(cè)函數(shù)司致,根據(jù)ip 獲取城市  注冊(cè)函數(shù)可以在Spark SQL 中使用  注意后面必須要使用 空格和下劃線" _"
    val city = session.udf.register("getCity", getCity _)
    // 注冊(cè)函數(shù) 時(shí)間轉(zhuǎn)換
    val formatTime = session.udf.register("getTime", convertDate _)
    // 注冊(cè)函數(shù) 分割URL 得到用戶調(diào)用的模塊
    val moduleType = session.udf.register("getModuleType", getModuleType _)
    // 以cvs 方式讀取文件,cvs 分隔符為; (默認(rèn)",") 從第一個(gè)參數(shù)里面讀取 并且轉(zhuǎn)成 DataFrame聋迎。
    val frame = session.read.option("delimiter", ";").csv(args(0)).toDF
    // 這里面進(jìn)行過濾脂矫, 清洗
    //  剔除第9列值不等于200 或者第四列為空
    // 查詢 第一列的值 并且命名為 url 后面語法大體與SQL 類似
    // moduleType 為上面注冊(cè)的方法主要是分割URL 得到調(diào)用的服務(wù)路徑,city  formatTime  都一樣原理
    val result = frame.filter(frame.col("_c8") === 200).filter(frame.col("_c3").isNotNull)
      .select(
        frame.col("_c1").as("url")
        , moduleType(frame.col("_c1")).as("server")
        , frame.col("_c0").as("ip")
        , city(frame.col("_c0")).as("address")
        , frame.col("_c3").as("userId")
        , frame.col("_c4").as("userName")
        , frame.col("_c5").as("browserName")
        , formatTime(frame.col("_c2")).as("time"))
    //  顯示分組統(tǒng)計(jì)后的結(jié)果霉晕,這里可以把結(jié)果集輸出到 HDFS 或者JDBC show 顯示50行結(jié)果集
    result.groupBy("server").count().orderBy("count").show(50, false)
    //這里大體意思是 coalesce 指定文件大小庭再,作用分區(qū)大小。 model  模式 指定為覆蓋 partitionBy 根據(jù)什么分區(qū)
    // 這里用的是地址 和用戶分區(qū)  輸出文件為JSON
    // 這里也可以指定目錄 在json 方法中
    result.coalesce(1).write.mode(SaveMode.Overwrite).partitionBy("address", "userName")
      .json(args(1))
    //關(guān)閉session
    session.stop()
  }

  def convertDate(date: String) = {
    format.format(TypeUtils.castToDate(date))
  }

  def getCity(ip: String) = {
    IpHelper.findRegionByIp(ip)
  }

  def getModuleType(url: String) = {
    if (url != null && url.contains("/")) {
      val str = url.replaceAll("http://", "/")
      str.substring(str.indexOf("hscp/") + 5).split("/")(0)
    } else {
      ""
    }
  }

}

3牺堰、啟動(dòng)參數(shù)設(shè)置(IDEA 中可以通過program arguments 來設(shè)置 輸入文件 和輸出文件拄轻,注意參數(shù)之間用空格分開)
image.png
4、成功輸出結(jié)果
image.png
5伟葫、可能出現(xiàn)的錯(cuò)誤

1哺眯、Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.9-3
原因 調(diào)試發(fā)現(xiàn)這是由于默認(rèn)的jackson-databind版本太高導(dǎo)致。

報(bào)錯(cuò)代碼:


image.png

解決方案:

       <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.6.6</version>
        </dependency>

2扒俯、java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
由于window 下 沒有配置好hadoop 環(huán)境變量導(dǎo)致奶卓。
解決方案:http://down2.opdown.com:8019/opdown/winutilsmaster.opdown.com.zip 配置好環(huán)境變量,重啟計(jì)算機(jī)撼玄。

6夺姑、數(shù)據(jù)保存到MYSQL的代碼如下(記得添加MYSQL驅(qū)動(dòng))
package cn.harsons.mbd

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Locale

import com.alibaba.fastjson.util.TypeUtils
import com.ggstar.util.ip.IpHelper
import org.apache.commons.lang.time.FastDateFormat
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import scala.collection.mutable.ListBuffer

/**
  *
  * @author liyabin
  * @date 2020/3/9 0009
  */
object UserStatSaveApp {

  private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss Z", Locale.ENGLISH)

  def main(args: Array[String]): Unit = {
    // 拿到Spark Session  在本地開發(fā)時(shí)先設(shè)置local 模式發(fā)布正式在修改對(duì)應(yīng)的模式
    val session = SparkSession.builder().appName("UserLogStatApp").master("local[2]").getOrCreate()
    // 注冊(cè)函數(shù),根據(jù)ip 獲取城市  注冊(cè)函數(shù)可以在Spark SQL 中使用  注意后面必須要使用 空格和下劃線" _"
    val city = session.udf.register("getCity", getCity _)
    // 注冊(cè)函數(shù) 時(shí)間轉(zhuǎn)換
    val formatTime = session.udf.register("getTime", convertDate _)
    // 注冊(cè)函數(shù) 分割URL 得到用戶調(diào)用的模塊
    val moduleType = session.udf.register("getModuleType", getModuleType _)
    // 以cvs 方式讀取文件掌猛,cvs 分隔符為; (默認(rèn)",") 取程序傳入的第一個(gè)參數(shù)當(dāng)做文件地址讀取 并且轉(zhuǎn)成 DataFrame盏浙。
    val frame = session.read.option("delimiter", ";").csv(args(0)).toDF
    // 這里面進(jìn)行過濾眉睹, 清洗
    // 如果 剔除第9列值不等于200 或者第四列為空
    // 查詢 第一列的值 并且命名為 url 后面語法大體與SQL 類似
    // moduleType 為上面注冊(cè)的方法主要是分割URL 得到調(diào)用的服務(wù)路徑,city  formatTime  都一樣原理
    val result = frame.filter(frame.col("_c8") === 200).filter(frame.col("_c3").isNotNull)
      .select(
        frame.col("_c1").as("url")
        , moduleType(frame.col("_c1")).as("server")
        , frame.col("_c0").as("ip")
        , city(frame.col("_c0")).as("address")
        , frame.col("_c3").as("userId")
        , frame.col("_c4").as("userName")
        , frame.col("_c5").as("browserName")
        , formatTime(frame.col("_c2")).as("time"))
    // 保存統(tǒng)計(jì)結(jié)果
    readToServerStat(result.groupBy("server").count().orderBy("count"))
    //清洗后的數(shù)據(jù)寫入數(shù)據(jù)庫废膘,也可以寫入HDFS 以及任何HADOOP 支持的路徑 竹海。這次案例寫入MYSQL數(shù)據(jù)庫
    readToUserLog(result)
    //關(guān)閉session
    session.stop()
  }

  def readToServerStat(value: Dataset[Row]): Unit = {
    value.foreachPartition(partitionOfRecords => {
      val buffer = new ListBuffer[ServerBean]
      partitionOfRecords.foreach(row => {
        val server = row.getAs[String]("server")
        val count = row.getAs[Long]("count")
        buffer.append(ServerBean(server, count))
      })
      saveServerStat(buffer)
    })
  }

  def readToUserLog(value: Dataset[Row]): Unit = {
    value.foreachPartition(partitionOfRecords => {
      val buffer = new ListBuffer[UserLogBean]
      partitionOfRecords.foreach(row => {
        val url = row.getAs[String]("url")
        val address = row.getAs[String]("address")
        val server = row.getAs[String]("server")
        val ip = row.getAs[String]("ip")
        val userId = row.getAs[String]("userId")
        val userName = row.getAs[String]("userName")
        val browserName = row.getAs[String]("browserName")
        val time = row.getAs[String]("time")
        buffer.append(UserLogBean(server, url, ip, address, userId, userName, browserName, time))
      })
      saveUserLog(buffer)
    })
  }


  def saveServerStat(list: ListBuffer[ServerBean]) = {

    val connection = getConnection()
    connection.setAutoCommit(false)
    //todo
    val sql = "insert into server_stat(server,count) values (?,?) "
    val statement = connection.prepareStatement(sql)

    for (bean <- list) {
      statement.setString(1, bean.server)
      statement.setLong(2, bean.count)
      statement.addBatch()
    }
    statement.executeBatch() // 執(zhí)行批量處理
    connection.commit() //手工提交
    release(connection, statement)
  }

  def saveUserLog(list: ListBuffer[UserLogBean]) = {
    val connection = getConnection()
    connection.setAutoCommit(false)
    val statement = connection.prepareStatement("insert into user_log(url,address,server,ip,userId,userName,browserName,time) values (?,?,?,?,?,?,?,?)")
    // todo
    for (bean <- list) {
      statement.setString(1, bean.url)
      statement.setString(2, bean.address)
      statement.setString(3, bean.server)
      statement.setString(4, bean.ip)
      statement.setString(5, bean.userId)
      statement.setString(6, bean.userName)
      statement.setString(7, bean.browserName)
      statement.setString(8, bean.time)
      statement.addBatch()
    }
    statement.executeBatch() // 執(zhí)行批量處理
    connection.commit() //手工提交
    release(connection, statement)

  }

  /**
    * 轉(zhuǎn)換日期
    *
    * @param date
    * @return
    */
  def convertDate(date: String) = {
    format.format(TypeUtils.castToDate(date))
  }

  /**
    * 根據(jù)ip計(jì)算出城市信息
    *
    * @param ip
    * @return
    */
  def getCity(ip: String) = {
    try {
      IpHelper.findRegionByIp(ip)
    } catch {
      case e: Exception => "未知"
    }
  }

  /**
    * 切割字符串
    *
    * @param url 用戶請(qǐng)求路徑
    * @return
    */
  def getModuleType(url: String) = {
    if (url != null && url.contains("/")) {
      val str = url.replaceAll("http://", "/")
      str.substring(str.indexOf("hscp/") + 5).split("/")(0)
    } else {
      ""
    }
  }

  case class ServerBean(server: String, count: Long)

  case class UserLogBean(server: String, url: String, ip: String, address: String, userId: String
                         , userName: String, browserName: String, time: String)

  def getConnection() = {
    // 這里把數(shù)據(jù)庫與地址寫死,正式程序 可以改成配置式
    DriverManager.getConnection("jdbc:mysql://192.168.137.1:3306/spark?user=root&password=123456")
  }

  def release(connection: Connection, pstmt: PreparedStatement): Unit = {
    try {
      if (pstmt != null) {
        pstmt.close()
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (connection != null) {
        connection.close()
      }
    }
  }

}

下一章-程序打包

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末丐黄,一起剝皮案震驚了整個(gè)濱河市斋配,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌灌闺,老刑警劉巖艰争,帶你破解...
    沈念sama閱讀 217,734評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異桂对,居然都是意外死亡甩卓,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門蕉斜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來逾柿,“玉大人,你說我怎么就攤上這事宅此』恚” “怎么了?”我有些...
    開封第一講書人閱讀 164,133評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵诽凌,是天一觀的道長毡熏。 經(jīng)常有香客問我,道長侣诵,這世上最難降的妖魔是什么痢法? 我笑而不...
    開封第一講書人閱讀 58,532評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮杜顺,結(jié)果婚禮上财搁,老公的妹妹穿的比我還像新娘。我一直安慰自己躬络,他們只是感情好尖奔,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,585評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著穷当,像睡著了一般提茁。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上馁菜,一...
    開封第一講書人閱讀 51,462評(píng)論 1 302
  • 那天茴扁,我揣著相機(jī)與錄音,去河邊找鬼汪疮。 笑死峭火,一個(gè)胖子當(dāng)著我的面吹牛毁习,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播卖丸,決...
    沈念sama閱讀 40,262評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼纺且,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了稍浆?” 一聲冷哼從身側(cè)響起载碌,我...
    開封第一講書人閱讀 39,153評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎粹湃,沒想到半個(gè)月后恐仑,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體泉坐,經(jīng)...
    沈念sama閱讀 45,587評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡为鳄,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,792評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了腕让。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片孤钦。...
    茶點(diǎn)故事閱讀 39,919評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖纯丸,靈堂內(nèi)的尸體忽然破棺而出偏形,到底是詐尸還是另有隱情,我是刑警寧澤觉鼻,帶...
    沈念sama閱讀 35,635評(píng)論 5 345
  • 正文 年R本政府宣布俊扭,位于F島的核電站,受9級(jí)特大地震影響坠陈,放射性物質(zhì)發(fā)生泄漏萨惑。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,237評(píng)論 3 329
  • 文/蒙蒙 一仇矾、第九天 我趴在偏房一處隱蔽的房頂上張望庸蔼。 院中可真熱鬧,春花似錦贮匕、人聲如沸姐仅。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽掏膏。三九已至,卻和暖如春敦锌,著一層夾襖步出監(jiān)牢的瞬間馒疹,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評(píng)論 1 269
  • 我被黑心中介騙來泰國打工供屉, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留行冰,地道東北人溺蕉。 一個(gè)月前我還...
    沈念sama閱讀 48,048評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像悼做,于是被迫代替她去往敵國和親疯特。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,864評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容