[IoTDB 學(xué)習(xí)筆記] [part 3] 元數(shù)據(jù)管理 MManager

說在前面: 這個list記錄了博主在學(xué)習(xí)IoTDB[1]期間的總結(jié)和思考湘换,歡迎一起討論學(xué)習(xí)哈
相關(guān)介紹可以參考list的第一篇博客:[IoTDB 學(xué)習(xí)筆記] [part1] 介紹

MManager 基本功能及結(jié)構(gòu)

MManager 提供了7種需要記錄日志的針對時間序列節(jié)點和存儲組節(jié)點的操作以及六種針對時間序列的標(biāo)簽 tag與屬性attribute的操作蛤袒,每個操作在操作前先獲得整個元數(shù)據(jù)的寫鎖,操作完后釋放:

  • 創(chuàng)建時間序列
  • 刪除時間序列
  • 設(shè)置存儲組
  • 刪除存儲組
  • 設(shè)置TTL
  • 改變時間序列標(biāo)簽信息offset
  • 改變時間序列的別名

tag & attribute operation:

  • 重命名標(biāo)簽或?qū)傩?/li>
  • 重新設(shè)置標(biāo)簽或?qū)傩缘闹?/li>
  • 刪除已經(jīng)存在的標(biāo)簽或?qū)傩?/li>
  • 添加新的標(biāo)簽
  • 添加新的屬性
  • 更新插入標(biāo)簽和屬性

MManager中的元數(shù)據(jù)主要以元數(shù)據(jù)樹的形式存在,樹中包含三種節(jié)點:StorageGroupMNode慈俯、InternalMNode(非葉子節(jié)點)旋圆、LeafMNode(葉子節(jié)點),他們都是MNode的子類用狱。

每個InternalMNode中都有一個讀寫鎖运怖,查詢元數(shù)據(jù)信息時,需要獲得路徑上每一個InternalMNode的讀鎖夏伊,修改元數(shù)據(jù)信息時摇展,如果修改的是LeafMNode,需要獲得其父節(jié)點的寫鎖溺忧,若修改的是InternalMNode咏连,則只需獲得本身的寫鎖。若該InternalMNode位于 Device 層鲁森,則還包含了一個Map<String, MNode> aliasChildren祟滴,用于存儲別名信息;
StorageGroupMNode繼承InternalMNode歌溉,作為存儲組的節(jié)點垄懂;
LeafMNode中包含了對應(yīng)時間序列的 Schema 信息,其alias以及該時間序列的標(biāo)簽/屬性信息在 tlog 文件中的offset痛垛。

一個示例如圖 1 [2]所示草慧,示例中的整個元數(shù)據(jù)樹分為4層,root 層匙头,storage group 層漫谷,device 層以及 measurement 層。

圖 1

MTree中提供了用于創(chuàng)建或刪除存儲組與時間序列的操作蹂析。

在創(chuàng)建存儲組的時候抖剿,首先創(chuàng)建所有中間節(jié)點,然后確保路徑前綴中不包含其他的存儲組(存儲組間互不包含)识窿,然后再確定目標(biāo)的存儲組不存在斩郎,如若前述的條件均滿足,則創(chuàng)建存儲組節(jié)點喻频,并添加到相應(yīng)的路徑位置下缩宜。

在創(chuàng)建時間序列時,首先確保其中間節(jié)點均存在,然后確定目標(biāo)葉節(jié)點不存在锻煌,如若均滿足則創(chuàng)建葉節(jié)點妓布,如有alias則再創(chuàng)建一個目標(biāo)節(jié)點的兄弟節(jié)點指向該葉節(jié)點。

在刪除存儲組或時間序列的時候宋梧,先將目標(biāo)節(jié)點在其父節(jié)點中的記錄刪除匣沼,然后如若刪除后父節(jié)點為空則遞歸向上刪除。

同時捂龄,MManager還提供了元數(shù)據(jù)查詢的功能:

  • 不帶過濾條件的元數(shù)據(jù)查詢
    根據(jù)是否需要根據(jù)熱度排序释涛,調(diào)用getAllMeasurementSchemaByHeatOrder (需要) 或者getAllMeasurementSchema (不需要)。

  • 帶過濾條件的元數(shù)據(jù)查詢
    其中過濾條件只能是tag屬性倦沧。
    通過在MManager中維護(hù)的tag的倒排索引唇撬,獲得所有滿足索引條件的MeasurementMNode。如若需要根據(jù)熱度排序則根據(jù)lastTimeStamp排序展融,反之根據(jù)序列名的字母序排序窖认。

其中,如果元數(shù)據(jù)較多告希,一次輸出的查詢結(jié)果可能導(dǎo)致OOM扑浸,此時考慮使用fetch size參數(shù) (服務(wù)器端一次最多只取 fetch size 個時間序列)。

源碼分析

MManager 基本結(jié)構(gòu)

