Apache Flink筆記原文鏈接 :http://timeyang.com/articles/28/2018/10/30/Apache%20Flink%20%E7%AC%94%E8%AE%B0
Apache Flink - Stateful Computations over Data Streams
Document
Dataflow Model
The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in MassiveScale, Unbounded, OutofOrder Data Processing
State
Paper
Chandy-Lamport algorithm:Distributed Snapshots: Determining Global States of Distributed Systems
-
Lightweight Asynchronous Snapshots for Distributed Dataflows
an asynchronous snapshotting algorithm that achieves minimal snapshots on acyclic execution graphs
a generalisation of our algorithm that works on cyclic execution graphs
-
State Management in Apache Flink
a complete end-to-end design for continuous stateful processing, from the conceptual view of state in the programming model to its physical counterpart implemented in various backends.
how to naturally pipeline consistent snapshots in weakly connected dataflow graphs and capture minimal state, skipping in-transit records when possible, without impacting general system progress.
how snapshots can be utilized for a large variety of operational needs beyond failure recovery such as software patches, testing, system upgrades and exactly-once delivery.
encapsulate different processing guarantees and isolation levels for externally accessing partitioned operator state and output, using snapshots.
large-scale pipeline deployments that operate 24/7 in production and rely heavily on stateful processing coupled with runtime metrics and performance insights.
Article
Note
-
Key-Groups
Key-Groups是flink重分發(fā)狀態(tài)的原子單元。
使用一致性哈希算法減少seek和增加磁盤順序讀氏涩。
實現(xiàn):先對key做hash,然后對最大并行度取模。以任務(wù)并行度等分哈希環(huán),每個task instance處理
[ i * max_parallelism/parallelism, (i + 1) * max_parallelism/parallelism)
內(nèi)的key目代,管理keyed-state蛤奥。如果并行度改變,則加載新的范圍的狀態(tài)忌卤,新進來的記錄也會哈希到持有對應(yīng)key的狀態(tài)的task instance為什么使用一致性哈希:如果不使用一致性哈希,比如使用并行度來取模楞泼。那么在并行度改變時驰徊,要么任務(wù)掃描整個快照,獲取指派給它的所有鍵的狀態(tài)堕阔,這會導(dǎo)致大量不必要的I/O棍厂;或者在快照中保存對每個key-state的引用,即索引超陆,這樣每個任務(wù)來選擇性地讀取指派給它的狀態(tài)牺弹,這會增加索引成本(與key的數(shù)量成比例,而key的數(shù)量可能會非常大)和通信負載时呀。使用一致性哈希张漂,則每個task實例只讀需要的數(shù)據(jù),而且通常Key-Groups足夠大以致能夠粗粒度地順序磁盤讀谨娜。
注意哈希環(huán)即最大并行度不能夠設(shè)置的太大航攒。因為根據(jù)上面定義,Key-Groups數(shù)量與最大并行度相同趴梢,如果最大并行度設(shè)置得太大漠畜,則每個Key-Groups可能只由很少的key的狀態(tài)組成,從而無法進行粗粒度磁盤讀寫坞靶,導(dǎo)致性能降低憔狞。
-
Rollback
During a full restart or rescaling, all tasks are being redeployed, while after a failure only the tasks belonging to the affected connected component (of the execution graph) are reconfigured.
In essence, known incremental recovery techniques from micro-batch processing are orthogonal to this approach and can also be employed. A snapshot epoch acts as synchronization point, similarly to a micro-batch or an input-split. On recovery, new task instances are being scheduled and, upon initialization, retrieve their allocated shard of state. In the case of IterationHead recovery, all records logged during the snapshot are recovered and flushed to output channels prior to their regular record forwarding logic.
-
Asynchronous and Incremental Snapshots
the pipelined snapshotting protocol only governs “when” but not “how” snapshots are internally executed.
The out-of-core state backend based on RocksDB [13]exploits the LSM (log-structured merge) tree, internal representation of data in RocksDB. Updates are not made in-place, but are appended and compacted asynchronously. Upon taking a snapshot, the synchronous triggerSnapshot() call simply marks the current version, which prevents all state as of that version to be overwritten during compactions. The operator can then continue processing and make modifications to the state. An asynchronous thread iterates over the marked version, materializes it to the snapshot store, and finally releases the snapshot so that future compactions can overwrite that state. Furthermore, the LSM-based data structure also lends itself to incremental snapshots, which write only parts to the snapshot store that changed since the previous snapshots.
Flink’s in-memory local state backend implementation is based on hash tables that employ chain hashing. During a snapshot, it copies the current table array synchronously and then starts the external materialization of the snapshot, in a background thread. The operator’s regular stream processing thread lazily copies the state entries and overflow chains upon modification, if the materialization thread still holds onto the snapshot. Incremental snapshots for the in-memory local backend are possible and conceptually trivial (using delta maps), yet not implemented at the current point.
-
ExactlyOnce Delivery Sinks
Idempotent Sinks
Transactional Sinks