Kafka 源碼解析之 Server 端如何處理 Produce 請(qǐng)求

轉(zhuǎn)載: http://matt33.com/2018/03/18/kafka-server-handle-produce-request/
[TOC]

produce 請(qǐng)求處理整體流程

在 Producer Client 端滋早,Producer 會(huì)維護(hù)一個(gè) ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches 的變量适荣,然后會(huì)根據(jù) topic-partition 的 leader 信息漂问,將 leader 在同一臺(tái)機(jī)器上的 batch 放在一個(gè) request 中,發(fā)送到 server村刨,這樣可以節(jié)省很多網(wǎng)絡(luò)開銷,提高發(fā)送效率钝诚。

Producer Client 發(fā)送請(qǐng)求的方法實(shí)現(xiàn)如下:

//NOTE: 發(fā)送 produce 請(qǐng)求
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
    Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
    final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size());
    for (RecordBatch batch : batches) {
        TopicPartition tp = batch.topicPartition;
        produceRecordsByPartition.put(tp, batch.records());
        recordsByPartition.put(tp, batch);
    }

    ProduceRequest.Builder requestBuilder =
            new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition);
    RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) {
            handleProduceResponse(response, recordsByPartition, time.milliseconds());
        }
    };

    String nodeId = Integer.toString(destination);
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
    client.send(clientRequest, now);
    log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

在發(fā)送 Produce 的請(qǐng)求里,Client 是把一個(gè) Map<TopicPartition, MemoryRecords> 類型的 produceRecordsByPartition 作為內(nèi)容發(fā)送給了 Server 端歇万,那么 Server 端是如何處理這個(gè)請(qǐng)求的呢?這就是本篇文章要講述的內(nèi)容勋陪,Server 處理這個(gè)請(qǐng)求的總體邏輯如下圖所示:

image.png

Server 端處理 produce 請(qǐng)求的總體過(guò)程

Broker 在收到 Produce 請(qǐng)求后贪磺,會(huì)有一個(gè) KafkaApis 進(jìn)行處理,KafkaApis 是 Server 端處理所有請(qǐng)求的入口诅愚,它會(huì)負(fù)責(zé)將請(qǐng)求的具體處理交給相應(yīng)的組件進(jìn)行處理寒锚,從上圖可以看到 Produce 請(qǐng)求是交給了 ReplicaManager 對(duì)象進(jìn)行處理了。

Server 端處理

Server 端的處理過(guò)程會(huì)按照上圖的流程一塊一塊去介紹违孝。

KafkaApis 處理 Produce 請(qǐng)求
KafkaApis 處理 produce 請(qǐng)求是在 handleProducerRequest() 方法中完成刹前,具體實(shí)現(xiàn)如下:

/**
 * Handle a produce request
 */
def handleProducerRequest(request: RequestChannel.Request) {
  val produceRequest = request.body.asInstanceOf[ProduceRequest]
  val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf

  //note: 按 exist 和有 Describe 權(quán)限進(jìn)行篩選
  val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = produceRequest.partitionRecords.asScala.partition {
    case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic)
  }

  //note: 判斷有沒(méi)有 Write 權(quán)限
  val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition {
    case (topicPartition, _) => authorize(request.session, Write, new Resource(auth.Topic, topicPartition.topic))
  }

  // the callback for sending a produce response
  //note: 回調(diào)函數(shù)
  def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {

    val mergedResponseStatus = responseStatus ++
      unauthorizedForWriteRequestInfo.mapValues(_ => new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)) ++
      nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION))

    var errorInResponse = false

    mergedResponseStatus.foreach { case (topicPartition, status) =>
      if (status.error != Errors.NONE) {
        errorInResponse = true
        debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
          request.header.correlationId,
          request.header.clientId,
          topicPartition,
          status.error.exceptionName))
      }
    }

    def produceResponseCallback(delayTimeMs: Int) {
      if (produceRequest.acks == 0) {
        // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
        // the request, since no response is expected by the producer, the server will close socket server so that
        // the producer client will know that some error has happened and will refresh its metadata
        //note: 因?yàn)樵O(shè)置的 ack=0, 相當(dāng)于 client 會(huì)默認(rèn)發(fā)送成功了,如果 server 在處理的過(guò)程出現(xiàn)了錯(cuò)誤,那么就會(huì)關(guān)閉 socket 連接來(lái)間接地通知 client
        //note: client 會(huì)重新刷新 meta,重新建立相應(yīng)的連接
        if (errorInResponse) {
          val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
            topicPartition -> status.error.exceptionName
          }.mkString(", ")
          info(
            s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " +
              s"from client id ${request.header.clientId} with ack=0\n" +
              s"Topic and partition to exceptions: $exceptionsSummary"
          )
          requestChannel.closeConnection(request.processor, request)
        } else {
          requestChannel.noOperation(request.processor, request)
        }
      } else {
        val respBody = request.header.apiVersion match {
          case 0 => new ProduceResponse(mergedResponseStatus.asJava)
          case version@(1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
          // This case shouldn't happen unless a new version of ProducerRequest is added without
          // updating this part of the code to handle it properly.
          case version => throw new IllegalArgumentException(s"Version `$version` of ProduceRequest is not handled. Code must be updated.")
        }

        requestChannel.sendResponse(new RequestChannel.Response(request, respBody))
      }
    }

    // When this callback is triggered, the remote API call has completed
    request.apiRemoteCompleteTimeMs = time.milliseconds

    quotas.produce.recordAndMaybeThrottle(
      request.session.sanitizedUser,
      request.header.clientId,
      numBytesAppended,
      produceResponseCallback)
  }

  if (authorizedRequestInfo.isEmpty)
    sendResponseCallback(Map.empty)
  else {
    val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId

    // call the replica manager to append messages to the replicas
    //note: 追加 Record
    replicaManager.appendRecords(
      produceRequest.timeout.toLong,
      produceRequest.acks,
      internalTopicsAllowed,
      authorizedRequestInfo,
      sendResponseCallback)

    // if the request is put into the purgatory, it will have a held reference
    // and hence cannot be garbage collected; hence we clear its data here in
    // order to let GC re-claim its memory since it is already appended to log
    produceRequest.clearPartitionRecords()
  }
}

