通過日志分析每個移動設備對服務器的訪問的總上行流量涵但,下行流量涧尿。然后先根據(jù)上行流量倒排序,如果相等就根據(jù)下行流量倒排序珊燎,如果上行流量和下行流量都相等拄丰,就根據(jù)時間戳排序。
AppAccessLog.java
該類是整個Spark應用的主類俐末,這里面主要寫業(yè)務邏輯代碼。包含計算總上行和下行流量奄侠,RDD的一些操作等卓箫。
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import scala.Tuple4;
public class AppAccessLog {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("AppAccessLog");
JavaSparkContext sc = new JavaSparkContext(conf);
SparkSession spark = SparkSession.builder().enableHiveSupport().getOrCreate();
// create RDD
JavaRDD<String> AppAccessLogRDD = sc.textFile("hdfs:///temp/data/access.log");
// transform into PairRDD
JavaPairRDD<String, AppAccessLogInfo> AppAccessLogPairRDD =
mapToPairRDD(AppAccessLogRDD);
// aggregate DeviceID
JavaPairRDD<String, AppAccessLogInfo> AppAccessAggregatePairRDD =
agggregateToPairRDD(AppAccessLogPairRDD);
// transform into sortByKeyPairRDD
JavaPairRDD<AppAccessLogSortInfo, String> AppAccessSortByKeyLogPairRDD =
mapToSortByKeyPairRDD(AppAccessAggregatePairRDD);
// transformation sortBykey
JavaPairRDD<AppAccessLogSortInfo, String> resultRDD =
AppAccessSortByKeyLogPairRDD.sortByKey(false);
// get Top 10
List<Tuple2<AppAccessLogSortInfo, String>> top10 = resultRDD.take(10);
// print Top 10
for(Tuple2<AppAccessLogSortInfo, String> t : top10) {
System.out.println(t._2+" " + t._1.getUpTraffic() + " " + t._1.getDownTraffic() + " " + t._1.getTimpStamp());
}
// JDK 1.8
// top10.forEach(t -> System.out.println(t._2+" " + t._1.getUpTraffic() + " " + t._1.getDownTraffic() + " " + t._1.getTimpStamp()));
// create RowRDD
JavaRDD<Row> rowRDD = mapToRowRDD(resultRDD);
// create schema
ArrayList<StructField> fields = getColumnName();
StructType schema = DataTypes.createStructType(fields);
// create DataFrame
Dataset<Row> logDF = spark.createDataFrame(rowRDD, schema);
// save to Hive
logDF.write().mode("overwrite").saveAsTable("test.log");
spark.close();
sc.close();
}
//
private static ArrayList<StructField> getColumnName() {
ArrayList<StructField> fields = new ArrayList<StructField>();
StructField field = null;
field = DataTypes.createStructField("timeStame", DataTypes.StringType, true);
fields.add(field);
field = DataTypes.createStructField("DeviceID", DataTypes.StringType, true);
fields.add(field);
field = DataTypes.createStructField("upTraffic", DataTypes.StringType, true);
fields.add(field);
field = DataTypes.createStructField("downTraffic", DataTypes.StringType, true);
fields.add(field);
return fields;
}
// transform resultRDD into RowRDD
private static JavaRDD<Row> mapToRowRDD(JavaPairRDD<AppAccessLogSortInfo, String> resultRDD) {
JavaRDD<Tuple4<String, String, String, String>> tempRDD = resultRDD
.map(new Function<Tuple2<AppAccessLogSortInfo, String>, Tuple4<String, String, String, String>>() {
private static final long serialVersionUID = 7952741378495112332L;
@Override
public Tuple4<String, String, String, String> call(Tuple2<AppAccessLogSortInfo, String> tuple)
throws Exception {
String DeviceID = tuple._2;
AppAccessLogSortInfo accessLogSortInfo = tuple._1;
return new Tuple4<String, String, String, String>(
String.valueOf(accessLogSortInfo.getTimpStamp()), DeviceID,
String.valueOf(accessLogSortInfo.getUpTraffic()),
String.valueOf(accessLogSortInfo.getDownTraffic()));
}
});
return tempRDD.map(new Function<Tuple4<String, String, String, String>, Row>() {
private static final long serialVersionUID = -1227536252899303985L;
@Override
public Row call(Tuple4<String, String, String, String> tuple) throws Exception {
return RowFactory.create(tuple._1(), tuple._2(), tuple._3(), tuple._4());
}
});
}
// aggregate DeviceID calculate total of upTraffic/downTraffic and select
// minimum timeStamp
private static JavaPairRDD<String, AppAccessLogInfo> agggregateToPairRDD(
JavaPairRDD<String, AppAccessLogInfo> appAccessLogPairRDD) {
// transformation reduceByKey
return appAccessLogPairRDD.reduceByKey(new Function2<AppAccessLogInfo, AppAccessLogInfo, AppAccessLogInfo>() {
private static final long serialVersionUID = -8552789221394152834L;
@Override
public AppAccessLogInfo call(AppAccessLogInfo v1, AppAccessLogInfo v2) throws Exception {
Long timeStamp = v1.getTimeStamp() > v2.getTimeStamp() ? v2.getTimeStamp() : v1.getTimeStamp();
Long upTraffic = v1.getUpTraffic() + v2.getUpTraffic();
Long downTraffic = v1.getDownTraffic() + v2.getDownTraffic();
AppAccessLogInfo accessLogInfo = new AppAccessLogInfo();
accessLogInfo.setUpTraffic(upTraffic);
accessLogInfo.setDownTraffic(downTraffic);
accessLogInfo.setTimeStamp(timeStamp);
return accessLogInfo;
}
});
}
// transform AppAccessLogPairRDD into AppAccessSortByKeyLogPairRDD
private static JavaPairRDD<AppAccessLogSortInfo, String> mapToSortByKeyPairRDD(
JavaPairRDD<String, AppAccessLogInfo> AppAccessAggregatePairRDD) {
// transformation mapToPair
return AppAccessAggregatePairRDD
.mapToPair(new PairFunction<Tuple2<String, AppAccessLogInfo>, AppAccessLogSortInfo, String>() {
private static final long serialVersionUID = -4778843695438540948L;
@Override
public Tuple2<AppAccessLogSortInfo, String> call(Tuple2<String, AppAccessLogInfo> tuple)
throws Exception {
String DeviceID = tuple._1;
AppAccessLogInfo appAccessLogInfo = tuple._2;
AppAccessLogSortInfo accessLogSortInfo = new AppAccessLogSortInfo();
accessLogSortInfo.setTimpStamp(appAccessLogInfo.getTimeStamp());
accessLogSortInfo.setUpTraffic(appAccessLogInfo.getUpTraffic());
accessLogSortInfo.setDownTraffic(appAccessLogInfo.getDownTraffic());
return new Tuple2<AppAccessLogSortInfo, String>(accessLogSortInfo, DeviceID);
}
});
}
// transform AppAccessLogRDD into AppAccessLogPairRDD
private static JavaPairRDD<String, AppAccessLogInfo> mapToPairRDD(JavaRDD<String> AppAccessLogRDD) {
// transformation mapToPair
return AppAccessLogRDD.mapToPair(new PairFunction<String, String, AppAccessLogInfo>() {
private static final long serialVersionUID = 5998646612001714125L;
@Override
public Tuple2<String, AppAccessLogInfo> call(String line) throws Exception {
String[] lineSplitArray = line.split("\t");
String DeviceID = lineSplitArray[1];
Long timeStamp = Long.valueOf(lineSplitArray[0]);
Long upTraffic = Long.valueOf(lineSplitArray[2]);
Long downTraffic = Long.valueOf(lineSplitArray[3]);
AppAccessLogInfo appAccessLogInfo = new AppAccessLogInfo();
appAccessLogInfo.setTimeStamp(timeStamp);
appAccessLogInfo.setUpTraffic(upTraffic);
appAccessLogInfo.setDownTraffic(downTraffic);
return new Tuple2<String, AppAccessLogInfo>(DeviceID, appAccessLogInfo);
}
});
}
}
這個類中除了mian方法以外,還有三個比較重要的方法垄潮,mapToPairRDD
烹卒,agggregateToPairRDD
闷盔,mapToSortByKeyPairRDD
。
從main方法開始看旅急,首先需要創(chuàng)建一個SparkContext逢勾,然后通過SparkContext來創(chuàng)建初始RDD。在這個過程中藐吮。Spark會完成一系列的初始化工作溺拱,包括向Master注冊Application,啟動Excutor谣辞,以及Excutor的反向注冊等迫摔。
接下來,調(diào)用
mapToPair
方法把初始RDD轉(zhuǎn)換成PairRDD泥从,為了后面做聚合操作句占,在這里,用一個實體類AppAccessLogInfo
把每條記錄的upTraffic躯嫉,downTraffic纱烘,timeStamp進行封裝,然后祈餐,使用DeviceID來作為Key擂啥,AppAccessLogInfo
類對象作為值,最終得到一個PairRDD昼弟。然后啤它,調(diào)用
agggregateToPairRDD
方法對AppAccessLogPairRDD做聚合操作,在這個方法中舱痘,調(diào)用了AppAccessLogPairRDD的reduceByKey
方法通過DeviceID(設備ID)來計算每臺設備的總上行流量/下行流量变骡,由于每臺設備對應多個訪問時間戳,在這里取最小的當作后面排序的依據(jù)芭逝。當做完
reduceByKey
之后塌碌,就需要對總上行流量,總下行流量旬盯,時間戳進行排序了台妆。首先,要想使用PairRDD的sortByKey
方法胖翰,需要改變RDD的結(jié)構(gòu)接剩,這里需要調(diào)用mapToSortByKeyPairRDD
方法,該方法需要另一個實體類AppAccessLogSortInfo
萨咳,把使用aggragateToPairRDD
方法得到的AppAccessAggregatePairRDD中的每個Tuple2類型的元素的value所封裝的信息提取出來懊缺,并把這些信息重新使用AppAccessLogSortInfo
類來封裝,來組成一個能夠?qū)崿F(xiàn)Key排序的AppAccessLogSortByKeyPairRDD培他。最后調(diào)用AppAccessLogSortByKeyPairRDD的sortByKey方法排序鹃两,并獲取訪問流量最大的前十個設備遗座。
把最后結(jié)果保存到Hive,首先需要把resultRDD轉(zhuǎn)換成JavaRDD<Row> 類型的RDD俊扳,在
mapToRowRDD
這個方法中途蒋,把所有的數(shù)據(jù)都提取出來,并把每一行包裝成Tuple4類型(四個字段)馋记,然后使用RowFactory.create()方法号坡,把每一個Tuple4轉(zhuǎn)換成Row對象(要把Tuple4的每一個元素都傳進RowFactory.create()方法),就得到了JavaRDD<Row>類型的RDD抗果。接下來需要創(chuàng)建Schema筋帖,首先要對Row對象的每一個元素創(chuàng)建StructField對象也就是field(列名),使用DataTypes.createStructField()方法冤馏,其中第一個參數(shù)是列名日麸,第二個參數(shù)是列字段類型,第三個參數(shù)代表是否允許為空逮光。最后使用DataTypes.createStructTypes()方法創(chuàng)建Schema代箭,該方法的參數(shù)是一個存放StructField對象的集合(里面存放著RowRDD每一列的列名),返回值為StructType涕刚。最后調(diào)用SparkSession對象的createDataFrame方法嗡综,創(chuàng)建一個DataFrame,其中第一個參數(shù)是RowRDD杜漠,第二個參數(shù)是Schema极景,返回值是Dataset<Row>類型。然后把創(chuàng)建好的DataFrame保存到Hive表
AppAccessLogInfo.java
由于該類對象其實是作為PairRDD的value驾茴,需要在網(wǎng)絡間傳輸盼樟,所以需要實現(xiàn)Serializable接口,使之能夠進行序列化和反序列化锈至。
import java.io.Serializable;
public class AppAccessLogInfo implements Serializable{
private static final long serialVersionUID = 2298114085058810487L;
private Long timeStamp;
private Long upTraffic;
private Long downTraffic;
public AppAccessLogInfo() {}
public AppAccessLogInfo(Long timeStamp, Long upTraffic, Long downTraffic) {
this.timeStamp = timeStamp;
this.upTraffic = upTraffic;
this.downTraffic = downTraffic;
}
public Long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(Long timeStamp) {
this.timeStamp = timeStamp;
}
public Long getUpTraffic() {
return upTraffic;
}
public void setUpTraffic(Long upTraffic) {
this.upTraffic = upTraffic;
}
public Long getDownTraffic() {
return downTraffic;
}
public void setDownTraffic(Long downTraffic) {
this.downTraffic = downTraffic;
}
}
AppAccessLogSortInfo.java
和AppAccessLogInfo類一樣晨缴,這個類也需要實現(xiàn)Serializable接口,同時還需要實現(xiàn)Ordered接口峡捡,因為需要對此類的對象進行排序击碗。
import java.io.Serializable;
import scala.math.Ordered;
/**
* need implements Ordered interface and Serializable interface
*
*/
public class AppAccessLogSortInfo implements Ordered<AppAccessLogSortInfo>, Serializable {
private static final long serialVersionUID = 7006437160384780829L;
private Long timeStamp;
private Long upTraffic;
private Long downTraffic;
public AppAccessLogSortInfo() {}
public AppAccessLogSortInfo(Long timeStamp, Long upTraffic, Long downTraffic) {
super();
this.timeStamp = timeStamp;
this.upTraffic = upTraffic;
this.downTraffic = downTraffic;
}
@Override
public boolean $greater(AppAccessLogSortInfo other) {
if (upTraffic > other.upTraffic) {
return true;
} else if (upTraffic == other.upTraffic && downTraffic > other.downTraffic) {
return true;
} else if (upTraffic == other.upTraffic && downTraffic == other.downTraffic && timeStamp > other.timeStamp) {
return true;
}
return false;
}
@Override
public boolean $greater$eq(AppAccessLogSortInfo other) {
if($greater(other)) {
return true;
} else if (upTraffic == other.upTraffic && downTraffic == other.downTraffic && timeStamp == other.timeStamp){
return true;
}
return false;
}
@Override
public boolean $less(AppAccessLogSortInfo other) {
if(upTraffic < other.upTraffic) {
return true;
} else if (upTraffic == other.upTraffic && downTraffic < other.downTraffic) {
return true;
} else if (upTraffic == other.upTraffic && downTraffic == other.downTraffic && timeStamp < other.timeStamp) {
return true;
}
return false;
}
@Override
public boolean $less$eq(AppAccessLogSortInfo other) {
if($less(other)) {
return true;
} else if (upTraffic == other.upTraffic && downTraffic == other.downTraffic && timeStamp == other.timeStamp) {
return true;
}
return false;
}
@Override
public int compare(AppAccessLogSortInfo other) {
int timeStampGap = (int) (timeStamp - other.timeStamp);
int upTrafficGap = (int) (upTraffic - other.upTraffic);
int downTrafficGap = (int) (downTraffic - other.downTraffic);
if(upTrafficGap != 0) {
return upTrafficGap;
} else if (downTrafficGap != 0) {
return downTrafficGap;
} else if (timeStampGap != 0) {
return timeStampGap;
}
return 0;
}
@Override
public int compareTo(AppAccessLogSortInfo other) {
int timeStampGap = (int) (timeStamp - other.timeStamp);
int upTrafficGap = (int) (upTraffic - other.upTraffic);
int downTrafficGap = (int) (downTraffic - other.downTraffic);
if(upTrafficGap != 0) {
return upTrafficGap;
} else if (downTrafficGap != 0) {
return downTrafficGap;
} else if (timeStampGap != 0) {
return timeStampGap;
}
return 0;
}
public Long getTimpStamp() {
return timeStamp;
}
public void setTimpStamp(Long timpStamp) {
this.timeStamp = timpStamp;
}
public Long getUpTraffic() {
return upTraffic;
}
public void setUpTraffic(Long upTraffic) {
this.upTraffic = upTraffic;
}
public Long getDownTraffic() {
return downTraffic;
}
public void setDownTraffic(Long downTraffic) {
this.downTraffic = downTraffic;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((downTraffic == null) ? 0 : downTraffic.hashCode());
result = prime * result + ((timeStamp == null) ? 0 : timeStamp.hashCode());
result = prime * result + ((upTraffic == null) ? 0 : upTraffic.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
AppAccessLogSortInfo other = (AppAccessLogSortInfo) obj;
if (downTraffic == null) {
if (other.downTraffic != null)
return false;
} else if (!downTraffic.equals(other.downTraffic))
return false;
if (timeStamp == null) {
if (other.timeStamp != null)
return false;
} else if (!timeStamp.equals(other.timeStamp))
return false;
if (upTraffic == null) {
if (other.upTraffic != null)
return false;
} else if (!upTraffic.equals(other.upTraffic))
return false;
return true;
}
}