項(xiàng)目簡介
- 統(tǒng)計(jì)主站最受歡迎的課程Top N 訪問次數(shù)
- 按地市統(tǒng)計(jì)主站最受歡迎的Top N 課程
- 按流量統(tǒng)計(jì)主站最受歡迎的Top N 課程
環(huán)境安裝
Spark環(huán)境搭建
- Spark源碼編譯(以spark2.1.0為例)
spark編譯官方文檔
1洪乍、官網(wǎng)下載相應(yīng)版本源碼包
參考編譯過程
2、spark源碼編譯中的坑
pom.xml添加<repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository>
設(shè)置內(nèi)存
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
選擇scala版本
./dev/change-scala-version.sh 2.10
- 環(huán)境搭建
設(shè)置環(huán)境變量
local模式啟動:spark-shell --master local[2]
standalone模式:
- 修改spark-env.sh配置文件
SPARK_MASTER_HOST=master
SPARK_WORKER_CORES=2
SPARK_WORKER_CORES=2g
SPARK_WORKER_INSTANCES=1- 啟動
sbin/start-all.sh:啟動slaves配置的所有節(jié)點(diǎn)的worker
spark-shell --master spark://master:7077:啟動spark
spark-shell --help 可以查看啟動參數(shù)
--total-executor-cores 1 指定core總數(shù)量
Spark on Yarn
spark-env.sh 添加 Hadoop conf 的目錄
- [Client]
Driver運(yùn)行在Client端
Client會和請求到的Container進(jìn)行通信來完成作業(yè)的調(diào)度和執(zhí)行监婶,Client不能退出
日志信息會在控制臺輸出览芳,方便調(diào)試- [Cluster]
Driver運(yùn)行在ApplicationMaster中
Client只要提交完作業(yè)之后就可以關(guān)閉
日志在終端看不到,可以通過yarn logs -applicationId <app ID>
查看日志
./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ [--deploy-mode cluster \] //默認(rèn)client模式 --executor-memory 1G \ --num-executors 1 \ /home/kang/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \ 4
Spark SQL 框架介紹
Spark SQL is Apache Spark's module for working with structured(結(jié)構(gòu)化) data.
- Integrated(集成)
Seamlessly mix(無縫混合) SQL queries with Spark programs.
Spark SQL lets you query structured data inside Spark programs, using either SQL or a familiar DataFrame API. Usable in Java, Scala, Python and R. - Uniform Data Access(統(tǒng)一的數(shù)據(jù)訪問)
Connect to any data source the same way.
DataFrames and SQL provide a common way to access a variety of data sources, including Hive, Avro, Parquet, ORC, JSON, and JDBC. You can even join data across these sources. - Hive Integration(Hive集成)
Run SQL or HiveQL queries on existing warehouses
Spark SQL supports the HiveQL syntax as well as Hive SerDes and UDFs, allowing you to access existing Hive warehouses. - Standard Connectivity(標(biāo)準(zhǔn)連接)
Connect through JDBC or ODBC.
A server mode provides industry standard JDBC and ODBC connectivity for business intelligence tools.
從Hive平滑過渡到Spark SQL
- Spark1.x中Spark SQL的入口點(diǎn):SQLContext
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
本地可直接運(yùn)行
package com.test
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
/**
* SQLContext的使用
*/
object SQLContextApp {
def main(args: Array[String]): Unit = {
//創(chuàng)建相應(yīng)的Context
val sparkConf = new SparkConf()
//在測試和或者生產(chǎn)中洲押,參數(shù)一般通過腳本進(jìn)行指定
sparkConf.setAppName("SQLContext").setMaster("spark://192.168.247.100:7077")//測試通常采用本地模式“l(fā)ocal[2]”
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
//相應(yīng)的處理:json
val path = args(0)
val people = sqlContext.read.format("json").load(path)
people.printSchema()
people.show()
//關(guān)閉資源
sc.stop()
}
}
打包上服務(wù)器運(yùn)行
spark-submit \
--name SQLContext \
--class com.test.SQLContextApp \
--master spark://192.168.247.100:7077 \
/home/kang/lib/SparkTest-1.0.jar \
/home/kang/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json
- Spark1.x中Spark SQL的入口點(diǎn):HiveContext
要獲取hive中的元數(shù)據(jù)信息武花,需把hive-site.xml配置文件復(fù)制到spark的/conf目錄下
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
package com.test
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* HiveContext的使用
*/
object HiveContextApp {
def main(args: Array[String]): Unit = {
//創(chuàng)建相應(yīng)的Context
val sparkConf = new SparkConf()
//在測試和或者生產(chǎn)中,參數(shù)一般通過腳本進(jìn)行指定
sparkConf.setAppName("HiveContext").setMaster("spark://192.168.247.100:7077")//測試通常采用本地模式“l(fā)ocal[2]”
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
//相應(yīng)的處理:json
hiveContext.table("test").show
//關(guān)閉資源
sc.stop()
}
}
打包上傳提交時要添加MySQL連接包
spark-submit \
--name HiveContext \
--class com.test.HiveContextApp \
--master spark://master:7077\
--jars /home/kang/lib/mysql-connector-java-5.1.34.jar \
/home/kang/lib/HiveContext.jar \
- Spark2.x中Spark SQL的入口點(diǎn):SparkSession
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
spark-shell/spark-sql的使用
- 添加hive-site.xml配置文件
- -- jars傳遞mysql驅(qū)動包
- spark-shell --master local[2] --jars ~/lib/mysql-connector-java-5.1.34.jar
spark.sql("show tables").show - spark-sql --master local[2] --jars ~/lib/mysql-connector-java-5.1.34.jar
直接輸入sql語句
explain extended + sql(查看詳細(xì)執(zhí)行計(jì)劃)
thriftserver/beeline的使用
- 啟動thriftserver
默認(rèn)端口是10000杈帐,可以修改
./sbin/start-thriftserver.sh --master local[2] \
--jars ~/lib/mysql-connector-java-5.1.34.jar \
--hiveconf hive.server2.thrift.port=10040
- 啟動beeline
./bin/beeline -u jdbc:hive2://localhost:10040 -n kang
- thriftserver和普通的spark-shell/spark-sql有什么區(qū)別体箕?
spark-shell、spark-sql都是一個spark application
thriftserver,不管啟動多少個客戶端(beeline/code),永遠(yuǎn)只有一個spark application挑童,多個客戶端可以共享緩存數(shù)據(jù)累铅。 - code連接thriftserver
添加相關(guān)的依賴包
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1.spark2</version>
</dependency>
package com.test
import java.sql.DriverManager
object SparkSQLThriftServerApp {
def main(args: Array[String]): Unit = {
Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn = DriverManager.getConnection("jdbc:hive2://192.168.247.100:10040","kang","")
val pstmt = conn.prepareStatement("select * from test")
val rs = pstmt.executeQuery()
while (rs.next()){
println("context:" + rs.getString("context"))
}
rs.close()
pstmt.close()
conn.close()
}
}
用戶行為日志概述
用戶行為日志:用戶每次訪問網(wǎng)站時所有的行為數(shù)據(jù)(訪問、瀏覽站叼、搜索娃兽、點(diǎn)擊)
日志數(shù)據(jù)內(nèi)容
訪問的系統(tǒng)屬性:操作系統(tǒng)、瀏覽器等等
訪問特征:點(diǎn)擊的url尽楔,從哪個url跳轉(zhuǎn)過來投储、頁面工停留的時間等
訪問信息:session_id、訪問ip(訪問城市)等分析的意義
離線數(shù)據(jù)處理架構(gòu)(流程)
- 數(shù)據(jù)采集
nginx記錄日志信息
Flume:web日志寫入HDFS - 數(shù)據(jù)清洗
spark翔试、Hive轻要、Mapreduce等
- 使用Spark SQL解析訪問日志
- 解析出課程編號、類型
- 根據(jù)IP解析出城市信息
- 使用Spark SQL將訪問時間按天進(jìn)行分區(qū)輸出
輸入:訪問時間垦缅、訪問url冲泥、耗費(fèi)的流量、訪問的IP信息
輸出:URL、cmsType(video/article)凡恍、cmsId(編號)志秃、流量、ip嚼酝、城市信息浮还、訪問時間、天
ip地址解析
下載:https://github.com/kangapp/ipdatabase
編譯:mvn clean package -DskipTests
jar包入庫:mvn install:install-file -Dfile=F:\ipdatabase-master\target\ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar
pom文件引入闽巩,resources文件兩個表格文件引入
- 數(shù)據(jù)處理
spark钧舌、Hive、Mapreduce進(jìn)行業(yè)務(wù)統(tǒng)計(jì)和分析
任務(wù)調(diào)度:Oozie涎跨、Azkaban
調(diào)優(yōu)點(diǎn):
1)控制文件輸出大型荻场:coalesce
2)分區(qū)字段的數(shù)據(jù)類型調(diào)整
.config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
- 處理結(jié)果入庫
RDBMS、NoSQL
需求一
create table day_video_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
times bigint(10) not null,
primary key (day,cms_id)
)
需求二
create table day_video_city_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
city varchar(20) not null,
times bigint(10) not null,
times_rank int not null,
primary key (day,city,cms_id)
)
需求三
create table day_video_traffics_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
traffics bigint(20) not null,
primary key (day,cms_id)
)
- 數(shù)據(jù)可視化
Echarts隅很、HUE撞牢、Zeppelin
項(xiàng)目需求
- 統(tǒng)計(jì)imooc主站最受歡迎的課程/手記Top N訪問次數(shù)
- 按地市統(tǒng)計(jì)imooc主站最受歡迎Top N課程
根據(jù)IP提取城市信息
窗口函數(shù)在Spark SQL中的使用 - 按流量統(tǒng)計(jì)imocc主站最受歡迎的Top N課程
項(xiàng)目打包
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
mvn assembly:assembly
spark-submit \ --class com.test.SparkStatCleanJobYARN \ --name SparkStatCleanJobYARN \ --master yarn \ --executor-memory 1G \ --num-executors 1 \ --files /home/kang/project/SprakSQL/resource/ipDatabase.csv,/home/kang/project/SprakSQL/resource/ipRegion.xlsx \ /home/kang/project/SprakSQL/lib/sparksql.jar \ hdfs://192.168.247.100:9000/data/spark/output/* hdfs://192.168.247.100:9000/data/spark/partitionByDay
項(xiàng)目性能調(diào)優(yōu)
https://segmentfault.com/a/1190000014876069
- 存儲格式選擇
-
壓縮格式選擇
代碼優(yōu)化
- 選用高性能的算子
- 復(fù)用已有的數(shù)據(jù)
參數(shù)優(yōu)化
并行度:spark.sql.shuffle.partitions
分區(qū)字段類型推測:spark.sql.sources.partitionColumnTypeInference.enabled