Spark Streaming實時流處理項目

項目簡介
  • 今天到現(xiàn)在為止實戰(zhàn)課程的訪問量
  • 今天到現(xiàn)在為止從搜索引擎引流過來的實戰(zhàn)課程的訪問量
項目流程

需求分析 ==> 數(shù)據(jù)產(chǎn)生 ==> 數(shù)據(jù)采集 ==> 數(shù)據(jù)清洗 ==> 數(shù)據(jù)統(tǒng)計分析 ==> 統(tǒng)計結(jié)果入庫 ==> 數(shù)據(jù)可視化

分布式日志收集框架Flume(印象筆記)

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible(靈活的) architecture(架構(gòu)) based on streaming data flows. It is robust(健壯) and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

分布式流處理平臺kafka

一個配置文件一個broker

單節(jié)點單broker阐污,不用修改配置文件
單節(jié)點多broker,復(fù)制多份配置文件兴溜,broker.id告材,listeners端口號丁频,log.dirs路徑唯一

IDEA+Maven編程開發(fā)

<dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
</dependency>
<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>${kafka.version}</version>
</dependency>
package com.test.kafka;

public class KafkaProperties {

    public static final String ZK = "192.168.247.100:2181";

    public static final String TOPIC = "test_repliation_3";

    public static final String BROKER_LIST = "192.168.247.100:9092,192.168.247.100:9093,192.168.247.100:9094";