總體來(lái)說(shuō),處理過(guò)程是(在權(quán)限系統(tǒng)的情況下):

  1. 查看 topic 是否存在雌桑,以及 client 是否有相應(yīng)的 Desribe 權(quán)限喇喉;
  2. 對(duì)于已經(jīng)有 Describe 權(quán)限的 topic 查看是否有 Write 權(quán)限;
  3. 調(diào)用 replicaManager.appendRecords() 方法向有 Write 權(quán)限的 topic-partition 追加相應(yīng)的 record校坑。

ReplicaManager
ReplicaManager 顧名思義拣技,它就是副本管理器,副本管理器的作用是管理這臺(tái) broker 上的所有副本(replica)撒踪。在 Kafka 中,每個(gè)副本(replica)都會(huì)跟日志實(shí)例(Log 對(duì)象)一一對(duì)應(yīng)大渤,一個(gè)副本會(huì)對(duì)應(yīng)一個(gè) Log 對(duì)象制妄。

Kafka Server 在啟動(dòng)的時(shí)候,會(huì)創(chuàng)建 ReplicaManager 對(duì)象泵三,如下所示耕捞。在 ReplicaManager 的構(gòu)造方法中,它需要 LogManager 作為成員變量烫幕。

//kafka.server.KafkaServer
def startup() {
  try {
    info("starting")
    /* start replica manager */
    replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower)
    replicaManager.startup()
  }catch {
    case e: Throwable =>
    fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
    isStartingUp.set(false)
    shutdown()
    throw e
  }
}

ReplicaManager 的并不負(fù)責(zé)具體的日志創(chuàng)建俺抽,它只是管理 Broker 上的所有分區(qū)(也就是圖中下一步的那個(gè) Partition 對(duì)象)。在創(chuàng)建 Partition 對(duì)象時(shí)较曼,它需要 ReplicaManager 的 logManager 對(duì)象磷斧,Partition 會(huì)通過(guò)這個(gè) logManager 對(duì)象為每個(gè) replica 創(chuàng)建對(duì)應(yīng)的日志。

/**
 * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
 */
class Partition(val topic: String,
                val partitionId: Int,
                time: Time,
                replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
  val topicPartition = new TopicPartition(topic, partitionId)

  private val localBrokerId = replicaManager.config.brokerId
  private val logManager = replicaManager.logManager //note: 日志管理器
}

ReplicaManager 與 LogManger 對(duì)比如下:

管理對(duì)象 組成部分
日志管理器(LogManager) 日志(Log) 日志分段(LogSegment)
副本管理器(ReplicaManager) 分區(qū)(Partition) 副本(Replica)

appendRecords() 實(shí)現(xiàn)

下面我們來(lái)看 appendRecords() 方法的具體實(shí)現(xiàn)捷犹。

//note: 向 partition 的 leader 寫入數(shù)據(jù)
def appendRecords(timeout: Long,
                  requiredAcks: Short,
                  internalTopicsAllowed: Boolean,
                  entriesPerPartition: Map[TopicPartition, MemoryRecords],
                  responseCallback: Map[TopicPartition, PartitionResponse] => Unit) {

  if (isValidRequiredAcks(requiredAcks)) { //note: acks 設(shè)置有效
    val sTime = time.milliseconds
    //note: 向本地的副本 log 追加數(shù)據(jù)
    val localProduceResults = appendToLocalLog(internalTopicsAllowed, entriesPerPartition, requiredAcks)
    debug("Produce to local log in %d ms".format(time.milliseconds - sTime))

    val produceStatus = localProduceResults.map { case (topicPartition, result) =>
      topicPartition ->
              ProducePartitionStatus(
                result.info.lastOffset + 1, // required offset
                new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
    }

    if (delayedRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
      //note: 處理 ack=-1 的情況,需要等到 isr 的 follower 都寫入成功的話,才能返回最后結(jié)果
      // create delayed produce operation
      val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
      //note: 延遲 produce 請(qǐng)求
      val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)

      // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
      val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq

      // try to complete the request immediately, otherwise put it into the purgatory
      // this is because while the delayed produce operation is being created, new
      // requests may arrive and hence make this operation completable.
      delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

    } else {
      // we can respond immediately
      //note: 通過(guò)回調(diào)函數(shù)直接返回結(jié)果
      val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
      responseCallback(produceResponseStatus)
    }
  } else {
    // If required.acks is outside accepted range, something is wrong with the client
    // Just return an error and don't handle the request at all
    //note: 返回 INVALID_REQUIRED_ACKS 錯(cuò)誤
    val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
      topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
        LogAppendInfo.UnknownLogAppendInfo.firstOffset, Record.NO_TIMESTAMP)
    }
    responseCallback(responseStatus)
  }
}

