RecordAccumulator.java

public final class RecordAccumulator {

    // Sender線程準備關閉
    private volatile boolean closed;
    // 批次大小
    private final int batchSize;
    // 緩存了發(fā)往對應topicPartition的N個消息批次
    // Deque是非線程安全绞幌,所以出入隊列需要加鎖
    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
    private final BufferPool free;
    // 未發(fā)送完成的批次集合,底層通過Set<RecordBatch>實現(xiàn)
    private final IncompleteRecordBatches incomplete;
    // 使用drain方法批量導出批次,為了防止饑餓,使用drainIndex記錄上次發(fā)送停止時的位置谆棱,下次發(fā)送繼續(xù)從該位置開始
    private int drainIndex;

    // 發(fā)送追加消息
    public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // 通過topic-partition查找map里指定的消息隊列
        Deque<RecordBatch> dq = getOrCreateDeque(tp);
        synchronized (dq) {
            // 嘗試往Dequeue的最后一個批次里追加消息
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
            if (appendResult != null)
                // 追加成功返回
                return appendResult;
        }

        // 追加失敗, 嘗試申請緩存
        // 注意這里走出synchronized代碼塊吏口,釋放dq鎖了粥鞋,可能堵塞在申請緩存空間上壮韭,把發(fā)送消息讓給其他需要更少空間的發(fā)送消息線程
        int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
        ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
        synchronized (dq) {
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
            if (appendResult != null) {
                // appendResult不為null說明第2次重試tryAppend成功了
                // 說明在synchronized釋放申請緩存的時候尚辑,有其他線程釋放了空間
                // 所以這里要把申請的緩存還回去
                free.deallocate(buffer);
                return appendResult;
            }
            // 申請到新空間后創(chuàng)建一個新批次
            // 之前從free那申請的緩存分配給MemoryRecords
            MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
            RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

            // 新建的batch加到Dequeue和incomplete里
            dq.addLast(batch);
            incomplete.add(batch);
            // 返回result,return后面會根據(jù)條件判斷是否要喚醒Sender線程
            return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
        }
    }

    // 獲取集群中符合發(fā)送消息條件的節(jié)點集合
    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        Set<Node> readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        boolean unknownLeadersExist = false;

        // waiter隊列里有成員在排隊辑鲤,說明有其他線程在等待BufferPool釋放空間,需要發(fā)送消息釋放空間
        boolean exhausted = this.free.queued() > 0;
        // 遍歷每個topicPartition
        for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<RecordBatch> deque = entry.getValue();

            // 獲取parition的leader節(jié)點
            Node leader = cluster.leaderFor(part);
            if (leader == null) {
                // 標記需要更新Metadata
                unknownLeadersExist = true;
            } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                synchronized (deque) {
                    // 加鎖從隊列里取出第一個批次
                    RecordBatch batch = deque.peekFirst();
                    if (batch != null) {
                        boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                        long waitedTimeMs = nowMs - batch.lastAttemptMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        // Dequeue中有多個批次或者第一個批次已經(jīng)滿了
                        // 如果緩存已用容量position超過writeLimit,表示批次已滿
                        boolean full = deque.size() > 1 || batch.records.isFull();
                        // 超時
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        // flushInProgress有線程正在等待flush操作完成
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        if (sendable && !backingOff) {
                            // 滿足符合發(fā)送消息的節(jié)點
                            readyNodes.add(leader);
                        } else {
                            // 下次調用ready方法的間隔, 本質是控制selector.select()堵塞時參考的等待時長要素之一
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }

        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
    }

    // 將partition-批次集合 轉成 節(jié)點-批次集合 映射
     public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {

        Map<Integer, List<RecordBatch>> batches = new HashMap<>();
        // 遍歷需要發(fā)送消息的節(jié)點
        for (Node node : nodes) {
            int size = 0;
            // node上的partition集合
            List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
            // 待發(fā)送消息批次的集合
            List<RecordBatch> ready = new ArrayList<>();
            // drainIndex就是斷電續(xù)傳的書簽
            int start = drainIndex = drainIndex % parts.size();
            do {
                PartitionInfo part = parts.get(drainIndex);
                TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                if (!muted.contains(tp)) {
                    // 獲取該partion的消息批次集合
                    Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
                    if (deque != null) {
                        synchronized (deque) {
                            // 獲取第一條消息批次
                            RecordBatch first = deque.peekFirst();
                            if (first != null) {
                                boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
                                if (!backoff) {
                                    if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
                                        // 要發(fā)送的集合大小超過配置的單個請求的maxSize杠茬,跳出循環(huán)
                                        // 因為外層循環(huán)是node節(jié)點月褥,所以單個請求只是node粒度的
                                        break;
                                    } else {
                                        // 每個node的partition只取一個消息批次,后面會drainIndex+1尋找下一個partition
                                        // 防止只有一個partition在發(fā)送瓢喉,其他partition處于饑餓狀態(tài)
                                        RecordBatch batch = deque.pollFirst();
                                        // 關閉輸出流
                                        batch.records.close();
                                        size += batch.records.sizeInBytes();
                                        ready.add(batch);
                                        batch.drainedMs = now;
                                    }
                                }
                            }
                        }
                    }
                }
                this.drainIndex = (this.drainIndex + 1) % parts.size();
            } while (start != drainIndex);
            batches.put(node.id(), ready);
        }
        return batches;
    }
}

