PulsarClient 解析(一)

PulsarClient

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

讓我們看一下這個(gè)類的主要方法


image-20210104211849635.png
  • 創(chuàng)建producer/consumer/reader

  • 元數(shù)據(jù)信息相關(guān)

  • transaction相關(guān)

  • close方法


ClientBuilder

這里有一個(gè)builder方法用來傳遞一些PulsarClient的配置

支持的配置項(xiàng)

  1. 連接配置相關(guān):

    • 連接地址:serviceUrl / serviceUrlProvider / listener / proxyServiceUrl

    • operation超時(shí)時(shí)間: operationTimeout

    • tcp配置:

      • tcpNoDelay

      • keepAliveinterval

      • 建立連接超時(shí):connectionTimeout

      • 一個(gè)broker創(chuàng)建多少連接

    • 請(qǐng)求重試策略(請(qǐng)求出錯(cuò)后backOff時(shí)間是多少)

  2. lookup請(qǐng)求配置:

    • lookup請(qǐng)求并發(fā)

    • 最大重定向次數(shù)

    • 連接最大拒絕的請(qǐng)求數(shù)目

  3. 線程數(shù)目:

    • ioThreads

    • listenerThreads

  4. TLS + 鑒權(quán)相關(guān)

  5. 事務(wù)相關(guān)

  6. metric相關(guān)

這里面Builder.build就直接配置參數(shù)傳入了PulsarClientImpl的構(gòu)造函數(shù)了

我們看下這里面做了什么操作

PulsarClientImpl

package org.apache.pulsar.client.impl;

public class PulsarClientImpl implements PulsarClient {

        // 查找服務(wù)
    private LookupService lookup;
    
    // 連接池
    private final ConnectionPool cnxPool;
    
    // netty 里面的HashedWheelTimer,用來調(diào)度一些延遲操作
    private final Timer timer;
    private final ExecutorProvider externalExecutorProvider;
    private final ExecutorProvider internalExecutorService;

        // 當(dāng)前PulsarClient的狀態(tài)
    private AtomicReference<State> state = new AtomicReference<>();
    
    // 所有的業(yè)務(wù)處理單元(客戶端邏輯)
    private final Set<ProducerBase<?>> producers;
    private final Set<ConsumerBase<?>> consumers;

        // id發(fā)號(hào)器
    private final AtomicLong producerIdGenerator = new AtomicLong();
    private final AtomicLong consumerIdGenerator = new AtomicLong();
    private final AtomicLong requestIdGenerator = new AtomicLong();

      // 這里面的EventLoopGroup好像只被當(dāng)成線程池來用了
    // 0. ConnectionPool 里面初始化作為連接的io線程池(netty客戶端常規(guī)用法)
    // 1. 在Consumer里面用來定時(shí)flush PersistentAcknowledgmentsGroupingTracker
    // 2. Producer 里面用來定時(shí)生成加密的key
    // 3. 作為AsyncHttpClient的構(gòu)造參數(shù)
    private final EventLoopGroup eventLoopGroup;

        // Schema 的cache
    private final LoadingCache<String, SchemaInfoProvider> schemaProviderLoadingCache;

    // producer 用來生成PublishTime
    private final Clock clientClock;

    @Getter
    private TransactionCoordinatorClientImpl tcClient;

這個(gè)類的構(gòu)造函數(shù)主要就是初始化這幾個(gè)關(guān)鍵變量,沒有特殊操作

LookUpService根據(jù)配置參數(shù)會(huì)選擇HttpLookupService 或者是BinaryProtoLookupService


ConnectionPool

我們先看一下ConnectionPool

package org.apache.pulsar.client.impl;

public class ConnectionPool implements Closeable {
  
    // 連接池,保存連接
    // 地址 -> 第x個(gè)連接 -> 連接
    // 如果配置maxConnectionsPerHosts=0 則把pooling關(guān)閉了
    protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
  
    // netty 相關(guān)
    // PulsarClient 傳遞過來的
    private final EventLoopGroup eventLoopGroup;
    private final Bootstrap bootstrap;
    private final PulsarChannelInitializer channelInitializerHandler;
    protected final DnsNameResolver dnsResolver;
  
