Spark日志分析案例


通過日志分析每個移動設備對服務器的訪問的總上行流量涵但,下行流量涧尿。然后先根據(jù)上行流量倒排序,如果相等就根據(jù)下行流量倒排序珊燎,如果上行流量和下行流量都相等拄丰,就根據(jù)時間戳排序。

AppAccessLog.java

該類是整個Spark應用的主類俐末,這里面主要寫業(yè)務邏輯代碼。包含計算總上行和下行流量奄侠,RDD的一些操作等卓箫。

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;
import scala.Tuple4;

public class AppAccessLog {

    public static void main(String[] args) {
        
        SparkConf conf = new SparkConf().setAppName("AppAccessLog");
        
        JavaSparkContext sc = new JavaSparkContext(conf);
        SparkSession spark = SparkSession.builder().enableHiveSupport().getOrCreate();
        
        // create RDD
        JavaRDD<String> AppAccessLogRDD = sc.textFile("hdfs:///temp/data/access.log");
        
        // transform into PairRDD
        JavaPairRDD<String, AppAccessLogInfo> AppAccessLogPairRDD = 
                mapToPairRDD(AppAccessLogRDD);
        
        // aggregate DeviceID 
        JavaPairRDD<String, AppAccessLogInfo> AppAccessAggregatePairRDD = 
                agggregateToPairRDD(AppAccessLogPairRDD);
        
        // transform into sortByKeyPairRDD 
        JavaPairRDD<AppAccessLogSortInfo, String> AppAccessSortByKeyLogPairRDD = 
                mapToSortByKeyPairRDD(AppAccessAggregatePairRDD);
        
        // transformation sortBykey
        JavaPairRDD<AppAccessLogSortInfo, String> resultRDD = 
                AppAccessSortByKeyLogPairRDD.sortByKey(false);
        
        // get Top 10
        List<Tuple2<AppAccessLogSortInfo, String>> top10 = resultRDD.take(10);
        
        // print Top 10
        for(Tuple2<AppAccessLogSortInfo, String> t : top10) {
            System.out.println(t._2+" " + t._1.getUpTraffic() + " " + t._1.getDownTraffic() + " " + t._1.getTimpStamp());
        }
        
        // JDK 1.8
        // top10.forEach(t -> System.out.println(t._2+" " + t._1.getUpTraffic() + " " + t._1.getDownTraffic() + " " + t._1.getTimpStamp()));
        
        // create RowRDD
        JavaRDD<Row> rowRDD = mapToRowRDD(resultRDD);
        // create schema
        ArrayList<StructField> fields = getColumnName();
        StructType schema = DataTypes.createStructType(fields);
        // create DataFrame
        Dataset<Row> logDF = spark.createDataFrame(rowRDD, schema);
        
        // save to Hive
        logDF.write().mode("overwrite").saveAsTable("test.log");
        
        spark.close();
        sc.close();
    }

    // 
    private static ArrayList<StructField> getColumnName() {
        ArrayList<StructField> fields = new ArrayList<StructField>();  
        StructField field = null;  
        field = DataTypes.createStructField("timeStame", DataTypes.StringType, true);  
        fields.add(field);  
        field = DataTypes.createStructField("DeviceID", DataTypes.StringType, true);  
        fields.add(field);  
        field = DataTypes.createStructField("upTraffic", DataTypes.StringType, true);  
        fields.add(field);
        field = DataTypes.createStructField("downTraffic", DataTypes.StringType, true);  
        fields.add(field);
        return fields;
    }

    // transform resultRDD into RowRDD
    private static JavaRDD<Row> mapToRowRDD(JavaPairRDD<AppAccessLogSortInfo, String> resultRDD) {

        JavaRDD<Tuple4<String, String, String, String>> tempRDD = resultRDD
                .map(new Function<Tuple2<AppAccessLogSortInfo, String>, Tuple4<String, String, String, String>>() {

                    private static final long serialVersionUID = 7952741378495112332L;

                    @Override
                    public Tuple4<String, String, String, String> call(Tuple2<AppAccessLogSortInfo, String> tuple)
                            throws Exception {

                        String DeviceID = tuple._2;
                        AppAccessLogSortInfo accessLogSortInfo = tuple._1;

                        return new Tuple4<String, String, String, String>(
                                String.valueOf(accessLogSortInfo.getTimpStamp()), DeviceID,
                                String.valueOf(accessLogSortInfo.getUpTraffic()),
                                String.valueOf(accessLogSortInfo.getDownTraffic()));
                    }
                });
        return tempRDD.map(new Function<Tuple4<String, String, String, String>, Row>() {

            private static final long serialVersionUID = -1227536252899303985L;

            @Override
            public Row call(Tuple4<String, String, String, String> tuple) throws Exception {

                return RowFactory.create(tuple._1(), tuple._2(), tuple._3(), tuple._4());
            }
        });
        
    }