MManager 包含如下內(nèi)部屬性

  public static final String TIME_SERIES_TREE_HEADER = "===  Timeseries Tree  ===\n\n";
  private static final String TAG_FORMAT = "tag key is %s, tag value is %s, tlog offset is %d";
  private static final String DEBUG_MSG = "%s : TimeSeries %s is removed from tag inverted index, ";
  private static final String DEBUG_MSG_1 =
      "%s: TimeSeries %s's tag info has been removed from tag inverted index ";
  private static final String PREVIOUS_CONDITION =
      "before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b";

  private static final int UPDATE_SCHEMA_MAP_IN_ARRAYPOOL_THRESHOLD = 5000;

  private static final Logger logger = LoggerFactory.getLogger(MManager.class);

  // 用于生成自動快照的判斷 MTree 是否修改的時間間隔閾值
  private static final long MTREE_SNAPSHOT_THREAD_CHECK_TIME = 600L;

  private final int mtreeSnapshotInterval;
  private final long mtreeSnapshotThresholdTime;

  private String logFilePath;
  private String mtreeSnapshotPath;
  private String mtreeSnapshotTmpPath;
  // 元數(shù)據(jù)信息存儲在MTree當(dāng)中
  private MTree mtree;
  private MLogWriter logWriter;
  private TagLogFile tagLogFile;
  private boolean isRecovering;
  // device -> DeviceMNode
  private RandomDeleteCache<PartialPath, Pair<MNode, Template>> mNodeCache;
  // tag key -> tag value -> LeafMNode
  private Map<String, Map<String, Set<MeasurementMNode>>> tagIndex = new ConcurrentHashMap<>();

  // data type -> number
  private Map<TSDataType, Integer> schemaDataTypeNumMap = new ConcurrentHashMap<>();

  private long reportedDataTypeTotalNum;
  private AtomicLong totalSeriesNumber = new AtomicLong();
  private boolean initialized;
  protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

  private File logFile;
  private ScheduledExecutorService timedCreateMTreeSnapshotThread;
  private ScheduledExecutorService timedForceMLogThread;

  // MTree 的大小閾值
  private static final long MTREE_SIZE_THRESHOLD = config.getAllocateMemoryForSchema();

  private boolean allowToCreateNewSeries = true;

  private static final int ESTIMATED_SERIES_SIZE = config.getEstimatedSeriesSize();

  // template name -> template
  private Map<String, Template> templateMap = new ConcurrentHashMap<>();

基本操作

節(jié)點的基本操作通過MManager的內(nèi)部函數(shù)實現(xiàn)燕偶,其中多數(shù)操作主要通過調(diào)用MTree對應(yīng)方法實現(xiàn)喝噪。

在這些基本操作之中這里主要分析setStorageGroupcreateTimeseries杭跪,deleteStorageGroupsshowTimeseriesWithoutIndex

  • setStorageGroup

設(shè)置存儲組主要通過MTree下的setStorageGroup接口來實現(xiàn)

void setStorageGroup(PartialPath path) throws MetadataException {
    String[] nodeNames = path.getNodes();
    // 檢查路徑是否滿足格式
    checkStorageGroup(path.getFullPath());
    // 將當(dāng)前節(jié)點設(shè)置為根節(jié)點
    MNode cur = root;
    if (nodeNames.length <= 1 || !nodeNames[0].equals(root.getName())) {
      throw new IllegalPathException(path.getFullPath());
    }
    int i = 1;
    // e.g., path = root.a.b.sg, 生成中間節(jié)點 (類型為 MNode)
    while (i < nodeNames.length - 1) {
      MNode temp = cur.getChild(nodeNames[i]);
      // 查看中間節(jié)點是否存在驰吓,不存在則添加
      if (temp == null) {
        cur.addChild(nodeNames[i], new MNode(cur, nodeNames[i]));
      } else if (temp instanceof StorageGroupMNode) {
        // 如果中間節(jié)點是 StorageGroupMNode涧尿,拋出錯誤
        throw new StorageGroupAlreadySetException(temp.getFullPath());
      }
      // 切換當(dāng)前節(jié)點
      cur = cur.getChild(nodeNames[i]);
      i++;
    }
    if (cur.hasChild(nodeNames[i])) {
      // 如果目標(biāo)存儲組節(jié)點存在,拋出錯誤
      if (cur.getChild(nodeNames[i]) instanceof StorageGroupMNode) {
        throw new StorageGroupAlreadySetException(path.getFullPath());
      } else {
        throw new StorageGroupAlreadySetException(path.getFullPath(), true);
      }
    } else {
      // 如若不存在則生成存儲組節(jié)點并添加到對應(yīng)位置 (當(dāng)前節(jié)點的子節(jié)點)
      StorageGroupMNode storageGroupMNode =
          new StorageGroupMNode(
              cur, nodeNames[i], IoTDBDescriptor.getInstance().getConfig().getDefaultTTL());
      cur.addChild(nodeNames[i], storageGroupMNode);
    }
  }
  • createTimeseries

設(shè)置存儲組主要通過MTree下的createTimeseries接口來實現(xiàn)

