實時-在線教育

第1章 項目需求及架構(gòu)設(shè)計

1.1 需求

  1. 數(shù)據(jù)采集平臺搭建
  2. Kafka嫡丙、Zookeeper中間件準(zhǔn)備
  3. 下游Spark Streaming對接Kafka接收數(shù)據(jù)菱阵,
  1. 實現(xiàn)vip個數(shù)統(tǒng)計
  2. 欄目打標(biāo)簽功能
  3. 做題正確率與掌握度的實時計算功能佑惠。

1.2 項目框架

1.2.1 技術(shù)選型

一、數(shù)據(jù)存儲:Kafka敏弃、MySql
二笛辟、數(shù)據(jù)處理:Spark
三、其他組件:Zookeeper

1.2.2 流程設(shè)計

流程圖

1.3 思考

(1)Spark Streaming 下每個stage的耗時由什么決定
(2)Spark Streaming task發(fā)生數(shù)據(jù)傾斜如何解決
(3)Spark Streaming操作mysql時宫患,相同維度的數(shù)據(jù)如何保證線程安全問題
(4)如何保證kill Spark Streaming任務(wù)的時候不丟失數(shù)據(jù)
(5)如何保證Spark Streaming的第一次啟動和kill后第二次啟動時據(jù)不丟失數(shù)據(jù)
(6)Spark Streaming下如何正確操作mysql(如何正確使用連接)
(7)MySql建表時 索引注意

pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <parent>
    <artifactId>education-online</artifactId>
    <groupId>com.qingfeng</groupId>
    <version>1.0-SNAPSHOT</version>
  </parent>
  <modelVersion>4.0.0</modelVersion>

  <artifactId>com_qingfeng_warehouse</artifactId>
  <dependencies>
    <!-- Spark的依賴引入 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
<!--      <scope>provided</scope>-->
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
<!--      <scope>provided</scope>-->
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>${spark.version}</version>
<!--      <scope>provided</scope>-->
    </dependency>
    <!-- 引入Scala -->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
<!--      <scope>provided</scope>-->
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.47</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.46</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <version>2.15.1</version>
        <executions>
          <execution>
            <id>compile-scala</id>
            <goals>
              <goal>add-source</goal>
              <goal>compile</goal>
            </goals>
          </execution>
          <execution>
            <id>test-compile-scala</id>
            <goals>
              <goal>add-source</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
            </manifest>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

MySQL配置文件

jdbc.url=jdbc:mysql://hadoop102:3306/course_learn?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false
jdbc.user=root
jdbc.password=000000

讀取配置文件工具類

import java.io.InputStream;
import java.util.Properties;

/**
 *
 * 讀取配置文件工具類
 */
public class ConfigurationManager {

  private static Properties prop = new Properties();

