項目簡介
- 今天到現(xiàn)在為止實戰(zhàn)課程的訪問量
- 今天到現(xiàn)在為止從搜索引擎引流過來的實戰(zhàn)課程的訪問量
項目流程
需求分析 ==> 數(shù)據(jù)產(chǎn)生 ==> 數(shù)據(jù)采集 ==> 數(shù)據(jù)清洗 ==> 數(shù)據(jù)統(tǒng)計分析 ==> 統(tǒng)計結(jié)果入庫 ==> 數(shù)據(jù)可視化
分布式日志收集框架Flume(印象筆記)
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible(靈活的) architecture(架構(gòu)) based on streaming data flows. It is robust(健壯) and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
分布式流處理平臺kafka
一個配置文件一個broker
單節(jié)點單broker阐污,不用修改配置文件
單節(jié)點多broker,復(fù)制多份配置文件兴溜,broker.id告材,listeners端口號丁频,log.dirs路徑唯一
IDEA+Maven編程開發(fā)
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
package com.test.kafka;
public class KafkaProperties {
public static final String ZK = "192.168.247.100:2181";
public static final String TOPIC = "test_repliation_3";
public static final String BROKER_LIST = "192.168.247.100:9092,192.168.247.100:9093,192.168.247.100:9094";
public static final String GROUP_ID = "testGroup";
}
package com.test.kafka;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumer extends Thread {
private String topic;
public KafkaConsumer(String topic){
this.topic = topic;
}
private ConsumerConnector createConnctor(){
Properties properties = new Properties();
properties.put("zookeeper.connect",KafkaProperties.ZK);
properties.put("group.id",KafkaProperties.GROUP_ID);
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
@Override
public void run() {
ConsumerConnector consumer = createConnctor();
Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
topicCountMap.put(topic,1);
//String :topic
//List: 數(shù)據(jù)流
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while(iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("rec: "+ message);
}
}
}
package com.test.kafka;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
public class KafkaProducer extends Thread{
private String topic;
private Producer<Integer, String> producer;
public KafkaProducer(String topic){
this.topic = topic;
Properties properties = new Properties();
properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("request.required.acks","1");
producer = new Producer<Integer, String>(new ProducerConfig(properties));
}
@Override
public void run() {
int messageNo = 1;
while(true){
String message = "message "+messageNo;
producer.send(new KeyedMessage<Integer, String>(topic, message));
System.out.println("Sent: "+message);
messageNo ++;
try {
Thread.sleep(2000);
} catch (Exception e){
e.printStackTrace();
}
}
}
}
package com.test.kafka;
public class KafkaClientApp {
public static void main(String[] args) {
new KafkaProducer(KafkaProperties.TOPIC).start();
new KafkaConsumer(KafkaProperties.TOPIC).start();
}
}
Flume對接Kafka
kafka sink
type=org.apache.flume.sink.kafka.KafkaSink
brokerList
topic
batchSize
requiredAcks
實戰(zhàn)環(huán)境搭建
- JDK安裝
- Scala安裝
- Maven安裝
- Hadoop安裝
- zookeeper安裝
- Hbase安裝
- Spark安裝
- IDEA+Maven+Spark Streaming
添加依賴
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
example
NetworkWordCount(基于網(wǎng)絡(luò)實時的文字統(tǒng)計功能)
nc -lk 9999
//在9999端口發(fā)消息
- spark-submit提交作業(yè)(生產(chǎn))
spark-submit \ --class org.apache.spark.examples.streaming \ --name NetworkWordCount \ --master local[2] \ /home/kang/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \ localhost 9999
- spark-shell提交(測試)
spark-shell --master local[2] --jars ~/lib/mysql-connector-java-5.1.34.jar
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("192.168.247.100", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
工作原理
- 粗粒度
Spark Streaming接收到實時數(shù)據(jù)流,把數(shù)據(jù)按照指定的時間段切成一片片小的數(shù)據(jù)塊,然后把小的數(shù)據(jù)塊傳給Spark Engine處理
核心概念
StreamingContext
常用的兩個構(gòu)造方法
The batch interval must be set based on the latency requirements of your application and available cluster resources.
def this(sparkContext: SparkContext, batchDuration: Duration) = {
this(sparkContext, null, batchDuration)
}
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
After a context is defined, you have to do the following.
- Define the input sources by creating input DStreams.
- Define the streaming computations by applying transformation and output operations to DStreams.
- Start receiving data and processing it using streamingContext.start().
- Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
- The processing can be manually stopped using streamingContext.stop().
Points to remember:
- Once a context has been started, no new streaming computations can be set up or added to it.
- Once a context has been stopped, it cannot be restarted.
- Only one StreamingContext can be active in a JVM at the same time.
- stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of
stop()
calledstopSparkContext
to false.- A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.
DStreams
Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable, distributed dataset.
Any operation applied on a DStream translates to operations on the underlying RDDs
Input DStreams and Receivers
Input DStreams are DStreams representing the stream of input data received from streaming sources. In the quick example,
lines
was an input DStream as it represented the stream of data received from the netcat server. Every input DStream (except file stream, discussed later in this section) is associated with a Receiver (Scala doc, Java doc) object which receives the data from a source and stores it in Spark’s memory for processing.
Transformations on DStreams
Output Operations on DStreams
案例實戰(zhàn)
socket數(shù)據(jù)
需添加依賴
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
</dependency>
package com.test
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Spark Streaming處理Socket數(shù)據(jù)
*/
object NetworkWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf,Seconds(5))
val line = ssc.socketTextStream("192.168.247.100",6789)
val result = line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
發(fā)送socket數(shù)據(jù):
nc -lk 6789
文件系統(tǒng)
File Streams: For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.)
package com.test
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 處理文件系統(tǒng)數(shù)據(jù)
*/
object FileWorkCount {
def main(args: Array[String]): Unit = {
//文件系統(tǒng)不需要receiver瞳氓,可以只用一個線程
val sparkConf = new SparkConf().setMaster("local").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf,Seconds(5))
//文件整體moving過去
val line = ssc.textFileStream("hdfs://192.168.247.100:9000/sparkStreaming/")
val result = line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
updateStateByKey
package com.test
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StatefulWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf,Seconds(5))
//如果使用stateful的算子地啰,必須要設(shè)置checkpoint目錄
ssc.checkpoint("hdfs://192.168.247.100:9000/sparkStreaming/")
val line = ssc.socketTextStream("192.168.247.100",6789)
val result = line.flatMap(_.split(" ")).map((_,1))
val state = result.updateStateByKey[Int](updateFunction _)
state.print()
ssc.start()
ssc.awaitTermination()
}
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = newValues.sum + runningCount.getOrElse(0)
Some(newCount)
}
}
數(shù)據(jù)保存到數(shù)據(jù)庫
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
transform函數(shù)的使用之黑名單過濾
zhangshan,123
lisi,123
wangwu,123
package com.test
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object TransformApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf,Seconds(5))
/**
* 構(gòu)建黑名單
*/
val blacks = List("zhangshan","lisi")
val balcksRDD = ssc.sparkContext.parallelize(blacks).map((_,true))
val line = ssc.socketTextStream("192.168.247.100",6789)
val clicklog = line.map(x => (x.split(",")(0),x)).transform(rdd => {
rdd.leftOuterJoin(balcksRDD)
.filter(x => x._2._2.getOrElse(false) != true)
.map(_._2._1)
})
clicklog.print()
ssc.start()
ssc.awaitTermination()
}
}
Spark Streaming整合SparkSQL
package com.test
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
object sqlNetworkWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf,Seconds(5))
val line = ssc.socketTextStream("192.168.247.100",6789)
val words = line.flatMap(_.split(" "))
words.foreachRDD { (rdd,time) =>
// Get the singleton instance of SparkSession
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on table using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
}
ssc.start()
ssc.awaitTermination()
}
}
case class Record(word: String)
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
Spark Streaming整合Flume
Push方式
添加相關(guān)的依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
Flume Agent配置
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /home/kang/flume.log
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = 192.168.247.100:9092
a1.sinks.k1.topic = hello_topic
a1.sinks.k1.batchSize = 100
a1.sinks.k1.requiredAcks = 1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
Idea 代碼
package com.test
import org.apache.spark.streaming.flume._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Spark Streaming整合Flume第一種方式
*/
object FlumePushWordCount {
def main(args: Array[String]): Unit = {
if(args.length != 2){
System.err.println("Usage FlumePushWordCount <hostname> <port>")
System.exit(1)
}
val Array(hostname, port) = args
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
val ssc = new StreamingContext(sparkConf,Seconds(5))
val flumeStream = FlumeUtils.createStream(ssc,hostname,port.toInt)
flumeStream.map(item => new String(item.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
1)啟動sparkstreaming 作業(yè)
2)啟動flume agent
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro.conf --name a1 -Dflume.root.logger=INFO,console
- 輸入數(shù)據(jù)愁拭,觀察IDEA控制臺輸出
- 服務(wù)器上運行
./bin/spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.1.0 ...
Pull方式
- 使用可靠的Receiver
可靠的接收器在接收到數(shù)據(jù)并將數(shù)據(jù)存儲在Spark中時正確地向可靠的源發(fā)送確認。
添加依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
Flume Agent配置
# Name the components on this agent //取別名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/kang/infos.txt
# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = 192.168.247.1
a1.sinks.k1.port = 4444
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
IDEA 代碼
package com.test
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Spark Streaming整合Flume第二種方式
*/
object FlumePullWordCount {
def main(args: Array[String]): Unit = {
if(args.length != 2){
System.err.println("Usage FlumePullWordCount <hostname> <port>")
System.exit(1)
}
val Array(hostname, port) = args
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePullWordCount")
val ssc = new StreamingContext(sparkConf,Seconds(5))
val flumeStream = FlumeUtils.createPollingStream(ssc,hostname,port.toInt)
flumeStream.map(item => new String(item.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
先啟動Flume亏吝,再啟動SparkStreaming
Spark Streaming整合Kafka
Receiver-based
啟動kafka:
kafka-server-start.sh $KAFKA_HOME/config/server-1.properties
創(chuàng)建topic:kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic
啟動生產(chǎn)者:kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_streaming_topic
IDEA代碼
package com.test
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
object KafkaReceiverWordCount {
def main(args: Array[String]): Unit = {
if(args.length != 4){
System.err.println("Usage KafkaReceiverWordCount <zkQuorum> <group> <topic> <numThread>")
System.exit(1)
}
val Array(zkQuorum, group, topic, numThread) = args
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
val ssc = new StreamingContext(sparkConf,Seconds(5))
val topicMap = topic.split(",").map((_, numThread.toInt)).toMap
val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
//message第二個才是有效信息
message.map(_._2).flatMap(_.split(" ")).map((_,1))
.reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
direct方式
package com.test
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaDirectWordCount {
def main(args: Array[String]): Unit = {
if(args.length != 2){
System.err.println("Usage KafkaReceiverWordCount <brokers> <topic>")
System.exit(1)
}
val Array(brokers, topic) = args
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaDirectWordCount")
val ssc = new StreamingContext(sparkConf,Seconds(5))
val topicSet = topic.split(",").toSet
val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers)
val message = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
//message第二個才是有效信息
message.map(_._2).flatMap(_.split(" ")).map((_,1))
.reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
log4j+flume+kafka+sparkStreaming
- 編寫log4j.properties
log4j.rootLogger = INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%C] [%p] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.247.100
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
- 添加依賴包
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.6.0</version>
</dependency>
- logger 產(chǎn)生器
import org.apache.log4j.Logger;
public class LoggerGenerator {
private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());
public static void main(String[] args) throws Exception {
int index = 0;
while(true){
Thread.sleep(2000);
logger.info("value : " + index++ );
}
}
}
- flume 配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.247.100
a1.sources.r1.port = 41414
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = 192.168.247.100:9092
a1.sinks.k1.topic = hello_topic
a1.sinks.k1.batchSize = 10
a1.sinks.k1.requiredAcks = 1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
項目實戰(zhàn)
- 為什么要記錄用戶訪問行為日志
網(wǎng)站頁面的訪問量
網(wǎng)站的粘性
推薦
python日志產(chǎn)生器
- python代碼
#coding=utf-8
import random
import time
url_path = [
"class/112.html",
"class/128.html",
"class/145.html",
"class/146.html",
"class/131.html",
"class/130.html",
"learn/821",
"course/list",
]
ip_slices = [123,132,10,98,43,55,72,89,31,192,168,247,99]
http_reference = [
"http://www.baidu.com/s?wd={query}",
"http://www.sogou.com/web?query={query}",
"http://cn.bing.com/search?q={query}",
"http://search.yahoo.com/search?p={query}"
]
search_keyword = [
"Spark SQL實戰(zhàn)"
"Hadoop基礎(chǔ)",
"Storm實戰(zhàn)",
"Spark Streaming實戰(zhàn)",
"大數(shù)據(jù)",
"java"
]
status_codes = ["200", "404", "500"]
def sample_ip():
slice = random.sample(ip_slices, 4)
return ".".join([str(item) for item in slice])
def sample_url():
return random.sample(url_path, 1)[0]
def sample_referer():
if random.uniform(0, 1) > 0.2:
return "-"
refer_str = random.sample(http_reference,1)
query_str = random.sample(search_keyword, 1)
return refer_str[0].format(query=query_str[0])
def sample_status_code():
return random.sample(status_codes, 1)[0]
def generate_log(count = 10):
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
f = open("/home/kang/project/SparkStreaming/logs/access.log","w+")
while count >= 1:
query_log = "{ip}\t{localtime}\t\"GET /{url} HTTP/1.1\"\t{status}\t{referer}".format(url=sample_url(), ip=sample_ip(), referer=sample_referer(), status=sample_status_code(), localtime=time_str)
print query_log
f.write(query_log + "\n")
count = count - 1
if __name__ == '__main__':
generate_log(100)
- 定時調(diào)度器工具crontab
一分鐘執(zhí)行一次
crontab -e
*/1 * * * * /home/kang/project/SparkStreaming/shell/generator_log.sh
:x 執(zhí)行
flume+kafka+sparkstreaming連通岭埠,清洗
- flume
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/kang/project/SparkStreaming/logs/access.log
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = 192.168.247.100:9092
a1.sinks.k1.topic = hello_topic
a1.sinks.k1.batchSize = 10
a1.sinks.k1.requiredAcks = 1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- Spark
package com.test.project
import com.test.project.domain.ClickLog
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object ImoocStatStreamingApp {
def main(args: Array[String]): Unit = {
if(args.length != 4){
System.err.println("Usage KafkaReceiverWordCount <zkQuorum> <group> <topic> <numThread>")
System.exit(1)
}
val Array(zkQuorum, group, topic, numThread) = args
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
val ssc = new StreamingContext(sparkConf,Seconds(60))
val topicMap = topic.split(",").map((_, numThread.toInt)).toMap
val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
//測試數(shù)據(jù)接收
// message.map(_._2).print()
val logs = message.map(_._2)
val cleanData = logs.map(line => {
val infos = line.split("\t")
val url = infos(2).split(" ")(1)
var courseId = 0
if(url.startsWith("/class")){
val courseIdHTML = url.split("/")(2)
courseId = courseIdHTML.substring(0,courseIdHTML.lastIndexOf(".")).toInt
}
ClickLog(infos(0), infos(1), courseId, infos(3).toInt, infos(4))
}).filter(clicklog => clicklog.courseId != 0)
cleanData.print()
ssc.start()
ssc.awaitTermination()
}
}
今天到現(xiàn)在為止實戰(zhàn)課程的訪問量
- HBase表設(shè)計
創(chuàng)建表:create 'imooc_course_clickcount', 'info'
RowKey設(shè)計:day_courseid
- HBase DAO
package com.test.project.dao
import com.test.project.domain.CourseClickCount
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.util.Bytes
import utils.HbaseUtil
import scala.collection.mutable.ListBuffer
/**
* 訪問層
*/
object CourseClickCountDAO {
val tableName = "imooc_course_clickcount"
val cf = "info"
val qualifer = "click_count"
def save(list: ListBuffer[CourseClickCount]) = {
val table = HbaseUtil.getInstance().getTable(tableName)
for(ele <- list) {
table.incrementColumnValue(Bytes.toBytes(ele.day_course),
Bytes.toBytes(cf),
Bytes.toBytes(qualifer),
ele.click_count)
}
}
def count(day_course:String):Long = {
val table = HbaseUtil.getInstance().getTable(tableName)
val get = new Get(Bytes.toBytes(day_course))
val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)
if(value == null){
0l
} else{
Bytes.toLong(value)
}
}
def main(args: Array[String]): Unit = {
val list = new ListBuffer[CourseClickCount]
list+=(CourseClickCount("20190314_7",18),CourseClickCount("20190313_6",66))
save(list)
println(count("20190314_7") + count("20190313_6"))
}
}
今天到現(xiàn)在為止搜索引擎引流過來的實戰(zhàn)課程訪問量
- HBase表設(shè)計
創(chuàng)建表:create 'imooc_search_course_clickcount', 'info'
RowKey設(shè)計:day_referer_courseid
- HBase DAO
package com.test.project.dao
import com.test.project.domain.{CourseClickCount, CourseSearchClickCount}
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.util.Bytes
import utils.HbaseUtil
import scala.collection.mutable.ListBuffer
/**
* 訪問層
*/
object CourseSearchClickCountDAO {
val tableName = "imooc_course_search_clickcount"
val cf = "info"
val qualifer = "click_count"
def save(list: ListBuffer[CourseSearchClickCount]) = {
val table = HbaseUtil.getInstance().getTable(tableName)
for(ele <- list) {
table.incrementColumnValue(Bytes.toBytes(ele.day_search_course),
Bytes.toBytes(cf),
Bytes.toBytes(qualifer),
ele.click_count)
}
}
def count(day_course:String):Long = {
val table = HbaseUtil.getInstance().getTable(tableName)
val get = new Get(Bytes.toBytes(day_course))
val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)
if(value == null){
0l
} else{
Bytes.toLong(value)
}
}
def main(args: Array[String]): Unit = {
println(count("20190315_www.baidu.com_112"))
}
}
package com.test.project
import com.test.project.dao.{CourseClickCountDAO, CourseSearchClickCountDAO}
import com.test.project.domain.{ClickLog, CourseClickCount, CourseSearchClickCount}
import com.test.project.util.DataUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.ListBuffer
object ImoocStatStreamingApp {
def main(args: Array[String]): Unit = {
if(args.length != 4){
System.err.println("Usage KafkaReceiverWordCount <zkQuorum> <group> <topic> <numThread>")
System.exit(1)
}
val Array(zkQuorum, group, topic, numThread) = args
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
val ssc = new StreamingContext(sparkConf,Seconds(60))
val topicMap = topic.split(",").map((_, numThread.toInt)).toMap
val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
//測試數(shù)據(jù)接收
// message.map(_._2).print()
val logs = message.map(_._2)
val cleanData = logs.map(line => {
val infos = line.split("\t")
val url = infos(2).split(" ")(1)
var courseId = 0
if(url.startsWith("/class")){
val courseIdHTML = url.split("/")(2)
courseId = courseIdHTML.substring(0,courseIdHTML.lastIndexOf(".")).toInt
}
ClickLog(infos(0), DataUtils.parse(infos(1)), courseId, infos(3).toInt, infos(4))
}).filter(clicklog => clicklog.courseId != 0)
// cleanData.print()
/**
* 統(tǒng)計今天到現(xiàn)在為止實戰(zhàn)課程訪問量
*/
cleanData.map(x => {
(x.time+"_"+x.courseId, 1)
}).reduceByKey(_+_).foreachRDD(rdd => {
rdd.foreachPartition(partitionRecodes => {
val list = new ListBuffer[CourseClickCount]
partitionRecodes.foreach(pair => {
list += (CourseClickCount(pair._1, pair._2))
})
CourseClickCountDAO.save(list)
})
})
/**
* 統(tǒng)計從搜索引擎過來的課程訪問量
*/
cleanData.map(x => {
val referer = x.referer.replaceAll("http://","/")
val splits = referer.split("/")
var host = ""
if(splits.length > 2){
host = splits(1)
}
(host, x.courseId, x.time)
}).filter(_._1!="").map(x => {
(x._3 + "_" + x._1+ "_" + x._2, 1)
}).reduceByKey(_+_).foreachRDD(rdd => {
rdd.foreachPartition(partitionRecodes => {
val list = new ListBuffer[CourseSearchClickCount]
partitionRecodes.foreach(pair => {
list += (CourseSearchClickCount(pair._1, pair._2))
})
CourseSearchClickCountDAO.save(list)
})
})
ssc.start()
ssc.awaitTermination()
}
}