說在前面: 這個list記錄了博主在學(xué)習(xí)IoTDB[1]期間的總結(jié)和思考湘换,歡迎一起討論學(xué)習(xí)哈
相關(guān)介紹可以參考list的第一篇博客:[IoTDB 學(xué)習(xí)筆記] [part1] 介紹
MManager 基本功能及結(jié)構(gòu)
MManager 提供了7種需要記錄日志的針對時間序列節(jié)點和存儲組節(jié)點的操作以及六種針對時間序列的標(biāo)簽
tag
與屬性attribute
的操作蛤袒,每個操作在操作前先獲得整個元數(shù)據(jù)的寫鎖,操作完后釋放:
- 創(chuàng)建時間序列
- 刪除時間序列
- 設(shè)置存儲組
- 刪除存儲組
- 設(shè)置TTL
- 改變時間序列標(biāo)簽信息offset
- 改變時間序列的別名
tag
&attribute
operation:
- 重命名標(biāo)簽或?qū)傩?/li>
- 重新設(shè)置標(biāo)簽或?qū)傩缘闹?/li>
- 刪除已經(jīng)存在的標(biāo)簽或?qū)傩?/li>
- 添加新的標(biāo)簽
- 添加新的屬性
- 更新插入標(biāo)簽和屬性
MManager
中的元數(shù)據(jù)主要以元數(shù)據(jù)樹的形式存在,樹中包含三種節(jié)點:StorageGroupMNode
慈俯、InternalMNode
(非葉子節(jié)點)旋圆、LeafMNode
(葉子節(jié)點),他們都是MNode
的子類用狱。
每個InternalMNode
中都有一個讀寫鎖运怖,查詢元數(shù)據(jù)信息時,需要獲得路徑上每一個InternalMNode
的讀鎖夏伊,修改元數(shù)據(jù)信息時摇展,如果修改的是LeafMNode
,需要獲得其父節(jié)點的寫鎖溺忧,若修改的是InternalMNode
咏连,則只需獲得本身的寫鎖。若該InternalMNode
位于 Device 層鲁森,則還包含了一個Map<String, MNode> aliasChildren
祟滴,用于存儲別名信息;
StorageGroupMNode
繼承InternalMNode
歌溉,作為存儲組的節(jié)點垄懂;
LeafMNode
中包含了對應(yīng)時間序列的 Schema 信息,其alias
以及該時間序列的標(biāo)簽/屬性信息在 tlog 文件中的offset
痛垛。
一個示例如圖 1 [2]所示草慧,示例中的整個元數(shù)據(jù)樹分為4層,root 層匙头,storage group 層漫谷,device 層以及 measurement 層。
MTree
中提供了用于創(chuàng)建或刪除存儲組與時間序列的操作蹂析。
在創(chuàng)建存儲組的時候抖剿,首先創(chuàng)建所有中間節(jié)點,然后確保路徑前綴中不包含其他的存儲組(存儲組間互不包含)识窿,然后再確定目標(biāo)的存儲組不存在斩郎,如若前述的條件均滿足,則創(chuàng)建存儲組節(jié)點喻频,并添加到相應(yīng)的路徑位置下缩宜。
在創(chuàng)建時間序列時,首先確保其中間節(jié)點均存在,然后確定目標(biāo)葉節(jié)點不存在锻煌,如若均滿足則創(chuàng)建葉節(jié)點妓布,如有alias
則再創(chuàng)建一個目標(biāo)節(jié)點的兄弟節(jié)點指向該葉節(jié)點。
在刪除存儲組或時間序列的時候宋梧,先將目標(biāo)節(jié)點在其父節(jié)點中的記錄刪除匣沼,然后如若刪除后父節(jié)點為空則遞歸向上刪除。
同時捂龄,MManager
還提供了元數(shù)據(jù)查詢的功能:
不帶過濾條件的元數(shù)據(jù)查詢
根據(jù)是否需要根據(jù)熱度排序释涛,調(diào)用getAllMeasurementSchemaByHeatOrder
(需要) 或者getAllMeasurementSchema
(不需要)。帶過濾條件的元數(shù)據(jù)查詢
其中過濾條件只能是tag屬性倦沧。
通過在MManager中維護(hù)的tag的倒排索引唇撬,獲得所有滿足索引條件的MeasurementMNode
。如若需要根據(jù)熱度排序則根據(jù)lastTimeStamp
排序展融,反之根據(jù)序列名的字母序排序窖认。
其中,如果元數(shù)據(jù)較多告希,一次輸出的查詢結(jié)果可能導(dǎo)致OOM扑浸,此時考慮使用fetch size參數(shù) (服務(wù)器端一次最多只取 fetch size 個時間序列)。
源碼分析
MManager 基本結(jié)構(gòu)
MManager 包含如下內(nèi)部屬性
public static final String TIME_SERIES_TREE_HEADER = "=== Timeseries Tree ===\n\n";
private static final String TAG_FORMAT = "tag key is %s, tag value is %s, tlog offset is %d";
private static final String DEBUG_MSG = "%s : TimeSeries %s is removed from tag inverted index, ";
private static final String DEBUG_MSG_1 =
"%s: TimeSeries %s's tag info has been removed from tag inverted index ";
private static final String PREVIOUS_CONDITION =
"before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b";
private static final int UPDATE_SCHEMA_MAP_IN_ARRAYPOOL_THRESHOLD = 5000;
private static final Logger logger = LoggerFactory.getLogger(MManager.class);
// 用于生成自動快照的判斷 MTree 是否修改的時間間隔閾值
private static final long MTREE_SNAPSHOT_THREAD_CHECK_TIME = 600L;
private final int mtreeSnapshotInterval;
private final long mtreeSnapshotThresholdTime;
private String logFilePath;
private String mtreeSnapshotPath;
private String mtreeSnapshotTmpPath;
// 元數(shù)據(jù)信息存儲在MTree當(dāng)中
private MTree mtree;
private MLogWriter logWriter;
private TagLogFile tagLogFile;
private boolean isRecovering;
// device -> DeviceMNode
private RandomDeleteCache<PartialPath, Pair<MNode, Template>> mNodeCache;
// tag key -> tag value -> LeafMNode
private Map<String, Map<String, Set<MeasurementMNode>>> tagIndex = new ConcurrentHashMap<>();
// data type -> number
private Map<TSDataType, Integer> schemaDataTypeNumMap = new ConcurrentHashMap<>();
private long reportedDataTypeTotalNum;
private AtomicLong totalSeriesNumber = new AtomicLong();
private boolean initialized;
protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private File logFile;
private ScheduledExecutorService timedCreateMTreeSnapshotThread;
private ScheduledExecutorService timedForceMLogThread;
// MTree 的大小閾值
private static final long MTREE_SIZE_THRESHOLD = config.getAllocateMemoryForSchema();
private boolean allowToCreateNewSeries = true;
private static final int ESTIMATED_SERIES_SIZE = config.getEstimatedSeriesSize();
// template name -> template
private Map<String, Template> templateMap = new ConcurrentHashMap<>();
基本操作
節(jié)點的基本操作通過MManager
的內(nèi)部函數(shù)實現(xiàn)燕偶,其中多數(shù)操作主要通過調(diào)用MTree
對應(yīng)方法實現(xiàn)喝噪。
在這些基本操作之中這里主要分析setStorageGroup
,createTimeseries
杭跪,deleteStorageGroups
,showTimeseriesWithoutIndex
:
- setStorageGroup
設(shè)置存儲組主要通過MTree
下的setStorageGroup
接口來實現(xiàn)
void setStorageGroup(PartialPath path) throws MetadataException {
String[] nodeNames = path.getNodes();
// 檢查路徑是否滿足格式
checkStorageGroup(path.getFullPath());
// 將當(dāng)前節(jié)點設(shè)置為根節(jié)點
MNode cur = root;
if (nodeNames.length <= 1 || !nodeNames[0].equals(root.getName())) {
throw new IllegalPathException(path.getFullPath());
}
int i = 1;
// e.g., path = root.a.b.sg, 生成中間節(jié)點 (類型為 MNode)
while (i < nodeNames.length - 1) {
MNode temp = cur.getChild(nodeNames[i]);
// 查看中間節(jié)點是否存在驰吓,不存在則添加
if (temp == null) {
cur.addChild(nodeNames[i], new MNode(cur, nodeNames[i]));
} else if (temp instanceof StorageGroupMNode) {
// 如果中間節(jié)點是 StorageGroupMNode涧尿,拋出錯誤
throw new StorageGroupAlreadySetException(temp.getFullPath());
}
// 切換當(dāng)前節(jié)點
cur = cur.getChild(nodeNames[i]);
i++;
}
if (cur.hasChild(nodeNames[i])) {
// 如果目標(biāo)存儲組節(jié)點存在,拋出錯誤
if (cur.getChild(nodeNames[i]) instanceof StorageGroupMNode) {
throw new StorageGroupAlreadySetException(path.getFullPath());
} else {
throw new StorageGroupAlreadySetException(path.getFullPath(), true);
}
} else {
// 如若不存在則生成存儲組節(jié)點并添加到對應(yīng)位置 (當(dāng)前節(jié)點的子節(jié)點)
StorageGroupMNode storageGroupMNode =
new StorageGroupMNode(
cur, nodeNames[i], IoTDBDescriptor.getInstance().getConfig().getDefaultTTL());
cur.addChild(nodeNames[i], storageGroupMNode);
}
}
- createTimeseries
設(shè)置存儲組主要通過MTree
下的createTimeseries
接口來實現(xiàn)
// MManager 中的 createTimeseries
public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
// 檢查還能否寫入新的時間序列
if (!allowToCreateNewSeries) {
throw new MetadataException(
"IoTDB system load is too large to create timeseries, "
+ "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat and restart");
}
try {
PartialPath path = plan.getPath();
SchemaUtils.checkDataTypeWithEncoding(plan.getDataType(), plan.getEncoding());
// 確保獲取存儲組路徑
ensureStorageGroup(path);
TSDataType type = plan.getDataType();
// 利用 MTree 生成 timeseries
MeasurementMNode leafMNode =
mtree.createTimeseries(
path,
type,
plan.getEncoding(),
plan.getCompressor(),
plan.getProps(),
plan.getAlias());
// 更新 tag
if (plan.getTags() != null) {
// tag key, tag value
for (Entry<String, String> entry : plan.getTags().entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
}
tagIndex
.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
.computeIfAbsent(entry.getValue(), v -> new CopyOnWriteArraySet<>())
.add(leafMNode);
}
}
// 更新 statistics 和 schemaDataTypeNumMap
totalSeriesNumber.addAndGet(1);
if (totalSeriesNumber.get() * ESTIMATED_SERIES_SIZE >= MTREE_SIZE_THRESHOLD) {
logger.warn("Current series number {} is too large...", totalSeriesNumber);
allowToCreateNewSeries = false;
}
updateSchemaDataTypeNumMap(type, 1);
// 寫 log
if (!isRecovering) {
// either tags or attributes is not empty
if ((plan.getTags() != null && !plan.getTags().isEmpty())
|| (plan.getAttributes() != null && !plan.getAttributes().isEmpty())) {
offset = tagLogFile.write(plan.getTags(), plan.getAttributes());
}
plan.setTagOffset(offset);
logWriter.createTimeseries(plan);
}
leafMNode.setOffset(offset);
} catch (IOException e) {
throw new MetadataException(e);
}
}
// MTree 下的 createTimeseries
MeasurementMNode createTimeseries(
PartialPath path,
TSDataType dataType,
TSEncoding encoding,
CompressionType compressor,
Map<String, String> props,
String alias)
throws MetadataException {
// 獲取分級 path
String[] nodeNames = path.getNodes();
// 判斷是否是合法的 Timeseries 路徑 (path長度不小于3 (root -> [storage group] -> device -> Timeseries))
if (nodeNames.length <= 2 || !nodeNames[0].equals(root.getName())) {
throw new IllegalPathException(path.getFullPath());
}
// 判斷是否是合法的 Timeseries 路徑 (使用正則式判斷)
checkTimeseries(path.getFullPath());
MNode cur = root;
boolean hasSetStorageGroup = false;
Template upperTemplate = cur.getDeviceTemplate();
// e.g, path = root.sg.d1.s1檬贰,構(gòu)建中間節(jié)點姑廉,并讓 cur 設(shè)置為 d1
// 從 root 后第一層節(jié)點開始構(gòu)建路徑 (measurement 之前)
for (int i = 1; i < nodeNames.length - 1; i++) {
String nodeName = nodeNames[i];
// cur 指向 storage group node 時
if (cur instanceof StorageGroupMNode) {
hasSetStorageGroup = true;
}
if (!cur.hasChild(nodeName)) {
if (!hasSetStorageGroup) {
throw new StorageGroupNotSetException("Storage group should be created first");
}
cur.addChild(nodeName, new MNode(cur, nodeName));
}
cur = cur.getChild(nodeName);
if (cur.getDeviceTemplate() != null) {
upperTemplate = cur.getDeviceTemplate();
}
}
if (upperTemplate != null && !upperTemplate.isCompatible(path)) {
throw new PathAlreadyExistException(
path.getFullPath() + " ( which is incompatible with template )");
}
if (props != null && props.containsKey(LOSS) && props.get(LOSS).equals(SDT)) {
checkSDTFormat(path.getFullPath(), props);
}
// 獲取葉節(jié)點 (measurement 節(jié)點) 名稱
String leafName = nodeNames[nodeNames.length - 1];
// 保持添加葉節(jié)點以及 alias 過程的原子性
// 將寫部分設(shè)置為 synchronized (上鎖)
synchronized (this) {
MNode child = cur.getChild(leafName);
if (child instanceof MeasurementMNode || child instanceof StorageGroupMNode) {
throw new PathAlreadyExistException(path.getFullPath());
}
if (alias != null) {
MNode childByAlias = cur.getChild(alias);
// 判斷該設(shè)備節(jié)點下是否已經(jīng)存在同名 alias
if (childByAlias instanceof MeasurementMNode) {
throw new AliasAlreadyExistException(path.getFullPath(), alias);
}
}
// 生成葉節(jié)點
MeasurementMNode measurementMNode =
new MeasurementMNode(cur, leafName, alias, dataType, encoding, compressor, props);
if (child != null) {
// 如若存在重名的葉節(jié)點,則覆蓋掉
cur.replaceChild(measurementMNode.getName(), measurementMNode);
} else {
// 如若不存在翁涤,則直接添加該葉節(jié)點
cur.addChild(leafName, measurementMNode);
}
// 添加 alias
if (alias != null) {
cur.addAlias(alias, measurementMNode);
}
return measurementMNode;
}
// 解鎖
}
- deleteStorageGroups
設(shè)置存儲組主要通過MTree
下的deleteStorageGroups
接口來實現(xiàn)
// MManager 中的 deleteStorageGroups
public void deleteStorageGroups(List<PartialPath> storageGroups) throws MetadataException {
try {
// 遍歷要刪除的存儲組
for (PartialPath storageGroup : storageGroups) {
totalSeriesNumber.addAndGet(mtree.getAllTimeseriesCount(storageGroup));
// 清除 MNode Cache
if (!allowToCreateNewSeries
&& totalSeriesNumber.get() * ESTIMATED_SERIES_SIZE < MTREE_SIZE_THRESHOLD) {
logger.info("Current series number {} come back to normal level", totalSeriesNumber);
allowToCreateNewSeries = true;
}
mNodeCache.clear();
// 刪除存儲組
List<MeasurementMNode> leafMNodes = mtree.deleteStorageGroup(storageGroup);
for (MeasurementMNode leafMNode : leafMNodes) {
removeFromTagInvertedIndex(leafMNode);
// 更新 statistics (in schemaDataTypeNumMap)
updateSchemaDataTypeNumMap(leafMNode.getSchema().getType(), -1);
}
// 刪除 triggers
TriggerEngine.drop(leafMNodes);
if (!config.isEnableMemControl()) {
MemTableManager.getInstance().addOrDeleteStorageGroup(-1);
}
// 正常進(jìn)行完成時桥言,寫 log
if (!isRecovering) {
logWriter.deleteStorageGroup(storageGroup);
}
}
} catch (IOException e) {
throw new MetadataException(e.getMessage());
}
}
// MTree 中的 deleteStorageGroups
List<MeasurementMNode> deleteStorageGroup(PartialPath path) throws MetadataException {
// 設(shè)置當(dāng)前節(jié)點為目標(biāo)節(jié)點
MNode cur = getNodeByPath(path);
// 當(dāng)前節(jié)點并非存儲組節(jié)點時,拋出錯誤
if (!(cur instanceof StorageGroupMNode)) {
throw new StorageGroupNotSetException(path.getFullPath());
}
// 假設(shè)正在一個包含 root.a.b.sg1, root.a.sg2 路徑的元數(shù)據(jù)樹下刪除 root.a.b.sg1
// 刪除 sg1
cur.getParent().deleteChild(cur.getName());
// 獲取該存儲組下所有葉節(jié)點
List<MeasurementMNode> leafMNodes = new LinkedList<>();
Queue<MNode> queue = new LinkedList<>();
queue.add(cur);
// 遞歸加載子節(jié)點葵礼,獲取其下所有葉節(jié)點
while (!queue.isEmpty()) {
MNode node = queue.poll();
for (MNode child : node.getChildren().values()) {
if (child instanceof MeasurementMNode) {
leafMNodes.add((MeasurementMNode) child);
} else {
queue.add(child);
}
}
}
// 設(shè)置當(dāng)前節(jié)點為目標(biāo)節(jié)點父節(jié)點
cur = cur.getParent();
// 如果刪除目標(biāo)節(jié)點后号阿,父節(jié)點為空,則刪除父節(jié)點鸳粉,并遞歸向上重復(fù)該過程
while (!IoTDBConstant.PATH_ROOT.equals(cur.getName()) && cur.getChildren().size() == 0) {
cur.getParent().deleteChild(cur.getName());
cur = cur.getParent();
}
return leafMNodes;
}
- showTimeseriesWithoutIndex
主要通過調(diào)用MTree
的對應(yīng)接口getAllMeasurementSchemaByHeatOrder
或者getAllMeasurementSchema
實現(xiàn)扔涧,這里選取getAllMeasurementSchema
進(jìn)行分析
// MManager 中的 showTimeseriesWithoutIndex
private List<ShowTimeSeriesResult> showTimeseriesWithoutIndex(
ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
List<Pair<PartialPath, String[]>> ans;
// 對于熱度順序排序的情況
if (plan.isOrderByHeat()) {
ans = mtree.getAllMeasurementSchemaByHeatOrder(plan, context);
} else {
// 對于未使用熱度順序排序的情況
ans = mtree.getAllMeasurementSchema(plan);
}
// 初始化
List<ShowTimeSeriesResult> res = new LinkedList<>();
for (Pair<PartialPath, String[]> ansString : ans) {
long tagFileOffset = Long.parseLong(ansString.right[5]);
try {
Pair<Map<String, String>, Map<String, String>> tagAndAttributePair =
new Pair<>(Collections.emptyMap(), Collections.emptyMap());
if (tagFileOffset >= 0) {
tagAndAttributePair = tagLogFile.read(config.getTagAttributeTotalSize(), tagFileOffset);
}
// 添加查找結(jié)果
res.add(
new ShowTimeSeriesResult(
ansString.left.getFullPath(),
ansString.right[0],
ansString.right[1],
TSDataType.valueOf(ansString.right[2]),
TSEncoding.valueOf(ansString.right[3]),
CompressionType.valueOf(ansString.right[4]),
tagAndAttributePair.left,
tagAndAttributePair.right));
} catch (IOException e) {
throw new MetadataException(
"Something went wrong while deserialize tag info of " + ansString.left.getFullPath(),
e);
}
}
return res;
}
// MTree 中的 getAllMeasurementSchema
List<Pair<PartialPath, String[]>> getAllMeasurementSchema(
ShowTimeSeriesPlan plan, boolean removeCurrentOffset) throws MetadataException {
List<Pair<PartialPath, String[]>> res = new LinkedList<>();
// 獲取分層節(jié)點名稱
String[] nodes = plan.getPath().getNodes();
if (nodes.length == 0 || !nodes[0].equals(root.getName())) {
throw new IllegalPathException(plan.getPath().getFullPath());
}
// 設(shè)置 limit
limit.set(plan.getLimit());
//設(shè)置 offset
offset.set(plan.getOffset());
curOffset.set(-1);
count.set(0);
// 查詢目標(biāo)時間序列
findPath(root, nodes, 1, res, offset.get() != 0 || limit.get() != 0, false, null, null);
// 避免內(nèi)存泄漏
limit.remove();
offset.remove();
if (removeCurrentOffset) {
curOffset.remove();
}
count.remove();
return res;
}
// 其中的 findPath 部分
private void findPath(
MNode node,
String[] nodes,
int idx,
List<Pair<PartialPath, String[]>> timeseriesSchemaList,
boolean hasLimit,
boolean needLast,
QueryContext queryContext,
Template upperTemplate)
throws MetadataException {
if (node instanceof MeasurementMNode && nodes.length <= idx) {
if (hasLimit) {
curOffset.set(curOffset.get() + 1);
if (curOffset.get() < offset.get() || count.get().intValue() == limit.get().intValue()) {
return;
}
}
IMeasurementSchema measurementSchema = ((MeasurementMNode) node).getSchema();
if (measurementSchema instanceof MeasurementSchema) {
// 查詢完葉節(jié)點時
addMeasurementSchema(
node, timeseriesSchemaList, needLast, queryContext, measurementSchema, "*");
} else if (measurementSchema instanceof VectorMeasurementSchema) {
String lastWord = nodes[nodes.length - 1];
addVectorMeasurementSchema(
node,
timeseriesSchemaList,
needLast,
queryContext,
measurementSchema,
nodes.length == idx ? lastWord : "*");
}
if (hasLimit) {
count.set(count.get() + 1);
}
}
// 獲取下一層節(jié)點名稱
String nodeReg = MetaUtils.getNodeRegByIdx(idx, nodes);
if (node.getDeviceTemplate() != null) {
upperTemplate = node.getDeviceTemplate();
}
// 判斷是否使用通配符*;或者查詢完葉節(jié)點
if (!nodeReg.contains(PATH_WILDCARD)) {
MNode next = null;
// 判斷是否是并列表示的節(jié)點
if (nodeReg.contains("(") && nodeReg.contains(",")) {
next = node.getChildOfAlignedTimeseries(nodeReg);
} else {
// 獲取下一層節(jié)點
next = node.getChild(nodeReg);
}
if (next != null) {
// 下一層節(jié)點非空時遞歸查詢
findPath(
next,
nodes,
idx + 1,
timeseriesSchemaList,
hasLimit,
needLast,
queryContext,
upperTemplate);
}
} else {
// 對于存在通配符時遍歷該節(jié)點所有子節(jié)點
for (MNode child : node.getDistinctMNodes()) {
boolean continueSearch = false;
if (child instanceof MeasurementMNode
&& ((MeasurementMNode) child).getSchema() instanceof VectorMeasurementSchema) {
// 對于 VectorMeasurementSchema 的 MeasurementMNode 的情況
List<String> measurementsList =
((MeasurementMNode) child).getSchema().getValueMeasurementIdList();
for (String measurement : measurementsList) {
// 判斷是否查到盡頭
if (Pattern.matches(nodeReg.replace("*", ".*"), measurement)) {
continueSearch = true;
}
}
} else {
// 判斷是否查到盡頭
if (Pattern.matches(nodeReg.replace("*", ".*"), child.getName())) {
continueSearch = true;
}
}
if (!continueSearch) {
continue;
}
// 遞歸查詢
findPath(
child,
nodes,
idx + 1,
timeseriesSchemaList,
hasLimit,
needLast,
queryContext,
upperTemplate);
if (hasLimit && count.get().intValue() == limit.get().intValue()) {
break;
}
}
}
// 使用 template 時的部分
if (!(node instanceof MeasurementMNode) && node.isUseTemplate()) {
if (upperTemplate != null) {
HashSet<IMeasurementSchema> set = new HashSet<>();
for (IMeasurementSchema schema : upperTemplate.getSchemaMap().values()) {
if (set.add(schema)) {
if (schema instanceof MeasurementSchema) {
addMeasurementSchema(
new MeasurementMNode(node, schema.getMeasurementId(), schema, null),
timeseriesSchemaList,
needLast,
queryContext,
schema,
nodeReg);
} else if (schema instanceof VectorMeasurementSchema) {
String firstNode = schema.getValueMeasurementIdList().get(0);
addVectorMeasurementSchema(
new MeasurementMNode(node, firstNode, schema, null),
timeseriesSchemaList,
needLast,
queryContext,
schema,
nodeReg);
}
}
}
}
}
}
Log 文件
元數(shù)據(jù)日志管理
所有元數(shù)據(jù)的操作均會記錄到元數(shù)據(jù)日志文件中,此文件默認(rèn)為 data/system/schema/mlog.bin枯夜。
系統(tǒng)重啟時會重做 mlog 中的日志弯汰,重做之前需要標(biāo)記不需要記錄日志。當(dāng)重啟結(jié)束后湖雹,標(biāo)記需要記錄日志咏闪。
元數(shù)據(jù)日志的類型由 MetadataOperationType 類記錄。mlog 直接存儲字符串編碼摔吏。
源碼分析
- 一些 sql 對應(yīng)的 mlog 記錄
set storage group to root.turbine --> 2,root.turbine
delete storage group root.turbine --> 1,root.turbine
create timeseries root.turbine.d1.s1(temprature) with datatype=FLOAT, encoding=RLE, compression=SNAPPY tags(tag1=v1, tag2=v2) attributes(attr1=v1, attr2=v2) --> 0,root.turbine.d1.s1,3,2,1,,溫度,offset
標(biāo)簽文件
所有時間序列的標(biāo)簽/屬性信息都會保存在標(biāo)簽文件中鸽嫂,此文件默認(rèn)為 data/system/schema/tlog.txt。
每條時間序列的
tags
和attributes
持久化后總字節(jié)數(shù)為 L舔腾,在 iotdb-engine.properties 中配置溪胶。持久化內(nèi)容:
Map<String,String> tags
,Map<String,String> attributes
稳诚,如果內(nèi)容不足 L哗脖,則需補(bǔ)空。
源碼分析
六種針對時間序列的標(biāo)簽與屬性的操作則直接作為MManager
的內(nèi)置函數(shù)出現(xiàn):renameTagOrAttributeKey
扳还,setTagsOrAttributesValue
才避,dropTagsOrAttributes
,addTags
氨距,addAttributes
桑逝,upsertTagsAndAttributes
。
在這些基本操作之中這里主要分析addAttributes
public void addAttributes(Map<String, String> attributesMap, PartialPath fullPath)
throws MetadataException, IOException {
// 獲取目標(biāo)節(jié)點
MNode mNode = mtree.getNodeByPath(fullPath);
// 如若目標(biāo)節(jié)點非是 MeasurementMNode俏让,拋出異常
if (!(mNode instanceof MeasurementMNode)) {
throw new PathNotExistException(fullPath.getFullPath());
}
MeasurementMNode leafMNode = (MeasurementMNode) mNode;
// 沒有 tag 或者attribute 的情況楞遏,需要在 tlog 中添加記錄
if (leafMNode.getOffset() < 0) {
long offset = tagLogFile.write(Collections.emptyMap(), attributesMap);
logWriter.changeOffset(fullPath, offset);
leafMNode.setOffset(offset);
return;
}
Pair<Map<String, String>, Map<String, String>> pair =
tagLogFile.read(config.getTagAttributeTotalSize(), leafMNode.getOffset());
// 遍歷需要添加的 attribute
for (Entry<String, String> entry : attributesMap.entrySet()) {
// 獲取鍵
String key = entry.getKey();
// 獲取值
String value = entry.getValue();
// 目標(biāo) attribute 存在時,拋出異常
if (pair.right.containsKey(key)) {
throw new MetadataException(
String.format("TimeSeries [%s] already has the attribute [%s].", fullPath, key));
}
// 不存在時首昔,進(jìn)行添加
pair.right.put(key, value);
}
// 持久化
tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
}
MTree 檢查點
為了加快 IoTDB 重啟速度寡喝,IoTDB為MTree
設(shè)置了檢查點,這樣避免了在重啟時按行讀取并復(fù)現(xiàn) mlog.bin 中的信息勒奇。創(chuàng)建MTree
的快照有兩種方式:
自動創(chuàng)建预鬓。每隔10分鐘,后臺線程檢查
MTree
的最后修改時間赊颠,需要同時滿足:用戶超過1小時(可配置)沒修改MTree
且 mlog.bin 中積累了100000行日志(可配置)兩個條件格二;手動創(chuàng)建。使用
create snapshot for schema
命令手動觸發(fā)創(chuàng)建 MTree 快照竣蹦。
創(chuàng)建過程
- 首先給
MTree
加讀鎖顶猜,防止創(chuàng)建快照過程中對其進(jìn)行修改- 將
MTree
序列化進(jìn)臨時 snapshot 文件(mtree.snapshot.tmp)。MTree
的序列化采用“先子節(jié)點痘括、后父節(jié)點”的深度優(yōu)先序列化方式驶兜,將節(jié)點的信息按照類型轉(zhuǎn)化成對應(yīng)格式的字符串,便于反序列化時讀取和組裝MTree
。
其中抄淑,字符串轉(zhuǎn)化格式如下:
- 普通節(jié)點:0,名字,子節(jié)點個數(shù)
- 存儲組節(jié)點:1,名字,TTL,子節(jié)點個數(shù)
- 傳感器節(jié)點:2,名字,別名,數(shù)據(jù)類型,編碼,壓縮方式,屬性,偏移量,子節(jié)點個數(shù)
- 序列化結(jié)束后屠凶,將臨時文件重命名為正式文件(mtree.snapshot),防止在序列化過程中出現(xiàn)服務(wù)器人為或意外關(guān)閉肆资,導(dǎo)致序列化失敗的情況矗愧。
- 調(diào)用
MLogWriter.clear()
方法,清空 mlog.bin- 釋放
MTree
讀鎖
源碼分析
public void createMTreeSnapshot() {
long time = System.currentTimeMillis();
logger.info("Start creating MTree snapshot to {}", mtreeSnapshotPath);
try {
// 將 MTree 序列化進(jìn)臨時 snapshot 文件 mtreeSnapshotTmpPath
mtree.serializeTo(mtreeSnapshotTmpPath);
// 獲取對應(yīng)臨時快照與快照文件
File tmpFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath);
File snapshotFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
// 如果路徑下存在快照文件則先刪除
if (snapshotFile.exists()) {
Files.delete(snapshotFile.toPath());
}
// 將臨時文件重命名為正式文件
if (tmpFile.renameTo(snapshotFile)) {
logger.info(
"Finish creating MTree snapshot to {}, spend {} ms.",
mtreeSnapshotPath,
System.currentTimeMillis() - time);
}
// 調(diào)用 clear() 方法郑原,清空 mlog.bin
logWriter.clear();
} catch (IOException e) {
logger.warn("Failed to create MTree snapshot to {}", mtreeSnapshotPath, e);
if (SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath).exists()) {
try {
Files.delete(SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath).toPath());
} catch (IOException e1) {
logger.warn("delete file {} failed: {}", mtreeSnapshotTmpPath, e1.getMessage());
}
}
}
}
// 其中序列化的時候從子節(jié)點由下往上序列化唉韭,按照深度優(yōu)先順序
public void serializeTo(MLogWriter logWriter) throws IOException {
serializeChildren(logWriter);
logWriter.serializeStorageGroupMNode(this);
}
public void serializeTo(MLogWriter logWriter) throws IOException {
serializeChildren(logWriter);
logWriter.serializeMNode(this);
}
void serializeChildren(MLogWriter logWriter) throws IOException {
if (children == null) {
return;
}
for (Entry<String, MNode> entry : children.entrySet()) {
entry.getValue().serializeTo(logWriter);
}
}
恢復(fù)過程
- 檢查臨時文件 mtree.snapshot.tmp 是否存在,如果存在證明在創(chuàng)建快照的序列化過程中出現(xiàn)服務(wù)器人為或意外關(guān)閉犯犁,導(dǎo)致序列化失敗属愤,刪除臨時文件;
- 檢查快照文件 mtree.snapshot 是否存在酸役。如果不存在住诸,則使用新的
MTree
;否則啟動反序列化過程涣澡,得到MTree
- 對于 mlog.bin 中的內(nèi)容贱呐,逐行讀取并操作,完成
MTree
的恢復(fù)入桂。讀取過程中更新logNumber
奄薇,并返回,用于后面mlog.bin行數(shù)的記錄抗愁。
源碼分析
加載快照并通過反序列化得到MTree
后讀取 mlog.bin 時馁蒂,通過 log 中的記錄生成 redo 的plan
,然后通過調(diào)用operation
來進(jìn)行操作的執(zhí)行:
public void operation(PhysicalPlan plan) throws IOException, MetadataException {
switch (plan.getOperatorType()) {
case CREATE_TIMESERIES:
CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan;
createTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset());
break;
case CREATE_ALIGNED_TIMESERIES:
CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
(CreateAlignedTimeSeriesPlan) plan;
createAlignedTimeSeries(createAlignedTimeSeriesPlan);
break;
case DELETE_TIMESERIES:
DeleteTimeSeriesPlan deleteTimeSeriesPlan = (DeleteTimeSeriesPlan) plan;
// cause we only has one path for one DeleteTimeSeriesPlan
deleteTimeseries(deleteTimeSeriesPlan.getPaths().get(0));
break;
case SET_STORAGE_GROUP:
SetStorageGroupPlan setStorageGroupPlan = (SetStorageGroupPlan) plan;
setStorageGroup(setStorageGroupPlan.getPath());
break;
case DELETE_STORAGE_GROUP:
DeleteStorageGroupPlan deleteStorageGroupPlan = (DeleteStorageGroupPlan) plan;
deleteStorageGroups(deleteStorageGroupPlan.getPaths());
break;
case TTL:
SetTTLPlan setTTLPlan = (SetTTLPlan) plan;
setTTL(setTTLPlan.getStorageGroup(), setTTLPlan.getDataTTL());
break;
case CHANGE_ALIAS:
ChangeAliasPlan changeAliasPlan = (ChangeAliasPlan) plan;
changeAlias(changeAliasPlan.getPath(), changeAliasPlan.getAlias());
break;
case CHANGE_TAG_OFFSET:
ChangeTagOffsetPlan changeTagOffsetPlan = (ChangeTagOffsetPlan) plan;
changeOffset(changeTagOffsetPlan.getPath(), changeTagOffsetPlan.getOffset());
break;
case CREATE_TEMPLATE:
CreateTemplatePlan createTemplatePlan = (CreateTemplatePlan) plan;
createDeviceTemplate(createTemplatePlan);
break;
case SET_DEVICE_TEMPLATE:
SetDeviceTemplatePlan setDeviceTemplatePlan = (SetDeviceTemplatePlan) plan;
setDeviceTemplate(setDeviceTemplatePlan);
break;
case SET_USING_DEVICE_TEMPLATE:
SetUsingDeviceTemplatePlan setUsingDeviceTemplatePlan = (SetUsingDeviceTemplatePlan) plan;
setUsingDeviceTemplate(setUsingDeviceTemplatePlan);
break;
case AUTO_CREATE_DEVICE_MNODE:
AutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan = (AutoCreateDeviceMNodePlan) plan;
autoCreateDeviceMNode(autoCreateDeviceMNodePlan);
break;
default:
logger.error("Unrecognizable command {}", plan.getOperatorType());
}
}
具體恢復(fù)過程如下:
private int initFromLog(File logFile) throws IOException {
File tmpFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath);
// 檢查臨時文件 mtree.snapshot.tmp 是否存在
if (tmpFile.exists()) {
logger.warn("Creating MTree snapshot not successful before crashing...");
// 如果存在證明在創(chuàng)建快照的序列化過程中出現(xiàn)服務(wù)器人為或意外關(guān)閉蜘腌,導(dǎo)致序列化失敗沫屡,刪除臨時文件
Files.delete(tmpFile.toPath());
}
// 檢查快照文件 mtree.snapshot 是否存在
File mtreeSnapshot = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
long time = System.currentTimeMillis();
// 判斷快照是否存在
if (!mtreeSnapshot.exists()) {
// 快照不存在的時候使用新的 MTree
mtree = new MTree();
} else {
// 快照存在的時候,反序列話快照逢捺,然后利用其生成 MTree
mtree = MTree.deserializeFrom(mtreeSnapshot);
logger.debug(
"spend {} ms to deserialize mtree from snapshot", System.currentTimeMillis() - time);
}
time = System.currentTimeMillis();
// 判斷 log 是否存在谁鳍,存在時通過 log 恢復(fù)元數(shù)據(jù)
if (logFile.exists()) {
int idx = 0;
try (MLogReader mLogReader =
new MLogReader(config.getSchemaDir(), MetadataConstant.METADATA_LOG); ) {
idx = applyMlog(mLogReader);
logger.debug(
"spend {} ms to deserialize mtree from mlog.bin", System.currentTimeMillis() - time);
return idx;
} catch (Exception e) {
throw new IOException("Failed to parser mlog.bin for err:" + e);
}
} else {
return 0;
}
}
private int applyMlog(MLogReader mLogReader) {
int idx = 0;
// 不斷按順序讀取 log 中的記錄
while (mLogReader.hasNext()) {
PhysicalPlan plan = null;
try {
// 根據(jù) log 中的記錄生成plan
plan = mLogReader.next();
if (plan == null) {
continue;
}
// 執(zhí)行操作
operation(plan);
idx++;
} catch (Exception e) {
logger.error(
"Can not operate cmd {} for err:", plan == null ? "" : plan.getOperatorType(), e);
}
}
return idx;
}
-
物聯(lián)網(wǎng)時序數(shù)據(jù)庫 Apache IoTDB癞季,詳細(xì)信息可以在https://iotdb.apache.org/中找到劫瞳。 ?
-
https://iotdb.apache.org/zh/SystemDesign/SchemaManager/SchemaManager.html ?