SkyWalking之Tracing少态,Metrics城侧,Logging

從Server端談Tracing上報

從上篇文章我們知道,Tracing數據是使用TraceSegmentServiceClient通過GRPC發(fā)送到服務端彼妻,那么服務端是如何接收請求處理的呢?

從Server說起

前面我們說到服務端基于微內核架構初始化了各種Module嫌佑,每個Module又有很多Service,服務啟動時會找到Module進行初始化侨歉,初始化模塊時先初始化依賴模塊再初始化非依賴模塊屋摇,模塊的具體承載對象是ModuleProvider,其中CoreModule顧名思意是核心模塊幽邓,其對象的Provider是CoreModuleProvider炮温,在之前的文章我們已經介紹過CoreModuleProvider初始化過程,其中其實就包括了Server的初始化過程牵舵,其實在Server框架中Server有兩種

  • GPRCServer 用于接收 SkyWalking Agent 發(fā)送的 gRPC 請求柒啤。正如前面課時介紹的那樣, SkyWalking 6.x 中的 Trace 上報畸颅、JVM 監(jiān)控上報担巩、服務以及服務實例注冊請求、心跳請求都是通過 gRPC 請求實現的没炒。
  • HTTPServer 用于接收 SkyWalking Agent 以及用戶的 Http 請求涛癌。

在 GRPCServer 實現中首先會根據配置初始化 NettyServerBuilder 對象(其中會指定服務監(jiān)聽的地址和端口,以及每個連接上的并發(fā)請求數等參數)送火,然后創(chuàng)建并啟動 io.grpc.Server 接收gRPC 請求拳话。gRPC 的 Java 實現底層是依靠 Netty 實現的,Netty 是一個高性能的開源網絡庫种吸,由于 Java 本身的 NIO API 使用起來比較麻煩弃衍,而 Netty 底層封裝了 Java NIO 并對外提供了簡單易用的 API,所以很多開源軟件的網絡模塊都是使用 Netty 實現的骨稿。

NettyServerBuilder借助于ServerImplBuilder 構建ServerImpl,ServerImpl通過NettyClientTransportServersBuilder構建Server氯质,最終構造出來的Server就是io.grpc.netty.NettyServer败玉,在CoreModuleProvider初始化GRPCServer的代碼如下:


 if (moduleConfig.isGRPCSslEnabled()) {
        grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(),
        moduleConfig.getGRPCSslCertChainPath(),
        moduleConfig.getGRPCSslKeyPath(),
        null
        );
} else {
    grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort());
}

我們縱觀一下CoreModuleProvider的源代碼谢鹊,發(fā)現還有幾處:

prepare方法

this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));


start 方法

grpcServer.addHandler(new RemoteServiceHandler(getManager()));
grpcServer.addHandler(new HealthCheckServiceHandler());

GRPCHandlerRegisterImpl 用于實現gRPC的擴展性彻磁,和Serverlet的Filter類似,用于權限驗證或限流等橫向擴展功能的接口注冊辙浑。

這里簡單介紹一下上面start方法里面的兩個 GRPCHandler 的功能:

  • HealthCheckServiceHandler:在 Cluster 模塊中提供了支持多種第三方服務發(fā)展組件的實現激涤,前文介紹的 cluster-zookeeper-plugin 模塊使用的 curator-x-discovery 擴展庫底層是依賴 Zookeeper 的 Watcher 來監(jiān)聽一個 OAP 服務實例是否可用。但是有的第三方服務發(fā)現組件(例如 Consul)會依靠定期健康檢查(Health Check)來檢查一個 OAP 服務實例是否可用,此時 OAP 就需要保留一個接口來處理健康檢查請求倦踢,這就是 HealthCheckServiceHandler 的核心功能送滞。對 Consul 感興趣的同學,可以參考下面兩篇文檔:
    https://www.consul.io/docs/agent/checks.html
    https://github.com/grpc/grpc/blob/master/doc/health-checking.md

  • RemoteServiceHandler:OAP 集群中各個 OAP 實例節(jié)點之間通信的接口辱挥,在后面會詳細介紹該 GRPCHandler 的實現以及通信方式犁嗅。

其實真正的上報Tracing數據的處理是在SharingServerModule模塊提供的SharingServerModuleProvider處理的,SkyWalking OAP 需要接收外部請求的地方還是挺多的晤碘,例如 Agent 上報的監(jiān)控數據褂微、 SkyWalking Rocketbot 的查詢請求、OAP 集群中節(jié)點之間的相互通信园爷,等等宠蚂。除了 CoreModuleProvider 中會啟動 Server 組件之外,sharing-server-plugin 模塊中也可以啟動單獨的一套 Server 實例童社,并監(jiān)聽獨立的端口求厕,這套 Server 實例主要用來接收 Agent 的注冊請求、上報的 JVM 監(jiān)控數據以及 Trace 數據扰楼。

SharingServerModuleProvider初始化GRPCServer的過程和CoreModuleProvider類似呀癣,唯一的區(qū)別是SharingServerModuleProvider可以復用CoreModuleProvider創(chuàng)建的GRPCServer。

trace-receiver-plugin

trace-receiver-plugin其實是真正處理上報數據的項目灭抑,負責的模塊是TraceModule十艾,Provider是TraceModuleProvider,TraceModule依賴了5個模塊腾节,其中兩個分別是CoreModule和SharingServerModule,首先我們看一下start方法的代碼:


@Override
public void start() {
        GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME)
            .provider()
            .getService(GRPCHandlerRegister.class);
        HTTPHandlerRegister httpHandlerRegister = getManager().find(SharingServerModule.NAME)
            .provider()
            .getService(HTTPHandlerRegister.class);

        TraceSegmentReportServiceHandler traceSegmentReportServiceHandler = new TraceSegmentReportServiceHandler(getManager());
        grpcHandlerRegister.addHandler(traceSegmentReportServiceHandler);
        grpcHandlerRegister.addHandler(new TraceSegmentReportServiceHandlerCompat(traceSegmentReportServiceHandler));

        httpHandlerRegister.addHandler(new TraceSegmentReportHandler(getManager()),
        Collections.singletonList(HttpMethod.POST)
    );
}