從上面的實(shí)現(xiàn)來(lái)看弛饭,appendRecords() 的實(shí)現(xiàn)主要分為以下幾步:

  1. 首先判斷 acks 設(shè)置是否有效(-1,0萍歉,1三個(gè)值有效)侣颂,無(wú)效的話直接返回異常,不再處理枪孩;
  2. acks 設(shè)置有效的話憔晒,調(diào)用 appendToLocalLog() 方法將 records 追加到本地對(duì)應(yīng)的 log 對(duì)象中藻肄;
  3. appendToLocalLog() 處理完后,如果發(fā)現(xiàn) clients 設(shè)置的 acks=-1拒担,即需要 isr 的其他的副本同步完成才能返回 response嘹屯,那么就會(huì)創(chuàng)建一個(gè) DelayedProduce 對(duì)象,等待 isr 的其他副本進(jìn)行同步澎蛛,否則的話直接返回追加的結(jié)果抚垄。

appendToLocalLog() 的實(shí)現(xiàn)

追加日志的實(shí)際操作是在 appendToLocalLog() 中完成的,這里看下它的具體實(shí)現(xiàn):

/**
 * Append the messages to the local replica logs
 */
//note: 向本地的 replica 寫入數(shù)據(jù)
private def appendToLocalLog(internalTopicsAllowed: Boolean,
                             entriesPerPartition: Map[TopicPartition, MemoryRecords],
                             requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
  trace("Append [%s] to local log ".format(entriesPerPartition))
  entriesPerPartition.map { case (topicPartition, records) => //note: 遍歷要寫的所有 topic-partition
    BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).totalProduceRequestRate.mark()
    BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark()

    // reject appending to internal topics if it is not allowed
    //note: 不能向 kafka 內(nèi)部使用的 topic 追加數(shù)據(jù)
    if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
      (topicPartition, LogAppendResult(
        LogAppendInfo.UnknownLogAppendInfo,
        Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
    } else {
      try {
        //note: 查找對(duì)應(yīng)的 Partition,并向分區(qū)對(duì)應(yīng)的副本寫入數(shù)據(jù)文件
        val partitionOpt = getPartition(topicPartition) //note: 獲取 topic-partition 的 Partition 對(duì)象
        val info = partitionOpt match {
          case Some(partition) =>
            partition.appendRecordsToLeader(records, requiredAcks) //note: 如果找到了這個(gè)對(duì)象,就開始追加日志
          case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
            .format(topicPartition, localBrokerId)) //note: 沒(méi)有找到的話,返回異常
        }

        //note: 追加的 msg 數(shù)
        val numAppendedMessages =
          if (info.firstOffset == -1L || info.lastOffset == -1L)
            0
          else
            info.lastOffset - info.firstOffset + 1

        // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
        //note:  更新 metrics
        BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
        BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(records.sizeInBytes)
        BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
        BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)

        trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
          .format(records.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset))
        (topicPartition, LogAppendResult(info))
      } catch { //note: 處理追加過(guò)程中出現(xiàn)的異常
        // NOTE: Failed produce requests metric is not incremented for known exceptions
        // it is supposed to indicate un-expected failures of a broker in handling a produce request
        case e: KafkaStorageException =>
          fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
          Runtime.getRuntime.halt(1)
          (topicPartition, null)
        case e@ (_: UnknownTopicOrPartitionException |
                 _: NotLeaderForPartitionException |
                 _: RecordTooLargeException |
                 _: RecordBatchTooLargeException |
                 _: CorruptRecordException |
                 _: InvalidTimestampException) =>
          (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
        case t: Throwable =>
          BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).failedProduceRequestRate.mark()
          BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
          error("Error processing append operation on partition %s".format(topicPartition), t)
          (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t)))
      }
    }
  }
}

從上面可以看到 appendToLocalLog() 的實(shí)現(xiàn)如下:

  1. 首先判斷要寫的 topic 是不是 Kafka 內(nèi)置的 topic谋逻,內(nèi)置的 topic 是不允許 Producer 寫入的呆馁;
  2. 先查找 topic-partition 對(duì)應(yīng)的 Partition 對(duì)象,如果在 allPartitions 中查找到了對(duì)應(yīng)的 partition毁兆,那么直接調(diào)用 partition.appendRecordsToLeader() 方法追加相應(yīng)的 records浙滤,否則會(huì)向 client 拋出異常。

