[IoTDB 學(xué)習筆記] [part 2] TsFile 的基本讀寫

說在前面: 這個list記錄了博主在學(xué)習IoTDB[1]期間的總結(jié)和思考,歡迎一起討論學(xué)習哈
相關(guān)介紹可以參考list的第一篇博客:[IoTDB 學(xué)習筆記] [part1] 介紹

TsFile 基本結(jié)構(gòu)

TsFile 是 IoTDB 的底層數(shù)據(jù)文件颤陶,一種專門為時間序列數(shù)據(jù)設(shè)計的列式文件格式颗管。TsFile的設(shè)計細節(jié)具體可以參考[2]

總的來說滓走,TsFile文件主要包含數(shù)據(jù)(Chunk)和元數(shù)據(jù)(Metadata)兩個部分垦江。可以認為對于我們觀測的設(shè)備網(wǎng)絡(luò)中搅方,每個設(shè)備包含若干個測點比吭,每個測點包含一個時間序列Timeseries,時間序列記錄了該測點各時間戳下的值姨涡,時間序列的值記錄在Chunk當中衩藤;對應(yīng)的,其中的元數(shù)據(jù)也分為三部分:ChunkMetadata涛漂,TimeseriesMetadataTsFileMetadata赏表。一個TsFile的示例結(jié)構(gòu)如圖 1 [2]所示,該文件包含兩個設(shè)備d1與d2匈仗,每個設(shè)備包含三個測點s1瓢剿,s2和s3,每個設(shè)備的每個測點對應(yīng)一個時間序列悠轩,每個時間序列包含兩個Chunk间狂。

圖 1 . TsFile的示例結(jié)構(gòu)

查詢流程
以查 d1.s1 為例

  • 反序列化 TsFileMetadata,得到 d1.s1 的 TimeseriesMetadata 的位置
  • 反序列化得到 d1.s1 的 TimeseriesMetadata
  • 根據(jù) d1.s1 的 TimeseriesMetadata火架,反序列化其所有 ChunkMetadata
  • 根據(jù) d1.s1 的每一個 ChunkMetadata鉴象,讀取其 Chunk 數(shù)據(jù)

每個設(shè)備中的數(shù)據(jù)存儲在一個ChunkGroup中,每個ChunkGroup由若干Chunk組成距潘,ChunkChunkHeader和若干Page一起構(gòu)成炼列。

所有元數(shù)據(jù)索引節(jié)點構(gòu)成了一顆元數(shù)據(jù)索引樹,包含傳感器索引層以及可能的設(shè)備索引層音比,各層包含內(nèi)部節(jié)點和葉節(jié)點兩種類型的節(jié)點俭尖,分別為:INTERNAL_MEASUREMENTLEAF_MEASUREMENT洞翩,INTERNAL_DEVICE稽犁,LEAF_DEVICE。其中LEAF_MEASUREMENT指向TimeseriesMetadata骚亿,如圖 2 [2]所示已亥。
max_degree_of_index_node為索引樹節(jié)點的最大子節(jié)點數(shù)目。當總設(shè)備數(shù)目不超過max_degree_of_index_node時来屠,不存在設(shè)備索引層虑椎,如圖 3 [2]所示震鹉。

圖 2 . max_degree_of_index_node為10 150個設(shè)備 每個設(shè)備中有150個傳感器
圖 3 . max_degree_of_index_node為10 5個設(shè)備 每個設(shè)備中有5個傳感器

寫流程

TsFile 的寫入流程如圖 4 [3]所示:

圖 4 . 每個設(shè)備對應(yīng)一個 ChunkGroupWriter 每個傳感器對應(yīng)一個 ChunkWriter

其中,文件的寫入主要分為三種操作捆姜,在圖上用 1传趾、2、3 標注:
1泥技、寫內(nèi)存緩沖區(qū)
2浆兰、持久化 ChunkGroup
3、關(guān)閉文件

寫內(nèi)存緩沖區(qū)

TsFile 文件層的寫入接口有兩種:

  • 寫入一個設(shè)備一個時間戳的多個測點
  • 寫入一個設(shè)備多個時間戳的多個測點