可以看到start方法使用SharingServerModule的GRPCHandlerRegister服務注冊了兩個自己的處理器分別是TraceSegmentReportServiceHandler荤牍,TraceSegmentReportServiceHandlerCompat案腺,TraceSegmentReportServiceHandler 負責接收 Agent 發(fā)送來的 SegmentObject 數據,并調用 SegmentParserServiceImpl.send() 方法進行解析康吵,然后再將SegmentObject交給TraceAnalyzer.doAnalysis()方法處理劈榨。


public void doAnalysis(SegmentObject segmentObject) {
    if (segmentObject.getSpansList().size() == 0) {
            return;
    }

    createSpanListeners();

    notifySegmentListener(segmentObject);

    segmentObject.getSpansList().forEach(spanObject -> {
        if (spanObject.getSpanId() == 0) {
            notifyFirstListener(spanObject, segmentObject);
        }

        if (SpanType.Exit.equals(spanObject.getSpanType())) {
            notifyExitListener(spanObject, segmentObject);
        } else if (SpanType.Entry.equals(spanObject.getSpanType())) {
            notifyEntryListener(spanObject, segmentObject);
        } else if (SpanType.Local.equals(spanObject.getSpanType())) {
            notifyLocalListener(spanObject, segmentObject);
        } else {
            log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
                                                                                          .name());
        }
    });

    notifyListenerToBuild();
}

上面的其實分為三步

  • 首先注冊監(jiān)聽
  • 將數據將給第一步注冊的監(jiān)聽處理
  • 最后調用AnalysisListener將數據將給聚合流程處理
注冊監(jiān)聽

private void createSpanListeners() {
    listenerManager.getSpanListenerFactories()
        .forEach(
                spanListenerFactory -> analysisListeners.add(
                    spanListenerFactory.create(moduleManager, config)));
}

所有監(jiān)聽接口通過SegmentParserListenerManager進行管理,由剛才的代碼我們看到監(jiān)聽分為6種

  • FirstAnalysisListener
  • ExitAnalysisListener
  • EntryAnalysisListener
  • LocalAnalysisListener
  • SegmentListener
  • AnalysisListener

剛才我們說到SharingServerModule依賴于5個模塊晦嵌,其中一個就是AnalyzerModule的AnalyzerModuleProvider同辣,那么AnalyzerModuleProvider就是負責SegmentParserListenerManager對象創(chuàng)建和監(jiān)聽器初始化的入口,具體方法是listenerManager


private SegmentParserListenerManager listenerManager() {
    SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
    if (moduleConfig.isTraceAnalysis()) {
        listenerManager.add(new RPCAnalysisListener.Factory(getManager()));
        listenerManager.add(new EndpointDepFromCrossThreadAnalysisListener.Factory(getManager()));
        listenerManager.add(new NetworkAddressAliasMappingListener.Factory(getManager()));
    }
    listenerManager.add(new SegmentAnalysisListener.Factory(getManager(), moduleConfig));

    return listenerManager;
}

上面注冊的監(jiān)聽實例囊括了我們剛才提到的6種監(jiān)聽接口惭载,就是說在極端情況下TraceAnalyzer.doAnalysis()方法會使用上面所有的實例進行數據加工處理旱函,其中RPCAnalysisListener,EndpointDepFromCrossThreadAnalysisListener描滔,NetworkAddressAliasMappingListener為可選流程棒妨,通過moduleConfig.isTraceAnalysis()開關開啟,限于篇幅含长,所以下面我們重點介紹SegmentAnalysisListener券腔。

FirstAnalysisListener

FirstSpanListener 的 parseFirst() 方法處理 TraceSegment 中的第一個 Span伏穆,這里只有 SegmentSpanListener 實現了該方法,具體實現分為三步:

  • 檢測當前 TraceSegment 是否被成功采樣纷纫。
  • 將 segmentCoreInfo 中記錄的 TraceSegment 數據拷貝到 segment 字段中枕扫。
  • 將 endpointNameId 記錄到 firstEndpointId 字段,通過前面的分析我們知道辱魁,endpointNameId 在 Spring MVC 里面對應的是 URL铡原,在 Dubbo 里面對應的是 RPC 請求 path 與方法名稱的拼接。

上面說的具體實現是在SegmentAnalysisListener的parseFirst方法


@Override
public void parseFirst(SpanObject span, SegmentObject segmentObject) {
        // 檢測是否采樣成功
        if (sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
            return;
        }

        if (StringUtil.isEmpty(serviceId)) {
            serviceName = namingControl.formatServiceName(segmentObject.getService());
            serviceId = IDManager.ServiceID.buildId(
                serviceName,
                true
            );
        }

        long timeBucket = TimeBucket.getRecordTimeBucket(startTimestamp);

        // 將 segmentCoreInfo 中記錄的 TraceSegment 數據拷貝到 segment 字段中商叹。
        segment.setSegmentId(segmentObject.getTraceSegmentId());
        segment.setServiceId(serviceId);
        segment.setServiceInstanceId(IDManager.ServiceInstanceID.buildId(
            serviceId,
            namingControl.formatInstanceName(segmentObject.getServiceInstance())
        ));
        segment.setLatency(duration);
        segment.setStartTime(startTimestamp);
        segment.setTimeBucket(timeBucket);
        segment.setIsError(BooleanUtils.booleanToValue(isError));
        segment.setDataBinary(segmentObject.toByteArray());

        // 將 endpointNameId 記錄到 firstEndpointId 字段燕刻,通過前面的分析我們知道,/endpointNameId 在 Spring MVC 里面對應的是 URL剖笙,在 Dubbo 里面對應的是 RPC 請求 path 與方法名稱的拼接卵洗。
        endpointName = namingControl.formatEndpointName(serviceName, span.getOperationName());
        endpointId = IDManager.EndpointID.buildId(
            serviceId,
            endpointName
        );
}
EntryAnalysisListener

