繼續(xù)解答問題:
Kafka怎么樣才能不丟消息?
考慮一種比較極端的情況彪置,整個Kafka集群用的是同一路電源拄踪,在掉電的情況下蝇恶,消息是有可能丟失的拳魁,即便消息已經(jīng)被復(fù)制所有的ISR上。默認(rèn)情況下撮弧,Kafka的刷盤是異步刷盤潘懊,也就是說姚糊,把消息寫進(jìn)OS的Page Cache后,已經(jīng)別當(dāng)成持久化成功了授舟,但是此時的消息沒有被sync到磁盤救恨,如果所有ISR的消息都在Page Cache上而不在磁盤中,整體掉電重啟后释树,消息就再也無法被消費(fèi)者消費(fèi)到肠槽,那么消息也就丟失。
private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
// code
segment.append(largestOffset = appendInfo.lastOffset,
largestTimestamp = appendInfo.maxTimestamp,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
records = validRecords)
// code
// unflushedMessages方法定義
// def unflushedMessages: Long = this.logEndOffset - this.recoveryPoint
// 當(dāng)沒有刷盤的消息累積到flushInterval時奢啥,做一次flush
if (unflushedMessages >= config.flushInterval)
flush()
// code
}
}
那萬一最近沒有新的消息秸仙,但是累積的消息的量又達(dá)不到,就需要依靠下面這個定時任務(wù)來做時間維度的定期flush
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs _,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
如果將flush.messages設(shè)置為1桩盲,那么每一條消息都會刷盤寂纪,配合前面整理的acks、min.insync.replicas赌结,會使消息可靠性得到大幅度得提升捞蛋,但是flush.messages=1會嚴(yán)重影響性能,可以在部分可靠性要求高的Topic級別進(jìn)行配置柬姚。