In batch processing, a file is written once and then potentially read by multiple jobs. Analogously, in streaming terminology, an event is generated once by a producer (also known as a publisher or sender), and then potentially processed by multiple con‐ sumers (subscribers or recipients) [3]. In a filesystem, a filename identifies a set of related records; in a streaming system, related events are usually grouped together into a topic or stream.
[3] Patrick Th. Eugster, Pascal A. Felber, Rachid Guerraoui, and Anne-Marie Ker‐ marrec: “The Many Faces of Publish/Subscribe,” ACM Computing Surveys, volume 35, number 2, pages 114–131, June 2003. doi:10.1145/857076.857078
A direct communication channel like a Unix pipe or TCP connection between pro‐ ducer and consumer would be a simple way of implementing a messaging system. However, most messaging systems expand on this basic model. In particular, Unix pipes and TCP connect exactly one sender with one recipient, whereas a messaging system allows multiple producer nodes to send messages to the same topic and allows multiple consumer nodes to receive messages in a topic.
這就是 publish/subscribe model
If the consumer exposes a service on the network, producers can make a direct HTTP or RPC request (see “Dataflow Through Services: REST and RPC” on page 131) to push messages to the consumer. This is the idea behind webhooks [12], a pattern in which a callback URL of one service is registered with another service, and it makes a request to that URL whenever an event occurs.
webhook 用到了callback url醉旦, 那OAuth 就是用的webhook?
Message broker
如果producer 跟 consumer 不是直接通過 network 傳遞信息(為了避免producer 或者 consumer 其中一個(gè) crash 造成 message Lost)
一個(gè)普遍做法是通過 message broker 來傳遞消息,
message broker (also known as a message queue), which is essentially a kind of database that is optimized for handling message streams [13].
他就是一個(gè)鏈接producer 跟 consumer 的服務(wù)器
It runs as a server, with producers and consumers connecting to it as clients. Producers write messages to the broker, and consumers receive them by reading them from the broker.
Acknowledgments and redelivery
跟 network 一樣骤菠, 要返回ack墨坚, 然后有 redelivery 機(jī)制欲鹏,不過分布式本質(zhì)上也是通過network來進(jìn)行數(shù)據(jù)傳輸?shù)谋┐眨杂蓄愃茩C(jī)制并不稀奇
就跟DNS 本身是分布式存儲(chǔ)一樣如绸,而且用到了locality(local DNS server) 來增加速度(減少latency)
Consumers may crash at any time, so it could happen that a broker delivers a mes‐ sage to a consumer but the consumer never processes it, or only partially processes it before crashing. In order to ensure that the message is not lost, message brokers use acknowledgments: a client must explicitly tell the broker when it has finished process‐ ing a message so that the broker can remove it from the queue.
Using logs for message storage
為了有permanent storage完域, 會(huì)采用log 形式進(jìn)行存儲(chǔ)
為了增加throughput软吐, log 也會(huì)分布式存儲(chǔ)
Within each partition, the broker assigns a monotonically increasing sequence num‐ ber, or offset, to every message (in Figure 11-3, the numbers in boxes are message off‐ sets). Such a sequence number makes sense because a partition is append-only, so the messages within a partition are totally ordered. There is no ordering guarantee across different partitions.
![[DDIA-Figure-11-3-分布式log 以及應(yīng)用.png]]
Logs vs traditional messaging
Thus, in situations where messages may be expensive to process and you want to par‐ allelize processing on a message-by-message basis, and where message ordering is not so important, the JMS/AMQP style of message broker is preferable. On the other hand, in situations with high message throughput, where each message is fast to pro‐ cess and where message ordering is important, the log-based approach works very well.
batch process 重點(diǎn)在于不會(huì)影響原始數(shù)據(jù),都是從file 里面讀吟税,然后copy and process 到新的位置凹耙。
log based message queue 也類似,他是從message broker 那邊讀肠仪,然后不會(huì)影響message broker 本身的log肖抱, 只是自己的offset 會(huì)增加,如果想要讀昨天的message异旧,直接用昨天的offset 就可以了
This aspect makes log-based messaging more like the batch processes of the last chapter, where derived data is clearly separated from input data through a repeatable transformation process. It allows more experimentation and easier recovery from errors and bugs, making it a good tool for integrating dataflows within an organiza‐ tion [24].
[24] Jay Kreps: “The Log: What Every Software Engineer Should Know About Real- Time Data’s Unifying Abstraction,” engineering.linkedin.com, December 16, 2013.
任何時(shí)候反向思考其實(shí)都是一個(gè)解決問題的思路
We can also go in reverse: take ideas from messag‐ ing and streams, and apply them to databases.
其實(shí)DDIA這種畫圖方式很值得學(xué)習(xí)意述,尤其是思考race condition的時(shí)候
![[DDIA-Figure-11-4.png]]
還要重新讀一下state machine replication
所以一個(gè)系統(tǒng)里面,一定有一個(gè)DB來存原始數(shù)據(jù)吮蛹,然后剩下的都是derived data storage荤崇,從這個(gè)原始(或者說source/源頭)來讀取任何變化,確保自己的數(shù)據(jù)是sync的
We can call the log consumers derived data systems, as discussed in the introduction to Part III: the data stored in the search index and the data warehouse is just another view onto the data in the system of record. Change data capture is a mechanism for ensuring that all changes made to the system of record are also reflected in the derived data systems so that the derived systems have an accurate copy of the data.
哈潮针!我的理解跟Martin 一樣 天试, CDC(Change Data Capture)就是把一個(gè)DB 作為 source/leader, 然后用一個(gè) message queue 來讓其他的 storage follow 這個(gè)source 的變化
Essentially, change data capture makes one database the leader (the one from which the changes are captured), and turns the others into followers. A log-based message broker is well suited for transporting the change events from the source database, since it preserves the ordering of messages (avoiding the reordering issue of Figure 11-2).
DDIA的reference 太強(qiáng)了……
LinkedIn’s Databus [25], Facebook’s Wormhole [26], and Yahoo!’s Sherpa [27] use this idea at large scale. Bottled Water implements CDC for PostgreSQL using an API that decodes the write-ahead log [28], Maxwell and Debezium do something similar for MySQL by parsing the binlog [29, 30, 31], Mongoriver reads the MongoDB oplog [32, 33], and GoldenGate provides similar facilities for Oracle [34, 35].
我覺得這點(diǎn)也很重要然低,就是我們根據(jù)需求喜每,在不同的level abstract 出來一個(gè)model, 然后這個(gè)model 通出ㄈ粒可以 apply 到不同的地方
The biggest difference is that event sourc‐ ing applies the idea at a different level of abstraction:
In change data capture, the application uses the database in a mutable way, updating and deleting records at will. The log of changes is extracted from the database at a low level (e.g., by parsing the replication log), which ensures that the order of writes extracted from the database matches the order in which they were actually written, avoiding the race condition in Figure 11-4. The application writing to the database does not need to be aware that CDC is occurring.
In event sourcing, the application logic is explicitly built on the basis of immuta‐ ble events that are written to an event log. In this case, the event store is append- only, and updates or deletes are discouraged or prohibited. Events are designed to reflect things that happened at the application level, rather than low-level state changes.
所以說CDC 相當(dāng)于 network 里面 Link layer 實(shí)現(xiàn)的checksum带兜, 而 event sourcing 是相當(dāng)于 transport layer 層面實(shí)現(xiàn)的 checksum, 一個(gè)是更底層的實(shí)現(xiàn)吨灭,一個(gè)是在更高層的 abstraction 實(shí)現(xiàn)的
Event sourcing is a powerful technique for data modeling: from an application point of view it is more meaningful to record the user’s actions as immutable events, rather than recording the effect of those actions on a mutable database. Event sourcing makes it easier to evolve applications over time, helps with debugging by making it easier to understand after the fact why something happened, and guards against application bugs (see “Advantages of immutable events” on page 460).
這跟 network 其實(shí)超級(jí)像刚照,通常這種實(shí)現(xiàn)在更高層更好,就好像你不想要 core network 有各種復(fù)雜的邏輯一樣喧兄,因?yàn)?edge 更容易更新无畔, core 只負(fù)責(zé)傳輸數(shù)據(jù)就好了, edge 來做各種邏輯的實(shí)現(xiàn)吠冤,而且更新起來要超級(jí)方便
OOD也是一樣的…… open close principle 就是這樣浑彰,close for modification 保證了你底層不會(huì)隨意改, 而 open for extension 其實(shí)本質(zhì)上就是在一個(gè)更高層的 abstraction 進(jìn)行更新
所以你在設(shè)計(jì)一個(gè)系統(tǒng)的時(shí)候拯辙,直接用一個(gè)DB 或者 message queue 來存 event是一個(gè)更好的選擇 郭变,而不是采用CDC來幫你實(shí)現(xiàn) sync
Specialized databases such as Event Store [46] have been developed to support appli‐ cations using event sourcing, but in general the approach is independent of any par‐ ticular tool. A conventional database or a log-based message broker can also be used to build applications in this style.
http://web.archive.org/web/20210827180824/http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper16.pdf
If you store the changelog durably, that simply has the effect of making the state reproducible. If you consider the log of events to be your system of record, and any mutable state as being derived from it, it becomes easier to reason about the flow of data through a system. As Pat Helland puts it [52]:
- Transaction logs record all the changes made to the database. High-speed appends are the only way to change the log. From this perspective, the contents of the database hold a caching of the latest record values in the logs. The truth is the log. The database is a cache of a subset of the log. That cached subset happens to be the latest value of each record and index value from the log.
Log compaction, as discussed in “Log compaction” on page 456, is one way of bridg‐ ing the distinction between log and database state: it retains only the latest version of each record, and discards overwritten versions.
深刻颜价!
Stream 并不是一個(gè)新領(lǐng)域,很多系統(tǒng)早就用stream 方式來monitor 系統(tǒng)狀況了
[68] Julian Hyde: “Data in Flight: How Streaming SQL Technology Can Help Solve the Web 2.0 Data Crunch,” ACM Queue, volume 7, number 11, December 2009. doi: 10.1145/1661785.1667562
probabilistic algorithm for optimization
https://www.oreilly.com/radar/questioning-the-lambda-architecture/
bloom filter is a probabilistic algorithm?
這些開源的 stream processor 都可能會(huì)在 Ian 的課上提到诉濒?
Many open source distributed stream processing frameworks are designed with ana‐ lytics in mind: for example, Apache Storm, Spark Streaming, Flink, Concord, Samza, and Kafka Streams [74]. Hosted services include Google Cloud Dataflow and Azure Stream Analytics.
好像Flink 在很多地方都被提到了周伦, 是否考慮 David 給的書?
Idempotence
setting key in key-value store is an idempotence operation
An idempotent operation is one that you can perform multiple times, and it has the same effect as if you performed it only once. For example, setting a key in a key-value store to some fixed value is idempotent (writing the value again simply overwrites the value with an identical value), whereas incrementing a counter is not idempotent (performing the increment again means the value is incremented twice).
有了meta data, we can make operation idempotent
Even if an operation is not naturally idempotent, it can often be made idempotent with a bit of extra metadata. For example, when consuming messages from Kafka, every message has a persistent, monotonically increasing offset. When writing a value to an external database, you can include the offset of the message that triggered the last write with the value. Thus, you can tell whether an update has already been applied, and avoid performing the same update again.
總結(jié)
這一章主要討論了 event streams未荒, 跟 batch processing 本質(zhì)上最不同的就是 unbounded input
從 unbounded input 角度看专挪,message brokers and event logs 跟文件系統(tǒng)一樣
From this perspective, message brokers and event logs serve as the streaming equivalent of a filesystem.
We spent some time comparing two types of message brokers:
AMQP/JMS-style message broker
- The broker assigns individual messages to consumers, and consumers acknowl‐ edge individual messages when they have been successfully processed. Messages are deleted from the broker once they have been acknowledged. This approach is appropriate as an asynchronous form of RPC (see also “Message-Passing Data‐ flow” on page 136), for example in a task queue, where the exact order of mes‐ sage processing is not important and where there is no need to go back and read old messages again after they have been processed.
Log-based message broker
- The broker assigns all messages in a partition to the same consumer node, and always delivers messages in the same order. Parallelism is achieved through par‐ titioning, and consumers track their progress by checkpointing the offset of the last message they have processed. The broker retains messages on disk, so it is possible to jump back and reread old messages if necessary.
We distinguished three types of joins that may appear in stream processes:
Stream-stream joins
- Both input streams consist of activity events, and the join operator searches for related events that occur within some window of time. For example, it may match two actions taken by the same user within 30 minutes of each other. The two join inputs may in fact be the same stream (a self-join) if you want to find related events within that one stream.
stream stream join 主要應(yīng)該是用于 stream analytics
Stream-table joins
- One input stream consists of activity events, while the other is a database change‐ log. The changelog keeps a local copy of the database up to date. For each activity event, the join operator queries the database and outputs an enriched activity event.
Stream 增強(qiáng)(添加更多信息在這個(gè)event 里面,比如用戶數(shù)據(jù)片排?)
Table-table joins
- Both input streams are database changelogs. In this case, every change on one side is joined with the latest state of the other side. The result is a stream of changes to the materialized view of the join between the two tables.
materialized view (derived data狈蚤?)
from [[DDIA Ch12]]
Materialized views, which are a kind of precomputed cache of query results (see
“Aggregation: Data Cubes and Materialized Views” on page 101)
[[DDIA Ch10#Materialization]] 把現(xiàn)有數(shù)據(jù)寫入一個(gè)新的 file 就是 mterialization