EntrySpanListener 實現的 parseEntry() 方法對于 Entry 類型的 Span 進行處理。SegmentAnalysisListener.parseEntry() 方法只做了一件事弥咪,就是將 serviceName 記錄到 endpointId 字段中:


@Override
public void parseEntry(SpanObject span, SegmentObject segmentObject) {
        if (StringUtil.isEmpty(serviceId)) {
            serviceName = namingControl.formatServiceName(segmentObject.getService());
            serviceId = IDManager.ServiceID.buildId(
                serviceName, true);
        }

        endpointName = namingControl.formatEndpointName(serviceName, span.getOperationName());
        endpointId = IDManager.EndpointID.buildId(
            serviceId,
            endpointName
        );
}
LocalAnalysisListener

主要是為了解決localspan和exitspan丟失元數據的問題过蹂,SegmentAnalysisListener并不會實現LocalAnalysisListener,但是RPCAnalysisListener和EndpointDepFromCrossThreadAnalysisListener確有實現聚至,拿EndpointDepFromCrossThreadAnalysisListener為例酷勺,其主要的實現是將SegmentObject里面的SpanObject信息緩存到RPCTrafficSourceBuilder對象,然后在build環(huán)節(jié)交給聚合流程處理扳躬,限于篇幅脆诉,這里不詳細介紹。

ExitAnalysisListener

同樣SegmentAnalysisListener并不會實現ExitAnalysisListener贷币,但是RPCAnalysisListener和EndpointDepFromCrossThreadAnalysisListener確有實現击胜,以RPCAnalysisListener為例,其主要的實現是將將SegmentObject里面的SpanObject信息緩存到List<RPCTrafficSourceBuilder> callingOutTraffic,List<DatabaseSlowStatementBuilder> dbSlowStatementBuilders兩個變量役纹,然后在build環(huán)節(jié)交給聚合流程處理偶摔,限于篇幅,這里不詳細介紹促脉。

SegmentListener

主要實現在SegmentAnalysisListener的parseSegment方法辰斋,主要用來給segment對象設置TraceId,根據Span的起始時間設置抽樣狀態(tài)sampleStatus

AnalysisListener

SegmentListener繼承了AnalysisListener瘸味,調用的是build方法


  @Override
public void build() {
    if (sampleStatus.equals(SAMPLE_STATUS.IGNORE)) {
        if (log.isDebugEnabled()) {
            log.debug("segment ignored, trace id: {}", segment.getTraceId());
            }
            return;
        }

    if (log.isDebugEnabled()) {
        log.debug("segment listener build, segment id: {}", segment.getSegmentId());
    }

    segment.setEndpointId(endpointId);

    sourceReceiver.receive(segment);
    addAutocompleteTags();
}

上面的方法其實就是做了一件事件宫仗,將segment對象的信息,交給sourceReceiver處理硫戈。

sourceReceiver

sourceReceiver來自于CoreModule锰什,我們在CoreModuleProvider的構造方法可以發(fā)現其實它是一個SourceReceiverImpl實例,SourceReceiver 底層封裝的 DispatcherManager 會根據 Segment 選擇相應的 SourceDispatcher 實現 —— SegmentDispatcher 進行分發(fā),DispatcherManager的scan方法會掃描所有SourceDispatcher接口實現類汁胆,并將其泛型類型的scope方法返回的內容即DefaultScopeDefine.SEGMENT作為KEY緩存到一個Map對象dispatcherMap,forward方法會根據傳入實例的類型的scope方法從dispatcherMap拿到SourceDispatcher即SegmentDispatcher調用其dispatch處理梭姓。


public void forward(ISource source) {
        if (source == null) {
            return;
        }

        List<SourceDispatcher> dispatchers = dispatcherMap.get(source.scope());

        /**
         * Dispatcher is only generated by oal script analysis result.
         * So these will/could be possible, the given source doesn't have the dispatcher,
         * when the receiver is open, and oal script doesn't ask for analysis.
         */
        if (dispatchers != null) {
            source.prepare();
            for (SourceDispatcher dispatcher : dispatchers) {
                dispatcher.dispatch(source);
            }
        }
}

SegmentDispatcher.dispatch() 方法中會將 Segment 中的數據拷貝到 SegmentRecord 對象中。

SegmentRecord 繼承了 StorageData 接口嫩码,與前面介紹的 RegisterSource 以及 Metrics 的實現類似誉尖,通過注解指明了 Trace 數據存儲的 index 名稱的前綴(最終寫入的 index 是由該前綴以及 TimeBucket 后綴兩部分共同構成)以及各個字段對應的 field 名稱,如下所示:


// @Stream 注解的 name 屬性指定了 index 的名稱(index 前綴)铸题,processor 指定了處理該類型數據的 StreamProcessor 實現

@SuperDataset
@Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, builder = SegmentRecord.Builder.class, processor = RecordStreamProcessor.class)
public class SegmentRecord extends Record {

    public static final String INDEX_NAME = "segment";
    public static final String SEGMENT_ID = "segment_id";
    public static final String TRACE_ID = "trace_id";
    public static final String SERVICE_ID = "service_id";
    public static final String SERVICE_INSTANCE_ID = "service_instance_id";
    public static final String ENDPOINT_ID = "endpoint_id";
    public static final String START_TIME = "start_time";
    public static final String LATENCY = "latency";
    public static final String IS_ERROR = "is_error";
    public static final String DATA_BINARY = "data_binary";
    public static final String TAGS = "tags";