// MManager 中的 createTimeseries
public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
    // 檢查還能否寫入新的時間序列
    if (!allowToCreateNewSeries) {
      throw new MetadataException(
          "IoTDB system load is too large to create timeseries, "
              + "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat and restart");
    }
    try {
      PartialPath path = plan.getPath();
      SchemaUtils.checkDataTypeWithEncoding(plan.getDataType(), plan.getEncoding());
      
      // 確保獲取存儲組路徑
      ensureStorageGroup(path);

      TSDataType type = plan.getDataType();
      // 利用 MTree 生成 timeseries
      MeasurementMNode leafMNode =
          mtree.createTimeseries(
              path,
              type,
              plan.getEncoding(),
              plan.getCompressor(),
              plan.getProps(),
              plan.getAlias());

      // 更新 tag
      if (plan.getTags() != null) {
        // tag key, tag value
        for (Entry<String, String> entry : plan.getTags().entrySet()) {
          if (entry.getKey() == null || entry.getValue() == null) {
            continue;
          }
          tagIndex
              .computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
              .computeIfAbsent(entry.getValue(), v -> new CopyOnWriteArraySet<>())
              .add(leafMNode);
        }
      }

      // 更新 statistics 和 schemaDataTypeNumMap
      totalSeriesNumber.addAndGet(1);
      if (totalSeriesNumber.get() * ESTIMATED_SERIES_SIZE >= MTREE_SIZE_THRESHOLD) {
        logger.warn("Current series number {} is too large...", totalSeriesNumber);
        allowToCreateNewSeries = false;
      }
      updateSchemaDataTypeNumMap(type, 1);

      // 寫 log
      if (!isRecovering) {
        // either tags or attributes is not empty
        if ((plan.getTags() != null && !plan.getTags().isEmpty())
            || (plan.getAttributes() != null && !plan.getAttributes().isEmpty())) {
          offset = tagLogFile.write(plan.getTags(), plan.getAttributes());
        }
        plan.setTagOffset(offset);
        logWriter.createTimeseries(plan);
      }
      leafMNode.setOffset(offset);

    } catch (IOException e) {
      throw new MetadataException(e);
    }
  }

// MTree 下的 createTimeseries
MeasurementMNode createTimeseries(
      PartialPath path,
      TSDataType dataType,
      TSEncoding encoding,
      CompressionType compressor,
      Map<String, String> props,
      String alias)
      throws MetadataException {
    // 獲取分級 path
    String[] nodeNames = path.getNodes();
    // 判斷是否是合法的 Timeseries 路徑 (path長度不小于3 (root -> [storage group] -> device -> Timeseries))
    if (nodeNames.length <= 2 || !nodeNames[0].equals(root.getName())) {
      throw new IllegalPathException(path.getFullPath());
    }
    // 判斷是否是合法的 Timeseries 路徑 (使用正則式判斷)
    checkTimeseries(path.getFullPath());
    MNode cur = root;
    boolean hasSetStorageGroup = false;
    Template upperTemplate = cur.getDeviceTemplate();
    // e.g, path = root.sg.d1.s1檬贰,構(gòu)建中間節(jié)點姑廉,并讓 cur 設(shè)置為 d1
    // 從 root 后第一層節(jié)點開始構(gòu)建路徑 (measurement 之前)
    for (int i = 1; i < nodeNames.length - 1; i++) {
      String nodeName = nodeNames[i];
      // cur 指向 storage group node 時
      if (cur instanceof StorageGroupMNode) {
        hasSetStorageGroup = true;
      }
      if (!cur.hasChild(nodeName)) {
        if (!hasSetStorageGroup) {
          throw new StorageGroupNotSetException("Storage group should be created first");
        }
        cur.addChild(nodeName, new MNode(cur, nodeName));
      }
      cur = cur.getChild(nodeName);

      if (cur.getDeviceTemplate() != null) {
        upperTemplate = cur.getDeviceTemplate();
      }
    }

    if (upperTemplate != null && !upperTemplate.isCompatible(path)) {
      throw new PathAlreadyExistException(
          path.getFullPath() + " ( which is incompatible with template )");
    }

    if (props != null && props.containsKey(LOSS) && props.get(LOSS).equals(SDT)) {
      checkSDTFormat(path.getFullPath(), props);
    }

    // 獲取葉節(jié)點 (measurement 節(jié)點) 名稱
    String leafName = nodeNames[nodeNames.length - 1];

    // 保持添加葉節(jié)點以及 alias 過程的原子性
    // 將寫部分設(shè)置為 synchronized (上鎖)
    synchronized (this) {
      MNode child = cur.getChild(leafName);
      if (child instanceof MeasurementMNode || child instanceof StorageGroupMNode) {
        throw new PathAlreadyExistException(path.getFullPath());
      }

      if (alias != null) {
        MNode childByAlias = cur.getChild(alias);
        // 判斷該設(shè)備節(jié)點下是否已經(jīng)存在同名 alias
        if (childByAlias instanceof MeasurementMNode) {
          throw new AliasAlreadyExistException(path.getFullPath(), alias);
        }
      }

      // 生成葉節(jié)點
      MeasurementMNode measurementMNode =
          new MeasurementMNode(cur, leafName, alias, dataType, encoding, compressor, props);
      if (child != null) {
        // 如若存在重名的葉節(jié)點,則覆蓋掉
        cur.replaceChild(measurementMNode.getName(), measurementMNode);
      } else {
        // 如若不存在翁涤,則直接添加該葉節(jié)點
        cur.addChild(leafName, measurementMNode);
      }

      // 添加 alias
      if (alias != null) {
        cur.addAlias(alias, measurementMNode);
      }

      return measurementMNode;
    }
    // 解鎖 
  }
  • deleteStorageGroups

設(shè)置存儲組主要通過MTree下的deleteStorageGroups接口來實現(xiàn)