public final class BufferPool {
    // 整個pool的大小
    private final long totalMemory;
    // 指定free隊列里每個byteBuffer的大小,等于batchSize
    private final int poolableSize;
    // 控制多線程并發(fā)分配和回收ByteBuffer
    private final ReentrantLock lock;
    // 緩存byteBuffer的隊列,緩沖池
    // 池化隊列free宁赤,避免ByteBuffer.allocate創(chuàng)建開銷
    private final Deque<ByteBuffer> free;
    // 記錄因申請不到足夠的空間而阻塞的線程,實際記錄阻塞線程對應的Condition對象
    private final Deque<Condition> waiters;
    // totalMemory - free
    private long availableMemory;

    // 追加消息時候會調用
    // 從緩沖池free里申請ByteBuffer栓票,緩沖池空間不足就會阻塞調用線程
    // size是申請空間大小,maxTimeToBlockMs申請最大等待時間
    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        // 加鎖
        this.lock.lock();
        try {
            if (size == poolableSize && !this.free.isEmpty())
                // 申請大小正好等于一個ByteBuffer對象的大小决左,直接從隊列里取出
                // 所以消息的長度不要超過batchSize
                return this.free.pollFirst();

            int freeListSize = this.free.size() * this.poolableSize;
            // 緩沖池+剩余可用足夠申請
            if (this.availableMemory + freeListSize >= size) {
                // 先從free里一塊塊劃走ByteBuffer緩存到availableMemory,直到滿足size
                freeUp(size);
                // 然后availableMemory里扣除size部分的緩存
                this.availableMemory -= size;
                lock.unlock();
                return ByteBuffer.allocate(size);
            } else {
                int accumulated = 0;
                ByteBuffer buffer = null;
                // Condition阻塞等待有足夠的緩存
                Condition moreMemory = this.lock.newCondition();
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                // 加入waiters等待隊列
                this.waiters.addLast(moreMemory);
                while (accumulated < size) {
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    // 阻塞Condition等待
                    boolean waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);

                    if (waitingTimeElapsed) {
                        // 表示等待超時走贪,拋出異常結束
                        this.waiters.remove(moreMemory);
                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                    }

                    // 可能有空間申請佛猛,但還不夠size,所以減去時間后繼續(xù)等待
                    remainingTimeToBlockNs -= timeNs;
                    // 正好size是一個ByteBuffer
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        buffer = this.free.pollFirst();
                        accumulated = size;
                    } else {
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.availableMemory);
                        this.availableMemory -= got;
                        accumulated += got;
                    }
                }

                // 等待隊列是先進先出的
                Condition removed = this.waiters.removeFirst();
                // 要是還有剩余空間喚醒下一個阻塞等待的其他線程
                if (this.availableMemory > 0 || !this.free.isEmpty()) {
                    if (!this.waiters.isEmpty())
                        this.waiters.peekFirst().signal();
                }

                lock.unlock();
                if (buffer == null)
                    return ByteBuffer.allocate(size);
                else
                    return buffer;
            }
        } finally {
            if (lock.isHeldByCurrentThread())
                lock.unlock();
        }
    }

    // 釋放緩存歸還
    public void deallocate(ByteBuffer buffer, int size) {
        lock.lock();
        try {
            // 正好等于單個ByteBuffer就放回free隊列
            if (size == this.poolableSize && size == buffer.capacity()) {
                buffer.clear();
                this.free.add(buffer);
            } else {
                // 不等于就加到availableMemory,不放回free隊列?
                this.availableMemory += size;
            }
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)
                // 喚醒正在堵塞等待可用空間的線程
                moreMem.signal();
        } finally {
            lock.unlock();
        }
    }
}

