Kafka+Spark Streaming進行網(wǎng)站黑名單實時過濾


開發(fā)環(huán)境:

  • spark 2.3
  • kafka 1.1.1

黑名單數(shù)據(jù)是從mysql中獲取的锈至。源數(shù)據(jù)是從kafka中獲取的晨缴,數(shù)據(jù)格式就是簡單的姓名,為了與黑名單數(shù)據(jù)做join峡捡,源數(shù)據(jù)和黑名單數(shù)據(jù)都需要轉換成鍵值對的形式击碗。

Java代碼:

package cn.spark.streaming;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import kafka.serializer.StringDecoder;
import scala.Tuple2;

/**
 * use transform filter balcklist
 * base on kafka message queue 
 *
 */
public class BlackListFilter {

    public static void main(String[] args) throws Exception{
        
        SparkConf conf = new SparkConf().setAppName("BlackListFilter");
        
        // create context
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
        
        // open checkpoint mechanism 
        jssc.checkpoint(args[0]);
        
        // properties map 
        Map<String, String> KafkaParams = new HashMap<String, String>();
        KafkaParams.put("bootstrap.servers", "hserver-1:9092,hserver-2:9092,hserver-3:9092");
        KafkaParams.put("gruop.id", "BlackListFilter");
        KafkaParams.put("auto.offest.reset", "smallest");
        
        // topic set
        Set<String> topics = new HashSet<String>();
        topics.add(args[1]);
        
        // create DStream
        JavaPairInputDStream<String, String> InputPairDstream = 
                KafkaUtils.createDirectStream(
                        jssc, 
                        String.class,
                        String.class,
                        StringDecoder.class,
                        StringDecoder.class,
                        KafkaParams,
                        topics
                        );
        
        // get blocklist from mysql
        SparkSession spark = SparkSession
                    .builder()
                    .enableHiveSupport()
                    .getOrCreate();
        
        // read data --> return Dataset
        Dataset<Row> BlackList = spark
                    .read()
                    .format("jdbc")
                    .option("url", "jdbc:mysql://hserver-1:3306/retail_db")
                    .option("driver", "com.mysql.jdbc.Driver")
                    .option("dbtable", "blacklist")
                    .option("username", "root")
                    .option("password", "root")
                    .load();
        
        // transform Dataset into JavaRDD
        JavaRDD<Row> BlackListRDD = BlackList.toJavaRDD();
        
        // transform JavaRDD into JavaPairRDD --> the second element type is Boolean
        final JavaPairRDD<String, Boolean> BlackListPairRDD = 
                BlackListRDD.mapToPair(
                
                    new PairFunction<Row, String, Boolean>() {
    
                        private static final long serialVersionUID = -6634120981007776151L;
    
                        @Override
                        public Tuple2<String, Boolean> call(Row name) throws Exception {
                        
                            return new Tuple2<String, Boolean>(name.getString(0), true);
                        }
                    });
        
        // transform kafka data flow
        JavaDStream<String> VaildListDStream = InputPairDstream.transform(
                
                new Function<JavaPairRDD<String,String>, JavaRDD<String>>() {


                    private static final long serialVersionUID = -7488950207291980402L;

                    @Override
                    public JavaRDD<String> call(JavaPairRDD<String, String> KafkaDataRDD) throws Exception {
                        
                        // create source RDD --> UserRDD: access log
                        JavaPairRDD<String, String> UserRDD = 
                                KafkaDataRDD.mapToPair(
                                
                                    new PairFunction<Tuple2<String,String>, String, String>() {
    
                                        private static final long serialVersionUID = 1L;
    
                                        @Override
                                        public Tuple2<String, String> call(Tuple2<String, String> tuple) throws Exception {
                                            
                                            return new Tuple2<String, String>(tuple._2, "........");
                                        }
                                    });
                        
                        // leftOutJoin
                        JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> JoinRDD = 
                                UserRDD.leftOuterJoin(BlackListPairRDD);
                        
                        // do blacklist filtering
                        JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> FilterRDD = 
                                JoinRDD.filter(
                                    
                                        new Function<Tuple2<String,Tuple2<String,Optional<Boolean>>>, Boolean>() {
                                
                                        private static final long serialVersionUID = 791090533213057710L;
    
                                        @Override
                                        public Boolean call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple) throws Exception {
                                            
                                            if(tuple._2._2.isPresent() && tuple._2._2.get()){
                                                
                                                return false;
                                            } else {
                                                return true;
                                            }
                                        }
                                    });

                        // mapToPair 
                        JavaRDD<String> resultRDD = 
                                FilterRDD.map(
                                    new Function<
                                    Tuple2<String,Tuple2<String,Optional<Boolean>>>, String>() {

                                        private static final long serialVersionUID = -54290472445703194L;

                                        @Override
                                        public String call(Tuple2<String, Tuple2<String, Optional<Boolean>>> tuple)
                                                throws Exception {

                                            return tuple._1 + "--->" + tuple._2._1;
                                        }
                                });
                        
                        return resultRDD;
                    }
                });
        
        // print result
        VaildListDStream.print();
        
        jssc.start();
        
        jssc.awaitTermination();
        
        jssc.close();
        
        spark.close();
        
    }
    
}

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市们拙,隨后出現(xiàn)的幾起案子稍途,更是在濱河造成了極大的恐慌,老刑警劉巖砚婆,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件械拍,死亡現(xiàn)場離奇詭異,居然都是意外死亡装盯,警方通過查閱死者的電腦和手機坷虑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來验夯,“玉大人猖吴,你說我怎么就攤上這事』幼” “怎么了海蔽?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長绑谣。 經常有香客問我党窜,道長,這世上最難降的妖魔是什么借宵? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任幌衣,我火速辦了婚禮,結果婚禮上,老公的妹妹穿的比我還像新娘豁护。我一直安慰自己哼凯,他們只是感情好,可當我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布楚里。 她就那樣靜靜地躺著断部,像睡著了一般。 火紅的嫁衣襯著肌膚如雪班缎。 梳的紋絲不亂的頭發(fā)上蝴光,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天,我揣著相機與錄音达址,去河邊找鬼蔑祟。 笑死,一個胖子當著我的面吹牛沉唠,可吹牛的內容都是我干的疆虚。 我是一名探鬼主播,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼右冻,長吁一口氣:“原來是場噩夢啊……” “哼装蓬!你這毒婦竟也來了?” 一聲冷哼從身側響起纱扭,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤牍帚,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后乳蛾,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體暗赶,經...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年肃叶,在試婚紗的時候發(fā)現(xiàn)自己被綠了蹂随。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡因惭,死狀恐怖岳锁,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情蹦魔,我是刑警寧澤激率,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站勿决,受9級特大地震影響乒躺,放射性物質發(fā)生泄漏。R本人自食惡果不足惜低缩,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一嘉冒、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦讳推、人聲如沸顶籽。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蜕衡。三九已至,卻和暖如春设拟,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背久脯。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工纳胧, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人帘撰。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓跑慕,卻偏偏與公主長得像,于是被迫代替她去往敵國和親摧找。 傳聞我的和親對象是個殘疾皇子核行,可洞房花燭夜當晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內容