// MManager 中的 deleteStorageGroups
public void deleteStorageGroups(List<PartialPath> storageGroups) throws MetadataException {
    try {
      // 遍歷要刪除的存儲組
      for (PartialPath storageGroup : storageGroups) {
        totalSeriesNumber.addAndGet(mtree.getAllTimeseriesCount(storageGroup));
        // 清除 MNode Cache
        if (!allowToCreateNewSeries
            && totalSeriesNumber.get() * ESTIMATED_SERIES_SIZE < MTREE_SIZE_THRESHOLD) {
          logger.info("Current series number {} come back to normal level", totalSeriesNumber);
          allowToCreateNewSeries = true;
        }
        mNodeCache.clear();

        // 刪除存儲組
        List<MeasurementMNode> leafMNodes = mtree.deleteStorageGroup(storageGroup);
        for (MeasurementMNode leafMNode : leafMNodes) {
          removeFromTagInvertedIndex(leafMNode);
          // 更新 statistics (in schemaDataTypeNumMap)
          updateSchemaDataTypeNumMap(leafMNode.getSchema().getType(), -1);
        }

        // 刪除 triggers
        TriggerEngine.drop(leafMNodes);

        if (!config.isEnableMemControl()) {
          MemTableManager.getInstance().addOrDeleteStorageGroup(-1);
        }

        // 正常進(jìn)行完成時桥言,寫 log
        if (!isRecovering) {
          logWriter.deleteStorageGroup(storageGroup);
        }
      }
    } catch (IOException e) {
      throw new MetadataException(e.getMessage());
    }
  }

// MTree 中的 deleteStorageGroups
List<MeasurementMNode> deleteStorageGroup(PartialPath path) throws MetadataException {
    // 設(shè)置當(dāng)前節(jié)點為目標(biāo)節(jié)點
    MNode cur = getNodeByPath(path);
    // 當(dāng)前節(jié)點并非存儲組節(jié)點時,拋出錯誤
    if (!(cur instanceof StorageGroupMNode)) {
      throw new StorageGroupNotSetException(path.getFullPath());
    }
    // 假設(shè)正在一個包含 root.a.b.sg1, root.a.sg2 路徑的元數(shù)據(jù)樹下刪除 root.a.b.sg1
    // 刪除 sg1
    cur.getParent().deleteChild(cur.getName());

    // 獲取該存儲組下所有葉節(jié)點
    List<MeasurementMNode> leafMNodes = new LinkedList<>();
    Queue<MNode> queue = new LinkedList<>();
    queue.add(cur);
    // 遞歸加載子節(jié)點葵礼,獲取其下所有葉節(jié)點
    while (!queue.isEmpty()) {
      MNode node = queue.poll();
      for (MNode child : node.getChildren().values()) {
        if (child instanceof MeasurementMNode) {
          leafMNodes.add((MeasurementMNode) child);
        } else {
          queue.add(child);
        }
      }
    }
    
    // 設(shè)置當(dāng)前節(jié)點為目標(biāo)節(jié)點父節(jié)點
    cur = cur.getParent();
    // 如果刪除目標(biāo)節(jié)點后号阿,父節(jié)點為空,則刪除父節(jié)點鸳粉,并遞歸向上重復(fù)該過程
    while (!IoTDBConstant.PATH_ROOT.equals(cur.getName()) && cur.getChildren().size() == 0) {
      cur.getParent().deleteChild(cur.getName());
      cur = cur.getParent();
    }
    return leafMNodes;
  }
  • showTimeseriesWithoutIndex

主要通過調(diào)用MTree的對應(yīng)接口getAllMeasurementSchemaByHeatOrder或者getAllMeasurementSchema實現(xiàn)扔涧,這里選取getAllMeasurementSchema進(jìn)行分析

// MManager 中的 showTimeseriesWithoutIndex
private List<ShowTimeSeriesResult> showTimeseriesWithoutIndex(
      ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
    List<Pair<PartialPath, String[]>> ans;
    // 對于熱度順序排序的情況
    if (plan.isOrderByHeat()) {
      ans = mtree.getAllMeasurementSchemaByHeatOrder(plan, context);
    } else {
      // 對于未使用熱度順序排序的情況
      ans = mtree.getAllMeasurementSchema(plan);
    }
    // 初始化
    List<ShowTimeSeriesResult> res = new LinkedList<>();
    for (Pair<PartialPath, String[]> ansString : ans) {
      long tagFileOffset = Long.parseLong(ansString.right[5]);
      try {
        Pair<Map<String, String>, Map<String, String>> tagAndAttributePair =
            new Pair<>(Collections.emptyMap(), Collections.emptyMap());
        if (tagFileOffset >= 0) {
          tagAndAttributePair = tagLogFile.read(config.getTagAttributeTotalSize(), tagFileOffset);
        }
        // 添加查找結(jié)果
        res.add(
            new ShowTimeSeriesResult(
                ansString.left.getFullPath(),
                ansString.right[0],
                ansString.right[1],
                TSDataType.valueOf(ansString.right[2]),
                TSEncoding.valueOf(ansString.right[3]),
                CompressionType.valueOf(ansString.right[4]),
                tagAndAttributePair.left,
                tagAndAttributePair.right));
      } catch (IOException e) {
        throw new MetadataException(
            "Something went wrong while deserialize tag info of " + ansString.left.getFullPath(),
            e);
      }
    }
    return res;
  }