    @Setter
    @Getter
    @Column(columnName = SEGMENT_ID, length = 150)
    private String segmentId;
    @Setter
    @Getter
    @Column(columnName = TRACE_ID, length = 150)
    @BanyanDBGlobalIndex(extraFields = {})
    private String traceId;
    @Setter
    @Getter
    @Column(columnName = SERVICE_ID)
    @BanyanDBShardingKey(index = 0)
    private String serviceId;
    @Setter
    @Getter
    @Column(columnName = SERVICE_INSTANCE_ID)
    @BanyanDBShardingKey(index = 1)
    private String serviceInstanceId;
    @Setter
    @Getter
    @Column(columnName = ENDPOINT_ID)
    private String endpointId;
    @Setter
    @Getter
    @Column(columnName = START_TIME)
    private long startTime;
    @Setter
    @Getter
    @Column(columnName = LATENCY)
    private int latency;
    @Setter
    @Getter
    @Column(columnName = IS_ERROR)
    @BanyanDBShardingKey(index = 2)
    private int isError;
    @Setter
    @Getter
    @Column(columnName = DATA_BINARY, storageOnly = true)
    private byte[] dataBinary;
    @Setter
    @Getter
    @Column(columnName = TAGS, indexOnly = true)
    private List<String> tags;

    // ...其它代碼

在 SegmentRecord 的父類 —— Record 中還定義了一個 timeBucket 字段(long 類型)铡恕,對應的 field 名稱是 "time_bucket"。

RecordStreamProcessor 的核心功能是為每個 Record 類型創(chuàng)建相應的 worker 鏈丢间。在 RecordStreamProcessor 中探熔,每個 Record 類型對應的 worker 鏈中只有一個worker 實例 —— RecordPersistentWorker。RecordPersistentWorker 負責 SegmentRecord 數據的持久化烘挫。

@Override
public void in(Record record) {
    try {
        InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record);
        batchDAO.insert(insertRequest);
    } catch (IOException e) {
        LOGGER.error(e.getMessage(), e);
    }
}

recordDAO由StorageDAO創(chuàng)建诀艰,StorageDAO由StorageModule初始化,StorageDAO對應的實例是 StorageEsDAO饮六,創(chuàng)建的batchDAO為RecordEsDAO其垄,batchDAO其實對應的是BatchProcessEsDAO,最后RecordPersistentWorker 生成的全部 IndexRequest 請求會交給全局唯一的 BatchProcessEsDAO 實例批量發(fā)送到 ES 卤橄,完成寫入绿满。

Metrics

有了上面的基礎知道,我們來研究一下聚合到底是怎么做的窟扑。上面我們提到在RPCAnalysisListener的parseExit方法會將Tracing上報數據緩存到dbSlowStatementBuilders集合喇颁,這個集合就是用來收集上報數據中存儲的慢查詢數據用的。收集到數據之后在build方法將數據交給sourceReceiver處理辜膝,sourceReceiver接收的其實是一個DatabaseSlowStatement對象无牵,所以其對應的處理實現類是DatabaseStatementDispatcher,它的dispatch方法其實就是做了聚合的事情厂抖,因為對于慢查詢來說,我們一般都會去關注前N個慢查詢信息克懊,


@Override
public void dispatch(DatabaseSlowStatement source) {
    TopNDatabaseStatement statement = new TopNDatabaseStatement();
    statement.setId(source.getId());
    statement.setServiceId(source.getDatabaseServiceId());
    statement.setLatency(source.getLatency());
    statement.setStatement(source.getStatement());
    statement.setTimeBucket(source.getTimeBucket());
    statement.setTraceId(source.getTraceId());

    TopNStreamProcessor.getInstance().in(statement);
}

TopNStreamProcessor 為每個 TopN 類型(其實只有 TopNDatabaseStatement)提供的 Worker 鏈中只有一個 Worker —— TopNWorker忱辅。與前文介紹的 MetricsPersistentWorker 以及 RecordPersistentWorker 類似,TopNWorker 也繼承了 PersistenceWorker 抽象類谭溉,其結構如下圖所示墙懂,TopNWorker 也是先將 TopNDatabaseStatement 暫存到 DataCarrier,然后由后臺 Consumer 線程定期讀取并調用 onWork() 方法進行處理扮念。

在 TopNWorker.onWorker() 方法中會將 TopNDatabaseStatement 暫存到 LimitedSizeBufferedData 中進行排序损搬。LimitedSizeBufferedData 使用雙隊列模式,繼承了 BufferedData 中進行排序。LimitedSizeBufferedData 底層的隊列實現是 HashMap嵌套LinkedList數組HashMap<String, LinkedList<STORAGE_DATA>>巧勤,其 data 字段(Map 類型)中維護了每個存儲服務的慢查詢(即 TopNDatabaseStatement)列表嵌灰,每個列表都是定長的(由 limitedSize 字段指定,默認 50)颅悉,在調用 ReadWriteSafeCache.write() 方法寫入的時候會按照 latency 從大到小排列沽瞭,并只保留最多 50 個元素。

那么數據是如何寫入的剩瓶?

PersistenceTimer

在CoreModuleProvider的notifyAfterCompleted方法會啟動一個PersistenceTimer實例并調用其start 方法PersistenceTimer.INSTANCE.start(getManager(), moduleConfig)處理PersistenceWorker的入庫驹溃,在它的extractDataAndSave方法主要處理兩類PersistenceWorker

  • TopNStreamProcessor
  • MetricsStreamProcessor

調用PersistenceWorker的buildBatchRequests拿到List<PrepareRequest>之后再調用batchDAO進行入庫操作,batchDAO就是IBatchDAO延曙,我們之前有介紹豌鹤,這里不再贅述。

MetricsStreamProcessor

