Flink HA
Flink HA 的HighAvailabilityMode類中定義了是那種高可用性模式枚舉:
- NONE:非HA模式
- ZOOKEEPER:基于ZK實(shí)現(xiàn)HA
- FACTORY_CLASS:自定義HA工廠類,實(shí)現(xiàn)HighAvailabilityServiceFactory接口。
ZooKeeperHaService主要提供了創(chuàng)建LeaderRetrievalService和LeaderElectionService等方法,并給出了各個(gè)服務(wù)組件使用的ZK節(jié)點(diǎn)名稱悲柱。
Flink Exactly-once實(shí)現(xiàn)原理解析
流處理引擎通常為用戶的應(yīng)用程序提供是那種數(shù)據(jù)處理語(yǔ)義:最多一次每窖、至少一次想许、精確一次罢缸。
- 最多一次: 用戶數(shù)據(jù)只會(huì)被處理一次,不管成功還是失敗,不會(huì)重試也不會(huì)重發(fā)蚪腋。
- 至少一次: 系統(tǒng)會(huì)保證數(shù)據(jù)或事件被處理一次凸克。如果中間發(fā)生錯(cuò)誤或者丟失,就會(huì)重發(fā)或者重試基跑。
- 精確一次: 每一條數(shù)據(jù)只會(huì)被精確地處理一次,不多也不少。
Flink的快照可以到算子級(jí)別,并且對(duì)全局?jǐn)?shù)據(jù)也可以做快照篓像。
Flink分布式快照的核心元素之一是Barrier,該標(biāo)記是嚴(yán)格有序的,并隨著數(shù)據(jù)往下流動(dòng)动知。
每個(gè)流的barrier n到達(dá)時(shí)間不一致怎么辦,這是Flink采取的措施是快流等慢流。
Flink在做存儲(chǔ)時(shí),可采用異步方式,每次都是進(jìn)行的全量checkpoint,是基于上次進(jìn)行更新的员辩。
快照機(jī)制能夠保證作業(yè)出現(xiàn)fail-over后可以從最新的快照進(jìn)行恢復(fù),即分布式快照機(jī)制可以保證Flink系統(tǒng)內(nèi)部的精確一次處理盒粮。
兩階段處理繼承TwoPhaseCommitSinkFunction,需要實(shí)現(xiàn)beginTransaction、preCommit奠滑、commit丹皱、abort方法來(lái)實(shí)現(xiàn)精確一次的處理語(yǔ)義,
- beginTransaction:在開啟事務(wù)之前,在目標(biāo)文件系統(tǒng)的臨時(shí)目錄中創(chuàng)建一個(gè)臨時(shí)文件,后面在處理數(shù)據(jù)時(shí)將數(shù)據(jù)寫入此文件。
- preCommit:在預(yù)提交階段,刷寫文件,然后關(guān)閉文件,之后就不能寫入到文件,為屬于下一個(gè)檢查點(diǎn)的任何后續(xù)寫入啟動(dòng)新事務(wù)宋税。
- commit:在提交階段,將預(yù)提交的文件原子性移動(dòng)到真正的目標(biāo)目錄中,這會(huì)增加輸出數(shù)據(jù)可見性的延遲摊崭。
- abort:在終止階段,刪除臨時(shí)文件。
Kafka-Flink-Kafka過(guò)程:
- Flink開始做checkpoint操作, 進(jìn)入pre-commit階段,同時(shí)Flink JobManager會(huì)將檢查點(diǎn)Barrier注入數(shù)據(jù)流中杰赛。
- 當(dāng)所有barrier在算子中成功進(jìn)行一遍傳遞,并完成快照后,則pre-commit階段完成
- 等所有的算子完成預(yù)提交,就會(huì)發(fā)起一個(gè)提交動(dòng)作,但是任何一個(gè)預(yù)提交失敗都會(huì)導(dǎo)致Flink回滾到最近的checkpoint;
- pre-commit完成,必須要確保commit也要成功呢簸。
如何排查生產(chǎn)環(huán)境中的反壓?jiǎn)栴}
不同框架的反壓對(duì)比:
- Storm:從1.0版本之后引入反壓,Storm會(huì)主動(dòng)監(jiān)控工作節(jié)點(diǎn),工作節(jié)點(diǎn)接收數(shù)據(jù)超過(guò)閾值,反壓信息會(huì)被發(fā)送到ZooKeeper,ZooKeeper通知所有的工作節(jié)點(diǎn)
進(jìn)入反壓狀態(tài),最后數(shù)據(jù)的生產(chǎn)源頭會(huì)降低數(shù)據(jù)的發(fā)送速度。- Spark Streaming:RateController組件,利用經(jīng)典的PID算法,根據(jù)消息數(shù)量乏屯、調(diào)度時(shí)間根时、處理時(shí)間等計(jì)算出來(lái)速率,然后進(jìn)行限速。
- Flink:利用網(wǎng)絡(luò)傳輸和動(dòng)態(tài)限流,流中的數(shù)據(jù)在算子間進(jìn)行計(jì)算和轉(zhuǎn)換時(shí),會(huì)被放入分布式的阻塞隊(duì)列中辰晕。當(dāng)消費(fèi)者的阻塞隊(duì)列滿時(shí),則會(huì)降低生產(chǎn)者的數(shù)據(jù)生產(chǎn)速度蛤迎。
Flink Web UI Back Pressure出現(xiàn)數(shù)值:
- OK: 0<=Ratio<=0.10,正常;
- LOW:0.10<Ratio<=0.50,一般;
- HIGH: 0.5 < Ratio <=1,嚴(yán)重。
指標(biāo)名稱 | 用途 | 解釋 |
---|---|---|
outPoolUsage | 發(fā)送端緩沖池的使用率 | 當(dāng)前Task的數(shù)據(jù)發(fā)送率,如果數(shù)值很低,當(dāng)前節(jié)點(diǎn)有可能為反壓節(jié)點(diǎn) |
inPoolUsage | 接收端緩沖池的使用率 | Task的接收速度,inPoolUsage很高,outPoolUsage很低,這個(gè)節(jié)點(diǎn)有可能是反壓節(jié)點(diǎn) |
floatingBuffersUsage | 處理節(jié)點(diǎn)緩沖池的使用率 | |
exclusiveBuffersUsage | 數(shù)據(jù)輸入方緩沖池的使用率 |
反壓?jiǎn)栴}處理:
- 數(shù)據(jù)傾斜:使用類似的KeyBy等分組聚合函數(shù)導(dǎo)致,需要用戶將熱點(diǎn)key進(jìn)行預(yù)處理,降低或者消除熱點(diǎn)key的影響含友。
- GC:使用-XX:+PrintGCDetails參數(shù)查看GC日志
- 代碼本身:查看機(jī)器的CPU替裆、內(nèi)存使用
如何處理生產(chǎn)環(huán)境中的數(shù)據(jù)傾斜問(wèn)題
兩階段聚合解決KeyBy熱點(diǎn)
根據(jù)type進(jìn)行KeyBy時(shí),如果數(shù)據(jù)的type分布不均勻就會(huì)導(dǎo)致大量的數(shù)據(jù)分配到一個(gè)task中,發(fā)生數(shù)據(jù)傾斜。解決的思路為:
- 首先把分組的key打散,比如添加隨機(jī)后綴;
- 對(duì)打散后的數(shù)據(jù)進(jìn)行聚合;
- 將打散的key還原為原先的key
- 二次KeyBy進(jìn)行結(jié)果統(tǒng)計(jì),然后輸出窘问。
Flink消費(fèi)Kafka數(shù)據(jù)時(shí),要保證Kafka的分區(qū)數(shù)等于Flink Consumer的并行度辆童。如果不一致,需要設(shè)置Flink的Redistributing(數(shù)據(jù)充分配),
Rebalance分區(qū)策略,數(shù)據(jù)會(huì)以round-robin的方式對(duì)數(shù)據(jù)進(jìn)行再次分區(qū),可以全局負(fù)載均衡。
Rescale分區(qū)策略基于上下游的并行度,會(huì)將數(shù)據(jù)以循環(huán)的方式輸出到下游的每個(gè)實(shí)例中惠赫。
生產(chǎn)環(huán)境中的并行度和資源設(shè)置
在Flink集群中,一個(gè)TaskManager就是一個(gè)JVM進(jìn)程,并且會(huì)用獨(dú)立的線程來(lái)執(zhí)行task把鉴,slot僅僅用來(lái)做內(nèi)存的隔離,對(duì)CPU不起作用。
默認(rèn)情況下,Flink還允許同一個(gè)Job的子任務(wù)共享slot汉形。
Flink自身會(huì)把不同的算子的task連接在一起組成一個(gè)新的task纸镊。因?yàn)閠ask在同一個(gè)線程中執(zhí)行,可以有效減少線程間上下文的切換,減少序列化/反序列化帶來(lái)的資源消耗,
提高任務(wù)的吞吐量概疆。
并行度級(jí)別:算子級(jí)別逗威、環(huán)境級(jí)別、客戶端級(jí)別岔冀、集群配置級(jí)別凯旭。
在生產(chǎn)中,推薦在算子級(jí)別顯式指定各自的并行度,方便進(jìn)行顯式和精確的資源控制。
環(huán)境級(jí)別:任務(wù)中的所有算子的并行度都是指定的值,生產(chǎn)環(huán)境不推薦。
設(shè)置并行度的優(yōu)先級(jí)為:算子級(jí)別 > 環(huán)境級(jí)別 > 客戶端級(jí)別 > 集群級(jí)別配置罐呼。
Flink如何做維表關(guān)聯(lián)
業(yè)務(wù)對(duì)維表數(shù)據(jù)關(guān)聯(lián)的時(shí)效性要求,有以下幾種解決方案:
- 實(shí)時(shí)查詢維表:用戶在Flink算子中直接訪問(wèn)外部數(shù)據(jù)庫(kù)鞠柄,這種是同步方式,數(shù)據(jù)保證是最新的。
- 預(yù)加載全量數(shù)據(jù):每次啟動(dòng)時(shí),將維表中全部數(shù)據(jù)加載到內(nèi)存中嫉柴。
- LRU緩存:將最近最少使用的數(shù)據(jù)則被淘汰厌杜。
實(shí)時(shí)查詢維表
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class DimSync extends RichMapFunction<String,Order> {
private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);
private Connection conn = null;
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
}
public Order map(String in) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(in);
Integer cityId = jsonObject.getInteger("city_id");
String userName = jsonObject.getString("user_name");
String items = jsonObject.getString("items");
//根據(jù)city_id 查詢 city_name
PreparedStatement pst = conn.prepareStatement("select city_name from info where city_id = ?");
pst.setInt(1,cityId);
ResultSet resultSet = pst.executeQuery();
String cityName = null;
while (resultSet.next()){
cityName = resultSet.getString(1);
}
pst.close();
return new Order(cityId,userName,items,cityName);
}
public void close() throws Exception {
super.close();
conn.close();
}
}
要保證及時(shí)關(guān)閉連接池
public class Order {
private Integer cityId;
private String userName;
private String items;
private String cityName;
public Order(Integer cityId, String userName, String items, String cityName) {
this.cityId = cityId;
this.userName = userName;
this.items = items;
this.cityName = cityName;
}
public Order() {
}
public Integer getCityId() {
return cityId;
}
public void setCityId(Integer cityId) {
this.cityId = cityId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getItems() {
return items;
}
public void setItems(String items) {
this.items = items;
}
public String getCityName() {
return cityName;
}
public void setCityName(String cityName) {
this.cityName = cityName;
}
@Override
public String toString() {
return "Order{" +
"cityId=" + cityId +
", userName='" + userName + '\'' +
", items='" + items + '\'' +
", cityName='" + cityName + '\'' +
'}';
}
}
預(yù)加載全量數(shù)據(jù)
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class WholeLoad extends RichMapFunction<String,Order> {
private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);
ScheduledExecutorService executor = null;
private Map<String,String> cache;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
load();
} catch (Exception e) {
e.printStackTrace();
}
}
},5,5, TimeUnit.MINUTES);
}
@Override
public Order map(String value) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(value);
Integer cityId = jsonObject.getInteger("city_id");
String userName = jsonObject.getString("user_name");
String items = jsonObject.getString("items");
String cityName = cache.get(cityId);
return new Order(cityId,userName,items,cityName);
}
public void load() throws Exception {
Class.forName("com.mysql.jdbc.Driver");
Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
PreparedStatement statement = con.prepareStatement("select city_id,city_name from info");
ResultSet rs = statement.executeQuery();
//全量更新維度數(shù)據(jù)到內(nèi)存
while (rs.next()) {
String cityId = rs.getString("city_id");
String cityName = rs.getString("city_name");
cache.put(cityId, cityName);
}
con.close();
}
}
LRU緩存
import com.alibaba.fastjson.JSONObject;
import com.stumbleupon.async.Callback;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.hbase.async.GetRequest;
import org.hbase.async.HBaseClient;
import org.hbase.async.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
public class LRU extends RichAsyncFunction<String,Order> {
private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
String table = "info";
Cache<String, String> cache = null;
private HBaseClient client = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//創(chuàng)建hbase客戶端
client = new HBaseClient("127.0.0.1","7071");
cache = CacheBuilder.newBuilder()
//最多存儲(chǔ)10000條
.maximumSize(10000)
//過(guò)期時(shí)間為1分鐘
.expireAfterWrite(60, TimeUnit.SECONDS)
.build();
}
@Override
public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(input);
Integer cityId = jsonObject.getInteger("city_id");
String userName = jsonObject.getString("user_name");
String items = jsonObject.getString("items");
//讀緩存
String cacheCityName = cache.getIfPresent(cityId);
//如果緩存獲取失敗再?gòu)膆base獲取維度數(shù)據(jù)
if(cacheCityName != null){
Order order = new Order();
order.setCityId(cityId);
order.setItems(items);
order.setUserName(userName);
order.setCityName(cacheCityName);
resultFuture.complete(Collections.singleton(order));
}else {
client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
for (KeyValue kv : arg) {
String value = new String(kv.value());
Order order = new Order();
order.setCityId(cityId);
order.setItems(items);
order.setUserName(userName);
order.setCityName(value);
resultFuture.complete(Collections.singleton(order));
cache.put(String.valueOf(cityId), value);
}
return null;
});
}
}
}
海量數(shù)據(jù)去重
Flink中實(shí)時(shí)去重的方案:
- 基于狀態(tài)后端
- 基于HyperLogLog
- 基于布隆過(guò)濾器
- 基于BitMap
- 基于外部數(shù)據(jù)庫(kù)
基于狀態(tài)后端
狀態(tài)后端的種類之一是RocksDBStateBackend,它會(huì)將正在云心中的狀態(tài)數(shù)據(jù)保存在RockDB數(shù)據(jù)庫(kù)中,該數(shù)據(jù)庫(kù)默認(rèn)將數(shù)據(jù)存儲(chǔ)在TaskManager運(yùn)行節(jié)點(diǎn)的數(shù)據(jù)目錄下。
計(jì)算每天每個(gè)商品的訪問(wèn)量:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class MapStateDistinctFunction extends KeyedProcessFunction<String,Tuple2<String,Integer>,Tuple2<String,Integer>> {
private transient ValueState<Integer> counts;
@Override
public void open(Configuration parameters) throws Exception {
//我們?cè)O(shè)置ValueState的TTL的生命周期為24小時(shí)计螺,到期自動(dòng)清除狀態(tài)
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.minutes(24 * 60))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
//設(shè)置ValueState的默認(rèn)值
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("skuNum", Integer.class);
descriptor.enableTimeToLive(ttlConfig);
counts = getRuntimeContext().getState(descriptor);
super.open(parameters);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
String f0 = value.f0;
//如果不存在則新增
if(counts.value() == null){
counts.update(1);
}else{
//如果存在則加1
counts.update(counts.value()+1);
}
out.collect(Tuple2.of(f0, counts.value()));
}
}
基于HyperLogLo
HyperLogLog是一種估計(jì)統(tǒng)計(jì)算法,被用來(lái)統(tǒng)計(jì)一餓集合中不同數(shù)據(jù)的個(gè)數(shù)夯尽。
import net.agkn.hll.HLL;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
public class HyperLogLogDistinct implements AggregateFunction<Tuple2<String,Long>,HLL,Long> {
@Override
public HLL createAccumulator() {
return new HLL(14, 5);
}
@Override
public HLL add(Tuple2<String, Long> value, HLL accumulator) {
//value為購(gòu)買記錄 <商品sku, 用戶id>
accumulator.addRaw(value.f1);
return accumulator;
}
@Override
public Long getResult(HLL accumulator) {
long cardinality = accumulator.cardinality();
return cardinality;
}
@Override
public HLL merge(HLL a, HLL b) {
a.union(b);
return a;
}
}
添加相應(yīng)的pom依賴:
<dependency>
<groupId>net.agkn</groupId>
<artifactId>hll</artifactId>
<version>1.6.0</version>
</dependency>
如果元素是非數(shù)值型,需要hash過(guò)后才能插入。
基于布隆過(guò)濾器
BloomFilter類似于一個(gè)HashSet,用于快速判斷某個(gè)元素是否存在與集合中,其典型的應(yīng)用場(chǎng)景就是能夠快速判斷一個(gè)key是否存在某個(gè)容器中,不存在直接返回登馒。
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class BloomFilterDistinct extends KeyedProcessFunction<Long, String, Long> {
private transient ValueState<BloomFilter> bloomState;
private transient ValueState<Long> countState;
@Override
public void processElement(String value, Context ctx, Collector<Long> out) throws Exception {
BloomFilter bloomFilter = bloomState.value();
Long skuCount = countState.value();
if(bloomFilter == null){
BloomFilter.create(Funnels.unencodedCharsFunnel(), 10000000);
}
if(skuCount == null){
skuCount = 0L;
}
if(!bloomFilter.mightContain(value)){
bloomFilter.put(value);
skuCount = skuCount + 1;
}
bloomState.update(bloomFilter);
countState.update(skuCount);
out.collect(countState.value());
}
}
BitMap
HyperLogLog 和BloomFilter雖然減少了存儲(chǔ)但是丟失了精度匙握。
BitMap的基本思想是用一個(gè)bit位來(lái)標(biāo)記某個(gè)元素對(duì)應(yīng)的value,而key即是該元素。
import org.apache.flink.api.common.functions.AggregateFunction;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
public class BitMapDistinct implements AggregateFunction<Long, Roaring64NavigableMap,Long> {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}
@Override
public Roaring64NavigableMap add(Long value, Roaring64NavigableMap accumulator) {
accumulator.add(value);
return accumulator;
}
@Override
public Long getResult(Roaring64NavigableMap accumulator) {
return accumulator.getLongCardinality();
}
@Override
public Roaring64NavigableMap merge(Roaring64NavigableMap a, Roaring64NavigableMap b) {
return null;
}
}
添加依賴:
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.9.21</version>
</dependency>