public final class RecordBatch {
    // 消息個數(shù)
    public int recordCount = 0;
    // 最大的消息的字節(jié)數(shù)
    public int maxRecordSize = 0;
    // 嘗試發(fā)送當前批次的次數(shù)
    public volatile int attempts = 0;
    // 最后一次嘗試發(fā)送的時間戳
    public long lastAttemptMs;
    // 消息最終存放的地方
    public final MemoryRecords records;
    // 當前批次發(fā)送給哪個topic的partition
    public final TopicPartition topicPartition;
    // 標志RecordBatch狀態(tài)的Future對象
    public final ProduceRequestResult produceFuture;
    // 最后一次往批次里追加消息的時間戳
    public long lastAppendTime;
    private final List<Thunk> thunks;
    // 記錄某消息在批次中的offset
    private long offsetCounter = 0L;
    // 是否正在重試
    private boolean retry;

    // 追加消息
    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
        if (!this.records.hasRoomFor(key, value)) {
            // buffer沒有空間返回null
            return null;
        } else {
            // 委托給MemoryRecords追加消息到緩存
            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
            // 通過produceFuture的latchDown實現(xiàn)異步future的特性
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            if (callback != null)
                // thunks就是批次里所有消息callback的隊列集合
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
    }

    // 當批次成功收到正常響應坠狡、或超時挚躯、或關閉producer時,調用done方法
    public void done(long baseOffset, long timestamp, RuntimeException exception) {
        // 回調所有消息的callback
        for (int i = 0; i < this.thunks.size(); i++) {
            Thunk thunk = this.thunks.get(i);
            if (exception == null) {
                // thunk.future即服務端返回的結果擦秽,封裝成RecordMetadata
                RecordMetadata metadata = new RecordMetadata(this.topicPartition,  baseOffset, thunk.future.relativeOffset(),
                                                             timestamp == Record.NO_TIMESTAMP ? thunk.future.timestamp() : timestamp,
                                                             thunk.future.checksum(),
                                                             thunk.future.serializedKeySize(),
                                                             thunk.future.serializedValueSize());
                thunk.callback.onCompletion(metadata, null);
            } else {
                thunk.callback.onCompletion(null, exception);
            }
        }
        // 這里會調用latchDown.donw()
        this.produceFuture.done(topicPartition, baseOffset, exception);
    }

    final private static class Thunk {
        // 對應消息的callback
        final Callback callback;
        final FutureRecordMetadata future;
    }
}

// 類名字面意思就是包含RecordMetadata的Future對象
public final class FutureRecordMetadata implements Future<RecordMetadata> {

    // 類似指針码荔,指向消息所在批次RecordBatch的produceFuture
    private final ProduceRequestResult result;
    // 消息在批次中的offset
    private final long relativeOffset;