MetricsStreamProcessor相比TopNStreamProcessor更為復雜的聚合枝缔,主要是解決度量數據的按時間維度的精度模糊聚合布疙,舉個例子,JVM數據一般按年月日時分秒進行記錄魂仍,我們需要按小時或者分鐘的曲線圖拐辽,所以需要將低級單位的數據向高級單位進行聚合,這樣不但可以節(jié)省存儲擦酌,也可以提高查詢性能俱诸,因為監(jiān)控數據不太需要太精確,只需要看到一個發(fā)展的趨勢就可以了赊舶,所以我們可以采用聚合的方式對數據進行模糊化處理睁搭,我們把這個過程叫做Downsampling。

MetricsStreamProcessor 中為每個 Metrics 類型維護了一個 Worker 鏈笼平,


private Map<Class<? extends Metrics>, MetricsAggregateWorker> entryWorkers = new HashMap<>();

MetricsStreamProcessor 初始化 entryWorkers 集合的核心邏輯也是在 create() 方法中园骆,具體方法如下:


public void create(ModuleDefineHolder moduleDefineHolder,
                       StreamDefinition stream,
                       Class<? extends Metrics> metricsClass) throws StorageException {
        final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
        .provider()
        .getService(StorageBuilderFactory.class);
        final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(
            metricsClass, stream.getBuilder());

        StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
        IMetricsDAO metricsDAO;
        try {
            metricsDAO = storageDAO.newMetricsDao(builder.getDeclaredConstructor().newInstance());
        } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e);
        }

        ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
        DownSamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME)
        .provider()
        .getService(DownSamplingConfigService.class);

        MetricsPersistentWorker hourPersistentWorker = null;
        MetricsPersistentWorker dayPersistentWorker = null;

        MetricsTransWorker transWorker = null;

        final MetricsExtension metricsExtension = metricsClass.getAnnotation(MetricsExtension.class);
        /**
         * All metrics default are `supportDownSampling` and `insertAndUpdate`, unless it has explicit definition.
         */
        boolean supportDownSampling = true;
        boolean supportUpdate = true;
        boolean timeRelativeID = true;
        if (metricsExtension != null) {
            supportDownSampling = metricsExtension.supportDownSampling();
            supportUpdate = metricsExtension.supportUpdate();
            timeRelativeID = metricsExtension.timeRelativeID();
        }
        if (supportDownSampling) {
            if (configService.shouldToHour()) {
                Model model = modelSetter.add(
                    metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Hour),
                    false
                );
                hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
            }
            if (configService.shouldToDay()) {
                Model model = modelSetter.add(
                    metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Day),
                    false
                );
                dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
            }

            transWorker = new MetricsTransWorker(
                moduleDefineHolder, hourPersistentWorker, dayPersistentWorker);
        }

        Model model = modelSetter.add(
            metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Minute),
            false
        );
        MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(
            moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate);

        String remoteReceiverWorkerName = stream.getName() + "_rec";
        IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME)
        .provider()
        .getService(IWorkerInstanceSetter.class);
        workerInstanceSetter.put(remoteReceiverWorkerName, minutePersistentWorker, metricsClass);

        MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
        MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
            moduleDefineHolder, remoteWorker, stream.getName(), l1FlushPeriod);

        entryWorkers.put(metricsClass, aggregateWorker);
}

其中 minutePersistentWorker 是一定會存在的,其他 DownSampling(Hour寓调、Day锌唾、Month) 對應的 PersistentWorker 則會根據配置的創(chuàng)建并添加,在 CoreModuleProvider.prepare() 方法中有下面這行代碼夺英,會獲取 Downsampling 配置并保存于 DownsamplingConfigService 對象中配置晌涕。后續(xù)創(chuàng)建上述 Worker 時,會從中獲取配置的 DownSampling痛悯。


 this.registerServiceImplementation(
            DownSamplingConfigService.class, new DownSamplingConfigService(moduleConfig.getDownsampling()));
MetricsAggregateWorker

MetricsAggregateWorker為聚合入口的第一個Worker,其功能就是進行簡單的聚合,MetricsAggregateWorker 在收到 Metrics 數據的時候余黎,會先寫到內部的 DataCarrier 中緩存,然后由 Consumer 線程(都屬于名為 “METRICS_L1_AGGREGATION” 的 BulkConsumePool)消費并進行聚合载萌,并將聚合結果寫入到 MergeDataCache 中的 current 隊列暫存惧财。

同時巡扇,Consumer 會定期(默認1秒,通過 METRICS_L1_AGGREGATION_SEND_CYCLE 配置修改)觸發(fā) current 隊列和 last 隊列的切換垮衷,然后讀取 last 隊列中暫存的數據厅翔,并發(fā)送到下一個 Worker 中處理。

寫入 DataCarrier 的邏輯在前面已經分析過了帘靡,這里不再贅述知给。下面深入分析兩個點:

  • Consumer 線程消費 DataCarrier 并聚合監(jiān)控數據的相關實現。
  • Consumer 線程定期清理 MergeDataCache 緩沖區(qū)并發(fā)送監(jiān)控數據的相關實現描姚。

Consumer 線程在消費 DataCarrier 數據的時候涩赢,首先會進行 Metrics 聚合(即相同 Metrics 合并成一個),然后寫入 MergeDataCache 中轩勘,實現如下:


    /**
     * Dequeue consuming. According to {@link IConsumer#consume(List)}, this is a serial operation for every work
     * instance.
     *
     * @param metricsList from the queue.
     */
    private void onWork(List<Metrics> metricsList) {
        metricsList.forEach(metrics -> {
            aggregationCounter.inc();
            mergeDataCache.accept(metrics);
        });

        flush();
    }

