1、環(huán)境準(zhǔn)備
1淡诗、JDK配置骇塘,Scala 配置 。目前教程環(huán)境用的是 hadopp2.6-CDH5.6.0韩容、spark 2.1.0 款违、jdk 1.7u51、scala2.11.8 群凶。
2插爹、Scala 下載地址 https://www.scala-lang.org/download/all.html 迅雷下載速度更快(PS: 這不是打廣告)。
3请梢、 安裝Scala 赠尾、JDK并配置環(huán)境變量(jdk 1.8 也是可以的、scala 要和spark 保持一致毅弧,因?yàn)榭赡芪覀儠?huì)修改spark源碼的需求)
4气嫁、IDEA scala 插件下載 。如果沒有scala插件 够坐,IDEA 不能新建 scala 類 和對(duì)象 寸宵。(如果在IDEA 插件中心下載太慢了可以崖面,查看版本后到 官網(wǎng)下載,或者用迅雷下載 官網(wǎng): https://plugins.jetbrains.com/idea)
5邓馒、測(cè)試數(shù)據(jù)存放百度網(wǎng)盤 https://pan.baidu.com/s/1xju3QodxC-abpWADifigyA 密碼微信聯(lián)系我
image
2嘶朱、IDEA創(chuàng)建項(xiàng)目
1蛾坯、打開IDE :file - new - project (搭建maven 項(xiàng)目并勾選 create from archetype ,并且選擇 scala IDEA 自動(dòng)幫你配置scala 大部分依賴省不少事)
2光酣、 設(shè)置maven 坐標(biāo) GAV
3)、 配置項(xiàng)目maven 版本 設(shè)置以及本地倉庫地址 (在spark 源碼編譯對(duì)maven 有要求脉课,為了避免出現(xiàn)莫名問題 所以這里指定較高mean版本)
4救军、 添加Scala 到IDEA 中
5、 第一個(gè)scala Object
3倘零、編碼階段
1唱遭、業(yè)務(wù)需求,根據(jù)用戶日志統(tǒng)計(jì)出各服務(wù)調(diào)用次數(shù)日志文件為csv 格式如下:
2呈驶、添加依賴
<!-- Spark-SQL 依賴-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!--開源 ip 工具類 依賴根據(jù)IP計(jì)算出城市拷泽,已經(jīng)安裝到本地了, 需要的可以到github 上面找-->
<dependency>
<groupId>com.ggstar</groupId>
<artifactId>ipdatabase</artifactId>
<version>1.0</version>
</dependency>
<!-- ip 工具類 需要讀取excel 中ip信息 依賴-->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>3.14</version>
</dependency>
<!-- ip 工具類依賴 需要讀取excel-->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>3.14</version>
</dependency>
<!-- json工具類依賴-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
3袖瞻、程序代碼
package cn.harsons.mbd
import java.util.Locale
import com.alibaba.fastjson.util.TypeUtils
import com.ggstar.util.ip.IpHelper
import org.apache.commons.lang.time.FastDateFormat
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
*
* @author lyb
* @date 2020/3/9 0009
*/
object UserLogStatApp {
private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss Z", Locale.ENGLISH)
def main(args: Array[String]): Unit = {
// 拿到Spark Session 在本地開發(fā)時(shí)先設(shè)置local 模式發(fā)布正式在修改對(duì)應(yīng)的模式
val session = SparkSession.builder().appName("UserLogStatApp").master("local[2]").getOrCreate()
// 注冊(cè)函數(shù)司致,根據(jù)ip 獲取城市 注冊(cè)函數(shù)可以在Spark SQL 中使用 注意后面必須要使用 空格和下劃線" _"
val city = session.udf.register("getCity", getCity _)
// 注冊(cè)函數(shù) 時(shí)間轉(zhuǎn)換
val formatTime = session.udf.register("getTime", convertDate _)
// 注冊(cè)函數(shù) 分割URL 得到用戶調(diào)用的模塊
val moduleType = session.udf.register("getModuleType", getModuleType _)
// 以cvs 方式讀取文件,cvs 分隔符為; (默認(rèn)",") 從第一個(gè)參數(shù)里面讀取 并且轉(zhuǎn)成 DataFrame聋迎。
val frame = session.read.option("delimiter", ";").csv(args(0)).toDF
// 這里面進(jìn)行過濾脂矫, 清洗
// 剔除第9列值不等于200 或者第四列為空
// 查詢 第一列的值 并且命名為 url 后面語法大體與SQL 類似
// moduleType 為上面注冊(cè)的方法主要是分割URL 得到調(diào)用的服務(wù)路徑,city formatTime 都一樣原理
val result = frame.filter(frame.col("_c8") === 200).filter(frame.col("_c3").isNotNull)
.select(
frame.col("_c1").as("url")
, moduleType(frame.col("_c1")).as("server")
, frame.col("_c0").as("ip")
, city(frame.col("_c0")).as("address")
, frame.col("_c3").as("userId")
, frame.col("_c4").as("userName")
, frame.col("_c5").as("browserName")
, formatTime(frame.col("_c2")).as("time"))
// 顯示分組統(tǒng)計(jì)后的結(jié)果霉晕,這里可以把結(jié)果集輸出到 HDFS 或者JDBC show 顯示50行結(jié)果集
result.groupBy("server").count().orderBy("count").show(50, false)
//這里大體意思是 coalesce 指定文件大小庭再,作用分區(qū)大小。 model 模式 指定為覆蓋 partitionBy 根據(jù)什么分區(qū)
// 這里用的是地址 和用戶分區(qū) 輸出文件為JSON
// 這里也可以指定目錄 在json 方法中
result.coalesce(1).write.mode(SaveMode.Overwrite).partitionBy("address", "userName")
.json(args(1))
//關(guān)閉session
session.stop()
}
def convertDate(date: String) = {
format.format(TypeUtils.castToDate(date))
}
def getCity(ip: String) = {
IpHelper.findRegionByIp(ip)
}
def getModuleType(url: String) = {
if (url != null && url.contains("/")) {
val str = url.replaceAll("http://", "/")
str.substring(str.indexOf("hscp/") + 5).split("/")(0)
} else {
""
}
}
}
3牺堰、啟動(dòng)參數(shù)設(shè)置(IDEA 中可以通過program arguments 來設(shè)置 輸入文件 和輸出文件拄轻,注意參數(shù)之間用空格分開)
4、成功輸出結(jié)果
5伟葫、可能出現(xiàn)的錯(cuò)誤
1哺眯、Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.9-3
原因 調(diào)試發(fā)現(xiàn)這是由于默認(rèn)的jackson-databind版本太高導(dǎo)致。
報(bào)錯(cuò)代碼:
解決方案:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.6</version>
</dependency>
2扒俯、java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
由于window 下 沒有配置好hadoop 環(huán)境變量導(dǎo)致奶卓。
解決方案:http://down2.opdown.com:8019/opdown/winutilsmaster.opdown.com.zip 配置好環(huán)境變量,重啟計(jì)算機(jī)撼玄。
6夺姑、數(shù)據(jù)保存到MYSQL的代碼如下(記得添加MYSQL驅(qū)動(dòng))
package cn.harsons.mbd
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Locale
import com.alibaba.fastjson.util.TypeUtils
import com.ggstar.util.ip.IpHelper
import org.apache.commons.lang.time.FastDateFormat
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import scala.collection.mutable.ListBuffer
/**
*
* @author liyabin
* @date 2020/3/9 0009
*/
object UserStatSaveApp {
private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss Z", Locale.ENGLISH)
def main(args: Array[String]): Unit = {
// 拿到Spark Session 在本地開發(fā)時(shí)先設(shè)置local 模式發(fā)布正式在修改對(duì)應(yīng)的模式
val session = SparkSession.builder().appName("UserLogStatApp").master("local[2]").getOrCreate()
// 注冊(cè)函數(shù),根據(jù)ip 獲取城市 注冊(cè)函數(shù)可以在Spark SQL 中使用 注意后面必須要使用 空格和下劃線" _"
val city = session.udf.register("getCity", getCity _)
// 注冊(cè)函數(shù) 時(shí)間轉(zhuǎn)換
val formatTime = session.udf.register("getTime", convertDate _)
// 注冊(cè)函數(shù) 分割URL 得到用戶調(diào)用的模塊
val moduleType = session.udf.register("getModuleType", getModuleType _)
// 以cvs 方式讀取文件掌猛,cvs 分隔符為; (默認(rèn)",") 取程序傳入的第一個(gè)參數(shù)當(dāng)做文件地址讀取 并且轉(zhuǎn)成 DataFrame盏浙。
val frame = session.read.option("delimiter", ";").csv(args(0)).toDF
// 這里面進(jìn)行過濾眉睹, 清洗
// 如果 剔除第9列值不等于200 或者第四列為空
// 查詢 第一列的值 并且命名為 url 后面語法大體與SQL 類似
// moduleType 為上面注冊(cè)的方法主要是分割URL 得到調(diào)用的服務(wù)路徑,city formatTime 都一樣原理
val result = frame.filter(frame.col("_c8") === 200).filter(frame.col("_c3").isNotNull)
.select(
frame.col("_c1").as("url")
, moduleType(frame.col("_c1")).as("server")
, frame.col("_c0").as("ip")
, city(frame.col("_c0")).as("address")
, frame.col("_c3").as("userId")
, frame.col("_c4").as("userName")
, frame.col("_c5").as("browserName")
, formatTime(frame.col("_c2")).as("time"))
// 保存統(tǒng)計(jì)結(jié)果
readToServerStat(result.groupBy("server").count().orderBy("count"))
//清洗后的數(shù)據(jù)寫入數(shù)據(jù)庫废膘,也可以寫入HDFS 以及任何HADOOP 支持的路徑 竹海。這次案例寫入MYSQL數(shù)據(jù)庫
readToUserLog(result)
//關(guān)閉session
session.stop()
}
def readToServerStat(value: Dataset[Row]): Unit = {
value.foreachPartition(partitionOfRecords => {
val buffer = new ListBuffer[ServerBean]
partitionOfRecords.foreach(row => {
val server = row.getAs[String]("server")
val count = row.getAs[Long]("count")
buffer.append(ServerBean(server, count))
})
saveServerStat(buffer)
})
}
def readToUserLog(value: Dataset[Row]): Unit = {
value.foreachPartition(partitionOfRecords => {
val buffer = new ListBuffer[UserLogBean]
partitionOfRecords.foreach(row => {
val url = row.getAs[String]("url")
val address = row.getAs[String]("address")
val server = row.getAs[String]("server")
val ip = row.getAs[String]("ip")
val userId = row.getAs[String]("userId")
val userName = row.getAs[String]("userName")
val browserName = row.getAs[String]("browserName")
val time = row.getAs[String]("time")
buffer.append(UserLogBean(server, url, ip, address, userId, userName, browserName, time))
})
saveUserLog(buffer)
})
}
def saveServerStat(list: ListBuffer[ServerBean]) = {
val connection = getConnection()
connection.setAutoCommit(false)
//todo
val sql = "insert into server_stat(server,count) values (?,?) "
val statement = connection.prepareStatement(sql)
for (bean <- list) {
statement.setString(1, bean.server)
statement.setLong(2, bean.count)
statement.addBatch()
}
statement.executeBatch() // 執(zhí)行批量處理
connection.commit() //手工提交
release(connection, statement)
}
def saveUserLog(list: ListBuffer[UserLogBean]) = {
val connection = getConnection()
connection.setAutoCommit(false)
val statement = connection.prepareStatement("insert into user_log(url,address,server,ip,userId,userName,browserName,time) values (?,?,?,?,?,?,?,?)")
// todo
for (bean <- list) {
statement.setString(1, bean.url)
statement.setString(2, bean.address)
statement.setString(3, bean.server)
statement.setString(4, bean.ip)
statement.setString(5, bean.userId)
statement.setString(6, bean.userName)
statement.setString(7, bean.browserName)
statement.setString(8, bean.time)
statement.addBatch()
}
statement.executeBatch() // 執(zhí)行批量處理
connection.commit() //手工提交
release(connection, statement)
}
/**
* 轉(zhuǎn)換日期
*
* @param date
* @return
*/
def convertDate(date: String) = {
format.format(TypeUtils.castToDate(date))
}
/**
* 根據(jù)ip計(jì)算出城市信息
*
* @param ip
* @return
*/
def getCity(ip: String) = {
try {
IpHelper.findRegionByIp(ip)
} catch {
case e: Exception => "未知"
}
}
/**
* 切割字符串
*
* @param url 用戶請(qǐng)求路徑
* @return
*/
def getModuleType(url: String) = {
if (url != null && url.contains("/")) {
val str = url.replaceAll("http://", "/")
str.substring(str.indexOf("hscp/") + 5).split("/")(0)
} else {
""
}
}
case class ServerBean(server: String, count: Long)
case class UserLogBean(server: String, url: String, ip: String, address: String, userId: String
, userName: String, browserName: String, time: String)
def getConnection() = {
// 這里把數(shù)據(jù)庫與地址寫死,正式程序 可以改成配置式
DriverManager.getConnection("jdbc:mysql://192.168.137.1:3306/spark?user=root&password=123456")
}
def release(connection: Connection, pstmt: PreparedStatement): Unit = {
try {
if (pstmt != null) {
pstmt.close()
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (connection != null) {
connection.close()
}
}
}
}