當調(diào)用寫接口時珊豹,這個設(shè)備的數(shù)據(jù)會交給對應(yīng)的 ChunkGroupWriter簸呈,其中的每個測點會交給對應(yīng)的 ChunkWriter 進行寫入。ChunkWriter 完成編碼和打包(生成 Page

持久化 ChunkGroup

當內(nèi)存中的數(shù)據(jù)達到一定閾值店茶,會觸發(fā)持久化操作蜕便。每次持久化會把當前內(nèi)存中所有設(shè)備的數(shù)據(jù)全部持久化到磁盤的 TsFile 文件中。每個設(shè)備對應(yīng)一個 ChunkGroup忽妒,每個測點對應(yīng)一個 Chunk玩裙。持久化完成后會在內(nèi)存中緩存對應(yīng)的元數(shù)據(jù)信息,以供查詢和生成文件尾部 metadata段直。

關(guān)閉文件

根據(jù)內(nèi)存中緩存的元數(shù)據(jù)吃溅,生成 TsFileMetadata 追加到文件尾部,最后關(guān)閉文件鸯檬。

正如 TsFile 基本結(jié)構(gòu) 中所述决侈,生成 TsFileMetadata 的過程中比較重要的一步是建立元數(shù)據(jù)索引 (MetadataIndex) 樹,以使得檢索時間序列數(shù)據(jù)時可以不用讀取所有的TimeseriesMetadata以減少 I/O 操作喧务。

建立元數(shù)據(jù)索引樹的算法大致如下:

  1. 從索引樹的底層開始構(gòu)建赖歌。在傳感器索引層,對于每個設(shè)備功茴,我們首先初始化其葉節(jié)點(類型為LEAF_MEASUREMENT)庐冯,然后對于每個TimeseriesMetadata,在序列化后坎穿,將其加入葉節(jié)點中展父,當一個節(jié)點中的TimeseriesMetadata達到MAX_DEGREE_OF_INDEX_NODE后,將這個節(jié)點加入設(shè)備queue中玲昧,并重新初始化一個葉節(jié)點用于繼續(xù)存放接下來的TimeseriesMetadata栖茉,然后重復(fù)這個過程,直到所有TimeseriesMetadata全部添加完成孵延。然后對于queue中的葉節(jié)點吕漂,逐層地生成上層節(jié)點(INTERNAL_MEASUREMENT),具體方法與上述方法類似尘应,每個上層節(jié)點中最多包含MAX_DEGREE_OF_INDEX_NODE個子節(jié)點惶凝,不斷重復(fù)地生成上級節(jié)點吼虎,直到僅剩一個節(jié)點時,即為需要的根節(jié)點苍鲜。
  2. 然后鲸睛,判斷設(shè)備數(shù)目是否超過MAX_DEGREE_OF_INDEX_NODE,如若未超過坡贺,則直接生成元數(shù)據(jù)索引樹的根節(jié)點(子節(jié)點集包含 1 中每個設(shè)備對應(yīng)的根節(jié)點序列化后轉(zhuǎn)化成的索引項);如若超過箱舞,則需要生成設(shè)備索引層級遍坟,具體方法與 1 類似,在初始化queue后晴股,先生成類型為LEAF_DEVICE的節(jié)點愿伴,每個節(jié)點最多包含MAX_DEGREE_OF_INDEX_NODE個子級索引。然后按照和 1 類似的方法逐層生成上層節(jié)點电湘,直到生成最終的根節(jié)點隔节。

源碼解析

此處引用的為v0.12.0[4]的代碼

接口調(diào)用

利用TSRecord寫

IoTDB 提供了一個TSRecord工具,TSRecord記錄了一個設(shè)備在一個時間戳下的若干測點信息寂呛。

直接寫入新文件時

// 生成 TsFile 存儲文件怎诫,path為對應(yīng)的disk存儲路徑
File f = FSFactoryProducer.getFSFactory().getFile(path);

// 生成 tsfile writer
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {

      // 在 tsfile writer 中注冊測點信息 這里 包含了 0,1,2,3 四個設(shè)備,每個設(shè)備包含 1,2,3 3 個測點
      for (int i = 0; i < 4; i++) {
        tsFileWriter.registerTimeseries(
            new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_1),
            new MeasurementSchema(Constant.SENSOR_1, TSDataType.INT64, TSEncoding.RLE));
        tsFileWriter.registerTimeseries(
            new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_2),
            new MeasurementSchema(Constant.SENSOR_2, TSDataType.INT64, TSEncoding.RLE));
        tsFileWriter.registerTimeseries(
            new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_3),
            new MeasurementSchema(Constant.SENSOR_3, TSDataType.INT64, TSEncoding.RLE));
      }
      // 生成時間序列信息贷痪,設(shè)備0在時間戳為0,4,8..時各測點有值幻妓,均與時間戳的值相同,設(shè)備1,2,3分別在時間戳為1,5,9..劫拢;2,6,10..肉津;3,7,11時各測點有值,且均與時間戳的值相同
      for (int i = 0; i < 100; i++) {
        // 生成設(shè)備(i % 4)在時間戳為i時的記錄
        TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_PREFIX + (i % 4));
        // 生成時間戳為i時設(shè)備(i % 4)的各測點的信息
        DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_1, i);
        DataPoint dPoint2 = new LongDataPoint(Constant.SENSOR_2, i);
        DataPoint dPoint3 = new LongDataPoint(Constant.SENSOR_3, i);
        // 將生成的測點信息加入設(shè)備記錄
        tsRecord.addTuple(dPoint1);
        tsRecord.addTuple(dPoint2);
        tsRecord.addTuple(dPoint3);

        // 將設(shè)備(i % 4)在時間戳為i時的記錄加入tsFile中
        tsFileWriter.write(tsRecord);
      }
    } catch (Exception e) {
      logger.error("meet error in TsFileWrite ", e);
    }

續(xù)寫TsFile時

// 基本流程與直接寫入新文件時一致舱沧,不同的是在生成writer的時候使用:

// f 為讀入的TsFile文件:File f = FSFactoryProducer.getFSFactory().getFile(path);
// 為了續(xù)寫f中存儲的TsFile妹沙,這里使用了ForceAppendTsFileWriter,初始化的時候讀取了f中存儲的TsFile相關(guān)信息
ForceAppendTsFileWriter fwriter = new ForceAppendTsFileWriter(f);
// 截取f中truncatePosition及之前的內(nèi)容(截去尾部元數(shù)據(jù))熟吏,以便添加新的內(nèi)容
fwriter.doTruncate();

利用tablet寫

IoTDB 提供了一個tablet工具距糖,tablet記錄了一個設(shè)備的多個測點的信息,按照一種表格的形式表示分俯,這些測點具有相同的時間戳序列肾筐,因此可以應(yīng)用在測點具有相同時間戳序列(每個時間戳下各個測點都具有值)的設(shè)備中。

// 生成 TsFile 存儲文件缸剪,path為對應(yīng)的disk存儲路徑
File f = FSFactoryProducer.getFSFactory().getFile(path);
if (f.exists() && !f.delete()) {
        throw new RuntimeException("can not delete " + f.getAbsolutePath());
      }

// 初始化模式
Schema schema = new Schema();

// 此處展示了10個測點(一個設(shè)備)吗铐,每個測點1000000個時間序列值的例子
String device = Constant.DEVICE_PREFIX + 1;
String sensorPrefix = "sensor_";
int rowNum = 1000000;
int sensorNum = 10;