  static {
    try {
      InputStream inputStream = ConfigurationManager.class.getClassLoader()
          .getResourceAsStream("comerce.properties");    //自動加載配置文件
      prop.load(inputStream);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  //獲取配置項
  public static String getProperty(String key) {
    return prop.getProperty(key);
  }

  //獲取布爾類型的配置項
public static boolean getBoolean(String key) {
    String value = prop.getProperty(key);
    try {
      return Boolean.valueOf(value);                       //解析布爾值
    } catch (Exception e) {
      e.printStackTrace();
    }
    return false;
  }

}

Json解析工具類

package com.atguigu.qzpoint.util;

import com.alibaba.fastjson.JSONObject;


public class ParseJsonData {

    public static JSONObject getJsonData(String data) {
        try {
            return JSONObject.parseObject(data);  Java代碼
        } catch (Exception e) {
            return null;
        }
    }
}

Druid連接池

import com.alibaba.druid.pool.DruidDataSourceFactory;

import javax.sql.DataSource;
import java.io.Serializable;
import java.sql.*;
import java.util.Properties;

/**
 * 德魯伊連接池
 */
public class DataSourceUtil implements Serializable {
    public static DataSource dataSource = null;

    static {     //數(shù)據(jù)庫連接池放在靜態(tài)代碼塊中刊懈,保證只創(chuàng)建一份
        try {
            Properties props = new Properties();

            props.setProperty("url", ConfigurationManager.getProperty("jdbc.url"));
            props.setProperty("username", ConfigurationManager.getProperty("jdbc.user"));
            props.setProperty("password", ConfigurationManager.getProperty("jdbc.password"));
            props.setProperty("initialSize", "5"); //初始化大小
            props.setProperty("maxActive", "10"); //最大連接
            props.setProperty("minIdle", "5");  //最小連接
            props.setProperty("maxWait", "60000"); //等待時長
            props.setProperty("timeBetweenEvictionRunsMillis", "2000");//配置多久進(jìn)行一次檢測,檢測需要關(guān)閉的連接 單位毫秒
            props.setProperty("minEvictableIdleTimeMillis", "600000");//配置連接在連接池中最小生存時間 單位毫秒
            props.setProperty("maxEvictableIdleTimeMillis", "900000"); //配置連接在連接池中最大生存時間 單位毫秒
            props.setProperty("validationQuery", "select 1");
            props.setProperty("testWhileIdle", "true");
            props.setProperty("testOnBorrow", "false");
            props.setProperty("testOnReturn", "false");
            props.setProperty("keepAlive", "true");
            props.setProperty("phyMaxUseCount", "100000");
//            props.setProperty("driverClassName", "com.mysql.jdbc.Driver");

            dataSource = DruidDataSourceFactory.createDataSource(props);   連接池只有一份

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //提供獲取連接的方法
    public static Connection getConnection() throws SQLException {
        return dataSource.getConnection();
    }

    // 提供關(guān)閉資源的方法【connection是歸還到連接池】
    // 提供關(guān)閉資源的方法 【方法重載】3 dql
    public static void closeResource(ResultSet resultSet, PreparedStatement preparedStatement,
                                     Connection connection) {
        // 關(guān)閉結(jié)果集
        // ctrl+alt+m 將java語句抽取成方法

        closeResultSet(resultSet);

        // 關(guān)閉語句執(zhí)行者
        closePrepareStatement(preparedStatement);

        // 關(guān)閉連接
        closeConnection(connection);
    }

    private static void closeConnection(Connection connection) {
        if (connection != null) {  //防止空指針異常
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    private static void closePrepareStatement(PreparedStatement preparedStatement) {
        if (preparedStatement != null) {
            try {
                preparedStatement.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }


    private static void closeResultSet(ResultSet resultSet) {
        if (resultSet != null) {
            try {
                resultSet.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

創(chuàng)建MySQL代理類

import java.sql.{Connection, PreparedStatement, ResultSet}

trait QueryCallback {
  def process(rs: ResultSet)
}

class SqlProxy {
  private var rs: ResultSet = _
  private var psmt: PreparedStatement = _

  /**
    * 執(zhí)行修改語句
    *
    * @param conn
    * @param sql
    * @param params
    * @return
    */
  def executeUpdate(conn: Connection, sql: String, params: Array[Any]): Int = {
    var rtn = 0
    try {
      psmt = conn.prepareStatement(sql)
      if (params != null && params.length > 0) {  //注意防止空指針異常
        for (i <- 0 until params.length) {
          psmt.setObject(i + 1, params(i))
        }
      }
      rtn = psmt.executeUpdate()
    } catch {
      case e: Exception => e.printStackTrace()
    }
    rtn  //狀態(tài)碼?
  }

  /**
    * 執(zhí)行查詢語句 
    * @param conn
    * @param sql
    * @param params
    * @return
    */
  def executeQuery(conn: Connection, sql: String, params: Array[Any], queryCallback: QueryCallback) = {
    rs = null
    try {
      psmt = conn.prepareStatement(sql)
      if (params != null && params.length > 0) {
        for (i <- 0 until params.length) {
          psmt.setObject(i + 1, params(i))
        }
      }
      rs = psmt.executeQuery()
      queryCallback.process(rs)
    } catch {
      case e: Exception => e.printStackTrace()
    }
  }

  def shutdown(conn: Connection): Unit = DataSourceUtil.closeResource(rs, psmt, conn)
}

指標(biāo)一:實時統(tǒng)計注冊人數(shù)代碼實現(xiàn)

import java.lang
import java.sql.ResultSet
import java.util.Random

import com.atguigu.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object RegisterStreaming {
 private val groupid = "register_group_test"

 def main(args: Array[String]): Unit = {
 val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
 .set("spark.streaming.kafka.maxRatePerPartition", "50")   //設(shè)置每秒每個分區(qū)拉取信息最大條數(shù)
 .set("spark.streaming.stopGracefullyOnShutdown", "true")  //開啟優(yōu)雅的關(guān)閉
// 還應(yīng)該開啟背壓機(jī)制(削峰)

 val ssc = new StreamingContext(conf, Seconds(3))

 val topics = Array("register_topic")
 val kafkaMap: Map[String, Object] = Map[String, Object](
 "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
 "key.deserializer" -> classOf[StringDeserializer],
 "value.deserializer" -> classOf[StringDeserializer],
 "group.id" -> groupid,
 "auto.offset.reset" -> "earliest",
 "enable.auto.commit" -> (false: lang.Boolean)
 )
//設(shè)置checkpoint路徑
 ssc.checkpoint("hdfs://hadoop102:9000/user/qingfeng/sparkstreaming/checkpoint")

 //查詢mysql中是否有偏移量 
val sqlProxy = new SqlProxy() 
val offsetMap = new mutable.HashMap[TopicPartition, Long]() 
val client = DataSourceUtil.getConnection
 try {
 sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
 override def process(rs: ResultSet): Unit = {
 while (rs.next()) {

 val model = new TopicPartition(rs.getString(2), rs.getInt(3)) //獲取topic娃闲、partition

 val offset = rs.getLong(4) //獲取偏移量

 offsetMap.put(model, offset)
 }
 rs.close() //關(guān)閉游標(biāo) }
 })
 } catch {
 case e: Exception => e.printStackTrace()
 } finally {
 sqlProxy.shutdown(client)
 }
 //設(shè)置kafka消費數(shù)據(jù)的參數(shù)  判斷本地是否有偏移量  有則根據(jù)偏移量繼續(xù)消費 無則重新消費 
val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
   KafkaUtils.createDirectStream(  ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
 } else {
     KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
 }

//對數(shù)據(jù)先進(jìn)行過濾虚汛,后處理
 val resultDStream = stream.filter(item => item.value().split("\t").length == 3). mapPartitions(partitions => {
 partitions.map(item => {
 val line = item.value()
 val arr = line.split("\t")
 val app_name = arr(1) match {
 case "1" => "PC"
 case "2" => "APP"
 case _ => "Other"
 }
 (app_name, 1)
 })
 })
 resultDStream.cache()

 // resultDStream.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(60), Seconds(6)).print()

 val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
    val currentCount = values.sum //本批次求和 
    val previousCount = state.getOrElse(0) //歷史數(shù)據(jù) 
    Some(currentCount + previousCount)
 }

 resultDStream.updateStateByKey(updateFunc).print()

// val dsStream = stream.filter(item => item.value().split("\t").length == 3)
// .mapPartitions(partitions =>
// partitions.map(item => {
// **val rand = new Random()**
// val line = item.value()
// val arr = line.split("\t")
// val app_id = arr(1)
// (rand.nextInt(3) + "_" + app_id, 1)  //在key中添加隨機(jī)數(shù),防止數(shù)據(jù)傾斜 }))
// dsStream.print()
// val a = dsStream.reduceByKey(_ + _)
// a.print()
// a.map(item => {
//   val appid = item._1.split("_")(1)
// (appid, item._2)
// }).reduceByKey(_ + _).print()

//處理完 業(yè)務(wù)邏輯后 手動提交offset維護(hù)到本地 mysql中 
stream.foreachRDD(rdd => {
 val sqlProxy = new SqlProxy()
 val client = DataSourceUtil.getConnection
 try {
 val offsetRanges: Array[OffsetRange] = **rdd**.**asInstanceOf[HasOffsetRanges].offsetRanges**
 for (or <- offsetRanges) {
 sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",Array(groupid, or.topic, or.partition.toString, or.untilOffset))
 }
 } catch {
 case e: Exception => e.printStackTrace()
 } finally {
 sqlProxy.shutdown(client)
 }
 })
 ssc.start()
 ssc.awaitTermination()
 }
}

offset表結(jié)構(gòu):

groupid topic partition untiloffset
groupid01 topic01 1 12123

MySQL中replace into用法詳解

replace into t(id, update_time) values(1, now());
或
replace into t(id, update_time) select 1, now();
replace into 跟 insert 功能類似皇帮,不同點在于:replace into 首先嘗試插入數(shù)據(jù)到表中卷哩, 
1. 如果發(fā)現(xiàn)表中已經(jīng)有此行數(shù)據(jù)(根據(jù)主鍵或者唯一索引判斷)則先刪除此行數(shù)據(jù),然后插入新的數(shù)據(jù)属拾。
 2. 否則将谊,直接插入新數(shù)據(jù)。
要注意的是:插入數(shù)據(jù)的表必須有主鍵或者是唯一索引渐白!否則的話尊浓,replace into 會直接插入數(shù)據(jù),這將導(dǎo)致表中出現(xiàn)重復(fù)的數(shù)據(jù)眠砾。
MySQL replace into 有三種形式:
1. replace into tbl_name(col_name, ...) values(...)
2. replace into tbl_name(col_name, ...) select ...
3. replace into tbl_name set col_name=value, ...
第一種形式類似于insert into的用法褒颈,
第二種replace select的用法也類似于insert select柒巫,這種用法并不一定要求列名匹配谷丸,事實上,MYSQL甚至不關(guān)心select返回的列名刨疼,它需要的是列的位置泉唁。
     例如亭畜,replace into tb1( name, title, mood) select rname, rtitle, rmood from tb2;這個例子使用replace into從tb2中將所有數(shù)據(jù)導(dǎo)入tb1中拴鸵。
第三種replace set用法類似于update set用法劲藐,使用一個例如“SET col_name = col_name + 1”的賦值聘芜,則對位于右側(cè)的列名稱的引用會被作為DEFAULT(col_name)處理汰现。因此瞎饲,該賦值相當(dāng)于SET col_name = DEFAULT(col_name) + 1企软。
前兩種形式用的多些饭望。其中 “into” 關(guān)鍵字可以省略形庭,不過最好加上 “into”萨醒,這樣意思更加直觀富纸。另外,對于那些沒有給予值的列综慎,MySQL 將自動為這些列賦上默認(rèn)值示惊。

指標(biāo)二:實時統(tǒng)計學(xué)員做題正確率與知識點掌握度

import java.lang
import java.sql.{Connection, ResultSet}
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

import com.atguigu.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
  * 知識點掌握度實時統(tǒng)計
  */
object QzPointStreaming {

  private val groupid = "qz_point_group"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
      .set("spark.streaming.kafka.maxRatePerPartition", "50")  
      .set("spark.streaming.stopGracefullyOnShutdown", "true")
    val ssc = new StreamingContext(conf, Seconds(3))
    val topics = Array("qz_log")
    val kafkaMap: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest",  //保證第一次消費不丟數(shù)據(jù)
      "enable.auto.commit" -> (false: lang.Boolean)
    )
    //查詢mysql中是否存在偏移量
    val sqlProxy = new SqlProxy()
    val offsetMap = new mutable.HashMap[TopicPartition, Long]()
    val client = DataSourceUtil.getConnection
    try {
      sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            val model = new TopicPartition(rs.getString(2), rs.getInt(3))
            val offset = rs.getLong(4)
            offsetMap.put(model, offset)
          }
          rs.close() //關(guān)閉游標(biāo)
        }
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      sqlProxy.shutdown(client)
    }

    //設(shè)置kafka消費數(shù)據(jù)的參數(shù)  判斷本地是否有偏移量  有則根據(jù)偏移量繼續(xù)消費 無則重新消費
    val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
    } else {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
    }
    //過濾不正常數(shù)據(jù) 獲取數(shù)據(jù)
    val dsStream = stream.filter(item => item.value().split("\t").length == 6).
      mapPartitions(partition => partition.map(item => {
        val line = item.value()
        val arr = line.split("\t")
        val uid = arr(0) //用戶id
        val courseid = arr(1) //課程id
        val pointid = arr(2) //知識點id
        val questionid = arr(3) //題目id
        val istrue = arr(4) //是否正確
        val createtime = arr(5) //創(chuàng)建時間
        (uid, courseid, pointid, questionid, istrue, createtime)
      }))
    dsStream.foreachRDD(rdd => {
      //獲取相同用戶 同一課程 同一知識點的數(shù)據(jù)
      val groupRdd = rdd.groupBy(item => item._1 + "-" + item._2 + "-" + item._3)
      groupRdd.foreachPartition(partition => {
        //在分區(qū)下獲取jdbc連接
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partition.foreach { case (key, iters) =>
            qzQuestionUpdate(key, iters, sqlProxy, client) //對題庫進(jìn)行更新操作
          }
        } catch {
          case e: Exception => e.printStackTrace()
        }
        finally {
          sqlProxy.shutdown(client)
        }
      }
      )
    })
    //處理完 業(yè)務(wù)邏輯后 手動提交offset維護(hù)到本地 mysql中
    stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(client)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }

  /**
    * 對題目表進(jìn)行更新操作
    *
    * @param key
    * @param iters
    * @param sqlProxy
    * @param client
    * @return
    */
  def qzQuestionUpdate(key: String, iters: Iterable[(String, String, String, String, String, String)], sqlProxy: SqlProxy, client: Connection) = {
    val keys = key.split("-")
    val userid = keys(0).toInt
    val courseid = keys(1).toInt
    val pointid = keys(2).toInt
    val array = iters.toArray
    val questionids = array.map(_._4).distinct //對當(dāng)前批次的數(shù)據(jù)下questionid 去重
    //查詢歷史數(shù)據(jù)下的 questionid
    var questionids_history: Array[String] = Array()
    sqlProxy.executeQuery(client, "select questionids from qz_point_history where userid=? and courseid=? and pointid=?",
      Array(userid, courseid, pointid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            questionids_history = rs.getString(1).split(",")
          }
          rs.close() //關(guān)閉游標(biāo)
        }
      })
    //獲取到歷史數(shù)據(jù)后再與當(dāng)前數(shù)據(jù)進(jìn)行拼接 去重
    val resultQuestionid = questionids.union(questionids_history).distinct
    val countSize = resultQuestionid.length
    val resultQuestionid_str = resultQuestionid.mkString(",")
    val qz_count = questionids.length //去重后的題個數(shù)
    var qz_sum = array.length //獲取當(dāng)前批次題總數(shù)
    var qz_istrue = array.filter(_._5.equals("1")).size //獲取當(dāng)前批次做正確的題個數(shù)
    val createtime = array.map(_._6).min //獲取最早的創(chuàng)建時間 作為表中創(chuàng)建時間
    //更新qz_point_set 記錄表 此表用于存當(dāng)前用戶做過的questionid表
    val updatetime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now())
    sqlProxy.executeUpdate(client, "insert into qz_point_history(userid,courseid,pointid,questionids,createtime,updatetime) values(?,?,?,?,?,?) " +
      " on duplicate key update questionids=?,updatetime=?", Array(userid, courseid, pointid, resultQuestionid_str, createtime, createtime, resultQuestionid_str, updatetime))

    var qzSum_history = 0
    var istrue_history = 0
    sqlProxy.executeQuery(client, "select qz_sum,qz_istrue from qz_point_detail where userid=? and courseid=? and pointid=?",
      Array(userid, courseid, pointid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            qzSum_history += rs.getInt(1)
            istrue_history += rs.getInt(2)
          }
          rs.close()
        }
      })
    qz_sum += qzSum_history
    qz_istrue += istrue_history
    val correct_rate = qz_istrue.toDouble / qz_sum.toDouble //計算正確率
    //計算完成率
    //假設(shè)每個知識點下一共有30道題  先計算題的做題情況 再計知識點掌握度
    val qz_detail_rate = countSize.toDouble / 30 //算出做題情況乘以 正確率 得出完成率 假如30道題都做了那么正確率等于 知識點掌握度
    val mastery_rate = qz_detail_rate * correct_rate
    sqlProxy.executeUpdate(client, "insert into qz_point_detail(userid,courseid,pointid,qz_sum,qz_count,qz_istrue,correct_rate,mastery_rate,createtime,updatetime)" +
      " values(?,?,?,?,?,?,?,?,?,?) on duplicate key update qz_sum=?,qz_count=?,qz_istrue=?,correct_rate=?,mastery_rate=?,updatetime=?",
      Array(userid, courseid, pointid, qz_sum, countSize, qz_istrue, correct_rate, mastery_rate, createtime, updatetime, qz_sum, countSize, qz_istrue, correct_rate, mastery_rate, updatetime))

  }
}

