
第1章 項目需求及架構(gòu)設(shè)計

1.1 需求

  1. 數(shù)據(jù)采集平臺搭建
  2. Kafka嫡丙、Zookeeper中間件準(zhǔn)備
  3. 下游Spark Streaming對接Kafka接收數(shù)據(jù)菱阵,
  1. 實現(xiàn)vip個數(shù)統(tǒng)計
  2. 欄目打標(biāo)簽功能
  3. 做題正確率與掌握度的實時計算功能佑惠。

1.2 項目框架

1.2.1 技術(shù)選型


1.2.2 流程設(shè)計


1.3 思考

(1)Spark Streaming 下每個stage的耗時由什么決定
(2)Spark Streaming task發(fā)生數(shù)據(jù)傾斜如何解決
(3)Spark Streaming操作mysql時宫患,相同維度的數(shù)據(jù)如何保證線程安全問題
(4)如何保證kill Spark Streaming任務(wù)的時候不丟失數(shù)據(jù)
(5)如何保證Spark Streaming的第一次啟動和kill后第二次啟動時據(jù)不丟失數(shù)據(jù)
(6)Spark Streaming下如何正確操作mysql(如何正確使用連接)
(7)MySql建表時 索引注意


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <!-- Spark的依賴引入 -->
<!--      <scope>provided</scope>-->
<!--      <scope>provided</scope>-->
<!--      <scope>provided</scope>-->
    <!-- 引入Scala -->
<!--      <scope>provided</scope>-->


    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->





import java.io.InputStream;
import java.util.Properties;

 * 讀取配置文件工具類
public class ConfigurationManager {

  private static Properties prop = new Properties();