// MTree 中的 getAllMeasurementSchema
List<Pair<PartialPath, String[]>> getAllMeasurementSchema(
      ShowTimeSeriesPlan plan, boolean removeCurrentOffset) throws MetadataException {
    List<Pair<PartialPath, String[]>> res = new LinkedList<>();
    // 獲取分層節(jié)點名稱
    String[] nodes = plan.getPath().getNodes(); 
    if (nodes.length == 0 || !nodes[0].equals(root.getName())) {
      throw new IllegalPathException(plan.getPath().getFullPath());
    }
    // 設(shè)置 limit
    limit.set(plan.getLimit()); 
    //設(shè)置 offset
    offset.set(plan.getOffset()); 
    curOffset.set(-1);
    count.set(0);
    // 查詢目標(biāo)時間序列
    findPath(root, nodes, 1, res, offset.get() != 0 || limit.get() != 0, false, null, null);
    // 避免內(nèi)存泄漏
    limit.remove();
    offset.remove();
    if (removeCurrentOffset) {
      curOffset.remove();
    }
    count.remove();
    return res;
  }

// 其中的 findPath 部分
private void findPath(
      MNode node,
      String[] nodes,
      int idx,
      List<Pair<PartialPath, String[]>> timeseriesSchemaList,
      boolean hasLimit,
      boolean needLast,
      QueryContext queryContext,
      Template upperTemplate)
      throws MetadataException {
    if (node instanceof MeasurementMNode && nodes.length <= idx) {
      if (hasLimit) {
        curOffset.set(curOffset.get() + 1);
        if (curOffset.get() < offset.get() || count.get().intValue() == limit.get().intValue()) {
          return;
        }
      }
      IMeasurementSchema measurementSchema = ((MeasurementMNode) node).getSchema();
      if (measurementSchema instanceof MeasurementSchema) {
        // 查詢完葉節(jié)點時
        addMeasurementSchema(
            node, timeseriesSchemaList, needLast, queryContext, measurementSchema, "*");
      } else if (measurementSchema instanceof VectorMeasurementSchema) {
        String lastWord = nodes[nodes.length - 1];
        addVectorMeasurementSchema(
            node,
            timeseriesSchemaList,
            needLast,
            queryContext,
            measurementSchema,
            nodes.length == idx ? lastWord : "*");
      }
      if (hasLimit) {
        count.set(count.get() + 1);
      }
    }

    // 獲取下一層節(jié)點名稱
    String nodeReg = MetaUtils.getNodeRegByIdx(idx, nodes); 
    if (node.getDeviceTemplate() != null) {
      upperTemplate = node.getDeviceTemplate();
    }

    // 判斷是否使用通配符*;或者查詢完葉節(jié)點
    if (!nodeReg.contains(PATH_WILDCARD)) {
      MNode next = null;
      // 判斷是否是并列表示的節(jié)點
      if (nodeReg.contains("(") && nodeReg.contains(",")) { 
        next = node.getChildOfAlignedTimeseries(nodeReg);
      } else {
        // 獲取下一層節(jié)點
        next = node.getChild(nodeReg);
      }
      if (next != null) {
        // 下一層節(jié)點非空時遞歸查詢
        findPath(
            next,
            nodes,
            idx + 1,
            timeseriesSchemaList,
            hasLimit,
            needLast,
            queryContext,
            upperTemplate);
      }
    } else {
      // 對于存在通配符時遍歷該節(jié)點所有子節(jié)點
      for (MNode child : node.getDistinctMNodes()) {
        boolean continueSearch = false;
        if (child instanceof MeasurementMNode
            && ((MeasurementMNode) child).getSchema() instanceof VectorMeasurementSchema) {
          // 對于 VectorMeasurementSchema 的 MeasurementMNode 的情況
          List<String> measurementsList =
              ((MeasurementMNode) child).getSchema().getValueMeasurementIdList();
          for (String measurement : measurementsList) {
            // 判斷是否查到盡頭
            if (Pattern.matches(nodeReg.replace("*", ".*"), measurement)) {
              continueSearch = true;
            }
          }
        } else {
          // 判斷是否查到盡頭
          if (Pattern.matches(nodeReg.replace("*", ".*"), child.getName())) {
            continueSearch = true;
          }
        }
        if (!continueSearch) {
          continue;
        }
      
        // 遞歸查詢
        findPath(
            child,
            nodes,
            idx + 1,
            timeseriesSchemaList,
            hasLimit,
            needLast,
            queryContext,
            upperTemplate);
        if (hasLimit && count.get().intValue() == limit.get().intValue()) {
          break;
        }
      }
    }

    // 使用 template 時的部分
    if (!(node instanceof MeasurementMNode) && node.isUseTemplate()) {
      if (upperTemplate != null) {
        HashSet<IMeasurementSchema> set = new HashSet<>();
        for (IMeasurementSchema schema : upperTemplate.getSchemaMap().values()) {
          if (set.add(schema)) {
            if (schema instanceof MeasurementSchema) {
              addMeasurementSchema(
                  new MeasurementMNode(node, schema.getMeasurementId(), schema, null),
                  timeseriesSchemaList,
                  needLast,
                  queryContext,
                  schema,
                  nodeReg);
            } else if (schema instanceof VectorMeasurementSchema) {
              String firstNode = schema.getValueMeasurementIdList().get(0);
              addVectorMeasurementSchema(
                  new MeasurementMNode(node, firstNode, schema, null),
                  timeseriesSchemaList,
                  needLast,
                  queryContext,
                  schema,
                  nodeReg);
            }
          }
        }
      }
    }
  }

Log 文件

元數(shù)據(jù)日志管理