// 用于初始化Tablet
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
// 將測點加入模式
for (int i = 0; i < sensorNum; i++) {
IMeasurementSchema measurementSchema =
            new MeasurementSchema(sensorPrefix + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF);
measurementSchemas.add(measurementSchema);
// 用于初始化TsFilewriter時注冊各測點
schema.registerTimeseries(
            new Path(device, sensorPrefix + (i + 1)),
            new MeasurementSchema(sensorPrefix + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
      }

// 生成TsFileWriter,并注冊各測點
try (TsFileWriter tsFileWriter = new TsFileWriter(f, schema)) {

        // 初始化Tablet
        Tablet tablet = new Tablet(device, measurementSchemas);

        long[] timestamps = tablet.timestamps;
        Object[] values = tablet.values;

        // 生成的時間序列初始時間戳以及初始值(各時間戳下的測點的值隨著時間戳的增長而增長)
        long timestamp = 1;
        long value = 1000000L;
        
        // 為Tablet賦值
        for (int r = 0; r < rowNum; r++, value++) {
          // 記錄時間戳的序列杏节,每次增加1(其值為row)唬渗,value也每次加1
          int row = tablet.rowSize++;
          timestamps[row] = timestamp++;
          // 將value賦給每個sensor的當前時間戳(row)的位置
          for (int i = 0; i < sensorNum; i++) {
            long[] sensor = (long[]) values[i];
            sensor[row] = value;
          }
          // 當時間戳數(shù)目達到閾值(tablet.getMaxRowNumber())時典阵,將當前的tablet寫入TsFile,并清空tablet以繼續(xù)重復(fù)該流程來將剩余時間序列信息寫入tsfile
          if (tablet.rowSize == tablet.getMaxRowNumber()) {
            tsFileWriter.write(tablet);
            tablet.reset();
          }
        }
        // 如果最后還有沒有寫入TsFile的內(nèi)容镊逝,將其寫入
        if (tablet.rowSize != 0) {
          tsFileWriter.write(tablet);
          tablet.reset();
        }
      }

流程解析

寫內(nèi)存緩沖區(qū)部分中壮啊,我們了解到在寫入TsFile時具有兩種接口:

  • 寫入一個設(shè)備一個時間戳多個測點
  • 寫入一個設(shè)備多個時間戳多個測點
    分別對應(yīng)上述展示的接口調(diào)用部分中的基于TSRecord和Tablet的寫入方法。

我們了解到撑蒜,當調(diào)用 write 接口時歹啼,這個設(shè)備的數(shù)據(jù)會交給對應(yīng)的 ChunkGroupWriter,其中的每個測點會交給對應(yīng)的 ChunkWriter 進行寫入座菠。ChunkWriter 完成編碼和打包(生成 Page)狸眼。

查看TsFile writer 中寫入方法write,我們發(fā)現(xiàn):

寫內(nèi)存緩沖區(qū)

// 用于基于TSRecord的寫入方法
public boolean write(TSRecord record) throws IOException, WriteProcessException {
    // 確保對應(yīng)的groupwriter和chunkwriter存在浴滴,不存在則生成
    checkIsTimeSeriesExist(record);
    // 獲取所要寫入的設(shè)備的groupwriter拓萌,并調(diào)用其來寫入所要寫入的datapoint
    groupWriters.get(record.deviceId).write(record.time, record.dataPointList);
    ++recordCount;
    return checkMemorySizeAndMayFlushChunks();
  }

// 用于基于Tablet的寫入方法
public boolean write(Tablet tablet) throws IOException, WriteProcessException {
    // 確保對應(yīng)的groupwriter和chunkwriter存在,不存在則生成
    checkIsTimeSeriesExist(tablet);
    // 獲取所要寫入的設(shè)備的groupwriter升略,并調(diào)用其來寫入所要導(dǎo)入的Tablet
    groupWriters.get(tablet.deviceId).write(tablet);
    recordCount += tablet.rowSize;
    return checkMemorySizeAndMayFlushChunks();
  }

持久化 ChunkGroup

// 檢查占用的內(nèi)存是否超過閾值(chunkGroupSizeThreshold)微王,如若超過,則調(diào)用flushAllChunkGroups()將其存至OutputStream中品嚣,并清空各group writer及其下chunk writer中數(shù)據(jù)炕倘,以進行剩余數(shù)據(jù)的輸入
// 返回false表示所占存儲空間未達閾值,true則表示達到
private boolean checkMemorySizeAndMayFlushChunks() throws IOException {
    // recordCountForNextMemCheck為data point數(shù)目閾值的最小值翰撑,此處初步放縮估計內(nèi)存激才,以進行加速
    if (recordCount >= recordCountForNextMemCheck) {
      // 
      long memSize = calculateMemSizeForAllGroup();
      assert memSize > 0;
      if (memSize > chunkGroupSizeThreshold) {
        LOG.debug("start to flush chunk groups, memory space occupy:{}", memSize);
        // 更新recordCountForNextMemCheck
        recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize;
        // 存至OutputStream,并清空writer數(shù)據(jù)
        return flushAllChunkGroups();
      } else {
        // 更新recordCountForNextMemCheck
        recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize;
        return false;
      }
    }
    return false;
  }

// 存至OutputStream额嘿,并清空各group writer及其下chunk writer中數(shù)據(jù)瘸恼,以進行剩余數(shù)據(jù)的輸入
public boolean flushAllChunkGroups() throws IOException {
    if (recordCount > 0) {
      for (Map.Entry<String, IChunkGroupWriter> entry : groupWriters.entrySet()) {
        String deviceId = entry.getKey();
        IChunkGroupWriter groupWriter = entry.getValue();
        // 載入當前設(shè)備
        fileWriter.startChunkGroup(deviceId);
        long pos = fileWriter.getPos();
        // 轉(zhuǎn)移該設(shè)備下的pages
        long dataSize = groupWriter.flushToFileWriter(fileWriter);
        if (fileWriter.getPos() - pos != dataSize) {
          throw new IOException(
              String.format(
                  "Flushed data size is inconsistent with computation! Estimated: %d, Actual: %d",
                  dataSize, fileWriter.getPos() - pos));
        }
        fileWriter.endChunkGroup();
      }
      // 清空其余相關(guān)記錄
      reset();
    }
    return false;
  }

關(guān)閉文件

fileWriter.endFile() 中,生成了path (測點) 到chunkMetadata 的映射 chunkMetadataListMap册养,flushMetadataIndex() 中基于 chunkMetadataListMap 生成了設(shè)備到TimeseriesMetadata 的映射 deviceTimeseriesMetadataMap东帅。

public void close() throws IOException {
    LOG.info("start close file");
    // 寫入剩余的ChunkGroup
    flushAllChunkGroups();
    // 根據(jù)內(nèi)存中緩存的元數(shù)據(jù),生成 TsFileMetadata 追加到文件尾部
    fileWriter.endFile();
  }

private MetadataIndexNode flushMetadataIndex(Map<Path, List<IChunkMetadata>> chunkMetadataListMap)
      throws IOException {

    deviceTimeseriesMetadataMap = new LinkedHashMap<>();
    // device -> TimeseriesMetaDataList 的映射
    for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
      Path path = entry.getKey();
      String device = path.getDevice();

      // 生成 TimeseriesMetaData
      PublicBAOS publicBAOS = new PublicBAOS();
      TSDataType dataType = entry.getValue().get(entry.getValue().size() - 1).getDataType();
      Statistics seriesStatistics = Statistics.getStatsByType(dataType);

      int chunkMetadataListLength = 0;
      boolean serializeStatistic = (entry.getValue().size() > 1);
      // flush chunkMetadataList 
      for (IChunkMetadata chunkMetadata : entry.getValue()) {
        if (!chunkMetadata.getDataType().equals(dataType)) {
          continue;
        }
        chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
        seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
      }
      TimeseriesMetadata timeseriesMetadata =
          new TimeseriesMetadata(
              (byte)
                  ((serializeStatistic ? (byte) 1 : (byte) 0) | entry.getValue().get(0).getMask()),
              chunkMetadataListLength,
              path.getMeasurement(),
              dataType,
              seriesStatistics,
              publicBAOS);
      deviceTimeseriesMetadataMap
          .computeIfAbsent(device, k -> new ArrayList<>())
          .add(timeseriesMetadata);
    }

    return MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, out);
  }