    public static final String GROUP_ID = "testGroup";
}
package com.test.kafka;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class KafkaConsumer extends Thread {

    private String topic;

    public KafkaConsumer(String topic){
        this.topic = topic;
    }

    private ConsumerConnector createConnctor(){

        Properties properties = new Properties();
        properties.put("zookeeper.connect",KafkaProperties.ZK);
        properties.put("group.id",KafkaProperties.GROUP_ID);
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }

    @Override
    public void run() {
        ConsumerConnector consumer = createConnctor();
        Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
        topicCountMap.put(topic,1);

        //String :topic
        //List: 數(shù)據(jù)流
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);

        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

        while(iterator.hasNext()){
            String message = new String(iterator.next().message());
            System.out.println("rec: "+ message);
        }

    }
}
package com.test.kafka;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class KafkaProducer extends Thread{

    private String topic;

    private Producer<Integer, String> producer;

    public KafkaProducer(String topic){
        this.topic = topic;

        Properties properties = new Properties();
        properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("request.required.acks","1");

        producer = new Producer<Integer, String>(new ProducerConfig(properties));
    }

    @Override
    public void run() {
        int messageNo = 1;

        while(true){
            String message = "message "+messageNo;
            producer.send(new KeyedMessage<Integer, String>(topic, message));
            System.out.println("Sent: "+message);

            messageNo ++;

            try {
                Thread.sleep(2000);
            } catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}
package com.test.kafka;

public class KafkaClientApp {

    public static void main(String[] args) {
        new KafkaProducer(KafkaProperties.TOPIC).start();

        new KafkaConsumer(KafkaProperties.TOPIC).start();
    }
}
Flume對接Kafka

kafka sink

type=org.apache.flume.sink.kafka.KafkaSink
brokerList
topic
batchSize
requiredAcks

實戰(zhàn)環(huán)境搭建
  • JDK安裝
  • Scala安裝
  • Maven安裝
  • Hadoop安裝
  • zookeeper安裝
  • Hbase安裝
  • Spark安裝
  • IDEA+Maven+Spark Streaming
    添加依賴
<repositories>
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
    </repository>
  </repositories>

  <dependencies>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>${kafka.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>${hbase.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-server</artifactId>
      <version>${hbase.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-flume-sink_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
  </dependencies>
example
NetworkWordCount(基于網(wǎng)絡(luò)實時的文字統(tǒng)計功能)

nc -lk 9999//在9999端口發(fā)消息

  • spark-submit提交作業(yè)(生產(chǎn))

spark-submit \ --class org.apache.spark.examples.streaming \ --name NetworkWordCount \ --master local[2] \ /home/kang/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \ localhost 9999

  • spark-shell提交(測試)

spark-shell --master local[2] --jars ~/lib/mysql-connector-java-5.1.34.jar

import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("192.168.247.100", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
工作原理
  • 粗粒度

Spark Streaming接收到實時數(shù)據(jù)流,把數(shù)據(jù)按照指定的時間段切成一片片小的數(shù)據(jù)塊,然后把小的數(shù)據(jù)塊傳給Spark Engine處理

核心概念
StreamingContext

常用的兩個構(gòu)造方法
The batch interval must be set based on the latency requirements of your application and available cluster resources.

def this(sparkContext: SparkContext, batchDuration: Duration) = {
    this(sparkContext, null, batchDuration)
  }

def this(conf: SparkConf, batchDuration: Duration) = {
    this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
  }
After a context is defined, you have to do the following.
  • Define the input sources by creating input DStreams.
  • Define the streaming computations by applying transformation and output operations to DStreams.
  • Start receiving data and processing it using streamingContext.start().
  • Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
  • The processing can be manually stopped using streamingContext.stop().
Points to remember:
  • Once a context has been started, no new streaming computations can be set up or added to it.
  • Once a context has been stopped, it cannot be restarted.
  • Only one StreamingContext can be active in a JVM at the same time.
  • stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
  • A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.
DStreams

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable, distributed dataset.

Any operation applied on a DStream translates to operations on the underlying RDDs

Input DStreams and Receivers

Input DStreams are DStreams representing the stream of input data received from streaming sources. In the quick example, lines was an input DStream as it represented the stream of data received from the netcat server. Every input DStream (except file stream, discussed later in this section) is associated with a Receiver (Scala doc, Java doc) object which receives the data from a source and stores it in Spark’s memory for processing.

Transformations on DStreams
Output Operations on DStreams
案例實戰(zhàn)
socket數(shù)據(jù)

需添加依賴

    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-scala_2.11</artifactId>
      <version>2.6.5</version>
    </dependency>

    <dependency>
      <groupId>net.jpountz.lz4</groupId>
      <artifactId>lz4</artifactId>
      <version>1.3.0</version>
    </dependency>
package com.test

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming處理Socket數(shù)據(jù)
  */
object NetworkWordCount {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val line = ssc.socketTextStream("192.168.247.100",6789)

    val result = line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    result.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

發(fā)送socket數(shù)據(jù):nc -lk 6789

文件系統(tǒng)

File Streams: For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.)

package com.test

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 處理文件系統(tǒng)數(shù)據(jù)
  */
object FileWorkCount {

  def main(args: Array[String]): Unit = {
    //文件系統(tǒng)不需要receiver瞳氓,可以只用一個線程
    val sparkConf = new SparkConf().setMaster("local").setAppName("NetworkWordCount")

    val ssc = new StreamingContext(sparkConf,Seconds(5))

    //文件整體moving過去
    val line = ssc.textFileStream("hdfs://192.168.247.100:9000/sparkStreaming/")
    val result = line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
updateStateByKey
package com.test

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StatefulWordCount {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

    val ssc = new StreamingContext(sparkConf,Seconds(5))

    //如果使用stateful的算子地啰,必須要設(shè)置checkpoint目錄
    ssc.checkpoint("hdfs://192.168.247.100:9000/sparkStreaming/")

    val line = ssc.socketTextStream("192.168.247.100",6789)

    val result = line.flatMap(_.split(" ")).map((_,1))

    val state = result.updateStateByKey[Int](updateFunction _)

    state.print()

    ssc.start()
    ssc.awaitTermination()
  }

  def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = newValues.sum + runningCount.getOrElse(0)
    Some(newCount)
  }
}
數(shù)據(jù)保存到數(shù)據(jù)庫
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}
transform函數(shù)的使用之黑名單過濾

zhangshan,123
lisi,123
wangwu,123

package com.test

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object TransformApp {

  def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

      val ssc = new StreamingContext(sparkConf,Seconds(5))

      /**
        * 構(gòu)建黑名單
        */
      val blacks = List("zhangshan","lisi")
      val balcksRDD = ssc.sparkContext.parallelize(blacks).map((_,true))

      val line = ssc.socketTextStream("192.168.247.100",6789)

      val clicklog = line.map(x => (x.split(",")(0),x)).transform(rdd => {
        rdd.leftOuterJoin(balcksRDD)
          .filter(x => x._2._2.getOrElse(false) != true)
          .map(_._2._1)
      })

      clicklog.print()

      ssc.start()
      ssc.awaitTermination()
  }
}
Spark Streaming整合SparkSQL
package com.test

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

object sqlNetworkWordCount {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val line = ssc.socketTextStream("192.168.247.100",6789)

    val words = line.flatMap(_.split(" "))

    words.foreachRDD { (rdd,time) =>
      // Get the singleton instance of SparkSession
      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
      import spark.implicits._

      // Convert RDD[String] to RDD[case class] to DataFrame
      val wordsDataFrame = rdd.map(w => Record(w)).toDF()

      // Creates a temporary view using the DataFrame
      wordsDataFrame.createOrReplaceTempView("words")

      // Do word count on table using SQL and print it
      val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      println(s"========= $time =========")
      wordCountsDataFrame.show()
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

case class Record(word: String)

object SparkSessionSingleton {

  @transient  private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}
Spark Streaming整合Flume
Push方式

添加相關(guān)的依賴

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-flume_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

Flume Agent配置


# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source 
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /home/kang/flume.log

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = 192.168.247.100:9092
a1.sinks.k1.topic = hello_topic
a1.sinks.k1.batchSize = 100
a1.sinks.k1.requiredAcks = 1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1

Idea 代碼

package com.test

import org.apache.spark.streaming.flume._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming整合Flume第一種方式
  */
object FlumePushWordCount {

  def main(args: Array[String]): Unit = {

    if(args.length != 2){
      System.err.println("Usage FlumePushWordCount <hostname> <port>")
      System.exit(1)
    }

    val Array(hostname, port) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val flumeStream = FlumeUtils.createStream(ssc,hostname,port.toInt)
    flumeStream.map(item => new String(item.event.getBody.array()).trim)
        .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

1)啟動sparkstreaming 作業(yè)
2)啟動flume agent
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro.conf --name a1 -Dflume.root.logger=INFO,console

  1. 輸入數(shù)據(jù)愁拭,觀察IDEA控制臺輸出
  • 服務(wù)器上運行
    ./bin/spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.1.0 ...
Pull方式
  • 使用可靠的Receiver

可靠的接收器在接收到數(shù)據(jù)并將數(shù)據(jù)存儲在Spark中時正確地向可靠的源發(fā)送確認。

添加依賴

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-flume-sink_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.5</version>
    </dependency>

Flume Agent配置

# Name the components on this agent   //取別名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/kang/infos.txt

# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = 192.168.247.1
a1.sinks.k1.port = 4444

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel   
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

IDEA 代碼

package com.test

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming整合Flume第二種方式
  */
object FlumePullWordCount {

  def main(args: Array[String]): Unit = {

    if(args.length != 2){
      System.err.println("Usage FlumePullWordCount <hostname> <port>")
      System.exit(1)
    }

    val Array(hostname, port) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePullWordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val flumeStream = FlumeUtils.createPollingStream(ssc,hostname,port.toInt)
    flumeStream.map(item => new String(item.event.getBody.array()).trim)
        .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

先啟動Flume亏吝,再啟動SparkStreaming

Spark Streaming整合Kafka
Receiver-based

啟動kafka:kafka-server-start.sh $KAFKA_HOME/config/server-1.properties
創(chuàng)建topic:kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic
啟動生產(chǎn)者:kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_streaming_topic

IDEA代碼

package com.test

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

object KafkaReceiverWordCount {

  def main(args: Array[String]): Unit = {

    if(args.length != 4){
      System.err.println("Usage KafkaReceiverWordCount <zkQuorum> <group> <topic> <numThread>")
      System.exit(1)
    }

    val Array(zkQuorum, group, topic, numThread) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val topicMap = topic.split(",").map((_, numThread.toInt)).toMap

    val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

    //message第二個才是有效信息
    message.map(_._2).flatMap(_.split(" ")).map((_,1))
        .reduceByKey(_+_).print()


    ssc.start()
    ssc.awaitTermination()
  }
}
direct方式
package com.test

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaDirectWordCount {

  def main(args: Array[String]): Unit = {

    if(args.length != 2){
      System.err.println("Usage KafkaReceiverWordCount <brokers> <topic>")
      System.exit(1)
    }

    val Array(brokers, topic) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaDirectWordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(5))


    val topicSet = topic.split(",").toSet
    val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers)

    val message = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)

    //message第二個才是有效信息
    message.map(_._2).flatMap(_.split(" ")).map((_,1))
        .reduceByKey(_+_).print()


    ssc.start()
    ssc.awaitTermination()
  }
} 
log4j+flume+kafka+sparkStreaming
  • 編寫log4j.properties
log4j.rootLogger = INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%C] [%p] - %m%n

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.247.100
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
  • 添加依賴包
    <dependency>
      <groupId>org.apache.flume.flume-ng-clients</groupId>
      <artifactId>flume-ng-log4jappender</artifactId>
      <version>1.6.0</version>
    </dependency>
  • logger 產(chǎn)生器
import org.apache.log4j.Logger;

public class LoggerGenerator {

    private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());

    public static void main(String[] args) throws Exception {

        int index = 0;
        while(true){
            Thread.sleep(2000);
            logger.info("value : " + index++ );
        }
    }
}
  • flume 配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source 
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.247.100
a1.sources.r1.port = 41414

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = 192.168.247.100:9092
a1.sinks.k1.topic = hello_topic
a1.sinks.k1.batchSize = 10
a1.sinks.k1.requiredAcks = 1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
項目實戰(zhàn)
  • 為什么要記錄用戶訪問行為日志
    網(wǎng)站頁面的訪問量
    網(wǎng)站的粘性
    推薦
python日志產(chǎn)生器
  • python代碼
#coding=utf-8
import random
import time
url_path = [
    "class/112.html",
    "class/128.html",
    "class/145.html",
    "class/146.html",
    "class/131.html",
    "class/130.html",
    "learn/821",
    "course/list",
]

ip_slices = [123,132,10,98,43,55,72,89,31,192,168,247,99]

http_reference = [
    "http://www.baidu.com/s?wd={query}",
    "http://www.sogou.com/web?query={query}",
    "http://cn.bing.com/search?q={query}",
    "http://search.yahoo.com/search?p={query}"
]

search_keyword = [
    "Spark SQL實戰(zhàn)"
    "Hadoop基礎(chǔ)",
    "Storm實戰(zhàn)",
    "Spark Streaming實戰(zhàn)",
    "大數(shù)據(jù)",
    "java"
]

status_codes = ["200", "404", "500"]

def sample_ip():
    slice = random.sample(ip_slices, 4)
    return ".".join([str(item) for item in slice])

def sample_url():
    return random.sample(url_path, 1)[0]

def sample_referer():
    if random.uniform(0, 1) > 0.2:
        return "-"
    
    refer_str = random.sample(http_reference,1)
    query_str = random.sample(search_keyword, 1)
    return refer_str[0].format(query=query_str[0])
    
def sample_status_code():
    return random.sample(status_codes, 1)[0]
    
def generate_log(count = 10):
    time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    f = open("/home/kang/project/SparkStreaming/logs/access.log","w+")
    while count >= 1:
        query_log = "{ip}\t{localtime}\t\"GET /{url} HTTP/1.1\"\t{status}\t{referer}".format(url=sample_url(), ip=sample_ip(), referer=sample_referer(), status=sample_status_code(), localtime=time_str)
        print query_log
        f.write(query_log + "\n")
        count = count - 1

if __name__ == '__main__':
    generate_log(100)
  • 定時調(diào)度器工具crontab

一分鐘執(zhí)行一次
crontab -e
*/1 * * * * /home/kang/project/SparkStreaming/shell/generator_log.sh
:x 執(zhí)行

flume+kafka+sparkstreaming連通岭埠,清洗
  • flume
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source 
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/kang/project/SparkStreaming/logs/access.log

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = 192.168.247.100:9092
a1.sinks.k1.topic = hello_topic
a1.sinks.k1.batchSize = 10
a1.sinks.k1.requiredAcks = 1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • Spark
package com.test.project

import com.test.project.domain.ClickLog
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object ImoocStatStreamingApp {

  def main(args: Array[String]): Unit = {

    if(args.length != 4){
      System.err.println("Usage KafkaReceiverWordCount <zkQuorum> <group> <topic> <numThread>")
      System.exit(1)
    }

    val Array(zkQuorum, group, topic, numThread) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(60))

    val topicMap = topic.split(",").map((_, numThread.toInt)).toMap

    val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

    //測試數(shù)據(jù)接收
//    message.map(_._2).print()

    val logs = message.map(_._2)
    val cleanData = logs.map(line => {
      val infos = line.split("\t")
      val url = infos(2).split(" ")(1)

      var courseId = 0

      if(url.startsWith("/class")){
        val courseIdHTML = url.split("/")(2)
        courseId = courseIdHTML.substring(0,courseIdHTML.lastIndexOf(".")).toInt
      }
      ClickLog(infos(0), infos(1), courseId, infos(3).toInt, infos(4))
    }).filter(clicklog => clicklog.courseId != 0)

    cleanData.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
今天到現(xiàn)在為止實戰(zhàn)課程的訪問量
  • HBase表設(shè)計

創(chuàng)建表:create 'imooc_course_clickcount', 'info'
RowKey設(shè)計:day_courseid

  • HBase DAO
package com.test.project.dao

import com.test.project.domain.CourseClickCount
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.util.Bytes
import utils.HbaseUtil

import scala.collection.mutable.ListBuffer

/**
  * 訪問層
  */
object CourseClickCountDAO {

  val tableName = "imooc_course_clickcount"
  val cf = "info"
  val qualifer = "click_count"

  def save(list: ListBuffer[CourseClickCount]) = {

    val table = HbaseUtil.getInstance().getTable(tableName)
    for(ele <- list) {
      table.incrementColumnValue(Bytes.toBytes(ele.day_course),
        Bytes.toBytes(cf),
        Bytes.toBytes(qualifer),
        ele.click_count)
    }
  }

  def count(day_course:String):Long = {

    val table = HbaseUtil.getInstance().getTable(tableName)
    val get = new Get(Bytes.toBytes(day_course))
    val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)

    if(value == null){
      0l
    } else{
      Bytes.toLong(value)
    }
  }

  def main(args: Array[String]): Unit = {
    val list = new ListBuffer[CourseClickCount]

    list+=(CourseClickCount("20190314_7",18),CourseClickCount("20190313_6",66))

    save(list)

    println(count("20190314_7") + count("20190313_6"))
  }
}
今天到現(xiàn)在為止搜索引擎引流過來的實戰(zhàn)課程訪問量
  • HBase表設(shè)計

創(chuàng)建表:create 'imooc_search_course_clickcount', 'info'
RowKey設(shè)計:day_referer_courseid

  • HBase DAO
package com.test.project.dao

import com.test.project.domain.{CourseClickCount, CourseSearchClickCount}
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.util.Bytes
import utils.HbaseUtil

import scala.collection.mutable.ListBuffer

/**
  * 訪問層
  */
object CourseSearchClickCountDAO {

  val tableName = "imooc_course_search_clickcount"
  val cf = "info"
  val qualifer = "click_count"

  def save(list: ListBuffer[CourseSearchClickCount]) = {

    val table = HbaseUtil.getInstance().getTable(tableName)
    for(ele <- list) {
      table.incrementColumnValue(Bytes.toBytes(ele.day_search_course),
        Bytes.toBytes(cf),
        Bytes.toBytes(qualifer),
        ele.click_count)
    }
  }

  def count(day_course:String):Long = {

    val table = HbaseUtil.getInstance().getTable(tableName)
    val get = new Get(Bytes.toBytes(day_course))
    val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)

    if(value == null){
      0l
    } else{
      Bytes.toLong(value)
    }
  }

  def main(args: Array[String]): Unit = {

    println(count("20190315_www.baidu.com_112"))
  }
}
package com.test.project

import com.test.project.dao.{CourseClickCountDAO, CourseSearchClickCountDAO}
import com.test.project.domain.{ClickLog, CourseClickCount, CourseSearchClickCount}
import com.test.project.util.DataUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable.ListBuffer

object ImoocStatStreamingApp {

  def main(args: Array[String]): Unit = {

    if(args.length != 4){
      System.err.println("Usage KafkaReceiverWordCount <zkQuorum> <group> <topic> <numThread>")
      System.exit(1)
    }

    val Array(zkQuorum, group, topic, numThread) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(60))

    val topicMap = topic.split(",").map((_, numThread.toInt)).toMap

    val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

    //測試數(shù)據(jù)接收
//    message.map(_._2).print()

    val logs = message.map(_._2)
    val cleanData = logs.map(line => {
      val infos = line.split("\t")
      val url = infos(2).split(" ")(1)

      var courseId = 0

      if(url.startsWith("/class")){
        val courseIdHTML = url.split("/")(2)
        courseId = courseIdHTML.substring(0,courseIdHTML.lastIndexOf(".")).toInt
      }
      ClickLog(infos(0), DataUtils.parse(infos(1)), courseId, infos(3).toInt, infos(4))
    }).filter(clicklog => clicklog.courseId != 0)

//    cleanData.print()

    /**
      * 統(tǒng)計今天到現(xiàn)在為止實戰(zhàn)課程訪問量
      */
    cleanData.map(x => {
      (x.time+"_"+x.courseId, 1)
    }).reduceByKey(_+_).foreachRDD(rdd => {
      rdd.foreachPartition(partitionRecodes => {
        val list = new ListBuffer[CourseClickCount]
        partitionRecodes.foreach(pair => {
          list += (CourseClickCount(pair._1, pair._2))
        })
        CourseClickCountDAO.save(list)
      })
    })

    /**
      * 統(tǒng)計從搜索引擎過來的課程訪問量
      */

    cleanData.map(x => {
      val referer = x.referer.replaceAll("http://","/")
      val splits = referer.split("/")
      var host = ""
      if(splits.length > 2){
        host = splits(1)
      }

      (host, x.courseId, x.time)
    }).filter(_._1!="").map(x => {
      (x._3 + "_" + x._1+ "_" + x._2, 1)
    }).reduceByKey(_+_).foreachRDD(rdd => {
      rdd.foreachPartition(partitionRecodes => {
        val list = new ListBuffer[CourseSearchClickCount]
        partitionRecodes.foreach(pair => {
          list += (CourseSearchClickCount(pair._1, pair._2))
        })
        CourseSearchClickCountDAO.save(list)
      })
    })

    ssc.start()
    ssc.awaitTermination()
  }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市蔚鸥,隨后出現(xiàn)的幾起案子惜论,更是在濱河造成了極大的恐慌,老刑警劉巖止喷,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件馆类,死亡現(xiàn)場離奇詭異,居然都是意外死亡弹谁,警方通過查閱死者的電腦和手機乾巧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來预愤,“玉大人沟于,你說我怎么就攤上這事≈部担” “怎么了社裆?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長向图。 經(jīng)常有香客問我,道長标沪,這世上最難降的妖魔是什么榄攀? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮金句,結(jié)果婚禮上檩赢,老公的妹妹穿的比我還像新娘。我一直安慰自己违寞,他們只是感情好贞瞒,可當(dāng)我...
    茶點故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著趁曼,像睡著了一般军浆。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上挡闰,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天乒融,我揣著相機與錄音掰盘,去河邊找鬼。 笑死赞季,一個胖子當(dāng)著我的面吹牛愧捕,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播申钩,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼次绘,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了撒遣?” 一聲冷哼從身側(cè)響起邮偎,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎愉舔,沒想到半個月后钢猛,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡轩缤,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年命迈,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片火的。...
    茶點故事閱讀 39,696評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡壶愤,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出馏鹤,到底是詐尸還是另有隱情征椒,我是刑警寧澤,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布湃累,位于F島的核電站勃救,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏治力。R本人自食惡果不足惜蒙秒,卻給世界環(huán)境...
    茶點故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望宵统。 院中可真熱鬧晕讲,春花似錦、人聲如沸马澈。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽痊班。三九已至勤婚,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間涤伐,已是汗流浹背蛔六。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工荆永, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人国章。 一個月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓具钥,卻偏偏與公主長得像,于是被迫代替她去往敵國和親液兽。 傳聞我的和親對象是個殘疾皇子骂删,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,592評論 2 353

推薦閱讀更多精彩內(nèi)容