所有元數(shù)據(jù)的操作均會記錄到元數(shù)據(jù)日志文件中,此文件默認(rèn)為 data/system/schema/mlog.bin枯夜。

系統(tǒng)重啟時會重做 mlog 中的日志弯汰,重做之前需要標(biāo)記不需要記錄日志。當(dāng)重啟結(jié)束后湖雹,標(biāo)記需要記錄日志咏闪。

元數(shù)據(jù)日志的類型由 MetadataOperationType 類記錄。mlog 直接存儲字符串編碼摔吏。

源碼分析

  • 一些 sql 對應(yīng)的 mlog 記錄
set storage group to root.turbine --> 2,root.turbine
delete storage group root.turbine --> 1,root.turbine
create timeseries root.turbine.d1.s1(temprature) with datatype=FLOAT, encoding=RLE, compression=SNAPPY tags(tag1=v1, tag2=v2) attributes(attr1=v1, attr2=v2) --> 0,root.turbine.d1.s1,3,2,1,,溫度,offset

標(biāo)簽文件

所有時間序列的標(biāo)簽/屬性信息都會保存在標(biāo)簽文件中鸽嫂,此文件默認(rèn)為 data/system/schema/tlog.txt。

  • 每條時間序列的tagsattributes持久化后總字節(jié)數(shù)為 L舔腾,在 iotdb-engine.properties 中配置溪胶。

  • 持久化內(nèi)容:Map<String,String> tagsMap<String,String> attributes稳诚,如果內(nèi)容不足 L哗脖,則需補(bǔ)空。

源碼分析

六種針對時間序列的標(biāo)簽與屬性的操作則直接作為MManager的內(nèi)置函數(shù)出現(xiàn):renameTagOrAttributeKey扳还,setTagsOrAttributesValue才避,dropTagsOrAttributesaddTags氨距,addAttributes桑逝,upsertTagsAndAttributes

在這些基本操作之中這里主要分析addAttributes

public void addAttributes(Map<String, String> attributesMap, PartialPath fullPath)
      throws MetadataException, IOException {
    // 獲取目標(biāo)節(jié)點
    MNode mNode = mtree.getNodeByPath(fullPath);
    // 如若目標(biāo)節(jié)點非是 MeasurementMNode俏让,拋出異常
    if (!(mNode instanceof MeasurementMNode)) {
      throw new PathNotExistException(fullPath.getFullPath());
    }
    MeasurementMNode leafMNode = (MeasurementMNode) mNode;
    // 沒有 tag 或者attribute 的情況楞遏,需要在 tlog 中添加記錄
    if (leafMNode.getOffset() < 0) {
      long offset = tagLogFile.write(Collections.emptyMap(), attributesMap);
      logWriter.changeOffset(fullPath, offset);
      leafMNode.setOffset(offset);
      return;
    }

    Pair<Map<String, String>, Map<String, String>> pair =
        tagLogFile.read(config.getTagAttributeTotalSize(), leafMNode.getOffset());

    // 遍歷需要添加的 attribute
    for (Entry<String, String> entry : attributesMap.entrySet()) {
      // 獲取鍵
      String key = entry.getKey();
      // 獲取值
      String value = entry.getValue();
      // 目標(biāo) attribute 存在時,拋出異常
      if (pair.right.containsKey(key)) {
        throw new MetadataException(
            String.format("TimeSeries [%s] already has the attribute [%s].", fullPath, key));
      }
      // 不存在時首昔,進(jìn)行添加
      pair.right.put(key, value);
    }

    // 持久化
    tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
  }

MTree 檢查點

為了加快 IoTDB 重啟速度寡喝,IoTDB為MTree設(shè)置了檢查點,這樣避免了在重啟時按行讀取并復(fù)現(xiàn) mlog.bin 中的信息勒奇。創(chuàng)建MTree的快照有兩種方式:

  • 自動創(chuàng)建预鬓。每隔10分鐘,后臺線程檢查MTree的最后修改時間赊颠,需要同時滿足:用戶超過1小時(可配置)沒修改MTree且 mlog.bin 中積累了100000行日志(可配置)兩個條件格二;

  • 手動創(chuàng)建。使用create snapshot for schema命令手動觸發(fā)創(chuàng)建 MTree 快照竣蹦。

創(chuàng)建過程

  1. 首先給MTree加讀鎖顶猜,防止創(chuàng)建快照過程中對其進(jìn)行修改
  2. MTree序列化進(jìn)臨時 snapshot 文件(mtree.snapshot.tmp)。MTree的序列化采用“先子節(jié)點痘括、后父節(jié)點”的深度優(yōu)先序列化方式驶兜,將節(jié)點的信息按照類型轉(zhuǎn)化成對應(yīng)格式的字符串,便于反序列化時讀取和組裝MTree
    其中抄淑,字符串轉(zhuǎn)化格式如下:
  • 普通節(jié)點:0,名字,子節(jié)點個數(shù)
  • 存儲組節(jié)點:1,名字,TTL,子節(jié)點個數(shù)
  • 傳感器節(jié)點:2,名字,別名,數(shù)據(jù)類型,編碼,壓縮方式,屬性,偏移量,子節(jié)點個數(shù)
  1. 序列化結(jié)束后屠凶,將臨時文件重命名為正式文件(mtree.snapshot),防止在序列化過程中出現(xiàn)服務(wù)器人為或意外關(guān)閉肆资,導(dǎo)致序列化失敗的情況矗愧。
  2. 調(diào)用MLogWriter.clear()方法,清空 mlog.bin
  3. 釋放MTree讀鎖