然后通過MetadataIndexConstructor.constructMetadataIndex() 結(jié)合存儲在輸出流中的信息生成元數(shù)據(jù)樹球拦。

public static MetadataIndexNode constructMetadataIndex(
      Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap, TsFileOutput out)
      throws IOException {
    Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();

    // 對于每個設(shè)備靠闭,有:
    for (Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap.entrySet()) {
      if (entry.getValue().isEmpty()) {
        continue;
      }

      // 傳感器索引層,初始化此設(shè)備的索引節(jié)點的隊列
      Queue<MetadataIndexNode> measurementMetadataIndexQueue = new ArrayDeque<>();
      TimeseriesMetadata timeseriesMetadata;
      
      // 傳感器索引層坎炼,初始化傳感器索引層級的葉子節(jié)點
      MetadataIndexNode currentIndexNode =
          new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
       
      // 對于每個 TimeseriesMetadata愧膀,有:
      for (int i = 0; i < entry.getValue().size(); i++) {
        timeseriesMetadata = entry.getValue().get(i);
        // 每隔 MAX_DEGREE_OF_INDEX_NODE 個,加一條 entry 到 currentIndexNode 中
        if (i % config.getMaxDegreeOfIndexNode() == 0) {
          // 每當 currentIndexNode 中攢夠 MAX_DEGREE_OF_INDEX_NODE 個 entry 后
          if (currentIndexNode.isFull()) {
            // 將 currentIndexNode 加入 queue 中
            addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
            // 并將 currentIndexNode 指向一個新的 MetadataIndexNode
            currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
          }
          currentIndexNode.addEntry(
              new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(), out.getPosition()));
        }
      
        // 序列化 timeseriesMetadata
        timeseriesMetadata.serializeTo(out.wrapAsStream());
      }
      // 將 currentIndexNode 加入 queue 中
      addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);

      // 根據(jù) queue 中已經(jīng)存儲的葉子節(jié)點谣光,逐層生成上層節(jié)點檩淋,直至最終的根節(jié)點,并將"設(shè)備-根節(jié)點"對應(yīng)的映射加入 deviceMetadataIndexMap 中
      deviceMetadataIndexMap.put(
          entry.getKey(),
          generateRootNode(
              measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
    }

    // 判斷設(shè)備數(shù)是否超過 MAX_DEGREE_OF_INDEX_NODE萄金,如果未超過則可以直接形成元數(shù)據(jù)索引樹的根節(jié)點并返回
    if (deviceMetadataIndexMap.size() <= config.getMaxDegreeOfIndexNode()) {  
      MetadataIndexNode metadataIndexNode =
          new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
      
      // 對于 deviceMetadataIndexMap 中的每一個 entry蟀悦,有:
      for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
        
        // 將其轉(zhuǎn)化成一個索引項媚朦,加入到 metadataIndexNode 中
        metadataIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));

        // 序列化 entry
        entry.getValue().serializeTo(out.wrapAsStream());
      }
      
      // 設(shè)置根節(jié)點的 endOffset 并返回
      metadataIndexNode.setEndOffset(out.getPosition());
      return metadataIndexNode;
    }

    // 如若不然,則生成設(shè)備索引層
    // 初始化存放設(shè)備索引層級節(jié)點的隊列 queue
    Queue<MetadataIndexNode> deviceMetadaIndexQueue = new ArrayDeque<>();
    // 初始化設(shè)備索引層級的葉子節(jié)點 currentIndexNode
    MetadataIndexNode currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);

    // 對于 deviceMetadataIndexMap 中的每一個 entry日戈,有:
    for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
      // 每當 currentIndexNode 中攢夠 MAX_DEGREE_OF_INDEX_NODE 個 entry 后询张,  
      if (currentIndexNode.isFull()) {
        addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadaIndexQueue, out);
        // 將 currentIndexNode 指向一個新的 MetadataIndexNode
        currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
      }
      // 將其轉(zhuǎn)化成一個索引項,加入到 currentIndexNode 中
      currentIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
      // 序列化 entry
      entry.getValue().serializeTo(out.wrapAsStream());
    }
    // 將 currentIndexNode 加入 queue 中
    addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadaIndexQueue, out);
    // 根據(jù) queue 中已經(jīng)存儲的葉子節(jié)點浙炼,逐層生成上層節(jié)點份氧,直至最終的根節(jié)點
    MetadataIndexNode deviceMetadataIndexNode =
        generateRootNode(deviceMetadaIndexQueue, out, MetadataIndexNodeType.INTERNAL_DEVICE);
    // 設(shè)置根節(jié)點的 endOffset 并返回
    deviceMetadataIndexNode.setEndOffset(out.getPosition());
    return deviceMetadataIndexNode;
  }

對于其中的generateRootNode(),有:

// 該方法需要將隊列中的 MetadataIndexNode 形成樹級結(jié)構(gòu)弯屈,并返回根節(jié)點
private static MetadataIndexNode generateRootNode(
      Queue<MetadataIndexNode> metadataIndexNodeQueue, TsFileOutput out, MetadataIndexNodeType type)
      throws IOException {
    int queueSize = metadataIndexNodeQueue.size();
    MetadataIndexNode metadataIndexNode;
    
    // 根據(jù)需要的類型 type 初始化 currentIndexNode
    MetadataIndexNode currentIndexNode = new MetadataIndexNode(type);
    
    // 當隊列中有多余一個節(jié)點時 (不是僅剩根節(jié)點時)半火,循環(huán)處理隊列,對于隊列中存在的每個節(jié)點季俩,有:
    while (queueSize != 1) {
      for (int i = 0; i < queueSize; i++) {
        metadataIndexNode = metadataIndexNodeQueue.poll();
        // 每當 currentIndexNode 中攢夠 MAX_DEGREE_OF_INDEX_NODE 個 entry 后
        if (currentIndexNode.isFull()) {

          // 將 currentIndexNode 加入 queue 中
          addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexNodeQueue, out);

          // 并將 currentIndexNode 指向一個新的 MetadataIndexNode
          currentIndexNode = new MetadataIndexNode(type);
        }

        // 將其轉(zhuǎn)化成一個索引項,加入到 currentIndexNode 中
        currentIndexNode.addEntry(
            new MetadataIndexEntry(metadataIndexNode.peek().getName(), out.getPosition()));
        
        // 序列化 metadataIndexNode
        metadataIndexNode.serializeTo(out.wrapAsStream());
      }

      // 將 currentIndexNode 加入 queue 中
      addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexNodeQueue, out);
      
      // 并將 currentIndexNode 指向一個新的 MetadataIndexNode
      currentIndexNode = new MetadataIndexNode(type);
      queueSize = metadataIndexNodeQueue.size();
    }

    // 隊列中只剩下一個節(jié)點時梅掠,返回隊列中最終剩余的節(jié)點 (即為根節(jié)點)
    return metadataIndexNodeQueue.poll();
  }

讀流程

過濾條件和查詢表達式

TsFile 查詢時的過濾器Filter定義了12種基本類型如下酌住,詳細定義可以參考[5]

等于 大于 大于等于 小于 小于等于 不等于
TimeEq TimeGt TimeGtEq TimeLt TimeLtEq TimeNotEq
ValueEq ValueGt ValueGtEq ValueLt ValueLtEq ValueNotEq

Filter整體上分為一元條件UnaryFilter以及二元條件BinaryFilter,二元條件存在邏輯與關(guān)系AndFilter以及邏輯或關(guān)系OrFilter兩種條件間關(guān)系阎抒。

IoTDB中提供了兩種查詢表達式:SingleSeriesExpression以及GlobalTimeExpression酪我。SingleSeriesExpression針對一個時間序列進行過濾,而GlobalTimeExpression的過濾條件則應(yīng)用于全部時間序列且叁。因此都哭,SingleSeriesExpression包含pathfilter兩個參數(shù),path用于確定針對的時間序列逞带,filter則表示查詢的過濾條件欺矫。而GlobalTimeExpression則只包含filter

查詢過濾條件(IExpression)為執(zhí)行查詢時的表達式展氓,有如下的定義:

IExpression := SingleSeriesExpression | GlobalTimeExpression | AndExpression | OrExpression
AndExpression := IExpression && IExpression
OrExpression := IExpression || IExpression

然而這些IExpression并不全部可以直接應(yīng)用于查詢穆趴,需要利用特殊的優(yōu)化算法轉(zhuǎn)化為可執(zhí)行的表達式,滿足以下任一條件的查詢表達式為可執(zhí)行表達式:

  1. IExpression 為單一的 GlobalTimeExpression
  2. IExpression 為單一的 SingleSeriesExpression
  3. IExpressionAndExpression遇汞,且葉子節(jié)點均為 SingleSeriesExpression
  4. IExpressionOrExpression,且葉子節(jié)點均為 SingleSeriesExpression

上述優(yōu)化算法大致如下:

首先,根據(jù)上述的定義弥咪,一個不是可執(zhí)行表達式的查詢表達式應(yīng)是AndExpression或者OrExpression咖城,且同時包含GlobalTimeExpressionSingleSeriesExpression

因此歪赢,我們主要的主要問題是處理GlobalTimeExpressionSingleSeriesExpression的查詢統(tǒng)一問題化戳,選擇的解決辦法是把GlobalTimeExpression的查詢條件投影到全部待查詢的path(測點)中,然后就可以化歸成 SingleSeriesExpression間的合并埋凯,這里我們的 IExpression 代指由若干條SingleSeriesExpression合并而成的條件迂烁,因此有:對于IExpression間和合并直接根據(jù)兩者的關(guān)系按照生成AndExpression或者 ORExpression的方法處理看尼,生成一個新的 IExpression (SingleSeriesExpression直接作為IExpression處理);
而對于GlobalTimeExpression間的合并盟步,則直接根據(jù)兩者的關(guān)系按照生成AndExpression或者 ORExpression的方法處理藏斩,生成一個新的 GlobaLTimeFilter
對于GlobalTimeExpressionIExpression的合并却盘,我們按照上述的映射方法化歸問題狰域,生成新的 IExpression,值得注意的是黄橘,如若兩者間的關(guān)系是AND兆览,則映射的path即為被合并的IExpression中包含的有效 path,而如若關(guān)系式OR塞关,則需首先查詢 GlobalTimeExpression的投影集合抬探。
IoTDB對于上述的算法提供了optimize()方法來進行實現(xiàn)(包含combineTwoGlobalTimeExpression()handleOneGlobalTimeExpressionr()MergeIExpression()的實現(xiàn)方法帆赢,詳細定義可以參照[5])

TsFile 查詢執(zhí)行過程

TsFile 文件層查詢接口只包含原始數(shù)據(jù)查詢小压,根據(jù)是否包含值過濾條件,可以將查詢分為兩類“無過濾條件或僅包含時間過濾條件查詢”和“包含值過濾條件的查詢”椰于。這里重點介紹其中“無過濾條件或僅包含時間過濾條件查詢”怠益,“包含值過濾條件的查詢”的詳細介紹可以參照[5]
為了執(zhí)行以上兩類查詢瘾婿,有兩套查詢流程:

  • 歸并查詢

生成多個 reader蜻牢,按照 time 對齊,返回結(jié)果集偏陪。

  • 連接查詢

根據(jù)查詢條件生成滿足過濾條件的時間戳抢呆,通過滿足條件的時間戳查詢投影列的值,返回結(jié)果集笛谦。

歸并查詢

歸并查詢對于每一個時間序列都構(gòu)建了一個FileSeriesReader镀娶,基于最小堆來實現(xiàn)數(shù)據(jù)的合并。
查詢過程中揪罕,系統(tǒng)根據(jù)所有的FileSeriesReader生成一個DataSetWithoutTimeGenerator梯码,由于每個FileSeriesReader會按照時間戳從小到大的順序迭代地返回數(shù)據(jù)點,所以可以采用“多路歸并”對所有FileSeriesReader的結(jié)果進行按時間戳對齊好啰。