將聚合后的 Metrics 寫入 MergeDataCache 之后筒扒,Consumer 線程會每隔一秒將 MergeDataCache 中的數據發(fā)送到下一個 Worker 處理,相關實現如下:


    private void flush() {
        long currentTime = System.currentTimeMillis();
        if (currentTime - lastSendTime > l1FlushPeriod) {
            mergeDataCache.read().forEach(
                data -> {
                    if (log.isDebugEnabled()) {
                        log.debug(data.toString());
                    }
                    nextWorker.in(data);
                }
            );
            lastSendTime = currentTime;
        }
    }
MetricsRemoteWorker

MetricsAggregateWorker 指向的下一個 Worker 是 MetricsRemoteWorker 绊寻,底層是通過 RemoteSenderService 將監(jiān)控數據發(fā)送到遠端節(jié)點花墩。

它提供了三種不同的發(fā)送策略:

  • HashCode 策略:根據 Hash 值選擇發(fā)送到目標 OAP 節(jié)點。MetricsRemoteWorker 默認使用該策略澄步。
  • Rolling 策略:輪訓方式選擇目標 OAP 節(jié)點冰蘑。
  • ForeverFirst 策略:始終選擇第一個 OAP 節(jié)點作為目標節(jié)點。RegisterRemoteWorker 默認使用該策略村缸。

    @Override
    public final void in(Metrics metrics) {
        try {
            remoteSender.send(remoteReceiverWorkerName, metrics, Selector.HashCode);
        } catch (Throwable e) {
            log.error(e.getMessage(), e);
        }
    }

為什么要發(fā)到其他 OAP 節(jié)點進行處理呢祠肥?

在 CoreModuleProvider 啟動過程中,我們可以看到 OAP 節(jié)點的角色選擇邏輯梯皿,如下所示:


if (Mixed.name().equalsIgnoreCase(moduleConfig.getRole()) || 

      Aggregator.name().equalsIgnoreCase(moduleConfig.getRole())) {

    RemoteInstance gRPCServerInstance = new RemoteInstance(

        new Address(moduleConfig.getGRPCHost(), 

            moduleConfig.getGRPCPort(), true));

    // 只有Mixed仇箱、Aggregator兩種角色的OAP節(jié)點才會通過Cluster模塊進行注冊

    this.getManager().find(ClusterModule.NAME).provider()

       .getService(ClusterRegister.class)

           .registerRemote(gRPCServerInstance);

}

在 application.yml 配置文件中可以配置 Mixed、Receiver东羹、Aggregator 三種角色:

  • Receiver 節(jié)點:負責接收 Agent 請求并進行 L1 級別的聚合處理剂桥,后續(xù)的 L2 級別的聚合操作由其他兩種類型的節(jié)點處理。
  • Mixed 節(jié)點:負責接收 Agent 請求以及其他 OAP 節(jié)點 L1 聚合結果属提,進行 L1 級別和 L2 級別的聚合處理权逗。
  • Aggregator 節(jié)點和:負責接收其他 OAP節(jié)點的 L1 聚合結果,進行 L2 級別的聚合處理冤议。

正是因為集群環(huán)境中不同的服務有不同的角色旬迹,不同步角色做的聚合是不一樣的,所以進行請求轉發(fā)處理求类。

MetricsTransWorker

MetricsRemoteWorker 之后的下一個 worker 是 MetricsTransWorker,其中有兩個字段分別指向兩個不同 Downsampling 粒度的 PersistenceWorker 對象屹耐,如下:


   private final MetricsPersistentWorker hourPersistenceWorker;
   private final MetricsPersistentWorker dayPersistenceWorker;

MetricsTransWorker.in() 方法會根據上述字段是否為空尸疆,將 Metrics 數據分別轉發(fā)到不同的 PersistenceWorker 中進行處理:


public void in(Metrics metrics) {

    // 檢測 Hour椿猎、Day、Month 對應的 PersistenceWorker 是否為空寿弱,若不為空犯眠,

    // 則將 Metrics 數據拷貝一份并調整時間窗口粒度,交到相應的 

    // PersistenceWorker 處理症革,這里省略了具體邏輯

    // 最后筐咧,直接轉發(fā)給 minutePersistenceWorker 進行處理

    if (Objects.nonNull(minutePersistenceWorker)) { 

        aggregationMinCounter.inc();

        minutePersistenceWorker.in(metrics);

    }

}
MetricsPersistentWorker

這個之前已經描述過,這里不再贅述

Logging

SkyWalking OAP 底層支持 ElasticSearch噪矛、H2量蕊、MySQL 等多種持久化存儲,同時也支持讀取其他分布式鏈路追蹤系統(tǒng)的數據艇挨,例如残炮,jaeger(Uber 開源的分布式跟蹤系統(tǒng))、zipkin(Twitter 開源的分布式跟蹤系統(tǒng))缩滨。

首先势就,OAP 存儲了兩種類型的數據:時間相關的數據和非時間相關的數據(與“時序”這個專有名詞區(qū)分一下)。注冊到 OAP 集群的 Service脉漏、ServiceInstance 以及同步的 EndpointName苞冯、NetworkAddress 都是非時間相關的數據,一個穩(wěn)定的服務產生的這些數據是有限的侧巨,我們可以用幾個固定的 ES 索引(或數據庫表)來存儲這些數據舅锄。

而像 JVM 監(jiān)控等都屬于時間相關的數據,它們的數據量會隨時間流逝而線性增加刃泡。如果將這些時間相關的數據存儲到幾個固定的 ES 索引中巧娱,就會導致這些 ES 索引(或數據庫表)非常大,這種顯然是不能落地的烘贴。既然一個 ES 索引(或數據庫表)存不下禁添,一般會考慮切分,常見的切分方式按照時間窗口以及 DownSampling 進行切分桨踪。

Model 與 ES 索引

常見的 ORM 框架會將數據中的表結構映射成 Java 類老翘,表中的一個行數據會映射成一個 Java Bean 對象,在 OAP 的存儲抽象中也有類似的操作锻离。SkyWalking 會將 [Metric + DownSampling] 對應的一系列 ES 索引與一個 Model 對象進行映射铺峭。