    // 實現(xiàn)future接口方法
    @Override
    public RecordMetadata get() throws InterruptedException, ExecutionException {
        // 本質是委托produceFuture的latchDown.await()
        this.result.await();
        return valueOrError();
    }

    RecordMetadata valueOrError() throws ExecutionException {
        if (this.result.error() != null)
            throw new ExecutionException(this.result.error());
        else
            return value();
    }

    // 當producer收到消息批次的響應時漩勤,get方法返回RecordMetadata對象,即消息的元數(shù)據(jù)
    RecordMetadata value() {
        return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset,
                                  this.timestamp, this.checksum, this.serializedKeySize, this.serializedValueSize);
    }

    // 在sender處理服務端返回的響應缩搅,回調callback時會被調用
    public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
        this.topicPartition = topicPartition;
        this.baseOffset = baseOffset;
        this.error = error;
        // 通知future有結果了
        this.latch.countDown();
    }
}

public final class ProduceRequestResult {
    // 沒有實現(xiàn)Future接口越败,通過CountDownLatch實現(xiàn)了類似Future的功能
    private final CountDownLatch latch = new CountDownLatch(1);
    // 服務端為批次RecordBatch中第一條消息分配的offset
    // 這個批次下每個消息根據(jù)baseOffset能夠計算出自己再服務端分區(qū)中的偏移量
    private volatile long baseOffset = -1L;
    private volatile RuntimeException error;

    // RecordBatch會調用該方法,通知消息被正常響應
    public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
        this.topicPartition = topicPartition;
        this.baseOffset = baseOffset;
        // 區(qū)分消息是正常響應還是異常響應
        this.error = error;
        // 喚醒阻塞在latch.await()的thread
        this.latch.countDown();
    }
}

public class MemoryRecords implements Records {

    // 壓縮后的消息輸出到buffer(壓縮場景主要是瓶頸在網(wǎng)絡帶寬)
    private final Compressor compressor;
    // buffer最多可以寫入多少個字節(jié)的數(shù)據(jù)
    private final int writeLimit;
    // 保存消息數(shù)據(jù)
    private ByteBuffer buffer;
    // 在MemoryRecords發(fā)送前會設置成false-只讀
    private boolean writable;

    // 私有構造方法
    private MemoryRecords(ByteBuffer buffer, CompressionType type, boolean writable, int writeLimit) {
        ...
    }

    // 只能通過emptyRecords創(chuàng)建MemoryRecords實例
    public static MemoryRecords emptyRecords(ByteBuffer buffer, CompressionType type, int writeLimit) {
        return new MemoryRecords(buffer, type, true, writeLimit);
    }

    // 追加消息
    public void append(long offset, Record record) {
        // 判斷是否只讀
        if (!writable)
            throw new IllegalStateException("Memory records is not writable");

        // 調用compressor將消息寫入ByteBuffer
        int size = record.size();
        compressor.putLong(offset);
        compressor.putInt(size);
        compressor.put(record.buffer());
        compressor.recordWritten(size + Records.LOG_OVERHEAD);
        record.buffer().rewind();
    }

    // 估算MemoryRecords剩余空間是否足夠寫入消息硼瓣,主要是壓縮后的消息
    // 如果估算不準究飞,會導致底層ByteBuffer擴容
    public boolean hasRoomFor(byte[] key, byte[] value) {
        if (!this.writable)
            return false;

        return this.compressor.numRecordsWritten() == 0 ?
            this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :
            this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
    }

}

public class Compressor {
    // 添加了壓縮功能
    private final DataOutputStream appendStream;
    // 封裝了ByteBuffer,當寫入數(shù)據(jù)超出ByteBuffer容量時會ByteBufferOutputStream會自動擴容
    private final ByteBufferOutputStream bufferStream;