Partition.appendRecordsToLeader() 方法
ReplicaManager 在追加 records 時(shí)气堕,調(diào)用的是 Partition 的 appendRecordsToLeader() 方法纺腊,其具體的實(shí)現(xiàn)如下:

def appendRecordsToLeader(records: MemoryRecords, requiredAcks: Int = 0) = {
  val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
    leaderReplicaIfLocal match {
      case Some(leaderReplica) =>
        val log = leaderReplica.log.get //note: 獲取對(duì)應(yīng)的 Log 對(duì)象
        val minIsr = log.config.minInSyncReplicas
        val inSyncSize = inSyncReplicas.size

        // Avoid writing to leader if there are not enough insync replicas to make it safe
        //note: 如果 ack 設(shè)置為-1, isr 數(shù)小于設(shè)置的 min.isr 時(shí),就會(huì)向 producer 拋出相應(yīng)的異常
        if (inSyncSize < minIsr && requiredAcks == -1) {
          throw new NotEnoughReplicasException("Number of insync replicas for partition %s is [%d], below required minimum [%d]"
            .format(topicPartition, inSyncSize, minIsr))
        }

        //note: 向副本對(duì)應(yīng)的 log 追加響應(yīng)的數(shù)據(jù)
        val info = log.append(records, assignOffsets = true)
        // probably unblock some follower fetch requests since log end offset has been updated
        replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
        // we may need to increment high watermark since ISR could be down to 1
        //note: 判斷是否需要增加 HHW(追加日志后會(huì)進(jìn)行一次判斷)
        (info, maybeIncrementLeaderHW(leaderReplica))

      case None =>
        //note: leader 不在本臺(tái)機(jī)器上
        throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
          .format(topicPartition, localBrokerId))
    }
  }

  // some delayed operations may be unblocked after HW changed
  if (leaderHWIncremented)
    tryCompleteDelayedRequests()

  info
}

在這個(gè)方法里,會(huì)根據(jù) topic 的 min.isrs 配置以及當(dāng)前這個(gè) partition 的 isr 情況判斷是否可以寫入茎芭,如果不滿足條件揖膜,就會(huì)拋出 NotEnoughReplicasException 的異常,如果滿足條件梅桩,就會(huì)調(diào)用 log.append() 向 replica 追加日志壹粟。

存儲(chǔ)層

跟著最開始圖中的流程及代碼分析,走到這里宿百,才算是到了 Kafka 的存儲(chǔ)層部分趁仙,在這里會(huì)詳細(xì)講述在存儲(chǔ)層 Kafka 如何寫入日志。

Log 對(duì)象
在上面有過(guò)一些介紹垦页,每個(gè) replica 會(huì)對(duì)應(yīng)一個(gè) log 對(duì)象雀费,log 對(duì)象是管理當(dāng)前分區(qū)的一個(gè)單位,它會(huì)包含這個(gè)分區(qū)的所有 segment 文件(包括對(duì)應(yīng)的 offset 索引和時(shí)間戳索引文件)痊焊,它會(huì)提供一些增刪查的方法盏袄。

在 Log 對(duì)象的初始化時(shí),有三個(gè)變量是比較重要的:

  1. nextOffsetMetadata:可以叫做下一個(gè)偏移量元數(shù)據(jù)薄啥,它包括 activeSegment 的下一條消息的偏移量貌矿,該 activeSegment 的基準(zhǔn)偏移量及日志分段的大小罪佳;
  2. activeSegment:指的是該 Log 管理的 segments 中那個(gè)最新的 segment(這里叫做活躍的 segment)逛漫,一個(gè) Log 中只會(huì)有一個(gè)活躍的 segment,其他的 segment 都已經(jīng)被持久化到磁盤了赘艳;
  3. logEndOffset:表示下一條消息的 offset酌毡,它取自 nextOffsetMetadata 的 offset克握,實(shí)際上就是活動(dòng)日志分段的下一個(gè)偏移量。
//note: nextOffsetMetadata 聲明為 volatile枷踏,如果該值被修改菩暗,其他使用此變量的線程就可以立刻見到變化后的值,在生產(chǎn)和消費(fèi)都會(huì)使用到這個(gè)值
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _

/* Calculate the offset of the next message */
//note: 下一個(gè)偏移量元數(shù)據(jù)
//note: 第一個(gè)參數(shù):下一條消息的偏移量旭蠕;第二個(gè)參數(shù):日志分段的基準(zhǔn)偏移量停团;第三個(gè)參數(shù):日志分段大小
nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)

/**
* The active segment that is currently taking appends
*/
//note: 任何時(shí)刻,只會(huì)有一個(gè)活動(dòng)的日志分段
def activeSegment = segments.lastEntry.getValue