詳細的算法描述如下[5]

(1) 創(chuàng)建一個最小堆轩娶,堆里面存儲“時間戳”,該堆將按照每個時間戳的大小進行組織框往。
(2) 初始化堆鳄抒,依次訪問每一個 FileSeriesReader,如果該 FileSeriesReader 中還有數(shù)據(jù)點,則獲取數(shù)據(jù)點的時間戳并放入堆中许溅。此時每個時間序列最多有1個時間戳被放入到堆中瓤鼻,即該序列最小的時間戳。
(3) 如果堆的 size > 0贤重,獲取堆頂?shù)臅r間戳茬祷,記為t,并將其在堆中刪除并蝗,進入步驟(4)祭犯;如果堆的 size 等于0,則跳到步驟(5)滚停,結(jié)束數(shù)據(jù)合并過程沃粗。
(4) 創(chuàng)建新的 RowRecord。依次遍歷每一條時間序列键畴。在處理其中一條時間序列時最盅,如果該序列沒有更多的數(shù)據(jù)點,則將該列標記為 null 并添加在 RowRecord 中起惕;否則涡贱,判斷最小的時間戳是否與 t 相同,若不相同疤祭,則將該列標記為 null 并添加在 RowRecord 中。若相同饵婆,將該數(shù)據(jù)點添加在 RowRecord 中勺馆,同時判斷該時間序列是否有新的數(shù)據(jù)點,若存在侨核,則將下一個時間戳 t' 添加在堆中草穆,并將 t' 設(shè)為當前時間序列的最小時間戳。最后搓译,返回步驟(3)悲柱。
(5) 結(jié)束數(shù)據(jù)合并過程。

連接查詢

連接查詢生成滿足“選擇條件”的時間戳些己、查詢被投影列在對應(yīng)時間戳下的數(shù)據(jù)點豌鸡、合成RowRecord,即針對滿足條件的時間戳段标,將其依次投影至各時間序列上涯冠,記錄滿足條件的數(shù)據(jù)。

主要流程如下[5]

(1) 根據(jù) QueryExpression逼庞,初始化時間戳計算模塊 TimeGeneratorImpl
(2) 為每個被投影的時間序列創(chuàng)建 FileSeriesReaderByTimestamp
(3) 如果“時間戳計算模塊”中還有下一個時間戳蛇更,則計算出下一個時間戳 t ,進入步驟(4);否則派任,結(jié)束查詢砸逊。
(4) 根據(jù) t,在每個時間序列上使用 FileSeriesReaderByTimestamp 組件獲取在時間戳 t 下的數(shù)據(jù)點掌逛;如果在該時間戳下沒有對應(yīng)的數(shù)據(jù)點师逸,則用 null 表示。
(5) 將步驟(4)中得到的所有數(shù)據(jù)點合并成一個 RowRecord颤诀,此時得到一條查詢結(jié)果字旭,返回步驟(3)計算下一個查詢結(jié)果。

查詢流程

TsFileExecutor接收一個QueryExpression崖叫,執(zhí)行該查詢并返回相應(yīng)的 QueryDataSet遗淳。

其基本流程如下:

(1)接收一個 QueryExpression
(2)如果無過濾條件,執(zhí)行歸并查詢心傀。如果該 QueryExpression 包含 Filter(過濾條件)屈暗,則通過 ExpressionOptimizer 對該 QueryExpression 的 Filter 進行優(yōu)化。如果是 GlobalTimeExpression脂男,執(zhí)行歸并查詢养叛。如果包含值過濾,交給 ExecutorWithTimeGenerator 執(zhí)行連接查詢宰翅。
(3) 生成對應(yīng)的 QueryDataSet弃甥,迭代地生成 RowRecord,將查詢結(jié)果返回汁讼。

源碼分析

此處引用的為v0.12.0[4]的代碼
這里未介紹 value filter 相關(guān)的查詢淆攻,詳細的介紹可以參考[5],代碼可以參考[4]嘿架。

接口調(diào)用

// 這里展示了一個具有 0,1,2,3 4個設(shè)備瓶珊,每個設(shè)備具有1,2,3 3個測點的TsFile的例子 
// 初始化 TsFile reader
try (TsFileSequenceReader reader = new TsFileSequenceReader(path);
        ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader)) {

無 filter

      // 將查詢映射到設(shè)備0的各個 path (測點)上
      ArrayList<Path> paths = new ArrayList<>();
      paths.add(new Path("device_0", "sensor_1"));
      paths.add(new Path("device_0", "sensor_2"));
      paths.add(new Path("device_0", "sensor_3"));


      // 不使用 filter的情況,應(yīng)該查詢出所有設(shè)備0某一測點具有有效值的數(shù)據(jù)點對應(yīng)的時間戳下的設(shè)備0的測點1,2,3的情況
      queryAndPrint(paths, readTsFile, null);
}

存在 time filter

      // 使用 time filter (4 <= time <= 10) 的情況耸彪,應(yīng)該選出時間戳在[4,10]間的有效記錄
      IExpression timeFilter =
          BinaryExpression.and(
              new GlobalTimeExpression(TimeFilter.gtEq(4L)),
              new GlobalTimeExpression(TimeFilter.ltEq(10L)));
      queryAndPrint(paths, readTsFile, timeFilter);

其中伞芹,查詢執(zhí)行部分接口調(diào)用如下:

// 其中,paths 為查詢的全體測點集蝉娜,statement 為查詢條件
private static void queryAndPrint(
      ArrayList<Path> paths, ReadOnlyTsFile readTsFile, IExpression statement) throws IOException {
    // 根據(jù)查詢條件生成queryExpression
    QueryExpression queryExpression = QueryExpression.create(paths, statement);
    // 執(zhí)行查詢唱较,如若存在過濾條件 (filter) 則先進行優(yōu)化
    QueryDataSet queryDataSet = readTsFile.query(queryExpression);
    
    // 完成查詢,并輸出全部查詢結(jié)果
    while (queryDataSet.hasNext()) {
      System.out.println(queryDataSet.next());
    }
    System.out.println("------------");
  }

另外值得一提的是IoTDB還提供了可以根據(jù) offset 逐 byte 讀取 TsFile 的接口:

marker = reader.readMarker()

流程解析

如上文所述召川,查詢過程的主要流程包括生成QueryExpression绊汹,(對于存在filter時) 優(yōu)化, 查詢 (歸并查詢 / 連接查詢)扮宠。我們發(fā)現(xiàn)西乖,在生成QueryExpression后狐榔,由readTsFile.query()生成queryDataSet,再迭代地生成RowRecord获雕。