Model 對象記錄了對應 ES 索引的核心信息:

  • name(String類型):Metric 名稱,在 OAP 創(chuàng)建 ES 索引時會使用
  • columns(List類型):ES 索引 中的 Field 集合汽纠,一個 ModelColumn 對象記錄了一個 Field 的名稱卫键、類型等信息。
  • capableOfTimeSeries(boolean類型):對應 ES 索引中存儲的數據是否為時間相關的數據虱朵。
  • downsampling(Downsampling類型):如果是時間相關的數據莉炉,則需要指定其 Downsampling 單位钓账,可選值有 Second、 Minute絮宁、 Hour梆暮、 Day、 Month绍昂。對于非時間相關的數據啦粹,則該字段值為 Downsampling.NONE。
  • deleteHistory(boolean類型):是否刪除歷史數據窘游。
  • scopeId(int類型):對應指標的全局唯一 ID唠椭。
    很明顯,ES 索引的名稱由三部分構成:Metric 名稱张峰、DownSampling泪蔫、時間窗口(后面兩部分只有時序數據才會有),而 ES 索引的別名由 Metric 名稱和 DownSampling 構成喘批。

在 CoreModuleProvider 啟動過程中撩荣,會掃描 @Stream 注解,創(chuàng)建相應的 Model 對象饶深,并由 StorageModels 統(tǒng)一管理(底層維護了 List 集合)餐曹。 @Stream 中會包含 Model 需要的信息即可。StorageModels 同時實現了 IModelManager敌厘、ModelCreator台猴、ModelManipulator 三個 Service 接口,在 CoreModuleProvider 的 prepare() 方法中會將 StorageModels 作為上述三個 Service 接口的實現注冊到 services 集合中。如果回看一下上面的代碼你就會發(fā)現在org.MetricsStreamProcessor.create從CoreModuleProvider獲取ModelCreator進行注冊俱两。

//...
ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);

//...
if (configService.shouldToHour()) {
    Model model = modelSetter.add(
                    metricsClass, stream.getScopeId(), new Storage(stream.getName(), timeRelativeID, DownSampling.Hour),
                    false
                );
    hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
}
//...

prepare 方法先注冊監(jiān)聽


annotationScan.registerListener(new StreamAnnotationListener(getManager()));

start 方法啟動@Stream 注解的掃描

// ...
annotationScan.scan();

// ...

下面介紹一下Model的初始化過程饱狂,主要還是調用StorageModels的add方法進行注冊,我們分析一下metricsClass來自于哪里

  • AnalyzerModuleProvider 的start方法中會調用MeterProcessService的start方法
  • MeterProcessService會初始化一個MetricConvert
      public void start(List<MeterConfig> configs) {
        final MeterSystem meterSystem = manager.find(CoreModule.NAME).provider().getService(MeterSystem.class);
        this.metricConverts = configs.stream().map(c -> new MetricConvert(c, meterSystem)).collect(Collectors.toList());
    }

  • MetricConvert會調用Analyzer的build方法

  public MetricConvert(MetricRuleConfig rule, MeterSystem service) {
        Preconditions.checkState(!Strings.isNullOrEmpty(rule.getMetricPrefix()));
        this.analyzers = rule.getMetricsRules().stream().map(
            r -> Analyzer.build(
                formatMetricName(rule, r.getName()),
                rule.getFilter(),
                Strings.isNullOrEmpty(rule.getExpSuffix()) ?
                    r.getExp() : String.format("(%s).%s", r.getExp(), rule.getExpSuffix()),
                service
            )
        ).collect(toList());
    }
  • Analyzer 會MeterSystem的create來初始化MetricsStreamProcessor宪彩,并調用MetricsStreamProcessor的create方法休讳。

初始化存儲結構

很多 ORM 框架(例如,Hibernate )提供了在應用啟動時根據 Java Bean 初始化表結構的功能尿孔。SkyWalking OAP 也提供了類似的功能俊柔,主要在 ModelInstaller 中完成,核心在 whenCreating() 方法中:


@Override
public void whenCreating(Model model) throws StorageException {
        if (RunningMode.isNoInitMode()) {
            while (!isExists(model)) {
                try {
                    log.info(
                        "table: {} does not exist. OAP is running in 'no-init' mode, waiting... retry 3s later.",
                        model
                            .getName()
                    );
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    log.error(e.getMessage());
                }
            }
        } else {
            if (!isExists(model)) {
                log.info("table: {} does not exist", model.getName());
                createTable(model);
            }
        }
}

在StorageModels的add方法中會調用List<CreatingListener> listeners來遍歷具體實現類來訪問whenCreating 來創(chuàng)建表結構活合,這個CreatingListener接口的唯一實現是ModelInstaller雏婶,峰回路轉,你有沒有發(fā)現所有流程已經串接起來白指,也就是AnalyzerModuleProvider初始化model的時候同時創(chuàng)建了表結構留晚。

createTable方法是個虛方法,主要有兩個實現

  • StorageEsInstaller
  • H2TableInstaller

我們主要關注StorageEsInstaller


@Override
protected void createTable(Model model) throws StorageException {
        if (model.isTimeSeries()) {
            createTimeSeriesTable(model);
        } else {
            createNormalTable(model);
        }
}

代碼很清晰就是創(chuàng)建我們介紹兩種表

數據抽象

了解了索引(或是數據庫表結構)的初始化流程之后告嘲,再來看 SkyWalking OAP是如何對持久化數據進行抽象的倔丈。