/**
*  The offset of the next message that will be appended to the log
*/
//note: 下一條消息的 offset掏熬,從 nextOffsetMetadata 中獲取的
def logEndOffset: Long = nextOffsetMetadata.messageOffset

日志寫入
在 Log 中一個(gè)重要的方法就是日志的寫入方法佑稠,下面來(lái)看下這個(gè)方法的實(shí)現(xiàn)。

/**
 * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
 *
 * This method will generally be responsible for assigning offsets to the messages,
 * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid.
 *
 * @param records The log records to append
 * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
 * @throws KafkaStorageException If the append fails due to an I/O error.
 * @return Information about the appended messages including the first and last offset.
 */
//note: 向 active segment 追加 log,必要的情況下,滾動(dòng)創(chuàng)建新的 segment
def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
  val appendInfo = analyzeAndValidateRecords(records) //note: 返回這批消息的該要信息,并對(duì)這批 msg 進(jìn)行校驗(yàn)

  // if we have any valid messages, append them to the log
  if (appendInfo.shallowCount == 0)
    return appendInfo

  // trim any invalid bytes or partial messages before appending it to the on-disk log
  //note: 刪除這批消息中無(wú)效的消息
  var validRecords = trimInvalidBytes(records, appendInfo)

  try {
    // they are valid, insert them in the log
    lock synchronized {

      if (assignOffsets) {
        // assign offsets to the message set
        //note: 計(jì)算這個(gè)消息集起始 offset旗芬,對(duì) offset 的操作是一個(gè)原子操作
        val offset = new LongRef(nextOffsetMetadata.messageOffset)
        appendInfo.firstOffset = offset.value //note: 作為消息集的第一個(gè) offset
        val now = time.milliseconds //note: 設(shè)置的時(shí)間錯(cuò)以 server 收到的時(shí)間戳為準(zhǔn)
        //note: 驗(yàn)證消息,并為沒(méi)條 record 設(shè)置相應(yīng)的 offset 和 timestrap
        val validateAndOffsetAssignResult = try {
          LogValidator.validateMessagesAndAssignOffsets(validRecords,
                                                        offset,
                                                        now,
                                                        appendInfo.sourceCodec,
                                                        appendInfo.targetCodec,
                                                        config.compact,
                                                        config.messageFormatVersion.messageFormatVersion,
                                                        config.messageTimestampType,
                                                        config.messageTimestampDifferenceMaxMs)
        } catch {
          case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
        }
        //note: 返回已經(jīng)計(jì)算好 offset 和 timestrap 的 MemoryRecords
        validRecords = validateAndOffsetAssignResult.validatedRecords
        appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
        appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
        appendInfo.lastOffset = offset.value - 1 //note: 最后一條消息的 offset
        if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
          appendInfo.logAppendTime = now

        // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
        // format conversion)
        //note: 更新 metrics 的記錄
        if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
          for (logEntry <- validRecords.shallowEntries.asScala) {
            if (logEntry.sizeInBytes > config.maxMessageSize) {
              // we record the original message set size instead of the trimmed size
              // to be consistent with pre-compression bytesRejectedRate recording
              BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
              BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
              throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
                .format(logEntry.sizeInBytes, config.maxMessageSize))
            }
          }
        }

      } else {
        // we are taking the offsets we are given
        if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
          throw new IllegalArgumentException("Out of order offsets found in " + records.deepEntries.asScala.map(_.offset))
      }

      // check messages set size may be exceed config.segmentSize
      if (validRecords.sizeInBytes > config.segmentSize) {
        throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
          .format(validRecords.sizeInBytes, config.segmentSize))
      }

      // maybe roll the log if this segment is full
      //note: 如果當(dāng)前 segment 滿了舌胶,就需要重新新建一個(gè) segment
      val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
        maxTimestampInMessages = appendInfo.maxTimestamp,
        maxOffsetInMessages = appendInfo.lastOffset)


      // now append to the log
      //note: 追加消息到當(dāng)前 segment
      segment.append(firstOffset = appendInfo.firstOffset,
        largestOffset = appendInfo.lastOffset,
        largestTimestamp = appendInfo.maxTimestamp,
        shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
        records = validRecords)

      // increment the log end offset
      //note: 修改最新的 next_offset
      updateLogEndOffset(appendInfo.lastOffset + 1)

      trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
        .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords))

      if (unflushedMessages >= config.flushInterval)//note: 滿足條件的話,刷新磁盤
        flush()

      appendInfo
    }
  } catch {
    case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
  }
}