    // 配置
    private final ClientConfigurationData clientConfig;
    private final int maxConnectionsPerHosts;
  
    // 是否是Server Name Indication 代理慎菲,TLS 相關(guān)豆拨,先忽略
    private final boolean isSniProxy;
 

構(gòu)造函數(shù)主要是按照netty 網(wǎng)絡(luò)客戶端方式初始化相關(guān)成員變量

        bootstrap = new Bootstrap();
        // 綁定io線程池
        bootstrap.group(eventLoopGroup);
        // 配置了channel類型某饰,如果支持Epoll的話會(huì)變成Epoll的channel
        bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
        // 設(shè)置tcp的連接超時(shí)時(shí)間
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.getConnectionTimeoutMs());
        // 設(shè)置tcp no delay
        bootstrap.option(ChannelOption.TCP_NODELAY, conf.isUseTcpNoDelay());
        // 配置allocator
        bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);
        // 綁定channelInitializer
            channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier);
            bootstrap.handler(channelInitializerHandler);
        // 這個(gè)類是netty提供的些膨,用來解析DNS基显,后面專門會(huì)說
        this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).traceEnabled(true)
                .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup)).build();
    }

這里面?zhèn)魅氲腂ufferPool是一個(gè)自定義的

這個(gè)連接池的主要功能

  1. 創(chuàng)建并cache連接

  2. 歸還連接

  3. 按照配置的maxConnectionsPerHosts限制連接數(shù)目

具體使用方式可以參照org.apache.pulsar.client.impl.ConnectionPoolTest 這個(gè)類

ConnectionPool pool;
InetSocketAddress brokerAddress = ....;

// 獲取連接脱衙,如果之前沒有的話侥猬,會(huì)創(chuàng)建一個(gè)
CompletableFuture<ClientCnx> conn = pool.getConnection(brokerAddress);
ClientCnx cnx = conn.get();

// 使用連接做事情
...
  
// 歸還給連接池
pool.releaseConnection(cnx);
          
pool.closeAllConnections();
pool.close();

我們先看一下這個(gè)類PulsarChannelInitializer用來初始化和pulsar broker 端的連接。