指標(biāo)三:實時統(tǒng)計商品頁到訂單頁类嗤,訂單頁到支付頁轉(zhuǎn)換率

import java.lang
import java.sql.{Connection, ResultSet}
import java.text.NumberFormat

import com.atguigu.qzpoint.util.{DataSourceUtil, ParseJsonData, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkFiles}
import org.lionsoul.ip2region.{DbConfig, DbSearcher}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
  * 頁面轉(zhuǎn)換率實時統(tǒng)計
  */
object PageStreaming {
  private val groupid = "vip_count_groupid"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
      .set("spark.streaming.kafka.maxRatePerPartition", "30")
      .set("spark.streaming.stopGracefullyOnShutdown", "true")
  
    val ssc = new StreamingContext(conf, Seconds(3))
    val topics = Array("page_topic")
    val kafkaMap: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: lang.Boolean)
    )
    //查詢mysql中是否存在偏移量
    val sqlProxy = new SqlProxy()
    val offsetMap = new mutable.HashMap[TopicPartition, Long]()
    val client = DataSourceUtil.getConnection
    try {
      sqlProxy.executeQuery(client, "select *from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            val model = new TopicPartition(rs.getString(2), rs.getInt(3))
            val offset = rs.getLong(4)
            offsetMap.put(model, offset)
          }
          rs.close()
        }
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      sqlProxy.shutdown(client)
    }

    //設(shè)置kafka消費數(shù)據(jù)的參數(shù) 判斷本地是否有偏移量  有則根據(jù)偏移量繼續(xù)消費 無則重新消費
    val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
    } else {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
    }
    //解析json數(shù)據(jù)
    val dsStream = stream.map(item => item.value()).mapPartitions(partition => {
      partition.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)  //判斷是否包含key
        val uid = if (jsonObject.containsKey("uid")) jsonObject.getString("uid") else ""
        val app_id = if (jsonObject.containsKey("app_id")) jsonObject.getString("app_id") else ""
        val device_id = if (jsonObject.containsKey("device_id")) jsonObject.getString("device_id") else ""
        val ip = if (jsonObject.containsKey("ip")) jsonObject.getString("ip") else ""
        val last_page_id = if (jsonObject.containsKey("last_page_id")) jsonObject.getString("last_page_id") else ""
        val pageid = if (jsonObject.containsKey("page_id")) jsonObject.getString("page_id") else ""
        val next_page_id = if (jsonObject.containsKey("next_page_id")) jsonObject.getString("next_page_id") else ""
        (uid, app_id, device_id, ip, last_page_id, pageid, next_page_id)
      })
    }).filter(item => {
      !item._5.equals("") && !item._6.equals("") && !item._7.equals("")
    })
    dsStream.cache()
    val pageValueDStream = dsStream.map(item => (item._5 + "_" + item._6 + "_" + item._7, 1))
    val resultDStream = pageValueDStream.reduceByKey(_ + _)
    resultDStream.foreachRDD(rdd => {
      rdd.foreachPartition(partition => {
        //在分區(qū)下獲取jdbc連接
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partition.foreach(item => {
            calcPageJumpCount(sqlProxy, item, client) //計算頁面跳轉(zhuǎn)個數(shù)
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          sqlProxy.shutdown(client)
        }
      })
    })

    ssc.sparkContext.addFile(this.getClass.getResource("/ip2region.db").getPath) //廣播文件
    val ipDStream = dsStream.mapPartitions(patitions => {
      val dbFile = SparkFiles.get("ip2region.db")
      val ipsearch = new DbSearcher(new DbConfig(), dbFile)
      patitions.map { item =>
        val ip = item._4
        val province = ipsearch.memorySearch(ip).getRegion().split("\\|")(2) //獲取ip詳情   中國|0|上海|上海市|有線通
        (province, 1l) //根據(jù)省份 統(tǒng)計點擊個數(shù)
      }
    }).reduceByKey(_ + _)


    ipDStream.foreachRDD(rdd => {
      //查詢mysql歷史數(shù)據(jù) 轉(zhuǎn)成rdd
      val ipSqlProxy = new SqlProxy()
      val ipClient = DataSourceUtil.getConnection
      try {
        val history_data = new ArrayBuffer[(String, Long)]()
        ipSqlProxy.executeQuery(ipClient, "select province,num from tmp_city_num_detail", null, new QueryCallback {
          override def process(rs: ResultSet): Unit = {
            while (rs.next()) {
              val tuple = (rs.getString(1), rs.getLong(2))
              history_data += tuple
            }
          }
        })
        val history_rdd = ssc.sparkContext.makeRDD(history_data)
        val resultRdd = history_rdd.fullOuterJoin(rdd).map(item => {
          val province = item._1
          val nums = item._2._1.getOrElse(0l) + item._2._2.getOrElse(0l)
          (province, nums)
        })
        resultRdd.foreachPartition(partitions => {
          val sqlProxy = new SqlProxy()
          val client = DataSourceUtil.getConnection
          try {
            partitions.foreach(item => {
              val province = item._1
              val num = item._2
              //修改mysql數(shù)據(jù) 并重組返回最新結(jié)果數(shù)據(jù)
              sqlProxy.executeUpdate(client, "insert into tmp_city_num_detail(province,num)values(?,?) on duplicate key update num=?",
                Array(province, num, num))
            })
          } catch {
            case e: Exception => e.printStackTrace()
          } finally {
            sqlProxy.shutdown(client)
          }
        })
        val top3Rdd = resultRdd.sortBy[Long](_._2, false).take(3)
        sqlProxy.executeUpdate(ipClient, "truncate table top_city_num", null)
        top3Rdd.foreach(item => {
          sqlProxy.executeUpdate(ipClient, "insert into top_city_num (province,num) values(?,?)", Array(item._1, item._2))
        })
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(ipClient)
      }
    })

    //計算轉(zhuǎn)換率
    //處理完 業(yè)務(wù)邏輯后 手動提交offset維護(hù)到本地 mysql中
    stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        calcJumRate(sqlProxy, client) //計算轉(zhuǎn)換率
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(client)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }

  /**
    * 計算頁面跳轉(zhuǎn)個數(shù)
    *
    * @param sqlProxy
    * @param item
    * @param client
    */
  def calcPageJumpCount(sqlProxy: SqlProxy, item: (String, Int), client: Connection): Unit = {
    val keys = item._1.split("_")
    var num: Long = item._2
    val page_id = keys(1).toInt //獲取當(dāng)前page_id
    val last_page_id = keys(0).toInt //獲取上一page_id
    val next_page_id = keys(2).toInt //獲取下頁面page_id
    //查詢當(dāng)前page_id的歷史num個數(shù)
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(page_id), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          num += rs.getLong(1)
        }
        rs.close()
      }

      //對num 進(jìn)行修改 并且判斷當(dāng)前page_id是否為首頁
      if (page_id == 1) {
        sqlProxy.executeUpdate(client, "insert into page_jump_rate(last_page_id,page_id,next_page_id,num,jump_rate)" +
          "values(?,?,?,?,?) on duplicate key update num=num+?", Array(last_page_id, page_id, next_page_id, num, "100%", num))
      } else {
        sqlProxy.executeUpdate(client, "insert into page_jump_rate(last_page_id,page_id,next_page_id,num)" +
          "values(?,?,?,?) on duplicate key update num=num+?", Array(last_page_id, page_id, next_page_id, num, num))
      }
    })
  }

  /**
    * 計算轉(zhuǎn)換率
    */
  def calcJumRate(sqlProxy: SqlProxy, client: Connection): Unit = {
    var page1_num = 0l
    var page2_num = 0l
    var page3_num = 0l
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(1), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          page1_num = rs.getLong(1)
        }
      }
    })
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(2), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          page2_num = rs.getLong(1)
        }
      }
    })
    sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(3), new QueryCallback {
      override def process(rs: ResultSet): Unit = {
        while (rs.next()) {
          page3_num = rs.getLong(1)
        }
      }
    })
    val nf = NumberFormat.getPercentInstance
    val page1ToPage2Rate = if (page1_num == 0) "0%" else nf.format(page2_num.toDouble / page1_num.toDouble)
    val page2ToPage3Rate = if (page2_num == 0) "0%" else nf.format(page3_num.toDouble / page2_num.toDouble)
    sqlProxy.executeUpdate(client, "update page_jump_rate set jump_rate=? where page_id=?", Array(page1ToPage2Rate, 2))
    sqlProxy.executeUpdate(client, "update page_jump_rate set jump_rate=? where page_id=?", Array(page2ToPage3Rate, 3))
  }

}