Server 將每個(gè)分區(qū)的消息追加到日志中時(shí)疮丛,是以 segment 為單位的幔嫂,當(dāng) segment 的大小到達(dá)閾值大小之后,會(huì)滾動(dòng)新建一個(gè)日志分段(segment)保存新的消息誊薄,而分區(qū)的消息總是追加到最新的日志分段(也就是 activeSegment)中履恩。每個(gè)日志分段都會(huì)有一個(gè)基準(zhǔn)偏移量(segmentBaseOffset,或者叫做 baseOffset)呢蔫,這個(gè)基準(zhǔn)偏移量就是分區(qū)級(jí)別的絕對(duì)偏移量切心,而且這個(gè)值在日志分段是固定的。有了這個(gè)基準(zhǔn)偏移量咐刨,就可以計(jì)算出來(lái)每條消息在分區(qū)中的絕對(duì)偏移量昙衅,最后把數(shù)據(jù)以及對(duì)應(yīng)的絕對(duì)偏移量寫到日志文件中扬霜。append() 方法的過(guò)程可以總結(jié)如下:

  1. analyzeAndValidateRecords():對(duì)這批要寫入的消息進(jìn)行檢測(cè)定鸟,主要是檢查消息的大小及 crc 校驗(yàn);
  2. trimInvalidBytes():會(huì)將這批消息中無(wú)效的消息刪除著瓶,返回一個(gè)都是有效消息的 MemoryRecords联予;
  3. LogValidator.validateMessagesAndAssignOffsets():為每條消息設(shè)置相應(yīng)的 offset(絕對(duì)偏移量) 和 timestrap;
  4. maybeRoll():判斷是否需要新建一個(gè) segment 的材原,如果當(dāng)前的 segment 放不下這批消息的話沸久,需要新建一個(gè) segment;
  5. segment.append():向 segment 中添加消息余蟹;
  6. 更新 logEndOffset 和判斷是否需要刷新磁盤(如果需要的話卷胯,調(diào)用 flush() 方法刷到磁盤)。

關(guān)于 timestrap 的設(shè)置威酒,這里也順便介紹一下窑睁,在新版的 Kafka 中挺峡,每條 msg 都會(huì)有一個(gè)對(duì)應(yīng)的時(shí)間戳記錄,producer 端可以設(shè)置這個(gè)字段 message.timestamp.type 來(lái)選擇 timestrap 的類型担钮,默認(rèn)是按照創(chuàng)建時(shí)間橱赠,只能選擇從下面的選擇中二選一:

  1. CreateTime,默認(rèn)值箫津;
  2. LogAppendTime狭姨。

日志分段
在 Log 的 append() 方法中,會(huì)調(diào)用 maybeRoll() 方法來(lái)判斷是否需要進(jìn)行相應(yīng)日志分段操作苏遥,其具體實(shí)現(xiàn)如下:

/**
 * Roll the log over to a new empty log segment if necessary.
 *
 * @param messagesSize The messages set size in bytes
 * @param maxTimestampInMessages The maximum timestamp in the messages.
 * logSegment will be rolled if one of the following conditions met
 * <ol>
 * <li> The logSegment is full
 * <li> The maxTime has elapsed since the timestamp of first message in the segment (or since the create time if
 * the first message does not have a timestamp)
 * <li> The index is full
 * </ol>
 * @return The currently active segment after (perhaps) rolling to a new segment
 */
//note: 判斷是否需要?jiǎng)?chuàng)建日志分段,如果不需要返回當(dāng)前分段,需要的話,返回新創(chuàng)建的日志分段
private def maybeRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long): LogSegment = {
  val segment = activeSegment //note: 對(duì)活躍的日志分段進(jìn)行判斷,它也是最新的一個(gè)日志分段
  val now = time.milliseconds
  //note: 距離上次日志分段的時(shí)間是否達(dá)到了設(shè)置的閾值(log.roll.hours)
  val reachedRollMs = segment.timeWaitedForRoll(now, maxTimestampInMessages) > config.segmentMs - segment.rollJitterMs
  //note: 這是五個(gè)條件: 1. 文件滿了,不足以放心這么大的 messageSet; 2. 文件有數(shù)據(jù),并且到分段的時(shí)間閾值; 3. 索引文件滿了;
  //note: 4. 時(shí)間索引文件滿了; 5. 最大的 offset饼拍,其相對(duì)偏移量超過(guò)了正整數(shù)的閾值
  if (segment.size > config.segmentSize - messagesSize ||
      (segment.size > 0 && reachedRollMs) ||
      segment.index.isFull || segment.timeIndex.isFull || !segment.canConvertToRelativeOffset(maxOffsetInMessages)) {
    debug(s"Rolling new log segment in $name (log_size = ${segment.size}/${config.segmentSize}}, " +
        s"index_size = ${segment.index.entries}/${segment.index.maxEntries}, " +
        s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
        s"inactive_time_ms = ${segment.timeWaitedForRoll(now, maxTimestampInMessages)}/${config.segmentMs - segment.rollJitterMs}).")
    roll(maxOffsetInMessages - Integer.MAX_VALUE) //note: 創(chuàng)建新的日志分段
  } else {
    segment //note: 使用當(dāng)前的日志分段
  }
}

從 maybeRoll() 的實(shí)現(xiàn)可以看到,是否需要?jiǎng)?chuàng)建新的日志分段暖眼,有下面幾種情況:

  1. 當(dāng)前日志分段的大小加上消息的大小超過(guò)了日志分段的閾值(log.segment.bytes)惕耕;
  2. 距離上次創(chuàng)建日志分段的時(shí)間達(dá)到了一定的閾值(log.roll.hours),并且數(shù)據(jù)文件有數(shù)據(jù)诫肠;
  3. 索引文件滿了司澎;
  4. 時(shí)間索引文件滿了;
  5. 最大的 offset栋豫,其相對(duì)偏移量超過(guò)了正整數(shù)的閾值懦鼠。