public void initChannel(SocketChannel ch) throws Exception {

    // tls相關(guān)
    ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ? ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER);

    // 定長(zhǎng)解碼器
    ch.pipeline().addLast("frameDecoder", 
                          new LengthFieldBasedFrameDecoder(
            Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
  
    // 到這里可以拿到了RPC協(xié)議反序列化后的對(duì)象捐韩,進(jìn)行客戶端邏輯處理
    // 實(shí)際在這個(gè)類ClientCnx里面處理所有邏輯
    ch.pipeline().addLast("handler", clientCnxSupplier.get());
}
創(chuàng)建連接邏輯 (connectToAddress)

netty 的bootstrap.connect(忽略tls)


ClientCnx

我們看一下這個(gè)類的層次結(jié)構(gòu)

public class ClientCnx extends PulsarHandler;
public abstract class PulsarHandler extends PulsarDecoder;
public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter;

PulsarDecoder

PulsarDecoder 這個(gè)類前面在初始化連接的時(shí)候還加入了一個(gè)LengthFieldBasedFrameDecoder.

所以到這里的channelRead就可以直接反序列化RPC就可以退唠,之后會(huì)調(diào)用相應(yīng)的RPC處理方法(handleXXXXXX)

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ...
          
        // Get a buffer that contains the full frame
        ByteBuf buffer = (ByteBuf) msg;
        BaseCommand cmd = null;
        BaseCommand.Builder cmdBuilder = null;
        try {
            // De-serialize the command
            int cmdSize = (int) buffer.readUnsignedInt();
            int writerIndex = buffer.writerIndex();
            buffer.writerIndex(buffer.readerIndex() + cmdSize);
          
            // 從對(duì)象池里拿到一個(gè)ByteBufCodedInputStream
            ByteBufCodedInputStream cmdInputStream = ByteBufCodedInputStream.get(buffer);
            cmdBuilder = BaseCommand.newBuilder();
            // 反序列化
            cmd = cmdBuilder.mergeFrom(cmdInputStream, null).build();
            buffer.writerIndex(writerIndex);

            cmdInputStream.recycle();

            ...
            // 下面按照不同的RPC類型調(diào)用不用的方法進(jìn)行處理
            switch (cmd.getType()) {
            case PARTITIONED_METADATA:
                checkArgument(cmd.hasPartitionMetadata());
                try {
                    interceptCommand(cmd);
                    handlePartitionMetadataRequest(cmd.getPartitionMetadata());
                } catch (InterceptException e) {
                    ctx.writeAndFlush(Commands.newPartitionMetadataResponse(getServerError(e.getErrorCode()),
                            e.getMessage(), cmd.getPartitionMetadata().getRequestId()));
                } finally {
                    cmd.getPartitionMetadata().recycle();
                }
                break;
            ...
              // 省略其他RPC方法,都是正常handleXXXXX
        } finally {
               // 清理方法
            if (cmdBuilder != null) {
                cmdBuilder.recycle();
            }

            if (cmd != null) {
                cmd.recycle();
            }

            buffer.release();
        }
    }

PulsarHandler

這個(gè)類實(shí)際里面主要增加了KeepAlive邏輯的實(shí)現(xiàn)。

具體查看相應(yīng)方法即可,比較容易


ClientCnx

這里主要負(fù)責(zé)和服務(wù)端交互的邏輯腻菇。

package org.apache.pulsar.client.impl;


public class ClientCnx extends PulsarHandler {


    // 連接狀態(tài)
    enum State {
        None, SentConnectFrame, Ready, Failed, Connecting
    }
    private State state;

   //----------------------------------------------------------------------
  
    // 臨時(shí)的請(qǐng)求隊(duì)列
    // requestId -> 請(qǐng)求
    private final ConcurrentLongHashMap<CompletableFuture<? extends Object>> pendingRequests =
        new ConcurrentLongHashMap<>(16, 1);
  
    // Lookup 請(qǐng)求隊(duì)列
    private final Queue<Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>>> waitingLookupRequests;

   //----------------------------------------------------------------------
  
    // 一些業(yè)務(wù)邏輯單元
    private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
    private final ConcurrentLongHashMap<TransactionMetaStoreHandler> transactionMetaStoreHandlers = new ConcurrentLongHashMap<>(16, 1);
  
   //----------------------------------------------------------------------
  
    // 異步新建連接的handle
    private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
  
   //----------------------------------------------------------------------
   
    // PulsarClient 構(gòu)造時(shí)傳遞進(jìn)來的線程池
    private final EventLoopGroup eventLoopGroup;

   //----------------------------------------------------------------------
  
    // 限流(和lookup有關(guān))
    private final Semaphore pendingLookupRequestSemaphore;
    private final Semaphore maxLookupRequestSemaphore;
  
    // 連接拒絕相關(guān)的成員(和lookup有關(guān))
    private final int maxNumberOfRejectedRequestPerConnection;
    private final int rejectedRequestResetTimeSec = 60;
    // 被拒絕的請(qǐng)求數(shù)目(和lookup有關(guān))
    private static final AtomicIntegerFieldUpdater<ClientCnx> NUMBER_OF_REJECTED_REQUESTS_UPDATER = AtomicIntegerFieldUpdater
            .newUpdater(ClientCnx.class, "numberOfRejectRequests");
    @SuppressWarnings("unused")
    private volatile int numberOfRejectRequests = 0;

    //----------------------------------------------------------------------
    // 用來檢查請(qǐng)求是否超時(shí)的數(shù)據(jù)結(jié)構(gòu)
    private static class RequestTime {
        final long creationTimeMs;
        final long requestId;
        final RequestType requestType;

        RequestTime(long creationTime, long requestId, RequestType requestType) {
            this.creationTimeMs = creationTime;
            this.requestId = requestId;
            this.requestType = requestType;
        }
    }
  
    // 超時(shí)的請(qǐng)求隊(duì)列
    private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
    
   //----------------------------------------------------------------------
  
    // 消息的最大大小
    @Getter
    private static int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;

    // RPC協(xié)議版本
    private final int protocolVersion;
  
    // operation超時(shí)時(shí)間
    private final long operationTimeoutMs;
    // 用來檢查operation超時(shí)時(shí)間的handle
    private ScheduledFuture<?> timeoutTask;
  
   //----------------------------------------------------------------------
  
    // 一些記錄是否從proxy連接的信息
    protected String proxyToTargetBrokerAddress = null;
    protected String remoteHostName = null;
  
    // TLS 相關(guān)
    private boolean isTlsHostnameVerificationEnable;
    private static final TlsHostnameVerifier HOSTNAME_VERIFIER = new TlsHostnameVerifier();
    protected final Authentication authentication;
    protected AuthenticationDataProvider authenticationDataProvider;
  
   //----------------------------------------------------------------------
  
    // 事務(wù)相關(guān)
    private TransactionBufferHandler transactionBufferHandler;
    

    private enum RequestType {
        Command,
        GetLastMessageId,
        GetTopics,
        GetSchema,
        GetOrCreateSchema;

        String getDescription() {
            if (this == Command) {
                return "request";
            } else {
                return name() + " request";
            }
        }
    }

這里臨時(shí)回到ConnectionPool的邏輯中苛骨,之前創(chuàng)建連接的時(shí)候?qū)嶋H調(diào)用Bootstrap.connect這里返回的實(shí)際是一個(gè)Netty的Channel對(duì)象挖胃,但是ConnectionPool里面返回的ClientCnx對(duì)象臊恋。

ConnectionPool

private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalAddress,
            InetSocketAddress physicalAddress, int connectionKey) {
     
        final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<ClientCnx>();

        // Trigger async connect to broker
        createConnection(physicalAddress).thenAccept(channel -> {
            ....
            // 這里面ClientCnx對(duì)象實(shí)際是從這個(gè)已經(jīng)成功連接的Channel的pipeline里拿到的
            final ClientCnx cnx = (ClientCnx) channel.pipeline().get("handler");
            ....

            if (!logicalAddress.equals(physicalAddress)) {
                // We are connecting through a proxy. We need to set the target broker in the ClientCnx object so that
                // it can be specified when sending the CommandConnect.
                // That phase will happen in the ClientCnx.connectionActive() which will be invoked immediately after
                // this method.
                cnx.setTargetBroker(logicalAddress);
            }
            
            // 保存了遠(yuǎn)端連接的地址
            cnx.setRemoteHostName(physicalAddress.getHostName());

            cnx.connectionFuture().thenRun(() -> {
                ... 
                // 連接成功則返回
                cnxFuture.complete(cnx);
            }).exceptionally(exception -> {
               
                cnxFuture.completeExceptionally(exception);
                cleanupConnection(logicalAddress, connectionKey, cnxFuture);
                cnx.ctx().close();
                return null;
            });
              
           ...

ClientCnx的主要方法(功能)
  • 連接生命周期管理(netty Handler里面的方法)

    • channelActive

    • channelInActive

    • exceptionCaught

    • ......

  • 發(fā)送request:主動(dòng)發(fā)送RPC的方法冈钦,并按照業(yè)務(wù)邏輯處理

    • Lookup請(qǐng)求

    • getLastMessageId

    • getSchema

    • .....

  • 處理response:繼承自PulsarDecoder 的handleXXXXX RPC 處理邏輯

  • 主動(dòng)發(fā)送RPC方法獲得原始的response

CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage,long requestId,RequestType requestType)
  • 檢查請(qǐng)求是否超時(shí)checkRequestTimeout

  • 注冊(cè)/ 刪除業(yè)務(wù)邏輯對(duì)象(業(yè)務(wù)邏輯對(duì)象后面單出文章說)

    • consumer

    • producer

    • transactionMetaStoreHandler

    • transactionBufferHandler


sendRequestAndHandleTimeout方法
private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, RequestType requestType) {
  
        // 放入到pending請(qǐng)求隊(duì)列里面居凶,用來等待response
        CompletableFuture<T> future = new CompletableFuture<>();
        pendingRequests.put(requestId, future);
  
        // 直接發(fā)送RPC body
        ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
            if (!writeFuture.isSuccess()) {
                log.warn("{} Failed to send {} to broker: {}", ctx.channel(), requestType.getDescription(), writeFuture.cause().getMessage());
                pendingRequests.remove(requestId);
                future.completeExceptionally(writeFuture.cause());
            }
        });
        // 在超時(shí)隊(duì)列里面增加一個(gè)數(shù)據(jù)結(jié)構(gòu)用來記錄超時(shí)
        requestTimeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId, requestType));
  
        return future;
    }