    // aggregate DeviceID calculate total of upTraffic/downTraffic and select
    // minimum timeStamp
    private static JavaPairRDD<String, AppAccessLogInfo> agggregateToPairRDD(
            JavaPairRDD<String, AppAccessLogInfo> appAccessLogPairRDD) {

        // transformation reduceByKey
        return appAccessLogPairRDD.reduceByKey(new Function2<AppAccessLogInfo, AppAccessLogInfo, AppAccessLogInfo>() {

            private static final long serialVersionUID = -8552789221394152834L;

            @Override
            public AppAccessLogInfo call(AppAccessLogInfo v1, AppAccessLogInfo v2) throws Exception {

                Long timeStamp = v1.getTimeStamp() > v2.getTimeStamp() ? v2.getTimeStamp() : v1.getTimeStamp();
                Long upTraffic = v1.getUpTraffic() + v2.getUpTraffic();
                Long downTraffic = v1.getDownTraffic() + v2.getDownTraffic();

                AppAccessLogInfo accessLogInfo = new AppAccessLogInfo();
                accessLogInfo.setUpTraffic(upTraffic);
                accessLogInfo.setDownTraffic(downTraffic);
                accessLogInfo.setTimeStamp(timeStamp);

                return accessLogInfo;
            }
        });
    }

    // transform AppAccessLogPairRDD into AppAccessSortByKeyLogPairRDD
    private static JavaPairRDD<AppAccessLogSortInfo, String> mapToSortByKeyPairRDD(
            JavaPairRDD<String, AppAccessLogInfo> AppAccessAggregatePairRDD) {

        // transformation mapToPair
        return AppAccessAggregatePairRDD
                .mapToPair(new PairFunction<Tuple2<String, AppAccessLogInfo>, AppAccessLogSortInfo, String>() {

                    private static final long serialVersionUID = -4778843695438540948L;

                    @Override
                    public Tuple2<AppAccessLogSortInfo, String> call(Tuple2<String, AppAccessLogInfo> tuple)
                            throws Exception {

                        String DeviceID = tuple._1;
                        AppAccessLogInfo appAccessLogInfo = tuple._2;

                        AppAccessLogSortInfo accessLogSortInfo = new AppAccessLogSortInfo();
                        accessLogSortInfo.setTimpStamp(appAccessLogInfo.getTimeStamp());
                        accessLogSortInfo.setUpTraffic(appAccessLogInfo.getUpTraffic());
                        accessLogSortInfo.setDownTraffic(appAccessLogInfo.getDownTraffic());

                        return new Tuple2<AppAccessLogSortInfo, String>(accessLogSortInfo, DeviceID);
                    }
                });
    }

    // transform AppAccessLogRDD into AppAccessLogPairRDD
    private static JavaPairRDD<String, AppAccessLogInfo> mapToPairRDD(JavaRDD<String> AppAccessLogRDD) {

        // transformation mapToPair
        return AppAccessLogRDD.mapToPair(new PairFunction<String, String, AppAccessLogInfo>() {

            private static final long serialVersionUID = 5998646612001714125L;

            @Override
            public Tuple2<String, AppAccessLogInfo> call(String line) throws Exception {

                String[] lineSplitArray = line.split("\t");

                String DeviceID = lineSplitArray[1];
                Long timeStamp = Long.valueOf(lineSplitArray[0]);
                Long upTraffic = Long.valueOf(lineSplitArray[2]);
                Long downTraffic = Long.valueOf(lineSplitArray[3]);

                AppAccessLogInfo appAccessLogInfo = new AppAccessLogInfo();
                appAccessLogInfo.setTimeStamp(timeStamp);
                appAccessLogInfo.setUpTraffic(upTraffic);
                appAccessLogInfo.setDownTraffic(downTraffic);

                return new Tuple2<String, AppAccessLogInfo>(DeviceID, appAccessLogInfo);
            }
        });
    }
}