源碼分析

public void createMTreeSnapshot() {
    long time = System.currentTimeMillis();
    logger.info("Start creating MTree snapshot to {}", mtreeSnapshotPath);
    try {
      // 將 MTree 序列化進(jìn)臨時 snapshot 文件 mtreeSnapshotTmpPath
      mtree.serializeTo(mtreeSnapshotTmpPath);
      // 獲取對應(yīng)臨時快照與快照文件
      File tmpFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath);
      File snapshotFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
      // 如果路徑下存在快照文件則先刪除
      if (snapshotFile.exists()) {
        Files.delete(snapshotFile.toPath());
      }
      // 將臨時文件重命名為正式文件
      if (tmpFile.renameTo(snapshotFile)) {
        logger.info(
            "Finish creating MTree snapshot to {}, spend {} ms.",
            mtreeSnapshotPath,
            System.currentTimeMillis() - time);
      }
      // 調(diào)用 clear() 方法郑原,清空 mlog.bin
      logWriter.clear();
    } catch (IOException e) {
      logger.warn("Failed to create MTree snapshot to {}", mtreeSnapshotPath, e);
      if (SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath).exists()) {
        try {
          Files.delete(SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath).toPath());
        } catch (IOException e1) {
          logger.warn("delete file {} failed: {}", mtreeSnapshotTmpPath, e1.getMessage());
        }
      }
    }
  }

// 其中序列化的時候從子節(jié)點由下往上序列化唉韭,按照深度優(yōu)先順序
public void serializeTo(MLogWriter logWriter) throws IOException {
    serializeChildren(logWriter);

    logWriter.serializeStorageGroupMNode(this);
  }

public void serializeTo(MLogWriter logWriter) throws IOException {
    serializeChildren(logWriter);

    logWriter.serializeMNode(this);
  }

void serializeChildren(MLogWriter logWriter) throws IOException {
    if (children == null) {
      return;
    }
    for (Entry<String, MNode> entry : children.entrySet()) {
      entry.getValue().serializeTo(logWriter);
    }
  }

恢復(fù)過程

  1. 檢查臨時文件 mtree.snapshot.tmp 是否存在,如果存在證明在創(chuàng)建快照的序列化過程中出現(xiàn)服務(wù)器人為或意外關(guān)閉犯犁,導(dǎo)致序列化失敗属愤,刪除臨時文件;
  2. 檢查快照文件 mtree.snapshot 是否存在酸役。如果不存在住诸,則使用新的MTree;否則啟動反序列化過程涣澡,得到MTree
  3. 對于 mlog.bin 中的內(nèi)容贱呐,逐行讀取并操作,完成MTree的恢復(fù)入桂。讀取過程中更新logNumber奄薇,并返回,用于后面mlog.bin行數(shù)的記錄抗愁。

源碼分析

加載快照并通過反序列化得到MTree后讀取 mlog.bin 時馁蒂,通過 log 中的記錄生成 redo 的plan,然后通過調(diào)用operation來進(jìn)行操作的執(zhí)行:

public void operation(PhysicalPlan plan) throws IOException, MetadataException {
    switch (plan.getOperatorType()) {
      case CREATE_TIMESERIES:
        CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan;
        createTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset());
        break;
      case CREATE_ALIGNED_TIMESERIES:
        CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
            (CreateAlignedTimeSeriesPlan) plan;
        createAlignedTimeSeries(createAlignedTimeSeriesPlan);
        break;
      case DELETE_TIMESERIES:
        DeleteTimeSeriesPlan deleteTimeSeriesPlan = (DeleteTimeSeriesPlan) plan;
        // cause we only has one path for one DeleteTimeSeriesPlan
        deleteTimeseries(deleteTimeSeriesPlan.getPaths().get(0));
        break;
      case SET_STORAGE_GROUP:
        SetStorageGroupPlan setStorageGroupPlan = (SetStorageGroupPlan) plan;
        setStorageGroup(setStorageGroupPlan.getPath());
        break;
      case DELETE_STORAGE_GROUP:
        DeleteStorageGroupPlan deleteStorageGroupPlan = (DeleteStorageGroupPlan) plan;
        deleteStorageGroups(deleteStorageGroupPlan.getPaths());
        break;
      case TTL:
        SetTTLPlan setTTLPlan = (SetTTLPlan) plan;
        setTTL(setTTLPlan.getStorageGroup(), setTTLPlan.getDataTTL());
        break;
      case CHANGE_ALIAS:
        ChangeAliasPlan changeAliasPlan = (ChangeAliasPlan) plan;
        changeAlias(changeAliasPlan.getPath(), changeAliasPlan.getAlias());
        break;
      case CHANGE_TAG_OFFSET:
        ChangeTagOffsetPlan changeTagOffsetPlan = (ChangeTagOffsetPlan) plan;
        changeOffset(changeTagOffsetPlan.getPath(), changeTagOffsetPlan.getOffset());
        break;
      case CREATE_TEMPLATE:
        CreateTemplatePlan createTemplatePlan = (CreateTemplatePlan) plan;
        createDeviceTemplate(createTemplatePlan);
        break;
      case SET_DEVICE_TEMPLATE:
        SetDeviceTemplatePlan setDeviceTemplatePlan = (SetDeviceTemplatePlan) plan;
        setDeviceTemplate(setDeviceTemplatePlan);
        break;
      case SET_USING_DEVICE_TEMPLATE:
        SetUsingDeviceTemplatePlan setUsingDeviceTemplatePlan = (SetUsingDeviceTemplatePlan) plan;
        setUsingDeviceTemplate(setUsingDeviceTemplatePlan);
        break;
      case AUTO_CREATE_DEVICE_MNODE:
        AutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan = (AutoCreateDeviceMNodePlan) plan;
        autoCreateDeviceMNode(autoCreateDeviceMNodePlan);
        break;
      default:
        logger.error("Unrecognizable command {}", plan.getOperatorType());
    }
  }