chanelActive 方法

這個(gè)方法邏輯比較簡(jiǎn)單

  • PulsarHandler.channelActive方法里面開啟了KeepAlive邏輯的調(diào)度任務(wù)

  • ClientCnx.channelActive 方法里面開啟了requestTimeout邏輯的調(diào)度任務(wù)

  • 發(fā)送一個(gè)ConnectCommand請(qǐng)求給服務(wù)端(服務(wù)端處理邏輯到后面會(huì)說)


請(qǐng)求超時(shí)的處理

這個(gè)邏輯也比較容易滩愁。

使用了EventLoopGroup調(diào)度了一個(gè)定時(shí)任務(wù)躯喇,每次去查看requestTimeoutQueue里面的請(qǐng)求是否有超時(shí)的

有的話就把這個(gè)請(qǐng)求的response設(shè)置成TimeoutException

這里的請(qǐng)求超時(shí)檢查時(shí)間間隔是operationTimeoutMs決定的


PulsarClient 功能回顧

這樣讓我們回顧一下PulsarClient的總體功能

  • 包含了一個(gè)連接池用來創(chuàng)建ClientCnx和服務(wù)端進(jìn)行溝通

  • 保存了一些自定義業(yè)務(wù)處理單元(consumer,producer, tcClient)

  • LookupService

  • 一些周期check的動(dòng)作

  • Schema 的LoadingCache