這個類中除了mian方法以外,還有三個比較重要的方法垄潮,mapToPairRDD烹卒,agggregateToPairRDD闷盔,mapToSortByKeyPairRDD

  1. 從main方法開始看旅急,首先需要創(chuàng)建一個SparkContext逢勾,然后通過SparkContext來創(chuàng)建初始RDD。在這個過程中藐吮。Spark會完成一系列的初始化工作溺拱,包括向Master注冊Application,啟動Excutor谣辞,以及Excutor的反向注冊等迫摔。

  2. 接下來,調(diào)用mapToPair方法把初始RDD轉(zhuǎn)換成PairRDD泥从,為了后面做聚合操作句占,在這里,用一個實體類AppAccessLogInfo把每條記錄的upTraffic躯嫉,downTraffic纱烘,timeStamp進行封裝,然后祈餐,使用DeviceID來作為Key擂啥,AppAccessLogInfo類對象作為值,最終得到一個PairRDD昼弟。

  3. 然后啤它,調(diào)用agggregateToPairRDD方法對AppAccessLogPairRDD做聚合操作,在這個方法中舱痘,調(diào)用了AppAccessLogPairRDD的reduceByKey方法通過DeviceID(設備ID)來計算每臺設備的總上行流量/下行流量变骡,由于每臺設備對應多個訪問時間戳,在這里取最小的當作后面排序的依據(jù)芭逝。

  4. 當做完reduceByKey之后塌碌,就需要對總上行流量,總下行流量旬盯,時間戳進行排序了台妆。首先,要想使用PairRDD的sortByKey方法胖翰,需要改變RDD的結(jié)構(gòu)接剩,這里需要調(diào)用mapToSortByKeyPairRDD方法,該方法需要另一個實體類AppAccessLogSortInfo萨咳,把使用aggragateToPairRDD方法得到的AppAccessAggregatePairRDD中的每個Tuple2類型的元素的value所封裝的信息提取出來懊缺,并把這些信息重新使用AppAccessLogSortInfo類來封裝,來組成一個能夠?qū)崿F(xiàn)Key排序的AppAccessLogSortByKeyPairRDD培他。

  5. 最后調(diào)用AppAccessLogSortByKeyPairRDD的sortByKey方法排序鹃两,并獲取訪問流量最大的前十個設備遗座。

  6. 把最后結(jié)果保存到Hive,首先需要把resultRDD轉(zhuǎn)換成JavaRDD<Row> 類型的RDD俊扳,在 mapToRowRDD這個方法中途蒋,把所有的數(shù)據(jù)都提取出來,并把每一行包裝成Tuple4類型(四個字段)馋记,然后使用RowFactory.create()方法号坡,把每一個Tuple4轉(zhuǎn)換成Row對象(要把Tuple4的每一個元素都傳進RowFactory.create()方法),就得到了JavaRDD<Row>類型的RDD抗果。接下來需要創(chuàng)建Schema筋帖,首先要對Row對象的每一個元素創(chuàng)建StructField對象也就是field(列名),使用DataTypes.createStructField()方法冤馏,其中第一個參數(shù)是列名日麸,第二個參數(shù)是列字段類型,第三個參數(shù)代表是否允許為空逮光。最后使用DataTypes.createStructTypes()方法創(chuàng)建Schema代箭,該方法的參數(shù)是一個存放StructField對象的集合(里面存放著RowRDD每一列的列名),返回值為StructType涕刚。最后調(diào)用SparkSession對象的createDataFrame方法嗡综,創(chuàng)建一個DataFrame,其中第一個參數(shù)是RowRDD杜漠,第二個參數(shù)是Schema极景,返回值是Dataset<Row>類型。然后把創(chuàng)建好的DataFrame保存到Hive表

AppAccessLogInfo.java