SkyWalking 與 ORM 類似憨闰,會將索引中的一個 Document (或是數據庫表中的一條數據)映射為一個 StorageData 對象。StorageData 接口中定義了一個 id() 方法需五,負責返回該條數據的唯一標識。我們上面的就是TopN就是一個StorageData 對象實現轧坎。

  • Metrics:所有監(jiān)控指標的頂級抽象宏邮,其中定義了一個 timeBucket 字段(long類型),它是所有監(jiān)控指標的公共字段缸血,用于表示該監(jiān)控點所處的時間窗口蜜氨。另外,timeBucket 字段被 @Column 注解標記捎泻,在 OAP 啟動時會被掃描到轉換成 Model 中的 ModelColumn飒炎,在初始化 ES 索引時就會創(chuàng)建相應的 Field。Metrics 抽象類中還定義了計算監(jiān)控數據的一些公共方法:

    • calculate() 方法:大部分 Metrics 都會有一個 value 字段來記錄該監(jiān)控點的值笆豁,例如郎汪,CountMetrics 中的 value 字段記錄了時間窗口內事件的次數,MaxLongMetrics 中的 value 字段記錄時間窗口內的最大值闯狱。calculate() 方法就是用來計算該 value 值煞赢。

    • combine() 方法:合并兩個監(jiān)控點。對于不同含義的監(jiān)控數據哄孤,合并方式也有所不同照筑,例如,CountMetrics 中 combine() 方法的實現是將兩個監(jiān)控點的值進行求和瘦陈;MaxLongMetrics 中 combine() 方法的實現是取兩個監(jiān)控點的最大值凝危。

  • Record:抽象了所有記錄類型的數據,其子類如下圖所示晨逝。其中 SegmentRecord 對應的是 TraceSegment 數據蛾默、AlarmRecord 對應一條告警、TopNDatabaseStatement 對應一條慢查詢記錄咏花,這些數據都是一條條的記錄趴生。

上述記錄類型的數據也有一個公共字段 —— timeBucket(long 類型),表示該條記錄數據所在的時間窗口昏翰,也同樣被 @Column 注解標記苍匆。Record 抽象類中沒有定義其他的公共方法。

RegisterSource:抽象了服務注冊棚菊、服務實例注冊浸踩、EndpointName(以及 NetworkAddress)同步三個過程中產生的數據。其中统求,定義了三個公共字段检碗,且這三個字段都被 @Column 注解標注了:
sequence(int 類型):上述三個過程中的數據都會產生一個全局唯一的 ID据块,該全局 ID 就保存在該字段中。
registerTime(long 類型):第一注冊(或同步)時的時間戳折剃。
heartbeatTime(long 類型):心跳時間戳另假。
在這三個抽象類下還有很多具體的實現類,這些實現類會根據對應的具體數據怕犁,擴展新的字段和方法

最后边篮,我們來看 StorageBuilder 這個接口,它與 StorageData 接口的關系非常緊密奏甫,在 StorageData 的全部實現類中庸娱,都有一個內部 Builder 類實現類了 StorageBuilder 接口成畦。StorageBuilder 接口中定義了 map2Data() 和 data2Map() 兩個方法,實現了 StorageData 對象與 Map 之間的相互轉換。

DAO 層架構

在創(chuàng)建 ES 索引時使用到的 ElasticSearchClient疆液, 是對 RestHighLevelClient 進行了一層封裝闽晦,位于 library-client 模塊边锁,其中還提供了 JDBC 以及 gRPC 的 Client

SkyWalking OAP 也提供了DAO 層抽象辕坝,如下圖所示,大致可以分為三類:Receiver DAO奈梳、Cache DAO杈湾、Query DAO。

數據 TTL

Metrics攘须、Trace 等(時間相關的數據)對應的 ES 索引都是按照時間進行切分的漆撞,隨著時間的推移,ES 索引會越來越多于宙。為了解決這個問題浮驳,SkyWalking 只會在 ES 中存儲一段時間內的數據,CoreModuleProvider 會啟動 DataTTLKeeperTimer 定時清理過期數據捞魁。

?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末至会,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子谱俭,更是在濱河造成了極大的恐慌奉件,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件昆著,死亡現場離奇詭異县貌,居然都是意外死亡,警方通過查閱死者的電腦和手機凑懂,發(fā)現死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進店門煤痕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事摆碉√料唬” “怎么了?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵巷帝,是天一觀的道長忌卤。 經常有香客問我,道長锅睛,這世上最難降的妖魔是什么埠巨? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮现拒,結果婚禮上,老公的妹妹穿的比我還像新娘望侈。我一直安慰自己印蔬,他們只是感情好,可當我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布脱衙。 她就那樣靜靜地躺著侥猬,像睡著了一般。 火紅的嫁衣襯著肌膚如雪捐韩。 梳的紋絲不亂的頭發(fā)上退唠,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天,我揣著相機與錄音荤胁,去河邊找鬼瞧预。 笑死,一個胖子當著我的面吹牛仅政,可吹牛的內容都是我干的垢油。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼圆丹,長吁一口氣:“原來是場噩夢啊……” “哼滩愁!你這毒婦竟也來了?” 一聲冷哼從身側響起辫封,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤硝枉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后倦微,有當地人在樹林里發(fā)現了一具尸體妻味,經...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年璃诀,在試婚紗的時候發(fā)現自己被綠了弧可。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖棕诵,靈堂內的尸體忽然破棺而出裁良,到底是詐尸還是另有隱情,我是刑警寧澤校套,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布价脾,位于F島的核電站,受9級特大地震影響笛匙,放射性物質發(fā)生泄漏侨把。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一妹孙、第九天 我趴在偏房一處隱蔽的房頂上張望秋柄。 院中可真熱鬧,春花似錦蠢正、人聲如沸骇笔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽笨触。三九已至,卻和暖如春雹舀,著一層夾襖步出監(jiān)牢的瞬間芦劣,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工说榆, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留虚吟,地道東北人。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓娱俺,卻偏偏與公主長得像稍味,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子荠卷,可洞房花燭夜當晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內容