業(yè)務(wù)單元通過注冊(cè)到ClientCnx上面硝枉,可以使用這個(gè)連接發(fā)送RPC廉丽,獲得response,這樣傳遞回業(yè)務(wù)邏輯單元里面

PulsarClient這個(gè)類對(duì)使用者來說提供了一個(gè)RPC層面的抽象妻味,其他類使用RPC完成自己的邏輯

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末正压,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子责球,更是在濱河造成了極大的恐慌焦履,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,539評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件雏逾,死亡現(xiàn)場(chǎng)離奇詭異嘉裤,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)校套,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門价脾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人笛匙,你說我怎么就攤上這事侨把。” “怎么了妹孙?”我有些...
    開封第一講書人閱讀 165,871評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵秋柄,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我蠢正,道長(zhǎng)骇笔,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,963評(píng)論 1 295
  • 正文 為了忘掉前任嚣崭,我火速辦了婚禮笨触,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘雹舀。我一直安慰自己芦劣,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評(píng)論 6 393
  • 文/花漫 我一把揭開白布说榆。 她就那樣靜靜地躺著虚吟,像睡著了一般寸认。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上串慰,一...
    開封第一講書人閱讀 51,763評(píng)論 1 307
  • 那天偏塞,我揣著相機(jī)與錄音,去河邊找鬼邦鲫。 笑死灸叼,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的掂碱。 我是一名探鬼主播怜姿,決...
    沈念sama閱讀 40,468評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼疼燥!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起蚁堤,我...
    開封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤醉者,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后披诗,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體撬即,經(jīng)...
    沈念sama閱讀 45,850評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評(píng)論 3 338
  • 正文 我和宋清朗相戀三年呈队,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了剥槐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,144評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡宪摧,死狀恐怖粒竖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情几于,我是刑警寧澤蕊苗,帶...
    沈念sama閱讀 35,823評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站沿彭,受9級(jí)特大地震影響朽砰,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜喉刘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評(píng)論 3 331
  • 文/蒙蒙 一瞧柔、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧睦裳,春花似錦造锅、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)券坞。三九已至,卻和暖如春肺素,著一層夾襖步出監(jiān)牢的瞬間恨锚,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工倍靡, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留猴伶,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,415評(píng)論 3 373
  • 正文 我出身青樓塌西,卻偏偏與公主長(zhǎng)得像他挎,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子捡需,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容