由于該類對象其實是作為PairRDD的value驾茴,需要在網(wǎng)絡間傳輸盼樟,所以需要實現(xiàn)Serializable接口,使之能夠進行序列化和反序列化锈至。

import java.io.Serializable;

public class AppAccessLogInfo implements Serializable{

    private static final long serialVersionUID = 2298114085058810487L;

    private Long timeStamp;
    private Long upTraffic;
    private Long downTraffic;
    
    public AppAccessLogInfo() {}
    
    public AppAccessLogInfo(Long timeStamp, Long upTraffic, Long downTraffic) {
        this.timeStamp = timeStamp;
        this.upTraffic = upTraffic;
        this.downTraffic = downTraffic;
    }
    
    public Long getTimeStamp() {
        return timeStamp;
    }

    public void setTimeStamp(Long timeStamp) {
        this.timeStamp = timeStamp;
    }
    public Long getUpTraffic() {
        return upTraffic;
    }

    public void setUpTraffic(Long upTraffic) {
        this.upTraffic = upTraffic;
    }

    public Long getDownTraffic() {
        return downTraffic;
    }

    public void setDownTraffic(Long downTraffic) {
        this.downTraffic = downTraffic;
    }
        
}

AppAccessLogSortInfo.java

和AppAccessLogInfo類一樣晨缴,這個類也需要實現(xiàn)Serializable接口,同時還需要實現(xiàn)Ordered接口峡捡,因為需要對此類的對象進行排序击碗。

import java.io.Serializable;

import scala.math.Ordered;

/**
 *      need implements Ordered interface and Serializable interface
 *
 */
public class AppAccessLogSortInfo implements Ordered<AppAccessLogSortInfo>, Serializable {

    private static final long serialVersionUID = 7006437160384780829L;

    private Long timeStamp;
    private Long upTraffic;
    private Long downTraffic;

    public AppAccessLogSortInfo() {}

    public AppAccessLogSortInfo(Long timeStamp, Long upTraffic, Long downTraffic) {
        super();
        this.timeStamp = timeStamp;
        this.upTraffic = upTraffic;
        this.downTraffic = downTraffic;
    }

    @Override
    public boolean $greater(AppAccessLogSortInfo other) {
        if (upTraffic > other.upTraffic) {
            return true;
        } else if (upTraffic == other.upTraffic && downTraffic > other.downTraffic) {
            return true;
        } else if (upTraffic == other.upTraffic && downTraffic == other.downTraffic && timeStamp > other.timeStamp) {
            return true;
        }
        return false;
    }

    @Override
    public boolean $greater$eq(AppAccessLogSortInfo other) {
        if($greater(other)) {
            return true;
        } else if (upTraffic == other.upTraffic && downTraffic == other.downTraffic && timeStamp == other.timeStamp){ 
            return true;
        }
        return false;
    }

    @Override
    public boolean $less(AppAccessLogSortInfo other) {
        if(upTraffic < other.upTraffic) {
            return true;
        } else if (upTraffic == other.upTraffic && downTraffic < other.downTraffic) {
            return true;
        } else if (upTraffic == other.upTraffic && downTraffic == other.downTraffic && timeStamp < other.timeStamp) {
            return true;
        }
        return false;
    }

    @Override
    public boolean $less$eq(AppAccessLogSortInfo other) {
        if($less(other)) {
            return true;
        } else if (upTraffic == other.upTraffic && downTraffic == other.downTraffic && timeStamp == other.timeStamp) {
            return true;
        }
        return false;
    }

    @Override
    public int compare(AppAccessLogSortInfo other) {
        
        int timeStampGap = (int) (timeStamp - other.timeStamp);
        int upTrafficGap = (int) (upTraffic - other.upTraffic);
        int downTrafficGap = (int) (downTraffic - other.downTraffic);
        
        if(upTrafficGap != 0) {
            return upTrafficGap;
        } else if (downTrafficGap != 0) {
            return downTrafficGap;
        } else if (timeStampGap != 0) {
            return timeStampGap;
        }
        return 0;
    }