如果上面的其中一個(gè)條件,就會(huì)創(chuàng)建新的 segment 文件恬砂,見 roll() 方法實(shí)現(xiàn):

/**
 * Roll the log over to a new active segment starting with the current logEndOffset.
 * This will trim the index to the exact size of the number of entries it currently contains.
 *
 * @return The newly rolled segment
 */
//note: 滾動(dòng)創(chuàng)建日志,并添加到日志管理的映射表中
def roll(expectedNextOffset: Long = 0): LogSegment = {
  val start = time.nanoseconds
  lock synchronized {
    val newOffset = Math.max(expectedNextOffset, logEndOffset) //note: 選擇最新的 offset 作為基準(zhǔn)偏移量
    val logFile = logFilename(dir, newOffset) //note: 創(chuàng)建數(shù)據(jù)文件
    val indexFile = indexFilename(dir, newOffset) //note: 創(chuàng)建 offset 索引文件
    val timeIndexFile = timeIndexFilename(dir, newOffset) //note: 創(chuàng)建 time 索引文件
    for(file <- List(logFile, indexFile, timeIndexFile); if file.exists) {
      warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
      file.delete()
    }

    segments.lastEntry() match {
      case null =>
      case entry => {
        val seg = entry.getValue
        seg.onBecomeInactiveSegment()
        seg.index.trimToValidSize()
        seg.timeIndex.trimToValidSize()
        seg.log.trim()
      }
    }
    //note: 創(chuàng)建一個(gè) segment 對(duì)象
    val segment = new LogSegment(dir,
                                 startOffset = newOffset,
                                 indexIntervalBytes = config.indexInterval,
                                 maxIndexSize = config.maxIndexSize,
                                 rollJitterMs = config.randomSegmentJitter,
                                 time = time,
                                 fileAlreadyExists = false,
                                 initFileSize = initFileSize,
                                 preallocate = config.preallocate)
    val prev = addSegment(segment) //note: 添加到日志管理中
    if(prev != null)
      throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
    // We need to update the segment base offset and append position data of the metadata when log rolls.
    // The next offset should not change.
    updateLogEndOffset(nextOffsetMetadata.messageOffset) //note: 更新 offset
    // schedule an asynchronous flush of the old segment
    scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)

    info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0)))

    segment
  }
}

創(chuàng)建一個(gè) segment 對(duì)象嘹叫,真正的實(shí)現(xiàn)是在 Log 的 roll() 方法中,也就是上面的方法中丛肢,創(chuàng)建 segment 對(duì)象围肥,主要包括三部分:數(shù)據(jù)文件、offset 索引文件和 time 索引文件蜂怎。

offset 索引文件
這里順便講述一下 offset 索引文件穆刻,Kafka 的索引文件有下面一個(gè)特點(diǎn):

  1. 采用 絕對(duì)偏移量+相對(duì)偏移量 的方式進(jìn)行存儲(chǔ)的,每個(gè) segment 最開始絕對(duì)偏移量也是其基準(zhǔn)偏移量杠步;
  2. 數(shù)據(jù)文件每隔一定的大小創(chuàng)建一個(gè)索引條目氢伟,而不是每條消息會(huì)創(chuàng)建索引條目,通過(guò) index.interval.bytes 來(lái)配置幽歼,默認(rèn)是 4096朵锣,也就是4KB;

這樣做的好處也非常明顯:

  1. 因?yàn)椴皇敲織l消息都創(chuàng)建相應(yīng)的索引條目甸私,所以索引條目是稀疏的诚些;
  2. 索引的相對(duì)偏移量占據(jù)4個(gè)字節(jié),而絕對(duì)偏移量占據(jù)8個(gè)字節(jié)皇型,加上物理位置的4個(gè)字節(jié)诬烹,使用相對(duì)索引可以將每條索引條目的大小從12字節(jié)減少到8個(gè)字節(jié)助析;
  3. 因?yàn)槠屏坑行虻模僮x取數(shù)據(jù)時(shí)椅您,可以按照二分查找的方式去快速定位偏移量的位置外冀;
  4. 這樣的稀疏索引是可以完全放到內(nèi)存中,加快偏移量的查找掀泳。

LogSegment 寫入
真正的日志寫入雪隧,還是在 LogSegment 的 append() 方法中完成的,LogSegment 會(huì)跟 Kafka 最底層的文件通道员舵、mmap 打交道脑沿。

 /**
 * Append the given messages starting with the given offset. Add
 * an entry to the index if needed.
 *
 * It is assumed this method is being called from within a lock.
 *
 * @param firstOffset The first offset in the message set.
 * @param largestTimestamp The largest timestamp in the message set.
 * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
 * @param records The log entries to append.
 */
 //note: 在指定的 offset 處追加指定的 msgs, 需要的情況下追加相應(yīng)的索引
