DStream中的所有計算,都是由output操作觸發(fā)的融击,比如print();
如果沒有任何output操作雳窟,那么就不會執(zhí)行定義的計算邏輯尊浪。
注:如果是使用了foreachRDD output操作,那么必須在里面對RDD執(zhí)行action操作封救,才能觸發(fā)對每一個batch的計算邏輯拇涤;否則,光有foreachRDD output操作兴泥,在里面沒有對RDD執(zhí)行action操作的話工育,是不會觸發(fā)任何邏輯的虾宇。
foreachRDD使用
通常在foreachRDD中搓彻,都會創(chuàng)建一個Connection,比如JDBC Connection嘱朽,然后通過Connection將數(shù)據(jù)寫入外部存儲旭贬。
-
使用誤區(qū)
誤區(qū)一:在RDD的foreach操作外部,創(chuàng)建Connection
這種方式是錯誤的搪泳,因?yàn)樗鼤?dǎo)致Connection對象被序列化后傳輸?shù)矫總€Task中稀轨;而這種Connection對象,實(shí)際上一般是不支持序列化的岸军,也就無法被傳輸奋刽。誤區(qū)二:在RDD的foreach操作內(nèi)部,創(chuàng)建Connection
這種方式是可以的艰赞,但是效率低下佣谐。因?yàn)樗鼤?dǎo)致對于RDD中的每一條數(shù)據(jù),都創(chuàng)建一個Connection對象方妖。
而通常來說狭魂,Connection的創(chuàng)建,是很消耗性能的党觅。 合理使用方式
合理方式一:使用RDD的foreachPartition操作雌澄,并且在該操作內(nèi)部,創(chuàng)建Connection對象杯瞻,這樣就相當(dāng)于是镐牺,為RDD的每個partition創(chuàng)建一個Connection對象,節(jié)省資源的多了魁莉。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
合理方式二:自己手動封裝一個靜態(tài)連接池睬涧,使用RDD的foreachPartition操作卒废,并且在該操作內(nèi)部,從靜態(tài)連接池中宙地,通過靜態(tài)方法摔认,獲取到一個連接,使用之后再還回去宅粥,
這樣的話参袱,甚至在多個RDD的partition之間,也可以復(fù)用連接了秽梅。而且可以讓連接池采取懶創(chuàng)建的策略抹蚀,并且空閑一段時間后,將其釋放掉企垦。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection)
}
}
Demo
改寫前面的UpdateStateByKeyWordCount环壤,將每次統(tǒng)計出來的全局的單詞計數(shù),寫入一份钞诡,到MySQL數(shù)據(jù)庫中郑现。
package cn.spark.study.streaming;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import com.google.common.base.Optional;
import scala.Tuple2;
/**
* 基于持久化機(jī)制的實(shí)時wordcount程序
*/
public class PersistWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("PersistWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
jssc.checkpoint("hdfs://spark1:9000/wordcount_checkpoint");
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("spark1", 9999);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word)
throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordCounts = pairs.updateStateByKey(
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public Optional<Integer> call(List<Integer> values,
Optional<Integer> state) throws Exception {
Integer newValue = 0;
if(state.isPresent()) {
newValue = state.get();
}
for(Integer value : values) {
newValue += value;
}
return Optional.of(newValue);
}
});
// 每次得到當(dāng)前所有單詞的統(tǒng)計次數(shù)之后,將其寫入mysql存儲荧降,
// 以便于后續(xù)的J2EE應(yīng)用程序進(jìn)行顯示
wordCounts.foreachRDD(new Function<JavaPairRDD<String,Integer>, Void>() {
private static final long serialVersionUID = 1L;
@Override
public Void call(JavaPairRDD<String, Integer> wordCountsRDD) throws Exception {
// 調(diào)用RDD的foreachPartition()方法
wordCountsRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Iterator<Tuple2<String, Integer>> wordCounts) throws Exception {
// 給每個partition接箫,獲取一個連接
Connection conn = ConnectionPool.getConnection();
// 遍歷partition中的數(shù)據(jù),使用一個連接朵诫,插入數(shù)據(jù)庫
Tuple2<String, Integer> wordCount = null;
while(wordCounts.hasNext()) {
wordCount = wordCounts.next();
String sql = "insert into wordcount(word,count) "
+ "values('" + wordCount._1 + "'," + wordCount._2 + ")";
Statement stmt = conn.createStatement();
stmt.executeUpdate(sql);
}
// 用完以后辛友,將連接還回去
ConnectionPool.returnConnection(conn);
}
});
return null;
}
});
jssc.start();
jssc.awaitTermination();
jssc.close();
}
}
package cn.spark.study.streaming;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;
/**
* 簡易版的連接池
*/
public class ConnectionPool {
// 靜態(tài)的Connection隊列
private static LinkedList<Connection> connectionQueue;
/**
* 加載驅(qū)動
*/
static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
/**
* 獲取連接,多線程訪問并發(fā)控制
* @return
*/
public synchronized static Connection getConnection() {
try {
if(connectionQueue == null) {
connectionQueue = new LinkedList<Connection>();
for(int i = 0; i < 10; i++) {
Connection conn = DriverManager.getConnection(
"jdbc:mysql://hadoop1:3306/testdb",
"",
"");
connectionQueue.push(conn);
}
}
} catch (Exception e) {
e.printStackTrace();
}
return connectionQueue.poll();
}
/**
* 還回去一個連接
*/
public static void returnConnection(Connection conn) {
connectionQueue.push(conn);
}
}