    @Override
    public int compareTo(AppAccessLogSortInfo other) {
    
        int timeStampGap = (int) (timeStamp - other.timeStamp);
        int upTrafficGap = (int) (upTraffic - other.upTraffic);
        int downTrafficGap = (int) (downTraffic - other.downTraffic);
        
        if(upTrafficGap != 0) {
            return upTrafficGap;
        } else if (downTrafficGap != 0) {
            return downTrafficGap;
        } else if (timeStampGap != 0) {
            return timeStampGap;
        }
        return 0;
    }

    public Long getTimpStamp() {
        return timeStamp;
    }

    public void setTimpStamp(Long timpStamp) {
        this.timeStamp = timpStamp;
    }

    public Long getUpTraffic() {
        return upTraffic;
    }

    public void setUpTraffic(Long upTraffic) {
        this.upTraffic = upTraffic;
    }

    public Long getDownTraffic() {
        return downTraffic;
    }

    public void setDownTraffic(Long downTraffic) {
        this.downTraffic = downTraffic;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((downTraffic == null) ? 0 : downTraffic.hashCode());
        result = prime * result + ((timeStamp == null) ? 0 : timeStamp.hashCode());
        result = prime * result + ((upTraffic == null) ? 0 : upTraffic.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        AppAccessLogSortInfo other = (AppAccessLogSortInfo) obj;
        if (downTraffic == null) {
            if (other.downTraffic != null)
                return false;
        } else if (!downTraffic.equals(other.downTraffic))
            return false;
        if (timeStamp == null) {
            if (other.timeStamp != null)
                return false;
        } else if (!timeStamp.equals(other.timeStamp))
            return false;
        if (upTraffic == null) {
            if (other.upTraffic != null)
                return false;
        } else if (!upTraffic.equals(other.upTraffic))
            return false;
        return true;
    }

}

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市们拙,隨后出現(xiàn)的幾起案子稍途,更是在濱河造成了極大的恐慌,老刑警劉巖砚婆,帶你破解...
    沈念sama閱讀 219,539評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件械拍,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機殊者,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評論 3 396
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來验夯,“玉大人猖吴,你說我怎么就攤上這事』幼” “怎么了海蔽?”我有些...
    開封第一講書人閱讀 165,871評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長绑谣。 經(jīng)常有香客問我党窜,道長,這世上最難降的妖魔是什么借宵? 我笑而不...
    開封第一講書人閱讀 58,963評論 1 295
  • 正文 為了忘掉前任幌衣,我火速辦了婚禮,結(jié)果婚禮上壤玫,老公的妹妹穿的比我還像新娘豁护。我一直安慰自己,他們只是感情好欲间,可當我...
    茶點故事閱讀 67,984評論 6 393
  • 文/花漫 我一把揭開白布楚里。 她就那樣靜靜地躺著,像睡著了一般猎贴。 火紅的嫁衣襯著肌膚如雪班缎。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,763評論 1 307
  • 那天她渴,我揣著相機與錄音达址,去河邊找鬼。 笑死惹骂,一個胖子當著我的面吹牛苏携,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播对粪,決...
    沈念sama閱讀 40,468評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼右冻,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了著拭?” 一聲冷哼從身側(cè)響起纱扭,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎儡遮,沒想到半個月后乳蛾,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,850評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,002評論 3 338
  • 正文 我和宋清朗相戀三年肃叶,在試婚紗的時候發(fā)現(xiàn)自己被綠了蹂随。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,144評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡因惭,死狀恐怖岳锁,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蹦魔,我是刑警寧澤激率,帶...
    沈念sama閱讀 35,823評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站勿决,受9級特大地震影響乒躺,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜低缩,卻給世界環(huán)境...
    茶點故事閱讀 41,483評論 3 331
  • 文/蒙蒙 一嘉冒、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧表制,春花似錦健爬、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至壤短,卻和暖如春设拟,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背久脯。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評論 1 272
  • 我被黑心中介騙來泰國打工纳胧, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人帘撰。 一個月前我還...
    沈念sama閱讀 48,415評論 3 373
  • 正文 我出身青樓跑慕,卻偏偏與公主長得像,于是被迫代替她去往敵國和親摧找。 傳聞我的和親對象是個殘疾皇子核行,可洞房花燭夜當晚...
    茶點故事閱讀 45,092評論 2 355

推薦閱讀更多精彩內(nèi)容