  static {
    try {
      InputStream inputStream = ConfigurationManager.class.getClassLoader()
          .getResourceAsStream("comerce.properties");    //自動加載配置文件
    } catch (Exception e) {

  public static String getProperty(String key) {
    return prop.getProperty(key);

public static boolean getBoolean(String key) {
    String value = prop.getProperty(key);
    try {
      return Boolean.valueOf(value);                       //解析布爾值
    } catch (Exception e) {
    return false;



package com.atguigu.qzpoint.util;

import com.alibaba.fastjson.JSONObject;

public class ParseJsonData {

    public static JSONObject getJsonData(String data) {
        try {
            return JSONObject.parseObject(data);  Java代碼
        } catch (Exception e) {
            return null;


import com.alibaba.druid.pool.DruidDataSourceFactory;

import javax.sql.DataSource;
import java.io.Serializable;
import java.sql.*;
import java.util.Properties;

 * 德魯伊連接池
public class DataSourceUtil implements Serializable {
    public static DataSource dataSource = null;

    static {     //數(shù)據(jù)庫連接池放在靜態(tài)代碼塊中刊懈,保證只創(chuàng)建一份
        try {
            Properties props = new Properties();

            props.setProperty("url", ConfigurationManager.getProperty("jdbc.url"));
            props.setProperty("username", ConfigurationManager.getProperty("jdbc.user"));
            props.setProperty("password", ConfigurationManager.getProperty("jdbc.password"));
            props.setProperty("initialSize", "5"); //初始化大小
            props.setProperty("maxActive", "10"); //最大連接
            props.setProperty("minIdle", "5");  //最小連接
            props.setProperty("maxWait", "60000"); //等待時長
            props.setProperty("timeBetweenEvictionRunsMillis", "2000");//配置多久進(jìn)行一次檢測,檢測需要關(guān)閉的連接 單位毫秒
            props.setProperty("minEvictableIdleTimeMillis", "600000");//配置連接在連接池中最小生存時間 單位毫秒
            props.setProperty("maxEvictableIdleTimeMillis", "900000"); //配置連接在連接池中最大生存時間 單位毫秒
            props.setProperty("validationQuery", "select 1");
            props.setProperty("testWhileIdle", "true");
            props.setProperty("testOnBorrow", "false");
            props.setProperty("testOnReturn", "false");
            props.setProperty("keepAlive", "true");
            props.setProperty("phyMaxUseCount", "100000");
//            props.setProperty("driverClassName", "com.mysql.jdbc.Driver");

            dataSource = DruidDataSourceFactory.createDataSource(props);   連接池只有一份

        } catch (Exception e) {

    public static Connection getConnection() throws SQLException {
        return dataSource.getConnection();

    // 提供關(guān)閉資源的方法【connection是歸還到連接池】
    // 提供關(guān)閉資源的方法 【方法重載】3 dql
    public static void closeResource(ResultSet resultSet, PreparedStatement preparedStatement,
                                     Connection connection) {
        // 關(guān)閉結(jié)果集
        // ctrl+alt+m 將java語句抽取成方法


        // 關(guān)閉語句執(zhí)行者

        // 關(guān)閉連接

    private static void closeConnection(Connection connection) {
        if (connection != null) {  //防止空指針異常
            try {
            } catch (SQLException e) {

    private static void closePrepareStatement(PreparedStatement preparedStatement) {
        if (preparedStatement != null) {
            try {
            } catch (SQLException e) {

    private static void closeResultSet(ResultSet resultSet) {
        if (resultSet != null) {
            try {
            } catch (SQLException e) {


import java.sql.{Connection, PreparedStatement, ResultSet}

trait QueryCallback {
  def process(rs: ResultSet)

class SqlProxy {
  private var rs: ResultSet = _
  private var psmt: PreparedStatement = _

    * 執(zhí)行修改語句
    * @param conn
    * @param sql
    * @param params
    * @return
  def executeUpdate(conn: Connection, sql: String, params: Array[Any]): Int = {
    var rtn = 0
    try {
      psmt = conn.prepareStatement(sql)
      if (params != null && params.length > 0) {  //注意防止空指針異常
        for (i <- 0 until params.length) {
          psmt.setObject(i + 1, params(i))
      rtn = psmt.executeUpdate()
    } catch {
      case e: Exception => e.printStackTrace()
    rtn  //狀態(tài)碼?

    * 執(zhí)行查詢語句 
    * @param conn
    * @param sql
    * @param params
    * @return
  def executeQuery(conn: Connection, sql: String, params: Array[Any], queryCallback: QueryCallback) = {
    rs = null
    try {
      psmt = conn.prepareStatement(sql)
      if (params != null && params.length > 0) {
        for (i <- 0 until params.length) {
          psmt.setObject(i + 1, params(i))
      rs = psmt.executeQuery()
    } catch {
      case e: Exception => e.printStackTrace()

  def shutdown(conn: Connection): Unit = DataSourceUtil.closeResource(rs, psmt, conn)


import java.lang
import java.sql.ResultSet
import java.util.Random

import com.atguigu.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object RegisterStreaming {
 private val groupid = "register_group_test"

 def main(args: Array[String]): Unit = {
 val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
 .set("spark.streaming.kafka.maxRatePerPartition", "50")   //設(shè)置每秒每個分區(qū)拉取信息最大條數(shù)
 .set("spark.streaming.stopGracefullyOnShutdown", "true")  //開啟優(yōu)雅的關(guān)閉
// 還應(yīng)該開啟背壓機(jī)制(削峰)

 val ssc = new StreamingContext(conf, Seconds(3))

 val topics = Array("register_topic")
 val kafkaMap: Map[String, Object] = Map[String, Object](
 "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
 "key.deserializer" -> classOf[StringDeserializer],
 "value.deserializer" -> classOf[StringDeserializer],
 "group.id" -> groupid,
 "auto.offset.reset" -> "earliest",
 "enable.auto.commit" -> (false: lang.Boolean)

val sqlProxy = new SqlProxy() 
val offsetMap = new mutable.HashMap[TopicPartition, Long]() 
val client = DataSourceUtil.getConnection
 try {
 sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
 override def process(rs: ResultSet): Unit = {
 while (rs.next()) {

 val model = new TopicPartition(rs.getString(2), rs.getInt(3)) //獲取topic娃闲、partition

 val offset = rs.getLong(4) //獲取偏移量

 offsetMap.put(model, offset)
 rs.close() //關(guān)閉游標(biāo) }
 } catch {
 case e: Exception => e.printStackTrace()
 } finally {
 //設(shè)置kafka消費數(shù)據(jù)的參數(shù)  判斷本地是否有偏移量  有則根據(jù)偏移量繼續(xù)消費 無則重新消費 
val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
   KafkaUtils.createDirectStream(  ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
 } else {
     KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))

 val resultDStream = stream.filter(item => item.value().split("\t").length == 3). mapPartitions(partitions => {
 partitions.map(item => {
 val line = item.value()
 val arr = line.split("\t")
 val app_name = arr(1) match {
 case "1" => "PC"
 case "2" => "APP"
 case _ => "Other"
 (app_name, 1)

 // resultDStream.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(60), Seconds(6)).print()

 val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
    val currentCount = values.sum //本批次求和 
    val previousCount = state.getOrElse(0) //歷史數(shù)據(jù) 
    Some(currentCount + previousCount)


// val dsStream = stream.filter(item => item.value().split("\t").length == 3)
// .mapPartitions(partitions =>
// partitions.map(item => {
// **val rand = new Random()**
// val line = item.value()
// val arr = line.split("\t")
// val app_id = arr(1)
// (rand.nextInt(3) + "_" + app_id, 1)  //在key中添加隨機(jī)數(shù),防止數(shù)據(jù)傾斜 }))
// dsStream.print()
// val a = dsStream.reduceByKey(_ + _)
// a.print()
// a.map(item => {
//   val appid = item._1.split("_")(1)
// (appid, item._2)
// }).reduceByKey(_ + _).print()

//處理完 業(yè)務(wù)邏輯后 手動提交offset維護(hù)到本地 mysql中 
stream.foreachRDD(rdd => {
 val sqlProxy = new SqlProxy()
 val client = DataSourceUtil.getConnection
 try {
 val offsetRanges: Array[OffsetRange] = **rdd**.**asInstanceOf[HasOffsetRanges].offsetRanges**
 for (or <- offsetRanges) {
 sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",Array(groupid, or.topic, or.partition.toString, or.untilOffset))
 } catch {
 case e: Exception => e.printStackTrace()
 } finally {


groupid topic partition untiloffset
groupid01 topic01 1 12123

MySQL中replace into用法詳解

replace into t(id, update_time) values(1, now());
replace into t(id, update_time) select 1, now();
replace into 跟 insert 功能類似皇帮,不同點在于:replace into 首先嘗試插入數(shù)據(jù)到表中卷哩, 
1. 如果發(fā)現(xiàn)表中已經(jīng)有此行數(shù)據(jù)(根據(jù)主鍵或者唯一索引判斷)則先刪除此行數(shù)據(jù),然后插入新的數(shù)據(jù)属拾。
 2. 否則将谊,直接插入新數(shù)據(jù)。
要注意的是:插入數(shù)據(jù)的表必須有主鍵或者是唯一索引渐白!否則的話尊浓,replace into 會直接插入數(shù)據(jù),這將導(dǎo)致表中出現(xiàn)重復(fù)的數(shù)據(jù)眠砾。
MySQL replace into 有三種形式:
1. replace into tbl_name(col_name, ...) values(...)
2. replace into tbl_name(col_name, ...) select ...
3. replace into tbl_name set col_name=value, ...
第一種形式類似于insert into的用法褒颈,
第二種replace select的用法也類似于insert select柒巫,這種用法并不一定要求列名匹配谷丸,事實上,MYSQL甚至不關(guān)心select返回的列名刨疼,它需要的是列的位置泉唁。
     例如亭畜,replace into tb1( name, title, mood) select rname, rtitle, rmood from tb2;這個例子使用replace into從tb2中將所有數(shù)據(jù)導(dǎo)入tb1中拴鸵。
第三種replace set用法類似于update set用法劲藐,使用一個例如“SET col_name = col_name + 1”的賦值聘芜,則對位于右側(cè)的列名稱的引用會被作為DEFAULT(col_name)處理汰现。因此瞎饲,該賦值相當(dāng)于SET col_name = DEFAULT(col_name) + 1企软。
前兩種形式用的多些饭望。其中 “into” 關(guān)鍵字可以省略形庭,不過最好加上 “into”萨醒,這樣意思更加直觀富纸。另外,對于那些沒有給予值的列综慎,MySQL 將自動為這些列賦上默認(rèn)值示惊。


import java.lang
import java.sql.{Connection, ResultSet}
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

import com.atguigu.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

  * 知識點掌握度實時統(tǒng)計
object QzPointStreaming {

  private val groupid = "qz_point_group"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
      .set("spark.streaming.kafka.maxRatePerPartition", "50")  
      .set("spark.streaming.stopGracefullyOnShutdown", "true")
    val ssc = new StreamingContext(conf, Seconds(3))
    val topics = Array("qz_log")
    val kafkaMap: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest",  //保證第一次消費不丟數(shù)據(jù)
      "enable.auto.commit" -> (false: lang.Boolean)
    val sqlProxy = new SqlProxy()
    val offsetMap = new mutable.HashMap[TopicPartition, Long]()
    val client = DataSourceUtil.getConnection
    try {
      sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            val model = new TopicPartition(rs.getString(2), rs.getInt(3))
            val offset = rs.getLong(4)
            offsetMap.put(model, offset)
          rs.close() //關(guān)閉游標(biāo)
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {

    //設(shè)置kafka消費數(shù)據(jù)的參數(shù)  判斷本地是否有偏移量  有則根據(jù)偏移量繼續(xù)消費 無則重新消費
    val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
    } else {
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
    //過濾不正常數(shù)據(jù) 獲取數(shù)據(jù)
    val dsStream = stream.filter(item => item.value().split("\t").length == 6).
      mapPartitions(partition => partition.map(item => {
        val line = item.value()
        val arr = line.split("\t")
        val uid = arr(0) //用戶id
        val courseid = arr(1) //課程id
        val pointid = arr(2) //知識點id
        val questionid = arr(3) //題目id
        val istrue = arr(4) //是否正確
        val createtime = arr(5) //創(chuàng)建時間
        (uid, courseid, pointid, questionid, istrue, createtime)
    dsStream.foreachRDD(rdd => {
      //獲取相同用戶 同一課程 同一知識點的數(shù)據(jù)
      val groupRdd = rdd.groupBy(item => item._1 + "-" + item._2 + "-" + item._3)
      groupRdd.foreachPartition(partition => {
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partition.foreach { case (key, iters) =>
            qzQuestionUpdate(key, iters, sqlProxy, client) //對題庫進(jìn)行更新操作
        } catch {
          case e: Exception => e.printStackTrace()
        finally {
    //處理完 業(yè)務(wù)邏輯后 手動提交offset維護(hù)到本地 mysql中
    stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {

    * 對題目表進(jìn)行更新操作
    * @param key
    * @param iters
    * @param sqlProxy
    * @param client
    * @return
  def qzQuestionUpdate(key: String, iters: Iterable[(String, String, String, String, String, String)], sqlProxy: SqlProxy, client: Connection) = {
    val keys = key.split("-")
    val userid = keys(0).toInt
    val courseid = keys(1).toInt
    val pointid = keys(2).toInt
    val array = iters.toArray
    val questionids = array.map(_._4).distinct //對當(dāng)前批次的數(shù)據(jù)下questionid 去重
    //查詢歷史數(shù)據(jù)下的 questionid
    var questionids_history: Array[String] = Array()
    sqlProxy.executeQuery(client, "select questionids from qz_point_history where userid=? and courseid=? and pointid=?",
      Array(userid, courseid, pointid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            questionids_history = rs.getString(1).split(",")
          rs.close() //關(guān)閉游標(biāo)
    //獲取到歷史數(shù)據(jù)后再與當(dāng)前數(shù)據(jù)進(jìn)行拼接 去重
    val resultQuestionid = questionids.union(questionids_history).distinct
    val countSize = resultQuestionid.length
    val resultQuestionid_str = resultQuestionid.mkString(",")
    val qz_count = questionids.length //去重后的題個數(shù)
    var qz_sum = array.length //獲取當(dāng)前批次題總數(shù)
    var qz_istrue = array.filter(_._5.equals("1")).size //獲取當(dāng)前批次做正確的題個數(shù)
    val createtime = array.map(_._6).min //獲取最早的創(chuàng)建時間 作為表中創(chuàng)建時間
    //更新qz_point_set 記錄表 此表用于存當(dāng)前用戶做過的questionid表
    val updatetime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now())
    sqlProxy.executeUpdate(client, "insert into qz_point_history(userid,courseid,pointid,questionids,createtime,updatetime) values(?,?,?,?,?,?) " +
      " on duplicate key update questionids=?,updatetime=?", Array(userid, courseid, pointid, resultQuestionid_str, createtime, createtime, resultQuestionid_str, updatetime))

    var qzSum_history = 0
    var istrue_history = 0
    sqlProxy.executeQuery(client, "select qz_sum,qz_istrue from qz_point_detail where userid=? and courseid=? and pointid=?",
      Array(userid, courseid, pointid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            qzSum_history += rs.getInt(1)
            istrue_history += rs.getInt(2)
    qz_sum += qzSum_history
    qz_istrue += istrue_history
    val correct_rate = qz_istrue.toDouble / qz_sum.toDouble //計算正確率
    //假設(shè)每個知識點下一共有30道題  先計算題的做題情況 再計知識點掌握度
    val qz_detail_rate = countSize.toDouble / 30 //算出做題情況乘以 正確率 得出完成率 假如30道題都做了那么正確率等于 知識點掌握度
    val mastery_rate = qz_detail_rate * correct_rate
    sqlProxy.executeUpdate(client, "insert into qz_point_detail(userid,courseid,pointid,qz_sum,qz_count,qz_istrue,correct_rate,mastery_rate,createtime,updatetime)" +
      " values(?,?,?,?,?,?,?,?,?,?) on duplicate key update qz_sum=?,qz_count=?,qz_istrue=?,correct_rate=?,mastery_rate=?,updatetime=?",
      Array(userid, courseid, pointid, qz_sum, countSize, qz_istrue, correct_rate, mastery_rate, createtime, updatetime, qz_sum, countSize, qz_istrue, correct_rate, mastery_rate, updatetime))



import java.lang
import java.sql.{Connection, ResultSet}
import java.text.NumberFormat

import com.atguigu.qzpoint.util.{DataSourceUtil, ParseJsonData, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkFiles}
import org.lionsoul.ip2region.{DbConfig, DbSearcher}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

  * 頁面轉(zhuǎn)換率實時統(tǒng)計
object PageStreaming {
  private val groupid = "vip_count_groupid"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
      .set("spark.streaming.kafka.maxRatePerPartition", "30")
      .set("spark.streaming.stopGracefullyOnShutdown", "true")
    val ssc = new StreamingContext(conf, Seconds(3))
    val topics = Array("page_topic")
    val kafkaMap: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: lang.Boolean)
    val sqlProxy = new SqlProxy()
    val offsetMap = new mutable.HashMap[TopicPartition, Long]()
    val client = DataSourceUtil.getConnection
    try {
      sqlProxy.executeQuery(client, "select *from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            val model = new TopicPartition(rs.getString(2), rs.getInt(3))
            val offset = rs.getLong(4)
            offsetMap.put(model, offset)
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {

    //設(shè)置kafka消費數(shù)據(jù)的參數(shù) 判斷本地是否有偏移量  有則根據(jù)偏移量繼續(xù)消費 無則重新消費
    val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
    } else {
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
    val dsStream = stream.map(item => item.value()).mapPartitions(partition => {
      partition.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)  //判斷是否包含key
        val uid = if (jsonObject.containsKey("uid")) jsonObject.getString("uid") else ""
        val app_id = if (jsonObject.containsKey("app_id")) jsonObject.getString("app_id") else ""
        val device_id = if (jsonObject.containsKey("device_id")) jsonObject.getString("device_id") else ""
        val ip = if (jsonObject.containsKey("ip")) jsonObject.getString("ip") else ""
        val last_page_id = if (jsonObject.containsKey("last_page_id")) jsonObject.getString("last_page_id") else ""
        val pageid = if (jsonObject.containsKey("page_id")) jsonObject.getString("page_id") else ""
        val next_page_id = if (jsonObject.containsKey("next_page_id")) jsonObject.getString("next_page_id") else ""
        (uid, app_id, device_id, ip, last_page_id, pageid, next_page_id)
    }).filter(item => {
      !item._5.equals("") && !item._6.equals("") && !item._7.equals("")
    val pageValueDStream = dsStream.map(item => (item._5 + "_" + item._6 + "_" + item._7, 1))
    val resultDStream = pageValueDStream.reduceByKey(_ + _)
    resultDStream.foreachRDD(rdd => {
      rdd.foreachPartition(partition => {
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partition.foreach(item => {
            calcPageJumpCount(sqlProxy, item, client) //計算頁面跳轉(zhuǎn)個數(shù)
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {

    ssc.sparkContext.addFile(this.getClass.getResource("/ip2region.db").getPath) //廣播文件
    val ipDStream = dsStream.mapPartitions(patitions => {
      val dbFile = SparkFiles.get("ip2region.db")
      val ipsearch = new DbSearcher(new DbConfig(), dbFile)
      patitions.map { item =>
        val ip = item._4
        val province = ipsearch.memorySearch(ip).getRegion().split("\\|")(2) //獲取ip詳情   中國|0|上海|上海市|有線通
        (province, 1l) //根據(jù)省份 統(tǒng)計點擊個數(shù)
    }).reduceByKey(_ + _)

    ipDStream.foreachRDD(rdd => {
      //查詢mysql歷史數(shù)據(jù) 轉(zhuǎn)成rdd
      val ipSqlProxy = new SqlProxy()
      val ipClient = DataSourceUtil.getConnection
      try {
        val history_data = new ArrayBuffer[(String, Long)]()
        ipSqlProxy.executeQuery(ipClient, "select province,num from tmp_city_num_detail", null, new QueryCallback {
          override def process(rs: ResultSet): Unit = {
            while (rs.next()) {
              val tuple = (rs.getString(1), rs.getLong(2))
              history_data += tuple
        val history_rdd = ssc.sparkContext.makeRDD(history_data)
        val resultRdd = history_rdd.fullOuterJoin(rdd).map(item => {
          val province = item._1
          val nums = item._2._1.getOrElse(0l) + item._2._2.getOrElse(0l)
          (province, nums)
        resultRdd.foreachPartition(partitions => {
          val sqlProxy = new SqlProxy()
          val client = DataSourceUtil.getConnection
          try {
            partitions.foreach(item => {
              val province = item._1
              val num = item._2
              //修改mysql數(shù)據(jù) 并重組返回最新結(jié)果數(shù)據(jù)
              sqlProxy.executeUpdate(client, "insert into tmp_city_num_detail(province,num)values(?,?) on duplicate key update num=?",
                Array(province, num, num))
          } catch {
            case e: Exception => e.printStackTrace()
          } finally {
        val top3Rdd = resultRdd.sortBy[Long](_._2, false).take(3)
        sqlProxy.executeUpdate(ipClient, "truncate table top_city_num", null)
        top3Rdd.foreach(item => {
          sqlProxy.executeUpdate(ipClient, "insert into top_city_num (province,num) values(?,?)", Array(item._1, item._2))
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {

    //處理完 業(yè)務(wù)邏輯后 手動提交offset維護(hù)到本地 mysql中
    stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        calcJumRate(sqlProxy, client) //計算轉(zhuǎn)換率
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {

    * 計算頁面跳轉(zhuǎn)個數(shù)
    * @param sqlProxy
    * @param item
    * @param client
  def calcPageJumpCount(sqlProxy: SqlProxy, item: (String, Int), client: Connection): Unit = {
    val keys = item._1.split("_")
    var num: Long = item._2
    val page_id = keys(1).toInt //獲取當(dāng)前page_id
    val last_page_id = keys(0).toInt //獲取上一page_id
    val next_page_id = keys(2).toInt //獲取下頁面page_id
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(page_id), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          num += rs.getLong(1)

      //對num 進(jìn)行修改 并且判斷當(dāng)前page_id是否為首頁
      if (page_id == 1) {
        sqlProxy.executeUpdate(client, "insert into page_jump_rate(last_page_id,page_id,next_page_id,num,jump_rate)" +
          "values(?,?,?,?,?) on duplicate key update num=num+?", Array(last_page_id, page_id, next_page_id, num, "100%", num))
      } else {
        sqlProxy.executeUpdate(client, "insert into page_jump_rate(last_page_id,page_id,next_page_id,num)" +
          "values(?,?,?,?) on duplicate key update num=num+?", Array(last_page_id, page_id, next_page_id, num, num))

    * 計算轉(zhuǎn)換率
  def calcJumRate(sqlProxy: SqlProxy, client: Connection): Unit = {
    var page1_num = 0l
    var page2_num = 0l
    var page3_num = 0l
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(1), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          page1_num = rs.getLong(1)
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(2), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          page2_num = rs.getLong(1)
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(3), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          page3_num = rs.getLong(1)
    val nf = NumberFormat.getPercentInstance
    val page1ToPage2Rate = if (page1_num == 0) "0%" else nf.format(page2_num.toDouble / page1_num.toDouble)
    val page2ToPage3Rate = if (page2_num == 0) "0%" else nf.format(page3_num.toDouble / page2_num.toDouble)
    sqlProxy.executeUpdate(client, "update page_jump_rate set jump_rate=? where page_id=?", Array(page1ToPage2Rate, 2))
    sqlProxy.executeUpdate(client, "update page_jump_rate set jump_rate=? where page_id=?", Array(page2ToPage3Rate, 3))



import java.lang
import java.sql.{Connection, ResultSet}

import com.atguigu.qzpoint.bean.LearnModel
import com.atguigu.qzpoint.util.{DataSourceUtil, ParseJsonData, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

object CourseLearnStreaming {
  private val groupid = "course_learn_test1"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
      .set("spark.streaming.kafka.maxRatePerPartition", "30")
      .set("spark.streaming.stopGracefullyOnShutdown", "true")

    val ssc = new StreamingContext(conf, Seconds(3))
    val topics = Array("course_learn")
    val kafkaMap: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: lang.Boolean)
    val sqlProxy = new SqlProxy()
    val offsetMap = new mutable.HashMap[TopicPartition, Long]()
    val client = DataSourceUtil.getConnection
    try {
      sqlProxy.executeQuery(client, "select *from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            val model = new TopicPartition(rs.getString(2), rs.getInt(3))
            val offset = rs.getLong(4)
            offsetMap.put(model, offset)
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
    //設(shè)置kafka消費數(shù)據(jù)的參數(shù) 判斷本地是否有偏移量  有則根據(jù)偏移量繼續(xù)消費 無則重新消費
    val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
    } else {
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))

    val dsStream = stream.mapPartitions(partitions => {
      partitions.map(item => {
        val json = item.value()
        val jsonObject = ParseJsonData.getJsonData(json)
        val userId = jsonObject.getIntValue("uid")
        val cwareid = jsonObject.getIntValue("cwareid")
        val videoId = jsonObject.getIntValue("videoid")
        val chapterId = jsonObject.getIntValue("chapterid")
        val edutypeId = jsonObject.getIntValue("edutypeid")
        val subjectId = jsonObject.getIntValue("subjectid")
        val sourceType = jsonObject.getString("sourceType")
        val speed = jsonObject.getIntValue("speed")
        val ts = jsonObject.getLong("ts")
        val te = jsonObject.getLong("te")
        val ps = jsonObject.getIntValue("ps")
        val pe = jsonObject.getIntValue("pe")
        LearnModel(userId, cwareid, videoId, chapterId, edutypeId, subjectId, sourceType, speed, ts, te, ps, pe)

    dsStream.foreachRDD(rdd => {
      //統(tǒng)計播放視頻 有效時長 完成時長 總時長
      rdd.groupBy(item => item.userId + "_" + item.cwareId + "_" + item.videoId).foreachPartition(partitoins => {
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partitoins.foreach { case (key, iters) =>
            calcVideoTime(key, iters, sqlProxy, client) //計算視頻時長
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
      rdd.mapPartitions(partitions => {
        partitions.map(item => {
          val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
          val key = item.chapterId
          (key, totaltime)
      }).reduceByKey(_ + _)
        .foreachPartition(partitoins => {
          val sqlProxy = new SqlProxy()
          val client = DataSourceUtil.getConnection
          try {
            partitoins.foreach(item => {
              sqlProxy.executeUpdate(client, "insert into chapter_learn_detail(chapterid,totaltime) values(?,?) on duplicate key" +
                " update totaltime=totaltime+?", Array(item._1, item._2, item._2))
          } catch {
            case e: Exception => e.printStackTrace()
          } finally {

      rdd.mapPartitions(partitions => {
        partitions.map(item => {
          val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
          val key = item.cwareId
          (key, totaltime)
      }).reduceByKey(_ + _).foreachPartition(partitions => {
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partitions.foreach(item => {
            sqlProxy.executeUpdate(client, "insert into cwareid_learn_detail(cwareid,totaltime) values(?,?) on duplicate key " +
              "update totaltime=totaltime+?", Array(item._1, item._2, item._2))
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {

      rdd.mapPartitions(partitions => {
        partitions.map(item => {
          val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
          val key = item.edutypeId
          (key, totaltime)
      }).reduceByKey(_ + _).foreachPartition(partitions => {
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partitions.foreach(item => {
            sqlProxy.executeUpdate(client, "insert into edutype_learn_detail(edutypeid,totaltime) values(?,?) on duplicate key " +
              "update totaltime=totaltime+?", Array(item._1, item._2, item._2))
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {

      rdd.mapPartitions(partitions => {
        partitions.map(item => {
          val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
          val key = item.sourceType
          (key, totaltime)
      }).reduceByKey(_ + _).foreachPartition(partitions => {
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partitions.foreach(item => {
            sqlProxy.executeUpdate(client, "insert into sourcetype_learn_detail (sourcetype_learn,totaltime) values(?,?) on duplicate key " +
              "update totaltime=totaltime+?", Array(item._1, item._2, item._2))
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
      // 統(tǒng)計同一科目下的播放總時長
      rdd.mapPartitions(partitions => {
        partitions.map(item => {
          val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
          val key = item.subjectId
          (key, totaltime)
      }).reduceByKey(_ + _).foreachPartition(partitons => {
        val sqlProxy = new SqlProxy()
        val clinet = DataSourceUtil.getConnection
        try {
          partitons.foreach(item => {
            sqlProxy.executeUpdate(clinet, "insert into subject_learn_detail(subjectid,totaltime) values(?,?) on duplicate key " +
              "update totaltime=totaltime+?", Array(item._1, item._2, item._2))
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {

    //處理完 業(yè)務(wù)邏輯后 手動提交offset維護(hù)到本地 mysql中
    stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {

    * 計算視頻 有效時長  完成時長 總時長
    * @param key
    * @param iters
    * @param sqlProxy
    * @param client
  def calcVideoTime(key: String, iters: Iterable[LearnModel], sqlProxy: SqlProxy, client: Connection) = {
    val keys = key.split("_")
    val userId = keys(0).toInt
    val cwareId = keys(1).toInt
    val videoId = keys(2).toInt
    var interval_history = ""
    sqlProxy.executeQuery(client, "select play_interval from video_interval where userid=? and cwareid=? and videoid=?",
      Array(userId, cwareId, videoId), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            interval_history = rs.getString(1)
    var effective_duration_sum = 0l //有效總時長
    var complete_duration_sum = 0l //完成總時長
    var cumulative_duration_sum = 0l //播放總時長
    val learnList = iters.toList.sortBy(item => item.ps) //轉(zhuǎn)成list 并根據(jù)開始區(qū)間升序排序
    learnList.foreach(item => {
      if ("".equals(interval_history)) {
        val play_interval = item.ps + "-" + item.pe //有效區(qū)間
        val effective_duration = Math.ceil((item.te - item.ts) / 1000) //有效時長
        val complete_duration = item.pe - item.ps //完成時長
        effective_duration_sum += effective_duration.toLong
        cumulative_duration_sum += effective_duration.toLong
        complete_duration_sum += complete_duration
        interval_history = play_interval
      } else {
        val interval_arry = interval_history.split(",").sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt))
        val tuple = getEffectiveInterval(interval_arry, item.ps, item.pe)
        val complete_duration = tuple._1 //獲取實際有效完成時長
        val effective_duration = Math.ceil((item.te - item.ts) / 1000) / (item.pe - item.ps) * complete_duration //計算有效時長
        val cumulative_duration = Math.ceil((item.te - item.ts) / 1000) //累計時長
        interval_history = tuple._2
        effective_duration_sum += effective_duration.toLong
        complete_duration_sum += complete_duration
        cumulative_duration_sum += cumulative_duration.toLong
      sqlProxy.executeUpdate(client, "insert into video_interval(userid,cwareid,videoid,play_interval) values(?,?,?,?) " +
        "on duplicate key update play_interval=?", Array(userId, cwareId, videoId, interval_history, interval_history))
      sqlProxy.executeUpdate(client, "insert into video_learn_detail(userid,cwareid,videoid,totaltime,effecttime,completetime) " +
        "values(?,?,?,?,?,?) on duplicate key update totaltime=totaltime+?,effecttime=effecttime+?,completetime=completetime+?",
        Array(userId, cwareId, videoId, cumulative_duration_sum, effective_duration_sum, complete_duration_sum, cumulative_duration_sum,
          effective_duration_sum, complete_duration_sum))

    * 計算有效區(qū)間
    * @param array
    * @param start
    * @param end
    * @return
  def getEffectiveInterval(array: Array[String], start: Int, end: Int) = {
    var effective_duration = end - start
    var bl = false //是否對有效時間進(jìn)行修改
    import scala.util.control.Breaks._
    breakable {
      for (i <- 0 until array.length) {
        var historyStart = 0 //獲取其中一段的開始播放區(qū)間
        var historyEnd = 0 //獲取其中一段結(jié)束播放區(qū)間
        val item = array(i)
        try {
          historyStart = item.split("-")(0).toInt
          historyEnd = item.split("-")(1).toInt
        } catch {
          case e: Exception => throw new Exception("error array:" + array.mkString(","))
        if (start >= historyStart && historyEnd >= end) {
          //已有數(shù)據(jù)占用全部播放時長 此次播放無效
          effective_duration = 0
          bl = true
        } else if (start <= historyStart && end > historyStart && end < historyEnd) {
          //和已有數(shù)據(jù)左側(cè)存在交集 扣除部分有效時間(以老數(shù)據(jù)為主進(jìn)行對照)
          effective_duration -= end - historyStart
          array(i) = start + "-" + historyEnd
          bl = true
        } else if (start > historyStart && start < historyEnd && end >= historyEnd) {
          //和已有數(shù)據(jù)右側(cè)存在交集 扣除部分有效時間
          effective_duration -= historyEnd - start
          array(i) = historyStart + "-" + end
          bl = true
        } else if (start < historyStart && end > historyEnd) {
          //現(xiàn)數(shù)據(jù) 大于舊數(shù)據(jù) 扣除舊數(shù)據(jù)所有有效時間
          effective_duration -= historyEnd - historyStart
          array(i) = start + "-" + end
          bl = true
    val result = bl match {
      case false => {
        //沒有修改原array 沒有交集 進(jìn)行新增
        val distinctArray2 = ArrayBuffer[String]()
        distinctArray2.append(start + "-" + end)
        val distinctArray = distinctArray2.distinct.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt))
        val tmpArray = ArrayBuffer[String]()
        for (i <- 1 until distinctArray.length) {
          val item = distinctArray(i).split("-")
          val tmpItem = tmpArray(tmpArray.length - 1).split("-")
          val itemStart = item(0)
          val itemEnd = item(1)
          val tmpItemStart = tmpItem(0)
          val tmpItemEnd = tmpItem(1)
          if (tmpItemStart.toInt < itemStart.toInt && tmpItemEnd.toInt < itemStart.toInt) {
            tmpArray.append(itemStart + "-" + itemEnd)
          } else {
            val resultStart = tmpItemStart
            val resultEnd = if (tmpItemEnd.toInt > itemEnd.toInt) tmpItemEnd else itemEnd
            tmpArray(tmpArray.length - 1) = resultStart + "-" + resultEnd
        val play_interval = tmpArray.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt)).mkString(",")
      case true => {
        //修改了原array 進(jìn)行區(qū)間重組
        val distinctArray = array.distinct.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt))
        val tmpArray = ArrayBuffer[String]()
        for (i <- 1 until distinctArray.length) {
          val item = distinctArray(i).split("-")
          val tmpItem = tmpArray(tmpArray.length - 1).split("-")
          val itemStart = item(0)
          val itemEnd = item(1)
          val tmpItemStart = tmpItem(0)
          val tmpItemEnd = tmpItem(1)
          if (tmpItemStart.toInt < itemStart.toInt && tmpItemEnd.toInt < itemStart.toInt) {
            tmpArray.append(itemStart + "-" + itemEnd)
          } else {
            val resultStart = tmpItemStart
            val resultEnd = if (tmpItemEnd.toInt > itemEnd.toInt) tmpItemEnd else itemEnd
            tmpArray(tmpArray.length - 1) = resultStart + "-" + resultEnd
        val play_interval = tmpArray.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt)).mkString(",")
    (effective_duration, result)