查看readTsFile.query()模塊有:

public QueryDataSet query(QueryExpression queryExpression) throws IOException {
    return tsFileExecutor.execute(queryExpression);
  }

public QueryDataSet execute(QueryExpression queryExpression) throws IOException {
    // 使用 bloom filter 判斷檢索對象 path 是否存在
    BloomFilter bloomFilter = metadataQuerier.getWholeFileMetadata().getBloomFilter();
    List<Path> filteredSeriesPath = new ArrayList<>();
    if (bloomFilter != null) {
      for (Path path : queryExpression.getSelectedSeries()) {
        if (bloomFilter.contains(path.getFullPath())) {
          filteredSeriesPath.add(path);
        }
      }
      queryExpression.setSelectSeries(filteredSeriesPath);
    }

    // 加載metadata
    metadataQuerier.loadChunkMetaDatas(queryExpression.getSelectedSeries());
   
    // 存在 filter 時
    if (queryExpression.hasQueryFilter()) {
      try {
        // 獲取查詢表達式
        IExpression expression = queryExpression.getExpression();
        // 獲取可執(zhí)行查詢表達式
        IExpression regularIExpression =
            ExpressionOptimizer.getInstance()
                .optimize(expression, queryExpression.getSelectedSeries());
        // 更新為對應(yīng)的可執(zhí)行表達式
        queryExpression.setExpression(regularIExpression);
        
        // 當可執(zhí)行表達式為 GlobalTimeExpression 時薄腻,及全由 GlobalTimeExpression 組成時,執(zhí)行歸并查詢
        if (regularIExpression instanceof GlobalTimeExpression) {
          
          return execute(
              queryExpression.getSelectedSeries(), (GlobalTimeExpression) regularIExpression);
        } else {
          // 反之届案,執(zhí)行連接查詢
          return new ExecutorWithTimeGenerator(metadataQuerier, chunkLoader)
              .execute(queryExpression);
        }
      } catch (QueryFilterOptimizationException | NoMeasurementException e) {
        throw new IOException(e);
      }
    } else {
      try {
        // 沒有 filter 時庵楷,執(zhí)行歸并查詢
        return execute(queryExpression.getSelectedSeries());
      } catch (NoMeasurementException e) {
        throw new IOException(e);
      }
    }
  }

// 其中對于存在 GlobalTimeExpression 的 filter 時,以及無 filter 時楣颠,有:
private QueryDataSet executeMayAttachTimeFiler(
      List<Path> selectedPathList, GlobalTimeExpression timeExpression)
      throws IOException, NoMeasurementException {
    List<AbstractFileSeriesReader> readersOfSelectedSeries = new ArrayList<>();
    List<TSDataType> dataTypes = new ArrayList<>();

    for (Path path : selectedPathList) {
      List<IChunkMetadata> chunkMetadataList = metadataQuerier.getChunkMetaDataList(path);
      AbstractFileSeriesReader seriesReader;
      if (chunkMetadataList.isEmpty()) {
        seriesReader = new EmptyFileSeriesReader();
        dataTypes.add(metadataQuerier.getDataType(path));
      } else {
        // 沒有filter 時
        if (timeExpression == null) {
          seriesReader = new FileSeriesReader(chunkLoader, chunkMetadataList, null);
        } else {
          // 存在 GlobalTimeExpression 的 filter 時尽纽,將 GlobalTimeExpression 的條件加入每一個時間序列的 reader 中
          seriesReader =
              new FileSeriesReader(chunkLoader, chunkMetadataList, timeExpression.getFilter());
        }
        dataTypes.add(chunkMetadataList.get(0).getDataType());
      }
      readersOfSelectedSeries.add(seriesReader);
    }
    // 開始歸并查詢
    return new DataSetWithoutTimeGenerator(selectedPathList, dataTypes, readersOfSelectedSeries);
  }

歸并查詢

// 初始化堆
private void initHeap() throws IOException {
    hasDataRemaining = new ArrayList<>();
    batchDataList = new ArrayList<>();
    timeHeap = new PriorityQueue<>();
    timeSet = new HashSet<>();

    // 初始化堆,依次訪問每個 series reader    
    for (int i = 0; i < paths.size(); i++) {
      AbstractFileSeriesReader reader = readers.get(i);
      
      // 如果沒有數(shù)據(jù)點時
      if (!reader.hasNextBatch()) {
        batchDataList.add(new BatchData());
        hasDataRemaining.add(false);
      } else {
        
        // 如果還有數(shù)據(jù)點童漩,將其加入list
        batchDataList.add(reader.nextBatch());
        hasDataRemaining.add(true);
      }
    }

    // 獲取 list 中有效數(shù)據(jù)點中時間戳放入堆中
    for (BatchData data : batchDataList) {
      if (data.hasCurrent()) {
        timeHeapPut(data.currentTime());
      }
    }
  }

queryDataSet.hasNext()中判斷堆的size是否大于0弄贿,

如若大于0,有:

public RowRecord nextWithoutConstraint() throws IOException {
    // 獲取堆頂記錄的最小時間
    long minTime = timeHeapGet();

    // 新建 RowRecord
    RowRecord record = new RowRecord(minTime);

    // 訪問每一個選中的時間序列
    for (int i = 0; i < paths.size(); i++) {

      Field field = new Field(dataTypes.get(i));

      // 沒有更多的數(shù)據(jù)點時矫膨,標記為null
      if (!hasDataRemaining.get(i)) {
        record.addField(null);
        continue;
      }
      
      // 獲取數(shù)據(jù)點
      BatchData data = batchDataList.get(i);
      
      // 如果當前的最小時間戳等于該測點當前時間戳
      if (data.hasCurrent() && data.currentTime() == minTime) {
        // 添加該數(shù)據(jù)點
        putValueToField(data, field);
        // 移動到下一個時間戳
        data.next();
        
        // 沒有下一個時間戳時
        if (!data.hasCurrent()) {
          AbstractFileSeriesReader reader = readers.get(i);
          // 判斷有無nextBatch差凹,如有,則判斷其中有無新的時間戳侧馅,如有則將其加入堆中
          if (reader.hasNextBatch()) {
            data = reader.nextBatch();
            if (data.hasCurrent()) {
              batchDataList.set(i, data);
              timeHeapPut(data.currentTime());
            } else {
              hasDataRemaining.set(i, false);
            }
          } else {
            hasDataRemaining.set(i, false);
          }
        } else {
          // 如果有下一個時間戳危尿,將其加入堆中
          timeHeapPut(data.currentTime());
        }
        record.addField(field);
      } else {
        // 如果兩個時間不相同,則將其標志為 null
        record.addField(null);
      }
    }
    return record;
  }