    public Compressor(ByteBuffer buffer, CompressionType type) {
        // KafkaProducer傳入的壓縮類型
        this.type = type;
        // create the stream
        bufferStream = new ByteBufferOutputStream(buffer);
        // 根據(jù)壓縮類型創(chuàng)建合適的壓縮流,默認是NONE不壓縮
        // 裝飾器模式堂鲤,在ByteBufferOutputStream上裝飾一層DataOutputStream
        // 最終壓縮流: compressor.put -> appendStream -> bufferStream -> buffer
        // 最后數(shù)據(jù)都寫到MemoryRecords的buffer緩存里
        appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
    }

    // 構造壓縮流
    public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
        switch (type) {
            case NONE:
                return new DataOutputStream(buffer);
            case GZIP:
                // GZIPOutputStream是JDK自帶的亿傅,不需要反射
                return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
            case SNAPPY:
                // snappy需要引入額外的依賴包,缺省不適用snappy壓縮的時候瘟栖,不引入依賴包葵擎,為了編譯通過這里就用反射動態(tài)創(chuàng)建
                OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
                return new DataOutputStream(stream);
            case LZ4:
                OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
                return new DataOutputStream(stream);
        }
    }

    // 寫消息到緩存里,實質調用Record消息靜態(tài)類的write放大
    private void putRecord(final long crc, final byte attributes, final long timestamp, final byte[] key, final byte[] value, final int valueOffset, final int valueSize) {
        maxTimestamp = Math.max(maxTimestamp, timestamp);
        Record.write(this, crc, attributes, timestamp, key, value, valueOffset, valueSize);
    }
}
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末半哟,一起剝皮案震驚了整個濱河市酬滤,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌寓涨,老刑警劉巖盯串,帶你破解...
    沈念sama閱讀 219,539評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異戒良,居然都是意外死亡体捏,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評論 3 396
  • 文/潘曉璐 我一進店門糯崎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來译打,“玉大人,你說我怎么就攤上這事拇颅。” “怎么了乔询?”我有些...
    開封第一講書人閱讀 165,871評論 0 356
  • 文/不壞的土叔 我叫張陵樟插,是天一觀的道長。 經(jīng)常有香客問我竿刁,道長黄锤,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,963評論 1 295
  • 正文 為了忘掉前任食拜,我火速辦了婚禮鸵熟,結果婚禮上,老公的妹妹穿的比我還像新娘负甸。我一直安慰自己流强,他們只是感情好痹届,可當我...
    茶點故事閱讀 67,984評論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著打月,像睡著了一般队腐。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上奏篙,一...
    開封第一講書人閱讀 51,763評論 1 307
  • 那天柴淘,我揣著相機與錄音,去河邊找鬼秘通。 笑死为严,一個胖子當著我的面吹牛,可吹牛的內容都是我干的肺稀。 我是一名探鬼主播第股,決...
    沈念sama閱讀 40,468評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼盹靴!你這毒婦竟也來了炸茧?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤稿静,失蹤者是張志新(化名)和其女友劉穎梭冠,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體改备,經(jīng)...
    沈念sama閱讀 45,850評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡控漠,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,002評論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了悬钳。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片盐捷。...
    茶點故事閱讀 40,144評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖默勾,靈堂內的尸體忽然破棺而出碉渡,到底是詐尸還是另有隱情,我是刑警寧澤母剥,帶...
    沈念sama閱讀 35,823評論 5 346
  • 正文 年R本政府宣布滞诺,位于F島的核電站,受9級特大地震影響环疼,放射性物質發(fā)生泄漏习霹。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,483評論 3 331
  • 文/蒙蒙 一炫隶、第九天 我趴在偏房一處隱蔽的房頂上張望淋叶。 院中可真熱鬧,春花似錦伪阶、人聲如沸煞檩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽形娇。三九已至锰霜,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間桐早,已是汗流浹背癣缅。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留哄酝,地道東北人友存。 一個月前我還...
    沈念sama閱讀 48,415評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像陶衅,于是被迫代替她去往敵國和親屡立。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,092評論 2 355

推薦閱讀更多精彩內容