具體恢復(fù)過程如下:

private int initFromLog(File logFile) throws IOException {
    File tmpFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath);
    // 檢查臨時文件 mtree.snapshot.tmp 是否存在
    if (tmpFile.exists()) {
      logger.warn("Creating MTree snapshot not successful before crashing...");
      // 如果存在證明在創(chuàng)建快照的序列化過程中出現(xiàn)服務(wù)器人為或意外關(guān)閉蜘腌,導(dǎo)致序列化失敗沫屡,刪除臨時文件
      Files.delete(tmpFile.toPath());
    }

    // 檢查快照文件 mtree.snapshot 是否存在
    File mtreeSnapshot = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
    long time = System.currentTimeMillis();
    // 判斷快照是否存在   
    if (!mtreeSnapshot.exists()) {
      // 快照不存在的時候使用新的 MTree
      mtree = new MTree();
    } else {
      // 快照存在的時候,反序列話快照逢捺,然后利用其生成 MTree
      mtree = MTree.deserializeFrom(mtreeSnapshot);
      logger.debug(
          "spend {} ms to deserialize mtree from snapshot", System.currentTimeMillis() - time);
    }

    time = System.currentTimeMillis();
    // 判斷 log 是否存在谁鳍,存在時通過 log 恢復(fù)元數(shù)據(jù)
    if (logFile.exists()) {
      int idx = 0;
      try (MLogReader mLogReader =
          new MLogReader(config.getSchemaDir(), MetadataConstant.METADATA_LOG); ) {
        idx = applyMlog(mLogReader);
        logger.debug(
            "spend {} ms to deserialize mtree from mlog.bin", System.currentTimeMillis() - time);
        return idx;
      } catch (Exception e) {
        throw new IOException("Failed to parser mlog.bin for err:" + e);
      }
    } else {
      return 0;
    }
  }

  private int applyMlog(MLogReader mLogReader) {
    int idx = 0;
    // 不斷按順序讀取 log 中的記錄
    while (mLogReader.hasNext()) {
      PhysicalPlan plan = null;
      try {
        // 根據(jù) log 中的記錄生成plan
        plan = mLogReader.next();
        if (plan == null) {
          continue;
        }
        // 執(zhí)行操作
        operation(plan);
        idx++;
      } catch (Exception e) {
        logger.error(
            "Can not operate cmd {} for err:", plan == null ? "" : plan.getOperatorType(), e);
      }
    }
    return idx;
  }

  1. 物聯(lián)網(wǎng)時序數(shù)據(jù)庫 Apache IoTDB癞季,詳細(xì)信息可以在https://iotdb.apache.org/中找到劫瞳。 ?

  2. https://iotdb.apache.org/zh/SystemDesign/SchemaManager/SchemaManager.html ?

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市绷柒,隨后出現(xiàn)的幾起案子志于,更是在濱河造成了極大的恐慌,老刑警劉巖废睦,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件伺绽,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)奈应,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進(jìn)店門澜掩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人杖挣,你說我怎么就攤上這事肩榕。” “怎么了惩妇?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵株汉,是天一觀的道長。 經(jīng)常有香客問我歌殃,道長乔妈,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任氓皱,我火速辦了婚禮路召,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘匀泊。我一直安慰自己优训,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布各聘。 她就那樣靜靜地躺著揣非,像睡著了一般。 火紅的嫁衣襯著肌膚如雪躲因。 梳的紋絲不亂的頭發(fā)上早敬,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天,我揣著相機(jī)與錄音大脉,去河邊找鬼搞监。 笑死,一個胖子當(dāng)著我的面吹牛镰矿,可吹牛的內(nèi)容都是我干的琐驴。 我是一名探鬼主播,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼秤标,長吁一口氣:“原來是場噩夢啊……” “哼绝淡!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起苍姜,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤牢酵,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后衙猪,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體馍乙,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡布近,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了丝格。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片撑瞧。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖显蝌,靈堂內(nèi)的尸體忽然破棺而出季蚂,到底是詐尸還是另有隱情,我是刑警寧澤琅束,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布扭屁,位于F島的核電站,受9級特大地震影響涩禀,放射性物質(zhì)發(fā)生泄漏料滥。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一艾船、第九天 我趴在偏房一處隱蔽的房頂上張望葵腹。 院中可真熱鬧,春花似錦屿岂、人聲如沸践宴。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽阻肩。三九已至,卻和暖如春运授,著一層夾襖步出監(jiān)牢的瞬間烤惊,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工吁朦, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留柒室,地道東北人。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓逗宜,卻偏偏與公主長得像雄右,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子纺讲,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內(nèi)容