@nonthreadsafe
def append(firstOffset: Long, largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords) {
  if (records.sizeInBytes > 0) {
    trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d"
        .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp))
    val physicalPosition = log.sizeInBytes()
    if (physicalPosition == 0)
      rollingBasedTimestamp = Some(largestTimestamp)
    // append the messages
    require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")
    val appendedBytes = log.append(records) //note: 追加到數(shù)據(jù)文件中
    trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")
    // Update the in memory max timestamp and corresponding offset.
    if (largestTimestamp > maxTimestampSoFar) {
      maxTimestampSoFar = largestTimestamp
      offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
    }
    // append an entry to the index (if needed)
    //note: 判斷是否需要追加索引(數(shù)據(jù)每次都會(huì)添加到數(shù)據(jù)文件中,但不是每次都會(huì)添加索引的,間隔 indexIntervalBytes 大小才會(huì)寫入一個(gè)索引文件)
    if(bytesSinceLastIndexEntry > indexIntervalBytes) {
      index.append(firstOffset, physicalPosition) //note: 添加索引
      timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
      bytesSinceLastIndexEntry = 0 //note: 重置為0
    }
    bytesSinceLastIndexEntry += records.sizeInBytes
  }
}

經(jīng)過(guò)上面的分析,一個(gè)消息集(MemoryRecords)在 Kafka 存儲(chǔ)層的調(diào)用情況如下圖所示:

image.png

最后還是利用底層的 Java NIO 實(shí)現(xiàn)马僻。

FAQ
有一點(diǎn)不是很明白庄拇,kafka server在接收到producer的請(qǐng)求以后是不是實(shí)時(shí)持久化的呢?
看到文章的最后說(shuō),
LogSegment 會(huì)跟 Kafka 最底層的文件通道韭邓、mmap 打交道措近。
是不是是說(shuō)kafka的消息是實(shí)時(shí)的進(jìn)行了持久化呢?
但是也看到上面在Log.append() 方法中有這樣的介紹

if (unflushedMessages >= config.flushInterval)//note: 滿足條件的話女淑,刷新磁盤
flush()

如果是實(shí)時(shí)持久化的為什么還需要這個(gè)判斷操作呢

明白了瞭郑,使用mmap只是寫入了頁(yè)緩存,并沒(méi)有flush進(jìn)磁盤鸭你,

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末屈张,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子袱巨,更是在濱河造成了極大的恐慌阁谆,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,946評(píng)論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件愉老,死亡現(xiàn)場(chǎng)離奇詭異场绿,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)俺夕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,336評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門裳凸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)贱鄙,“玉大人劝贸,你說(shuō)我怎么就攤上這事《耗” “怎么了映九?”我有些...
    開封第一講書人閱讀 169,716評(píng)論 0 364
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)瞎颗。 經(jīng)常有香客問(wèn)我件甥,道長(zhǎng)捌议,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,222評(píng)論 1 300
  • 正文 為了忘掉前任引有,我火速辦了婚禮瓣颅,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘譬正。我一直安慰自己宫补,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,223評(píng)論 6 398
  • 文/花漫 我一把揭開白布曾我。 她就那樣靜靜地躺著粉怕,像睡著了一般。 火紅的嫁衣襯著肌膚如雪抒巢。 梳的紋絲不亂的頭發(fā)上贫贝,一...
    開封第一講書人閱讀 52,807評(píng)論 1 314
  • 那天,我揣著相機(jī)與錄音蛉谜,去河邊找鬼稚晚。 笑死,一個(gè)胖子當(dāng)著我的面吹牛型诚,可吹牛的內(nèi)容都是我干的蜈彼。 我是一名探鬼主播,決...
    沈念sama閱讀 41,235評(píng)論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼俺驶,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼幸逆!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起暮现,我...
    開封第一講書人閱讀 40,189評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤还绘,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后栖袋,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體拍顷,經(jīng)...
    沈念sama閱讀 46,712評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,775評(píng)論 3 343
  • 正文 我和宋清朗相戀三年塘幅,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了昔案。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,926評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡电媳,死狀恐怖踏揣,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情匾乓,我是刑警寧澤捞稿,帶...
    沈念sama閱讀 36,580評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站,受9級(jí)特大地震影響娱局,放射性物質(zhì)發(fā)生泄漏彰亥。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,259評(píng)論 3 336
  • 文/蒙蒙 一衰齐、第九天 我趴在偏房一處隱蔽的房頂上張望任斋。 院中可真熱鬧,春花似錦耻涛、人聲如沸仁卷。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,750評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)锦积。三九已至,卻和暖如春歉嗓,著一層夾襖步出監(jiān)牢的瞬間丰介,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,867評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工鉴分, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留哮幢,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,368評(píng)論 3 379
  • 正文 我出身青樓志珍,卻偏偏與公主長(zhǎng)得像橙垢,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子伦糯,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,930評(píng)論 2 361