指標(biāo)六:實時統(tǒng)計視頻播放各時長

import java.lang
import java.sql.{Connection, ResultSet}

import com.atguigu.qzpoint.bean.LearnModel
import com.atguigu.qzpoint.util.{DataSourceUtil, ParseJsonData, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

object CourseLearnStreaming {
  private val groupid = "course_learn_test1"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
      .set("spark.streaming.kafka.maxRatePerPartition", "30")
      .set("spark.streaming.stopGracefullyOnShutdown", "true")

    val ssc = new StreamingContext(conf, Seconds(3))
    val topics = Array("course_learn")
    val kafkaMap: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: lang.Boolean)
    )
    //查詢mysql是否存在偏移量
    val sqlProxy = new SqlProxy()
    val offsetMap = new mutable.HashMap[TopicPartition, Long]()
    val client = DataSourceUtil.getConnection
    try {
      sqlProxy.executeQuery(client, "select *from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            val model = new TopicPartition(rs.getString(2), rs.getInt(3))
            val offset = rs.getLong(4)
            offsetMap.put(model, offset)
          }
          rs.close()
        }
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      sqlProxy.shutdown(client)
    }
    //設(shè)置kafka消費數(shù)據(jù)的參數(shù) 判斷本地是否有偏移量  有則根據(jù)偏移量繼續(xù)消費 無則重新消費
    val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
    } else {
      KafkaUtils.createDirectStream(
        ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
    }

    //解析json數(shù)據(jù)
    val dsStream = stream.mapPartitions(partitions => {
      partitions.map(item => {
        val json = item.value()
        val jsonObject = ParseJsonData.getJsonData(json)
        val userId = jsonObject.getIntValue("uid")
        val cwareid = jsonObject.getIntValue("cwareid")
        val videoId = jsonObject.getIntValue("videoid")
        val chapterId = jsonObject.getIntValue("chapterid")
        val edutypeId = jsonObject.getIntValue("edutypeid")
        val subjectId = jsonObject.getIntValue("subjectid")
        val sourceType = jsonObject.getString("sourceType")
        val speed = jsonObject.getIntValue("speed")
        val ts = jsonObject.getLong("ts")
        val te = jsonObject.getLong("te")
        val ps = jsonObject.getIntValue("ps")
        val pe = jsonObject.getIntValue("pe")
        LearnModel(userId, cwareid, videoId, chapterId, edutypeId, subjectId, sourceType, speed, ts, te, ps, pe)
      })
    })

    dsStream.foreachRDD(rdd => {
      rdd.cache()
      //統(tǒng)計播放視頻 有效時長 完成時長 總時長
      rdd.groupBy(item => item.userId + "_" + item.cwareId + "_" + item.videoId).foreachPartition(partitoins => {
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partitoins.foreach { case (key, iters) =>
            calcVideoTime(key, iters, sqlProxy, client) //計算視頻時長
          }
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          sqlProxy.shutdown(client)
        }
      })
      //統(tǒng)計章節(jié)下視頻播放總時長
      rdd.mapPartitions(partitions => {
        partitions.map(item => {
          val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
          val key = item.chapterId
          (key, totaltime)
        })
      }).reduceByKey(_ + _)
        .foreachPartition(partitoins => {
          val sqlProxy = new SqlProxy()
          val client = DataSourceUtil.getConnection
          try {
            partitoins.foreach(item => {
              sqlProxy.executeUpdate(client, "insert into chapter_learn_detail(chapterid,totaltime) values(?,?) on duplicate key" +
                " update totaltime=totaltime+?", Array(item._1, item._2, item._2))
            })
          } catch {
            case e: Exception => e.printStackTrace()
          } finally {
            sqlProxy.shutdown(client)
          }
        })

      //統(tǒng)計課件下的總播放時長
      rdd.mapPartitions(partitions => {
        partitions.map(item => {
          val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
          val key = item.cwareId
          (key, totaltime)
        })
      }).reduceByKey(_ + _).foreachPartition(partitions => {
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partitions.foreach(item => {
            sqlProxy.executeUpdate(client, "insert into cwareid_learn_detail(cwareid,totaltime) values(?,?) on duplicate key " +
              "update totaltime=totaltime+?", Array(item._1, item._2, item._2))
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          sqlProxy.shutdown(client)
        }
      })

      //統(tǒng)計輔導(dǎo)下的總播放時長
      rdd.mapPartitions(partitions => {
        partitions.map(item => {
          val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
          val key = item.edutypeId
          (key, totaltime)
        })
      }).reduceByKey(_ + _).foreachPartition(partitions => {
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partitions.foreach(item => {
            sqlProxy.executeUpdate(client, "insert into edutype_learn_detail(edutypeid,totaltime) values(?,?) on duplicate key " +
              "update totaltime=totaltime+?", Array(item._1, item._2, item._2))
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          sqlProxy.shutdown(client)
        }
      })

      //統(tǒng)計同一資源平臺下的總播放時長
      rdd.mapPartitions(partitions => {
        partitions.map(item => {
          val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
          val key = item.sourceType
          (key, totaltime)
        })
      }).reduceByKey(_ + _).foreachPartition(partitions => {
        val sqlProxy = new SqlProxy()
        val client = DataSourceUtil.getConnection
        try {
          partitions.foreach(item => {
            sqlProxy.executeUpdate(client, "insert into sourcetype_learn_detail (sourcetype_learn,totaltime) values(?,?) on duplicate key " +
              "update totaltime=totaltime+?", Array(item._1, item._2, item._2))
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          sqlProxy.shutdown(client)
        }
      })
      // 統(tǒng)計同一科目下的播放總時長
      rdd.mapPartitions(partitions => {
        partitions.map(item => {
          val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
          val key = item.subjectId
          (key, totaltime)
        })
      }).reduceByKey(_ + _).foreachPartition(partitons => {
        val sqlProxy = new SqlProxy()
        val clinet = DataSourceUtil.getConnection
        try {
          partitons.foreach(item => {
            sqlProxy.executeUpdate(clinet, "insert into subject_learn_detail(subjectid,totaltime) values(?,?) on duplicate key " +
              "update totaltime=totaltime+?", Array(item._1, item._2, item._2))
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          sqlProxy.shutdown(clinet)
        }
      })

    })
    //計算轉(zhuǎn)換率
    //處理完 業(yè)務(wù)邏輯后 手動提交offset維護(hù)到本地 mysql中
    stream.foreachRDD(rdd => {
      val sqlProxy = new SqlProxy()
      val client = DataSourceUtil.getConnection
      try {
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (or <- offsetRanges) {
          sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
            Array(groupid, or.topic, or.partition.toString, or.untilOffset))
        }
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        sqlProxy.shutdown(client)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }

  /**
    * 計算視頻 有效時長  完成時長 總時長
    *
    * @param key
    * @param iters
    * @param sqlProxy
    * @param client
    */
  def calcVideoTime(key: String, iters: Iterable[LearnModel], sqlProxy: SqlProxy, client: Connection) = {
    val keys = key.split("_")
    val userId = keys(0).toInt
    val cwareId = keys(1).toInt
    val videoId = keys(2).toInt
    //查詢歷史數(shù)據(jù)
    var interval_history = ""
    sqlProxy.executeQuery(client, "select play_interval from video_interval where userid=? and cwareid=? and videoid=?",
      Array(userId, cwareId, videoId), new QueryCallback {
        override def process(rs: ResultSet): Unit = {
          while (rs.next()) {
            interval_history = rs.getString(1)
          }
          rs.close()
        }
      })
    var effective_duration_sum = 0l //有效總時長
    var complete_duration_sum = 0l //完成總時長
    var cumulative_duration_sum = 0l //播放總時長
    val learnList = iters.toList.sortBy(item => item.ps) //轉(zhuǎn)成list 并根據(jù)開始區(qū)間升序排序
    learnList.foreach(item => {
      if ("".equals(interval_history)) {
        //沒有歷史區(qū)間
        val play_interval = item.ps + "-" + item.pe //有效區(qū)間
        val effective_duration = Math.ceil((item.te - item.ts) / 1000) //有效時長
        val complete_duration = item.pe - item.ps //完成時長
        effective_duration_sum += effective_duration.toLong
        cumulative_duration_sum += effective_duration.toLong
        complete_duration_sum += complete_duration
        interval_history = play_interval
      } else {
        //有歷史區(qū)間進(jìn)行對比
        val interval_arry = interval_history.split(",").sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt))
        val tuple = getEffectiveInterval(interval_arry, item.ps, item.pe)
        val complete_duration = tuple._1 //獲取實際有效完成時長
        val effective_duration = Math.ceil((item.te - item.ts) / 1000) / (item.pe - item.ps) * complete_duration //計算有效時長
        val cumulative_duration = Math.ceil((item.te - item.ts) / 1000) //累計時長
        interval_history = tuple._2
        effective_duration_sum += effective_duration.toLong
        complete_duration_sum += complete_duration
        cumulative_duration_sum += cumulative_duration.toLong
      }
      sqlProxy.executeUpdate(client, "insert into video_interval(userid,cwareid,videoid,play_interval) values(?,?,?,?) " +
        "on duplicate key update play_interval=?", Array(userId, cwareId, videoId, interval_history, interval_history))
      sqlProxy.executeUpdate(client, "insert into video_learn_detail(userid,cwareid,videoid,totaltime,effecttime,completetime) " +
        "values(?,?,?,?,?,?) on duplicate key update totaltime=totaltime+?,effecttime=effecttime+?,completetime=completetime+?",
        Array(userId, cwareId, videoId, cumulative_duration_sum, effective_duration_sum, complete_duration_sum, cumulative_duration_sum,
          effective_duration_sum, complete_duration_sum))
    })
  }

  /**
    * 計算有效區(qū)間
    *
    * @param array
    * @param start
    * @param end
    * @return
    */
  def getEffectiveInterval(array: Array[String], start: Int, end: Int) = {
    var effective_duration = end - start
    var bl = false //是否對有效時間進(jìn)行修改
    import scala.util.control.Breaks._
    breakable {
      for (i <- 0 until array.length) {
        //循環(huán)各區(qū)間段
        var historyStart = 0 //獲取其中一段的開始播放區(qū)間
        var historyEnd = 0 //獲取其中一段結(jié)束播放區(qū)間
        val item = array(i)
        try {
          historyStart = item.split("-")(0).toInt
          historyEnd = item.split("-")(1).toInt
        } catch {
          case e: Exception => throw new Exception("error array:" + array.mkString(","))
        }
        if (start >= historyStart && historyEnd >= end) {
          //已有數(shù)據(jù)占用全部播放時長 此次播放無效
          effective_duration = 0
          bl = true
          break()
        } else if (start <= historyStart && end > historyStart && end < historyEnd) {
          //和已有數(shù)據(jù)左側(cè)存在交集 扣除部分有效時間(以老數(shù)據(jù)為主進(jìn)行對照)
          effective_duration -= end - historyStart
          array(i) = start + "-" + historyEnd
          bl = true
        } else if (start > historyStart && start < historyEnd && end >= historyEnd) {
          //和已有數(shù)據(jù)右側(cè)存在交集 扣除部分有效時間
          effective_duration -= historyEnd - start
          array(i) = historyStart + "-" + end
          bl = true
        } else if (start < historyStart && end > historyEnd) {
          //現(xiàn)數(shù)據(jù) 大于舊數(shù)據(jù) 扣除舊數(shù)據(jù)所有有效時間
          effective_duration -= historyEnd - historyStart
          array(i) = start + "-" + end
          bl = true
        }
      }
    }
    val result = bl match {
      case false => {
        //沒有修改原array 沒有交集 進(jìn)行新增
        val distinctArray2 = ArrayBuffer[String]()
        distinctArray2.appendAll(array)
        distinctArray2.append(start + "-" + end)
        val distinctArray = distinctArray2.distinct.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt))
        val tmpArray = ArrayBuffer[String]()
        tmpArray.append(distinctArray(0))
        for (i <- 1 until distinctArray.length) {
          val item = distinctArray(i).split("-")
          val tmpItem = tmpArray(tmpArray.length - 1).split("-")
          val itemStart = item(0)
          val itemEnd = item(1)
          val tmpItemStart = tmpItem(0)
          val tmpItemEnd = tmpItem(1)
          if (tmpItemStart.toInt < itemStart.toInt && tmpItemEnd.toInt < itemStart.toInt) {
            //沒有交集
            tmpArray.append(itemStart + "-" + itemEnd)
          } else {
            //有交集
            val resultStart = tmpItemStart
            val resultEnd = if (tmpItemEnd.toInt > itemEnd.toInt) tmpItemEnd else itemEnd
            tmpArray(tmpArray.length - 1) = resultStart + "-" + resultEnd
          }
        }
        val play_interval = tmpArray.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt)).mkString(",")
        play_interval
      }
      case true => {
        //修改了原array 進(jìn)行區(qū)間重組
        val distinctArray = array.distinct.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt))
        val tmpArray = ArrayBuffer[String]()
        tmpArray.append(distinctArray(0))
        for (i <- 1 until distinctArray.length) {
          val item = distinctArray(i).split("-")
          val tmpItem = tmpArray(tmpArray.length - 1).split("-")
          val itemStart = item(0)
          val itemEnd = item(1)
          val tmpItemStart = tmpItem(0)
          val tmpItemEnd = tmpItem(1)
          if (tmpItemStart.toInt < itemStart.toInt && tmpItemEnd.toInt < itemStart.toInt) {
            //沒有交集
            tmpArray.append(itemStart + "-" + itemEnd)
          } else {
            //有交集
            val resultStart = tmpItemStart
            val resultEnd = if (tmpItemEnd.toInt > itemEnd.toInt) tmpItemEnd else itemEnd
            tmpArray(tmpArray.length - 1) = resultStart + "-" + resultEnd
          }
        }
        val play_interval = tmpArray.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt)).mkString(",")
        play_interval
      }
    }
    (effective_duration, result)
  }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鸭轮,一起剝皮案震驚了整個濱河市窃爷,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌医吊,老刑警劉巖遮咖,帶你破解...
    沈念sama閱讀 218,451評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件御吞,死亡現(xiàn)場離奇詭異陶珠,居然都是意外死亡享钞,警方通過查閱死者的電腦和手機(jī)栗竖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評論 3 394
  • 文/潘曉璐 我一進(jìn)店門狐肢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來份名,“玉大人僵腺,你說我怎么就攤上這事∑占啵” “怎么了凯正?”我有些...
    開封第一講書人閱讀 164,782評論 0 354
  • 文/不壞的土叔 我叫張陵漆际,是天一觀的道長。 經(jīng)常有香客問我,道長擂找,這世上最難降的妖魔是什么贯涎? 我笑而不...
    開封第一講書人閱讀 58,709評論 1 294
  • 正文 為了忘掉前任塘雳,我火速辦了婚禮败明,結(jié)果婚禮上妻顶,老公的妹妹穿的比我還像新娘蜒车。我一直安慰自己酿愧,他們只是感情好嬉挡,可當(dāng)我...
    茶點故事閱讀 67,733評論 6 392
  • 文/花漫 我一把揭開白布棘伴。 她就那樣靜靜地躺著焊夸,像睡著了一般。 火紅的嫁衣襯著肌膚如雪饭冬。 梳的紋絲不亂的頭發(fā)上昌抠,一...
    開封第一講書人閱讀 51,578評論 1 305
  • 那天炊苫,我揣著相機(jī)與錄音侨艾,去河邊找鬼唠梨。 笑死,一個胖子當(dāng)著我的面吹牛茬故,可吹牛的內(nèi)容都是我干的均牢。 我是一名探鬼主播徘跪,決...
    沈念sama閱讀 40,320評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼垮庐,長吁一口氣:“原來是場噩夢啊……” “哼哨查!你這毒婦竟也來了剧辐?” 一聲冷哼從身側(cè)響起荧关,我...
    開封第一講書人閱讀 39,241評論 0 276
  • 序言:老撾萬榮一對情侶失蹤忍啤,失蹤者是張志新(化名)和其女友劉穎同波,沒想到半個月后未檩,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體冤狡,經(jīng)...
    沈念sama閱讀 45,686評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,878評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了浑测。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片迁央。...
    茶點故事閱讀 39,992評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡岖圈,死狀恐怖蜂科,靈堂內(nèi)的尸體忽然破棺而出导匣,到底是詐尸還是另有隱情茸时,我是刑警寧澤可都,帶...
    沈念sama閱讀 35,715評論 5 346
  • 正文 年R本政府宣布渠牲,位于F島的核電站签杈,受9級特大地震影響芹壕,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜通孽,卻給世界環(huán)境...
    茶點故事閱讀 41,336評論 3 330
  • 文/蒙蒙 一背苦、第九天 我趴在偏房一處隱蔽的房頂上張望行剂。 院中可真熱鬧厚宰,春花似錦铲觉、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,912評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽撕贞。三九已至测垛,卻和暖如春食侮,著一層夾襖步出監(jiān)牢的瞬間锯七,已是汗流浹背眉尸。 一陣腳步聲響...
    開封第一講書人閱讀 33,040評論 1 270
  • 我被黑心中介騙來泰國打工噪猾, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留袱蜡,地道東北人。 一個月前我還...
    沈念sama閱讀 48,173評論 3 370
  • 正文 我出身青樓镜沽,卻偏偏與公主長得像贱田,于是被迫代替她去往敵國和親男摧。 傳聞我的和親對象是個殘疾皇子彩倚,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,947評論 2 355

推薦閱讀更多精彩內(nèi)容