連接查詢

public DataSetWithTimeGenerator execute(QueryExpression queryExpression) throws IOException {
    
    // 獲取查詢表達式
    IExpression expression = queryExpression.getExpression();
    List<Path> selectedPathList = queryExpression.getSelectedSeries();

    // 根據(jù)查詢表達式獲取時間戳計算單元
    TimeGenerator timeGenerator = new TsFileTimeGenerator(expression, chunkLoader, metadataQuerier);
    
    List<Boolean> cached =
        markFilterdPaths(expression, selectedPathList, timeGenerator.hasOrNode());
    // 為每個被投影的時間序列創(chuàng)建 FileSeriesReaderByTimestamp
    List<FileSeriesReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
    List<TSDataType> dataTypes = new ArrayList<>();

    Iterator<Boolean> cachedIterator = cached.iterator();
    Iterator<Path> selectedPathIterator = selectedPathList.iterator();
    // 如果“時間戳計算模塊”中還有下一個時間戳馁痴,則計算出下一個時間戳
    while (cachedIterator.hasNext()) {
      boolean cachedValue = cachedIterator.next();
      Path selectedPath = selectedPathIterator.next();

      List<IChunkMetadata> chunkMetadataList = metadataQuerier.getChunkMetaDataList(selectedPath);
      if (chunkMetadataList.size() != 0) {
        dataTypes.add(chunkMetadataList.get(0).getDataType());
        
        // 如果該時間戳下沒有滿足 filter 的數(shù)據(jù)點谊娇,則將各對應(yīng)的測點下該時間戳處設(shè)為null
        if (cachedValue) {
          readersOfSelectedSeries.add(null);
          continue;
        }
        FileSeriesReaderByTimestamp seriesReader =
            new FileSeriesReaderByTimestamp(chunkLoader, chunkMetadataList);
        
        // 將該 FileSeriesReaderByTimestamp 組件加入 readersOfSelectedSeries 以便獲取在該時間戳下的數(shù)據(jù)點
        readersOfSelectedSeries.add(seriesReader);
      } else {
        // 如果 selectedPath 為空
        selectedPathIterator.remove();
        cachedIterator.remove();
      }
    }

    // 返回 DataSetWithTimeGenerator
    return new DataSetWithTimeGenerator(
        selectedPathList, cached, dataTypes, timeGenerator, readersOfSelectedSeries);
  }

// 根據(jù)上述“時間戳計算模塊”中算出的時間戳,對于每一個時間戳罗晕,分別在每個時間序列上使用之前記錄的 FileSeriesReaderByTimestamp 組件獲取在該時間戳下的數(shù)據(jù)點济欢,并合并成一個 RowRecord,得到一個查詢結(jié)果
public RowRecord nextWithoutConstraint() throws IOException {
    // 獲取下一個“時間戳計算模塊”中算出的時間戳
    long timestamp = timeGenerator.next();
    RowRecord rowRecord = new RowRecord(timestamp);
    
    // 對于每個時間序列攀例,獲取在該時間戳下的數(shù)據(jù)點
    for (int i = 0; i < paths.size(); i++) {

      // 如果是存在 filter 作用在該時間序列的情況
      if (cached.get(i)) {
        // 利用 time generator 中的 reader 獲取對應(yīng)數(shù)據(jù)點
        Object value = timeGenerator.getValue(paths.get(i));
        
        // 加入 RowRecord
        rowRecord.addField(value, dataTypes.get(i));
        continue;
      }

      // 如果是不存在 filter 作用在該時間序列的情況
      // 利用 time generator 中的 reader 獲取對應(yīng)數(shù)據(jù)點
      FileSeriesReaderByTimestamp fileSeriesReaderByTimestamp = readers.get(i);
      Object value = fileSeriesReaderByTimestamp.getValueInTimestamp(timestamp);
      // 加入 RowRecord
      rowRecord.addField(value, dataTypes.get(i));
    }

    return rowRecord;
  }

  1. 物聯(lián)網(wǎng)時序數(shù)據(jù)庫 Apache IoTDB船逮,詳細信息可以在https://iotdb.apache.org/中找到顾腊。 ?

  2. https://iotdb.apache.org/zh/SystemDesign/TsFile/Format.html ? ? ? ?

  3. https://iotdb.apache.org/zh/SystemDesign/TsFile/Write.html ?

  4. https://github.com/apache/iotdb ? ? ?

  5. https://iotdb.apache.org/zh/SystemDesign/TsFile/Read.html ? ? ? ? ? ?

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末粤铭,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子杂靶,更是在濱河造成了極大的恐慌梆惯,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件吗垮,死亡現(xiàn)場離奇詭異垛吗,居然都是意外死亡,警方通過查閱死者的電腦和手機烁登,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進店門怯屉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事锨络《奶桑” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵羡儿,是天一觀的道長礼患。 經(jīng)常有香客問我,道長掠归,這世上最難降的妖魔是什么缅叠? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮虏冻,結(jié)果婚禮上肤粱,老公的妹妹穿的比我還像新娘。我一直安慰自己兄旬,他們只是感情好狼犯,可當我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著领铐,像睡著了一般悯森。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上绪撵,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天瓢姻,我揣著相機與錄音,去河邊找鬼音诈。 笑死幻碱,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的细溅。 我是一名探鬼主播褥傍,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼喇聊!你這毒婦竟也來了恍风?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤誓篱,失蹤者是張志新(化名)和其女友劉穎朋贬,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體窜骄,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡锦募,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了邻遏。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片糠亩。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡虐骑,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出赎线,到底是詐尸還是另有隱情富弦,我是刑警寧澤,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布氛驮,位于F島的核電站腕柜,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏矫废。R本人自食惡果不足惜盏缤,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望蓖扑。 院中可真熱鬧唉铜,春花似錦、人聲如沸律杠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽柜去。三九已至灰嫉,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間嗓奢,已是汗流浹背讼撒。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留股耽,地道東北人根盒。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像物蝙,于是被迫代替她去往敵國和親炎滞。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容