第1章 項目需求及架構(gòu)設(shè)計
1.1 需求
- 數(shù)據(jù)采集平臺搭建
- Kafka嫡丙、Zookeeper中間件準(zhǔn)備
- 下游Spark Streaming對接Kafka接收數(shù)據(jù)菱阵,
- 實現(xiàn)vip個數(shù)統(tǒng)計
- 欄目打標(biāo)簽功能
- 做題正確率與掌握度的實時計算功能佑惠。
1.2 項目框架
1.2.1 技術(shù)選型
一、數(shù)據(jù)存儲:Kafka敏弃、MySql
二笛辟、數(shù)據(jù)處理:Spark
三、其他組件:Zookeeper
1.2.2 流程設(shè)計
流程圖
1.3 思考
(1)Spark Streaming 下每個stage的耗時由什么決定
(2)Spark Streaming task發(fā)生數(shù)據(jù)傾斜如何解決
(3)Spark Streaming操作mysql時宫患,相同維度的數(shù)據(jù)如何保證線程安全問題
(4)如何保證kill Spark Streaming任務(wù)的時候不丟失數(shù)據(jù)
(5)如何保證Spark Streaming的第一次啟動和kill后第二次啟動時據(jù)不丟失數(shù)據(jù)
(6)Spark Streaming下如何正確操作mysql(如何正確使用連接)
(7)MySql建表時 索引注意
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>education-online</artifactId>
<groupId>com.qingfeng</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>com_qingfeng_warehouse</artifactId>
<dependencies>
<!-- Spark的依賴引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- 引入Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.1</version>
<executions>
<execution>
<id>compile-scala</id>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
MySQL配置文件
jdbc.url=jdbc:mysql://hadoop102:3306/course_learn?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false
jdbc.user=root
jdbc.password=000000
讀取配置文件工具類
import java.io.InputStream;
import java.util.Properties;
/**
*
* 讀取配置文件工具類
*/
public class ConfigurationManager {
private static Properties prop = new Properties();
static {
try {
InputStream inputStream = ConfigurationManager.class.getClassLoader()
.getResourceAsStream("comerce.properties"); //自動加載配置文件
prop.load(inputStream);
} catch (Exception e) {
e.printStackTrace();
}
}
//獲取配置項
public static String getProperty(String key) {
return prop.getProperty(key);
}
//獲取布爾類型的配置項
public static boolean getBoolean(String key) {
String value = prop.getProperty(key);
try {
return Boolean.valueOf(value); //解析布爾值
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
}
Json解析工具類
package com.atguigu.qzpoint.util;
import com.alibaba.fastjson.JSONObject;
public class ParseJsonData {
public static JSONObject getJsonData(String data) {
try {
return JSONObject.parseObject(data); Java代碼
} catch (Exception e) {
return null;
}
}
}
Druid連接池
import com.alibaba.druid.pool.DruidDataSourceFactory;
import javax.sql.DataSource;
import java.io.Serializable;
import java.sql.*;
import java.util.Properties;
/**
* 德魯伊連接池
*/
public class DataSourceUtil implements Serializable {
public static DataSource dataSource = null;
static { //數(shù)據(jù)庫連接池放在靜態(tài)代碼塊中刊懈,保證只創(chuàng)建一份
try {
Properties props = new Properties();
props.setProperty("url", ConfigurationManager.getProperty("jdbc.url"));
props.setProperty("username", ConfigurationManager.getProperty("jdbc.user"));
props.setProperty("password", ConfigurationManager.getProperty("jdbc.password"));
props.setProperty("initialSize", "5"); //初始化大小
props.setProperty("maxActive", "10"); //最大連接
props.setProperty("minIdle", "5"); //最小連接
props.setProperty("maxWait", "60000"); //等待時長
props.setProperty("timeBetweenEvictionRunsMillis", "2000");//配置多久進(jìn)行一次檢測,檢測需要關(guān)閉的連接 單位毫秒
props.setProperty("minEvictableIdleTimeMillis", "600000");//配置連接在連接池中最小生存時間 單位毫秒
props.setProperty("maxEvictableIdleTimeMillis", "900000"); //配置連接在連接池中最大生存時間 單位毫秒
props.setProperty("validationQuery", "select 1");
props.setProperty("testWhileIdle", "true");
props.setProperty("testOnBorrow", "false");
props.setProperty("testOnReturn", "false");
props.setProperty("keepAlive", "true");
props.setProperty("phyMaxUseCount", "100000");
// props.setProperty("driverClassName", "com.mysql.jdbc.Driver");
dataSource = DruidDataSourceFactory.createDataSource(props); 連接池只有一份
} catch (Exception e) {
e.printStackTrace();
}
}
//提供獲取連接的方法
public static Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
// 提供關(guān)閉資源的方法【connection是歸還到連接池】
// 提供關(guān)閉資源的方法 【方法重載】3 dql
public static void closeResource(ResultSet resultSet, PreparedStatement preparedStatement,
Connection connection) {
// 關(guān)閉結(jié)果集
// ctrl+alt+m 將java語句抽取成方法
closeResultSet(resultSet);
// 關(guān)閉語句執(zhí)行者
closePrepareStatement(preparedStatement);
// 關(guān)閉連接
closeConnection(connection);
}
private static void closeConnection(Connection connection) {
if (connection != null) { //防止空指針異常
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
private static void closePrepareStatement(PreparedStatement preparedStatement) {
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
private static void closeResultSet(ResultSet resultSet) {
if (resultSet != null) {
try {
resultSet.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
創(chuàng)建MySQL代理類
import java.sql.{Connection, PreparedStatement, ResultSet}
trait QueryCallback {
def process(rs: ResultSet)
}
class SqlProxy {
private var rs: ResultSet = _
private var psmt: PreparedStatement = _
/**
* 執(zhí)行修改語句
*
* @param conn
* @param sql
* @param params
* @return
*/
def executeUpdate(conn: Connection, sql: String, params: Array[Any]): Int = {
var rtn = 0
try {
psmt = conn.prepareStatement(sql)
if (params != null && params.length > 0) { //注意防止空指針異常
for (i <- 0 until params.length) {
psmt.setObject(i + 1, params(i))
}
}
rtn = psmt.executeUpdate()
} catch {
case e: Exception => e.printStackTrace()
}
rtn //狀態(tài)碼?
}
/**
* 執(zhí)行查詢語句
* @param conn
* @param sql
* @param params
* @return
*/
def executeQuery(conn: Connection, sql: String, params: Array[Any], queryCallback: QueryCallback) = {
rs = null
try {
psmt = conn.prepareStatement(sql)
if (params != null && params.length > 0) {
for (i <- 0 until params.length) {
psmt.setObject(i + 1, params(i))
}
}
rs = psmt.executeQuery()
queryCallback.process(rs)
} catch {
case e: Exception => e.printStackTrace()
}
}
def shutdown(conn: Connection): Unit = DataSourceUtil.closeResource(rs, psmt, conn)
}
指標(biāo)一:實時統(tǒng)計注冊人數(shù)代碼實現(xiàn)
import java.lang
import java.sql.ResultSet
import java.util.Random
import com.atguigu.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object RegisterStreaming {
private val groupid = "register_group_test"
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
.set("spark.streaming.kafka.maxRatePerPartition", "50") //設(shè)置每秒每個分區(qū)拉取信息最大條數(shù)
.set("spark.streaming.stopGracefullyOnShutdown", "true") //開啟優(yōu)雅的關(guān)閉
// 還應(yīng)該開啟背壓機(jī)制(削峰)
val ssc = new StreamingContext(conf, Seconds(3))
val topics = Array("register_topic")
val kafkaMap: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupid,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: lang.Boolean)
)
//設(shè)置checkpoint路徑
ssc.checkpoint("hdfs://hadoop102:9000/user/qingfeng/sparkstreaming/checkpoint")
//查詢mysql中是否有偏移量
val sqlProxy = new SqlProxy()
val offsetMap = new mutable.HashMap[TopicPartition, Long]()
val client = DataSourceUtil.getConnection
try {
sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
val model = new TopicPartition(rs.getString(2), rs.getInt(3)) //獲取topic娃闲、partition
val offset = rs.getLong(4) //獲取偏移量
offsetMap.put(model, offset)
}
rs.close() //關(guān)閉游標(biāo) }
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
//設(shè)置kafka消費數(shù)據(jù)的參數(shù) 判斷本地是否有偏移量 有則根據(jù)偏移量繼續(xù)消費 無則重新消費
val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
} else {
KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
}
//對數(shù)據(jù)先進(jìn)行過濾虚汛,后處理
val resultDStream = stream.filter(item => item.value().split("\t").length == 3). mapPartitions(partitions => {
partitions.map(item => {
val line = item.value()
val arr = line.split("\t")
val app_name = arr(1) match {
case "1" => "PC"
case "2" => "APP"
case _ => "Other"
}
(app_name, 1)
})
})
resultDStream.cache()
// resultDStream.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(60), Seconds(6)).print()
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum //本批次求和
val previousCount = state.getOrElse(0) //歷史數(shù)據(jù)
Some(currentCount + previousCount)
}
resultDStream.updateStateByKey(updateFunc).print()
// val dsStream = stream.filter(item => item.value().split("\t").length == 3)
// .mapPartitions(partitions =>
// partitions.map(item => {
// **val rand = new Random()**
// val line = item.value()
// val arr = line.split("\t")
// val app_id = arr(1)
// (rand.nextInt(3) + "_" + app_id, 1) //在key中添加隨機(jī)數(shù),防止數(shù)據(jù)傾斜 }))
// dsStream.print()
// val a = dsStream.reduceByKey(_ + _)
// a.print()
// a.map(item => {
// val appid = item._1.split("_")(1)
// (appid, item._2)
// }).reduceByKey(_ + _).print()
//處理完 業(yè)務(wù)邏輯后 手動提交offset維護(hù)到本地 mysql中
stream.foreachRDD(rdd => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
val offsetRanges: Array[OffsetRange] = **rdd**.**asInstanceOf[HasOffsetRanges].offsetRanges**
for (or <- offsetRanges) {
sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",Array(groupid, or.topic, or.partition.toString, or.untilOffset))
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
ssc.start()
ssc.awaitTermination()
}
}
offset表結(jié)構(gòu):
groupid | topic | partition | untiloffset |
---|---|---|---|
groupid01 | topic01 | 1 | 12123 |
MySQL中replace into用法詳解
replace into t(id, update_time) values(1, now());
或
replace into t(id, update_time) select 1, now();
replace into 跟 insert 功能類似皇帮,不同點在于:replace into 首先嘗試插入數(shù)據(jù)到表中卷哩,
1. 如果發(fā)現(xiàn)表中已經(jīng)有此行數(shù)據(jù)(根據(jù)主鍵或者唯一索引判斷)則先刪除此行數(shù)據(jù),然后插入新的數(shù)據(jù)属拾。
2. 否則将谊,直接插入新數(shù)據(jù)。
要注意的是:插入數(shù)據(jù)的表必須有主鍵或者是唯一索引渐白!否則的話尊浓,replace into 會直接插入數(shù)據(jù),這將導(dǎo)致表中出現(xiàn)重復(fù)的數(shù)據(jù)眠砾。
MySQL replace into 有三種形式:
1. replace into tbl_name(col_name, ...) values(...)
2. replace into tbl_name(col_name, ...) select ...
3. replace into tbl_name set col_name=value, ...
第一種形式類似于insert into的用法褒颈,
第二種replace select的用法也類似于insert select柒巫,這種用法并不一定要求列名匹配谷丸,事實上,MYSQL甚至不關(guān)心select返回的列名刨疼,它需要的是列的位置泉唁。
例如亭畜,replace into tb1( name, title, mood) select rname, rtitle, rmood from tb2;這個例子使用replace into從tb2中將所有數(shù)據(jù)導(dǎo)入tb1中拴鸵。
第三種replace set用法類似于update set用法劲藐,使用一個例如“SET col_name = col_name + 1”的賦值聘芜,則對位于右側(cè)的列名稱的引用會被作為DEFAULT(col_name)處理汰现。因此瞎饲,該賦值相當(dāng)于SET col_name = DEFAULT(col_name) + 1企软。
前兩種形式用的多些饭望。其中 “into” 關(guān)鍵字可以省略形庭,不過最好加上 “into”萨醒,這樣意思更加直觀富纸。另外,對于那些沒有給予值的列综慎,MySQL 將自動為這些列賦上默認(rèn)值示惊。
指標(biāo)二:實時統(tǒng)計學(xué)員做題正確率與知識點掌握度
import java.lang
import java.sql.{Connection, ResultSet}
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import com.atguigu.qzpoint.util.{DataSourceUtil, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
/**
* 知識點掌握度實時統(tǒng)計
*/
object QzPointStreaming {
private val groupid = "qz_point_group"
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
.set("spark.streaming.kafka.maxRatePerPartition", "50")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(conf, Seconds(3))
val topics = Array("qz_log")
val kafkaMap: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupid,
"auto.offset.reset" -> "earliest", //保證第一次消費不丟數(shù)據(jù)
"enable.auto.commit" -> (false: lang.Boolean)
)
//查詢mysql中是否存在偏移量
val sqlProxy = new SqlProxy()
val offsetMap = new mutable.HashMap[TopicPartition, Long]()
val client = DataSourceUtil.getConnection
try {
sqlProxy.executeQuery(client, "select * from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
val model = new TopicPartition(rs.getString(2), rs.getInt(3))
val offset = rs.getLong(4)
offsetMap.put(model, offset)
}
rs.close() //關(guān)閉游標(biāo)
}
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
//設(shè)置kafka消費數(shù)據(jù)的參數(shù) 判斷本地是否有偏移量 有則根據(jù)偏移量繼續(xù)消費 無則重新消費
val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
} else {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
}
//過濾不正常數(shù)據(jù) 獲取數(shù)據(jù)
val dsStream = stream.filter(item => item.value().split("\t").length == 6).
mapPartitions(partition => partition.map(item => {
val line = item.value()
val arr = line.split("\t")
val uid = arr(0) //用戶id
val courseid = arr(1) //課程id
val pointid = arr(2) //知識點id
val questionid = arr(3) //題目id
val istrue = arr(4) //是否正確
val createtime = arr(5) //創(chuàng)建時間
(uid, courseid, pointid, questionid, istrue, createtime)
}))
dsStream.foreachRDD(rdd => {
//獲取相同用戶 同一課程 同一知識點的數(shù)據(jù)
val groupRdd = rdd.groupBy(item => item._1 + "-" + item._2 + "-" + item._3)
groupRdd.foreachPartition(partition => {
//在分區(qū)下獲取jdbc連接
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partition.foreach { case (key, iters) =>
qzQuestionUpdate(key, iters, sqlProxy, client) //對題庫進(jìn)行更新操作
}
} catch {
case e: Exception => e.printStackTrace()
}
finally {
sqlProxy.shutdown(client)
}
}
)
})
//處理完 業(yè)務(wù)邏輯后 手動提交offset維護(hù)到本地 mysql中
stream.foreachRDD(rdd => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (or <- offsetRanges) {
sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
Array(groupid, or.topic, or.partition.toString, or.untilOffset))
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
ssc.start()
ssc.awaitTermination()
}
/**
* 對題目表進(jìn)行更新操作
*
* @param key
* @param iters
* @param sqlProxy
* @param client
* @return
*/
def qzQuestionUpdate(key: String, iters: Iterable[(String, String, String, String, String, String)], sqlProxy: SqlProxy, client: Connection) = {
val keys = key.split("-")
val userid = keys(0).toInt
val courseid = keys(1).toInt
val pointid = keys(2).toInt
val array = iters.toArray
val questionids = array.map(_._4).distinct //對當(dāng)前批次的數(shù)據(jù)下questionid 去重
//查詢歷史數(shù)據(jù)下的 questionid
var questionids_history: Array[String] = Array()
sqlProxy.executeQuery(client, "select questionids from qz_point_history where userid=? and courseid=? and pointid=?",
Array(userid, courseid, pointid), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
questionids_history = rs.getString(1).split(",")
}
rs.close() //關(guān)閉游標(biāo)
}
})
//獲取到歷史數(shù)據(jù)后再與當(dāng)前數(shù)據(jù)進(jìn)行拼接 去重
val resultQuestionid = questionids.union(questionids_history).distinct
val countSize = resultQuestionid.length
val resultQuestionid_str = resultQuestionid.mkString(",")
val qz_count = questionids.length //去重后的題個數(shù)
var qz_sum = array.length //獲取當(dāng)前批次題總數(shù)
var qz_istrue = array.filter(_._5.equals("1")).size //獲取當(dāng)前批次做正確的題個數(shù)
val createtime = array.map(_._6).min //獲取最早的創(chuàng)建時間 作為表中創(chuàng)建時間
//更新qz_point_set 記錄表 此表用于存當(dāng)前用戶做過的questionid表
val updatetime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now())
sqlProxy.executeUpdate(client, "insert into qz_point_history(userid,courseid,pointid,questionids,createtime,updatetime) values(?,?,?,?,?,?) " +
" on duplicate key update questionids=?,updatetime=?", Array(userid, courseid, pointid, resultQuestionid_str, createtime, createtime, resultQuestionid_str, updatetime))
var qzSum_history = 0
var istrue_history = 0
sqlProxy.executeQuery(client, "select qz_sum,qz_istrue from qz_point_detail where userid=? and courseid=? and pointid=?",
Array(userid, courseid, pointid), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
qzSum_history += rs.getInt(1)
istrue_history += rs.getInt(2)
}
rs.close()
}
})
qz_sum += qzSum_history
qz_istrue += istrue_history
val correct_rate = qz_istrue.toDouble / qz_sum.toDouble //計算正確率
//計算完成率
//假設(shè)每個知識點下一共有30道題 先計算題的做題情況 再計知識點掌握度
val qz_detail_rate = countSize.toDouble / 30 //算出做題情況乘以 正確率 得出完成率 假如30道題都做了那么正確率等于 知識點掌握度
val mastery_rate = qz_detail_rate * correct_rate
sqlProxy.executeUpdate(client, "insert into qz_point_detail(userid,courseid,pointid,qz_sum,qz_count,qz_istrue,correct_rate,mastery_rate,createtime,updatetime)" +
" values(?,?,?,?,?,?,?,?,?,?) on duplicate key update qz_sum=?,qz_count=?,qz_istrue=?,correct_rate=?,mastery_rate=?,updatetime=?",
Array(userid, courseid, pointid, qz_sum, countSize, qz_istrue, correct_rate, mastery_rate, createtime, updatetime, qz_sum, countSize, qz_istrue, correct_rate, mastery_rate, updatetime))
}
}
指標(biāo)三:實時統(tǒng)計商品頁到訂單頁类嗤,訂單頁到支付頁轉(zhuǎn)換率
import java.lang
import java.sql.{Connection, ResultSet}
import java.text.NumberFormat
import com.atguigu.qzpoint.util.{DataSourceUtil, ParseJsonData, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkFiles}
import org.lionsoul.ip2region.{DbConfig, DbSearcher}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
/**
* 頁面轉(zhuǎn)換率實時統(tǒng)計
*/
object PageStreaming {
private val groupid = "vip_count_groupid"
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
.set("spark.streaming.kafka.maxRatePerPartition", "30")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(conf, Seconds(3))
val topics = Array("page_topic")
val kafkaMap: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupid,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: lang.Boolean)
)
//查詢mysql中是否存在偏移量
val sqlProxy = new SqlProxy()
val offsetMap = new mutable.HashMap[TopicPartition, Long]()
val client = DataSourceUtil.getConnection
try {
sqlProxy.executeQuery(client, "select *from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
val model = new TopicPartition(rs.getString(2), rs.getInt(3))
val offset = rs.getLong(4)
offsetMap.put(model, offset)
}
rs.close()
}
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
//設(shè)置kafka消費數(shù)據(jù)的參數(shù) 判斷本地是否有偏移量 有則根據(jù)偏移量繼續(xù)消費 無則重新消費
val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
} else {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
}
//解析json數(shù)據(jù)
val dsStream = stream.map(item => item.value()).mapPartitions(partition => {
partition.map(item => {
val jsonObject = ParseJsonData.getJsonData(item) //判斷是否包含key
val uid = if (jsonObject.containsKey("uid")) jsonObject.getString("uid") else ""
val app_id = if (jsonObject.containsKey("app_id")) jsonObject.getString("app_id") else ""
val device_id = if (jsonObject.containsKey("device_id")) jsonObject.getString("device_id") else ""
val ip = if (jsonObject.containsKey("ip")) jsonObject.getString("ip") else ""
val last_page_id = if (jsonObject.containsKey("last_page_id")) jsonObject.getString("last_page_id") else ""
val pageid = if (jsonObject.containsKey("page_id")) jsonObject.getString("page_id") else ""
val next_page_id = if (jsonObject.containsKey("next_page_id")) jsonObject.getString("next_page_id") else ""
(uid, app_id, device_id, ip, last_page_id, pageid, next_page_id)
})
}).filter(item => {
!item._5.equals("") && !item._6.equals("") && !item._7.equals("")
})
dsStream.cache()
val pageValueDStream = dsStream.map(item => (item._5 + "_" + item._6 + "_" + item._7, 1))
val resultDStream = pageValueDStream.reduceByKey(_ + _)
resultDStream.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
//在分區(qū)下獲取jdbc連接
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partition.foreach(item => {
calcPageJumpCount(sqlProxy, item, client) //計算頁面跳轉(zhuǎn)個數(shù)
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
})
ssc.sparkContext.addFile(this.getClass.getResource("/ip2region.db").getPath) //廣播文件
val ipDStream = dsStream.mapPartitions(patitions => {
val dbFile = SparkFiles.get("ip2region.db")
val ipsearch = new DbSearcher(new DbConfig(), dbFile)
patitions.map { item =>
val ip = item._4
val province = ipsearch.memorySearch(ip).getRegion().split("\\|")(2) //獲取ip詳情 中國|0|上海|上海市|有線通
(province, 1l) //根據(jù)省份 統(tǒng)計點擊個數(shù)
}
}).reduceByKey(_ + _)
ipDStream.foreachRDD(rdd => {
//查詢mysql歷史數(shù)據(jù) 轉(zhuǎn)成rdd
val ipSqlProxy = new SqlProxy()
val ipClient = DataSourceUtil.getConnection
try {
val history_data = new ArrayBuffer[(String, Long)]()
ipSqlProxy.executeQuery(ipClient, "select province,num from tmp_city_num_detail", null, new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
val tuple = (rs.getString(1), rs.getLong(2))
history_data += tuple
}
}
})
val history_rdd = ssc.sparkContext.makeRDD(history_data)
val resultRdd = history_rdd.fullOuterJoin(rdd).map(item => {
val province = item._1
val nums = item._2._1.getOrElse(0l) + item._2._2.getOrElse(0l)
(province, nums)
})
resultRdd.foreachPartition(partitions => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partitions.foreach(item => {
val province = item._1
val num = item._2
//修改mysql數(shù)據(jù) 并重組返回最新結(jié)果數(shù)據(jù)
sqlProxy.executeUpdate(client, "insert into tmp_city_num_detail(province,num)values(?,?) on duplicate key update num=?",
Array(province, num, num))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
val top3Rdd = resultRdd.sortBy[Long](_._2, false).take(3)
sqlProxy.executeUpdate(ipClient, "truncate table top_city_num", null)
top3Rdd.foreach(item => {
sqlProxy.executeUpdate(ipClient, "insert into top_city_num (province,num) values(?,?)", Array(item._1, item._2))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(ipClient)
}
})
//計算轉(zhuǎn)換率
//處理完 業(yè)務(wù)邏輯后 手動提交offset維護(hù)到本地 mysql中
stream.foreachRDD(rdd => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
calcJumRate(sqlProxy, client) //計算轉(zhuǎn)換率
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (or <- offsetRanges) {
sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
Array(groupid, or.topic, or.partition.toString, or.untilOffset))
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
ssc.start()
ssc.awaitTermination()
}
/**
* 計算頁面跳轉(zhuǎn)個數(shù)
*
* @param sqlProxy
* @param item
* @param client
*/
def calcPageJumpCount(sqlProxy: SqlProxy, item: (String, Int), client: Connection): Unit = {
val keys = item._1.split("_")
var num: Long = item._2
val page_id = keys(1).toInt //獲取當(dāng)前page_id
val last_page_id = keys(0).toInt //獲取上一page_id
val next_page_id = keys(2).toInt //獲取下頁面page_id
//查詢當(dāng)前page_id的歷史num個數(shù)
sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(page_id), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
num += rs.getLong(1)
}
rs.close()
}
//對num 進(jìn)行修改 并且判斷當(dāng)前page_id是否為首頁
if (page_id == 1) {
sqlProxy.executeUpdate(client, "insert into page_jump_rate(last_page_id,page_id,next_page_id,num,jump_rate)" +
"values(?,?,?,?,?) on duplicate key update num=num+?", Array(last_page_id, page_id, next_page_id, num, "100%", num))
} else {
sqlProxy.executeUpdate(client, "insert into page_jump_rate(last_page_id,page_id,next_page_id,num)" +
"values(?,?,?,?) on duplicate key update num=num+?", Array(last_page_id, page_id, next_page_id, num, num))
}
})
}
/**
* 計算轉(zhuǎn)換率
*/
def calcJumRate(sqlProxy: SqlProxy, client: Connection): Unit = {
var page1_num = 0l
var page2_num = 0l
var page3_num = 0l
sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(1), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
page1_num = rs.getLong(1)
}
}
})
sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(2), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
page2_num = rs.getLong(1)
}
}
})
sqlProxy.executeQuery(client, "select num from page_jump_rate where page_id=?", Array(3), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
page3_num = rs.getLong(1)
}
}
})
val nf = NumberFormat.getPercentInstance
val page1ToPage2Rate = if (page1_num == 0) "0%" else nf.format(page2_num.toDouble / page1_num.toDouble)
val page2ToPage3Rate = if (page2_num == 0) "0%" else nf.format(page3_num.toDouble / page2_num.toDouble)
sqlProxy.executeUpdate(client, "update page_jump_rate set jump_rate=? where page_id=?", Array(page1ToPage2Rate, 2))
sqlProxy.executeUpdate(client, "update page_jump_rate set jump_rate=? where page_id=?", Array(page2ToPage3Rate, 3))
}
}
指標(biāo)六:實時統(tǒng)計視頻播放各時長
import java.lang
import java.sql.{Connection, ResultSet}
import com.atguigu.qzpoint.bean.LearnModel
import com.atguigu.qzpoint.util.{DataSourceUtil, ParseJsonData, QueryCallback, SqlProxy}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
object CourseLearnStreaming {
private val groupid = "course_learn_test1"
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
.set("spark.streaming.kafka.maxRatePerPartition", "30")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(conf, Seconds(3))
val topics = Array("course_learn")
val kafkaMap: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupid,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: lang.Boolean)
)
//查詢mysql是否存在偏移量
val sqlProxy = new SqlProxy()
val offsetMap = new mutable.HashMap[TopicPartition, Long]()
val client = DataSourceUtil.getConnection
try {
sqlProxy.executeQuery(client, "select *from `offset_manager` where groupid=?", Array(groupid), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
val model = new TopicPartition(rs.getString(2), rs.getInt(3))
val offset = rs.getLong(4)
offsetMap.put(model, offset)
}
rs.close()
}
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
//設(shè)置kafka消費數(shù)據(jù)的參數(shù) 判斷本地是否有偏移量 有則根據(jù)偏移量繼續(xù)消費 無則重新消費
val stream: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.isEmpty) {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap))
} else {
KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaMap, offsetMap))
}
//解析json數(shù)據(jù)
val dsStream = stream.mapPartitions(partitions => {
partitions.map(item => {
val json = item.value()
val jsonObject = ParseJsonData.getJsonData(json)
val userId = jsonObject.getIntValue("uid")
val cwareid = jsonObject.getIntValue("cwareid")
val videoId = jsonObject.getIntValue("videoid")
val chapterId = jsonObject.getIntValue("chapterid")
val edutypeId = jsonObject.getIntValue("edutypeid")
val subjectId = jsonObject.getIntValue("subjectid")
val sourceType = jsonObject.getString("sourceType")
val speed = jsonObject.getIntValue("speed")
val ts = jsonObject.getLong("ts")
val te = jsonObject.getLong("te")
val ps = jsonObject.getIntValue("ps")
val pe = jsonObject.getIntValue("pe")
LearnModel(userId, cwareid, videoId, chapterId, edutypeId, subjectId, sourceType, speed, ts, te, ps, pe)
})
})
dsStream.foreachRDD(rdd => {
rdd.cache()
//統(tǒng)計播放視頻 有效時長 完成時長 總時長
rdd.groupBy(item => item.userId + "_" + item.cwareId + "_" + item.videoId).foreachPartition(partitoins => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partitoins.foreach { case (key, iters) =>
calcVideoTime(key, iters, sqlProxy, client) //計算視頻時長
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
//統(tǒng)計章節(jié)下視頻播放總時長
rdd.mapPartitions(partitions => {
partitions.map(item => {
val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
val key = item.chapterId
(key, totaltime)
})
}).reduceByKey(_ + _)
.foreachPartition(partitoins => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partitoins.foreach(item => {
sqlProxy.executeUpdate(client, "insert into chapter_learn_detail(chapterid,totaltime) values(?,?) on duplicate key" +
" update totaltime=totaltime+?", Array(item._1, item._2, item._2))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
//統(tǒng)計課件下的總播放時長
rdd.mapPartitions(partitions => {
partitions.map(item => {
val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
val key = item.cwareId
(key, totaltime)
})
}).reduceByKey(_ + _).foreachPartition(partitions => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partitions.foreach(item => {
sqlProxy.executeUpdate(client, "insert into cwareid_learn_detail(cwareid,totaltime) values(?,?) on duplicate key " +
"update totaltime=totaltime+?", Array(item._1, item._2, item._2))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
//統(tǒng)計輔導(dǎo)下的總播放時長
rdd.mapPartitions(partitions => {
partitions.map(item => {
val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
val key = item.edutypeId
(key, totaltime)
})
}).reduceByKey(_ + _).foreachPartition(partitions => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partitions.foreach(item => {
sqlProxy.executeUpdate(client, "insert into edutype_learn_detail(edutypeid,totaltime) values(?,?) on duplicate key " +
"update totaltime=totaltime+?", Array(item._1, item._2, item._2))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
//統(tǒng)計同一資源平臺下的總播放時長
rdd.mapPartitions(partitions => {
partitions.map(item => {
val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
val key = item.sourceType
(key, totaltime)
})
}).reduceByKey(_ + _).foreachPartition(partitions => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
partitions.foreach(item => {
sqlProxy.executeUpdate(client, "insert into sourcetype_learn_detail (sourcetype_learn,totaltime) values(?,?) on duplicate key " +
"update totaltime=totaltime+?", Array(item._1, item._2, item._2))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
// 統(tǒng)計同一科目下的播放總時長
rdd.mapPartitions(partitions => {
partitions.map(item => {
val totaltime = Math.ceil((item.te - item.ts) / 1000).toLong
val key = item.subjectId
(key, totaltime)
})
}).reduceByKey(_ + _).foreachPartition(partitons => {
val sqlProxy = new SqlProxy()
val clinet = DataSourceUtil.getConnection
try {
partitons.foreach(item => {
sqlProxy.executeUpdate(clinet, "insert into subject_learn_detail(subjectid,totaltime) values(?,?) on duplicate key " +
"update totaltime=totaltime+?", Array(item._1, item._2, item._2))
})
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(clinet)
}
})
})
//計算轉(zhuǎn)換率
//處理完 業(yè)務(wù)邏輯后 手動提交offset維護(hù)到本地 mysql中
stream.foreachRDD(rdd => {
val sqlProxy = new SqlProxy()
val client = DataSourceUtil.getConnection
try {
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (or <- offsetRanges) {
sqlProxy.executeUpdate(client, "replace into `offset_manager` (groupid,topic,`partition`,untilOffset) values(?,?,?,?)",
Array(groupid, or.topic, or.partition.toString, or.untilOffset))
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
sqlProxy.shutdown(client)
}
})
ssc.start()
ssc.awaitTermination()
}
/**
* 計算視頻 有效時長 完成時長 總時長
*
* @param key
* @param iters
* @param sqlProxy
* @param client
*/
def calcVideoTime(key: String, iters: Iterable[LearnModel], sqlProxy: SqlProxy, client: Connection) = {
val keys = key.split("_")
val userId = keys(0).toInt
val cwareId = keys(1).toInt
val videoId = keys(2).toInt
//查詢歷史數(shù)據(jù)
var interval_history = ""
sqlProxy.executeQuery(client, "select play_interval from video_interval where userid=? and cwareid=? and videoid=?",
Array(userId, cwareId, videoId), new QueryCallback {
override def process(rs: ResultSet): Unit = {
while (rs.next()) {
interval_history = rs.getString(1)
}
rs.close()
}
})
var effective_duration_sum = 0l //有效總時長
var complete_duration_sum = 0l //完成總時長
var cumulative_duration_sum = 0l //播放總時長
val learnList = iters.toList.sortBy(item => item.ps) //轉(zhuǎn)成list 并根據(jù)開始區(qū)間升序排序
learnList.foreach(item => {
if ("".equals(interval_history)) {
//沒有歷史區(qū)間
val play_interval = item.ps + "-" + item.pe //有效區(qū)間
val effective_duration = Math.ceil((item.te - item.ts) / 1000) //有效時長
val complete_duration = item.pe - item.ps //完成時長
effective_duration_sum += effective_duration.toLong
cumulative_duration_sum += effective_duration.toLong
complete_duration_sum += complete_duration
interval_history = play_interval
} else {
//有歷史區(qū)間進(jìn)行對比
val interval_arry = interval_history.split(",").sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt))
val tuple = getEffectiveInterval(interval_arry, item.ps, item.pe)
val complete_duration = tuple._1 //獲取實際有效完成時長
val effective_duration = Math.ceil((item.te - item.ts) / 1000) / (item.pe - item.ps) * complete_duration //計算有效時長
val cumulative_duration = Math.ceil((item.te - item.ts) / 1000) //累計時長
interval_history = tuple._2
effective_duration_sum += effective_duration.toLong
complete_duration_sum += complete_duration
cumulative_duration_sum += cumulative_duration.toLong
}
sqlProxy.executeUpdate(client, "insert into video_interval(userid,cwareid,videoid,play_interval) values(?,?,?,?) " +
"on duplicate key update play_interval=?", Array(userId, cwareId, videoId, interval_history, interval_history))
sqlProxy.executeUpdate(client, "insert into video_learn_detail(userid,cwareid,videoid,totaltime,effecttime,completetime) " +
"values(?,?,?,?,?,?) on duplicate key update totaltime=totaltime+?,effecttime=effecttime+?,completetime=completetime+?",
Array(userId, cwareId, videoId, cumulative_duration_sum, effective_duration_sum, complete_duration_sum, cumulative_duration_sum,
effective_duration_sum, complete_duration_sum))
})
}
/**
* 計算有效區(qū)間
*
* @param array
* @param start
* @param end
* @return
*/
def getEffectiveInterval(array: Array[String], start: Int, end: Int) = {
var effective_duration = end - start
var bl = false //是否對有效時間進(jìn)行修改
import scala.util.control.Breaks._
breakable {
for (i <- 0 until array.length) {
//循環(huán)各區(qū)間段
var historyStart = 0 //獲取其中一段的開始播放區(qū)間
var historyEnd = 0 //獲取其中一段結(jié)束播放區(qū)間
val item = array(i)
try {
historyStart = item.split("-")(0).toInt
historyEnd = item.split("-")(1).toInt
} catch {
case e: Exception => throw new Exception("error array:" + array.mkString(","))
}
if (start >= historyStart && historyEnd >= end) {
//已有數(shù)據(jù)占用全部播放時長 此次播放無效
effective_duration = 0
bl = true
break()
} else if (start <= historyStart && end > historyStart && end < historyEnd) {
//和已有數(shù)據(jù)左側(cè)存在交集 扣除部分有效時間(以老數(shù)據(jù)為主進(jìn)行對照)
effective_duration -= end - historyStart
array(i) = start + "-" + historyEnd
bl = true
} else if (start > historyStart && start < historyEnd && end >= historyEnd) {
//和已有數(shù)據(jù)右側(cè)存在交集 扣除部分有效時間
effective_duration -= historyEnd - start
array(i) = historyStart + "-" + end
bl = true
} else if (start < historyStart && end > historyEnd) {
//現(xiàn)數(shù)據(jù) 大于舊數(shù)據(jù) 扣除舊數(shù)據(jù)所有有效時間
effective_duration -= historyEnd - historyStart
array(i) = start + "-" + end
bl = true
}
}
}
val result = bl match {
case false => {
//沒有修改原array 沒有交集 進(jìn)行新增
val distinctArray2 = ArrayBuffer[String]()
distinctArray2.appendAll(array)
distinctArray2.append(start + "-" + end)
val distinctArray = distinctArray2.distinct.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt))
val tmpArray = ArrayBuffer[String]()
tmpArray.append(distinctArray(0))
for (i <- 1 until distinctArray.length) {
val item = distinctArray(i).split("-")
val tmpItem = tmpArray(tmpArray.length - 1).split("-")
val itemStart = item(0)
val itemEnd = item(1)
val tmpItemStart = tmpItem(0)
val tmpItemEnd = tmpItem(1)
if (tmpItemStart.toInt < itemStart.toInt && tmpItemEnd.toInt < itemStart.toInt) {
//沒有交集
tmpArray.append(itemStart + "-" + itemEnd)
} else {
//有交集
val resultStart = tmpItemStart
val resultEnd = if (tmpItemEnd.toInt > itemEnd.toInt) tmpItemEnd else itemEnd
tmpArray(tmpArray.length - 1) = resultStart + "-" + resultEnd
}
}
val play_interval = tmpArray.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt)).mkString(",")
play_interval
}
case true => {
//修改了原array 進(jìn)行區(qū)間重組
val distinctArray = array.distinct.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt))
val tmpArray = ArrayBuffer[String]()
tmpArray.append(distinctArray(0))
for (i <- 1 until distinctArray.length) {
val item = distinctArray(i).split("-")
val tmpItem = tmpArray(tmpArray.length - 1).split("-")
val itemStart = item(0)
val itemEnd = item(1)
val tmpItemStart = tmpItem(0)
val tmpItemEnd = tmpItem(1)
if (tmpItemStart.toInt < itemStart.toInt && tmpItemEnd.toInt < itemStart.toInt) {
//沒有交集
tmpArray.append(itemStart + "-" + itemEnd)
} else {
//有交集
val resultStart = tmpItemStart
val resultEnd = if (tmpItemEnd.toInt > itemEnd.toInt) tmpItemEnd else itemEnd
tmpArray(tmpArray.length - 1) = resultStart + "-" + resultEnd
}
}
val play_interval = tmpArray.sortBy(a => (a.split("-")(0).toInt, a.split("-")(1).toInt)).mkString(",")
play_interval
}
}
(effective_duration, result)
}
}