說在前面: 這個list記錄了博主在學(xué)習IoTDB[1]期間的總結(jié)和思考,歡迎一起討論學(xué)習哈
相關(guān)介紹可以參考list的第一篇博客:[IoTDB 學(xué)習筆記] [part1] 介紹
TsFile 基本結(jié)構(gòu)
TsFile 是 IoTDB 的底層數(shù)據(jù)文件颤陶,一種專門為時間序列數(shù)據(jù)設(shè)計的列式文件格式颗管。TsFile的設(shè)計細節(jié)具體可以參考[2]。
總的來說滓走,TsFile文件主要包含數(shù)據(jù)(Chunk
)和元數(shù)據(jù)(Metadata
)兩個部分垦江。可以認為對于我們觀測的設(shè)備網(wǎng)絡(luò)中搅方,每個設(shè)備包含若干個測點比吭,每個測點包含一個時間序列Timeseries
,時間序列記錄了該測點各時間戳下的值姨涡,時間序列的值記錄在Chunk
當中衩藤;對應(yīng)的,其中的元數(shù)據(jù)也分為三部分:ChunkMetadata
涛漂,TimeseriesMetadata
和TsFileMetadata
赏表。一個TsFile
的示例結(jié)構(gòu)如圖 1 [2]所示,該文件包含兩個設(shè)備d1與d2匈仗,每個設(shè)備包含三個測點s1瓢剿,s2和s3,每個設(shè)備的每個測點對應(yīng)一個時間序列悠轩,每個時間序列包含兩個Chunk
间狂。
查詢流程
以查 d1.s1 為例
- 反序列化
TsFileMetadata
,得到 d1.s1 的TimeseriesMetadata
的位置- 反序列化得到 d1.s1 的
TimeseriesMetadata
- 根據(jù) d1.s1 的
TimeseriesMetadata
火架,反序列化其所有ChunkMetadata
- 根據(jù) d1.s1 的每一個
ChunkMetadata
鉴象,讀取其Chunk
數(shù)據(jù)
每個設(shè)備中的數(shù)據(jù)存儲在一個ChunkGroup
中,每個ChunkGroup
由若干Chunk
組成距潘,Chunk
由ChunkHeader
和若干Page
一起構(gòu)成炼列。
所有元數(shù)據(jù)索引節(jié)點構(gòu)成了一顆元數(shù)據(jù)索引樹,包含傳感器索引層以及可能的設(shè)備索引層音比,各層包含內(nèi)部節(jié)點和葉節(jié)點兩種類型的節(jié)點俭尖,分別為:INTERNAL_MEASUREMENT
,LEAF_MEASUREMENT
洞翩,INTERNAL_DEVICE
稽犁,LEAF_DEVICE
。其中LEAF_MEASUREMENT
指向TimeseriesMetadata
骚亿,如圖 2 [2]所示已亥。
max_degree_of_index_node
為索引樹節(jié)點的最大子節(jié)點數(shù)目。當總設(shè)備數(shù)目不超過max_degree_of_index_node
時来屠,不存在設(shè)備索引層虑椎,如圖 3 [2]所示震鹉。
寫流程
TsFile 的寫入流程如圖 4 [3]所示:
其中,文件的寫入主要分為三種操作捆姜,在圖上用 1传趾、2、3 標注:
1泥技、寫內(nèi)存緩沖區(qū)
2浆兰、持久化ChunkGroup
3、關(guān)閉文件
寫內(nèi)存緩沖區(qū)
TsFile 文件層的寫入接口有兩種:
- 寫入一個設(shè)備一個時間戳的多個測點
- 寫入一個設(shè)備多個時間戳的多個測點
當調(diào)用寫接口時珊豹,這個設(shè)備的數(shù)據(jù)會交給對應(yīng)的 ChunkGroupWriter
簸呈,其中的每個測點會交給對應(yīng)的 ChunkWriter
進行寫入。ChunkWriter
完成編碼和打包(生成 Page
)
持久化 ChunkGroup
當內(nèi)存中的數(shù)據(jù)達到一定閾值店茶,會觸發(fā)持久化操作蜕便。每次持久化會把當前內(nèi)存中所有設(shè)備的數(shù)據(jù)全部持久化到磁盤的 TsFile 文件中。每個設(shè)備對應(yīng)一個 ChunkGroup
忽妒,每個測點對應(yīng)一個 Chunk
玩裙。持久化完成后會在內(nèi)存中緩存對應(yīng)的元數(shù)據(jù)信息,以供查詢和生成文件尾部 metadata段直。
關(guān)閉文件
根據(jù)內(nèi)存中緩存的元數(shù)據(jù)吃溅,生成 TsFileMetadata
追加到文件尾部,最后關(guān)閉文件鸯檬。
正如 TsFile 基本結(jié)構(gòu) 中所述决侈,生成 TsFileMetadata
的過程中比較重要的一步是建立元數(shù)據(jù)索引 (MetadataIndex
) 樹,以使得檢索時間序列數(shù)據(jù)時可以不用讀取所有的TimeseriesMetadata
以減少 I/O 操作喧务。
建立元數(shù)據(jù)索引樹的算法大致如下:
- 從索引樹的底層開始構(gòu)建赖歌。在傳感器索引層,對于每個設(shè)備功茴,我們首先初始化其葉節(jié)點(類型為
LEAF_MEASUREMENT
)庐冯,然后對于每個TimeseriesMetadata
,在序列化后坎穿,將其加入葉節(jié)點中展父,當一個節(jié)點中的TimeseriesMetadata
達到MAX_DEGREE_OF_INDEX_NODE
后,將這個節(jié)點加入設(shè)備queue
中玲昧,并重新初始化一個葉節(jié)點用于繼續(xù)存放接下來的TimeseriesMetadata
栖茉,然后重復(fù)這個過程,直到所有TimeseriesMetadata
全部添加完成孵延。然后對于queue
中的葉節(jié)點吕漂,逐層地生成上層節(jié)點(INTERNAL_MEASUREMENT
),具體方法與上述方法類似尘应,每個上層節(jié)點中最多包含MAX_DEGREE_OF_INDEX_NODE
個子節(jié)點惶凝,不斷重復(fù)地生成上級節(jié)點吼虎,直到僅剩一個節(jié)點時,即為需要的根節(jié)點苍鲜。 - 然后鲸睛,判斷設(shè)備數(shù)目是否超過
MAX_DEGREE_OF_INDEX_NODE
,如若未超過坡贺,則直接生成元數(shù)據(jù)索引樹的根節(jié)點(子節(jié)點集包含 1 中每個設(shè)備對應(yīng)的根節(jié)點序列化后轉(zhuǎn)化成的索引項);如若超過箱舞,則需要生成設(shè)備索引層級遍坟,具體方法與 1 類似,在初始化queue
后晴股,先生成類型為LEAF_DEVICE
的節(jié)點愿伴,每個節(jié)點最多包含MAX_DEGREE_OF_INDEX_NODE
個子級索引。然后按照和 1 類似的方法逐層生成上層節(jié)點电湘,直到生成最終的根節(jié)點隔节。
源碼解析
此處引用的為v0.12.0[4]的代碼
接口調(diào)用
利用TSRecord寫
IoTDB 提供了一個TSRecord工具,TSRecord記錄了一個設(shè)備在一個時間戳下的若干測點信息寂呛。
直接寫入新文件時
// 生成 TsFile 存儲文件怎诫,path為對應(yīng)的disk存儲路徑
File f = FSFactoryProducer.getFSFactory().getFile(path);
// 生成 tsfile writer
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
// 在 tsfile writer 中注冊測點信息 這里 包含了 0,1,2,3 四個設(shè)備,每個設(shè)備包含 1,2,3 3 個測點
for (int i = 0; i < 4; i++) {
tsFileWriter.registerTimeseries(
new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_1),
new MeasurementSchema(Constant.SENSOR_1, TSDataType.INT64, TSEncoding.RLE));
tsFileWriter.registerTimeseries(
new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_2),
new MeasurementSchema(Constant.SENSOR_2, TSDataType.INT64, TSEncoding.RLE));
tsFileWriter.registerTimeseries(
new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_3),
new MeasurementSchema(Constant.SENSOR_3, TSDataType.INT64, TSEncoding.RLE));
}
// 生成時間序列信息贷痪,設(shè)備0在時間戳為0,4,8..時各測點有值幻妓,均與時間戳的值相同,設(shè)備1,2,3分別在時間戳為1,5,9..劫拢;2,6,10..肉津;3,7,11時各測點有值,且均與時間戳的值相同
for (int i = 0; i < 100; i++) {
// 生成設(shè)備(i % 4)在時間戳為i時的記錄
TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_PREFIX + (i % 4));
// 生成時間戳為i時設(shè)備(i % 4)的各測點的信息
DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_1, i);
DataPoint dPoint2 = new LongDataPoint(Constant.SENSOR_2, i);
DataPoint dPoint3 = new LongDataPoint(Constant.SENSOR_3, i);
// 將生成的測點信息加入設(shè)備記錄
tsRecord.addTuple(dPoint1);
tsRecord.addTuple(dPoint2);
tsRecord.addTuple(dPoint3);
// 將設(shè)備(i % 4)在時間戳為i時的記錄加入tsFile中
tsFileWriter.write(tsRecord);
}
} catch (Exception e) {
logger.error("meet error in TsFileWrite ", e);
}
續(xù)寫TsFile時
// 基本流程與直接寫入新文件時一致舱沧,不同的是在生成writer的時候使用:
// f 為讀入的TsFile文件:File f = FSFactoryProducer.getFSFactory().getFile(path);
// 為了續(xù)寫f中存儲的TsFile妹沙,這里使用了ForceAppendTsFileWriter,初始化的時候讀取了f中存儲的TsFile相關(guān)信息
ForceAppendTsFileWriter fwriter = new ForceAppendTsFileWriter(f);
// 截取f中truncatePosition及之前的內(nèi)容(截去尾部元數(shù)據(jù))熟吏,以便添加新的內(nèi)容
fwriter.doTruncate();
利用tablet寫
IoTDB 提供了一個tablet工具距糖,tablet記錄了一個設(shè)備的多個測點的信息,按照一種表格的形式表示分俯,這些測點具有相同的時間戳序列肾筐,因此可以應(yīng)用在測點具有相同時間戳序列(每個時間戳下各個測點都具有值)的設(shè)備中。
// 生成 TsFile 存儲文件缸剪,path為對應(yīng)的disk存儲路徑
File f = FSFactoryProducer.getFSFactory().getFile(path);
if (f.exists() && !f.delete()) {
throw new RuntimeException("can not delete " + f.getAbsolutePath());
}
// 初始化模式
Schema schema = new Schema();
// 此處展示了10個測點(一個設(shè)備)吗铐,每個測點1000000個時間序列值的例子
String device = Constant.DEVICE_PREFIX + 1;
String sensorPrefix = "sensor_";
int rowNum = 1000000;
int sensorNum = 10;
// 用于初始化Tablet
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
// 將測點加入模式
for (int i = 0; i < sensorNum; i++) {
IMeasurementSchema measurementSchema =
new MeasurementSchema(sensorPrefix + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF);
measurementSchemas.add(measurementSchema);
// 用于初始化TsFilewriter時注冊各測點
schema.registerTimeseries(
new Path(device, sensorPrefix + (i + 1)),
new MeasurementSchema(sensorPrefix + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
}
// 生成TsFileWriter,并注冊各測點
try (TsFileWriter tsFileWriter = new TsFileWriter(f, schema)) {
// 初始化Tablet
Tablet tablet = new Tablet(device, measurementSchemas);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
// 生成的時間序列初始時間戳以及初始值(各時間戳下的測點的值隨著時間戳的增長而增長)
long timestamp = 1;
long value = 1000000L;
// 為Tablet賦值
for (int r = 0; r < rowNum; r++, value++) {
// 記錄時間戳的序列杏节,每次增加1(其值為row)唬渗,value也每次加1
int row = tablet.rowSize++;
timestamps[row] = timestamp++;
// 將value賦給每個sensor的當前時間戳(row)的位置
for (int i = 0; i < sensorNum; i++) {
long[] sensor = (long[]) values[i];
sensor[row] = value;
}
// 當時間戳數(shù)目達到閾值(tablet.getMaxRowNumber())時典阵,將當前的tablet寫入TsFile,并清空tablet以繼續(xù)重復(fù)該流程來將剩余時間序列信息寫入tsfile
if (tablet.rowSize == tablet.getMaxRowNumber()) {
tsFileWriter.write(tablet);
tablet.reset();
}
}
// 如果最后還有沒有寫入TsFile的內(nèi)容镊逝,將其寫入
if (tablet.rowSize != 0) {
tsFileWriter.write(tablet);
tablet.reset();
}
}
流程解析
在寫內(nèi)存緩沖區(qū)部分中壮啊,我們了解到在寫入TsFile時具有兩種接口:
- 寫入一個設(shè)備一個時間戳多個測點
- 寫入一個設(shè)備多個時間戳多個測點
分別對應(yīng)上述展示的接口調(diào)用部分中的基于TSRecord和Tablet的寫入方法。
我們了解到撑蒜,當調(diào)用 write 接口時歹啼,這個設(shè)備的數(shù)據(jù)會交給對應(yīng)的 ChunkGroupWriter,其中的每個測點會交給對應(yīng)的 ChunkWriter 進行寫入座菠。ChunkWriter 完成編碼和打包(生成 Page)狸眼。
查看TsFile writer 中寫入方法write,我們發(fā)現(xiàn):
寫內(nèi)存緩沖區(qū)
// 用于基于TSRecord的寫入方法
public boolean write(TSRecord record) throws IOException, WriteProcessException {
// 確保對應(yīng)的groupwriter和chunkwriter存在浴滴,不存在則生成
checkIsTimeSeriesExist(record);
// 獲取所要寫入的設(shè)備的groupwriter拓萌,并調(diào)用其來寫入所要寫入的datapoint
groupWriters.get(record.deviceId).write(record.time, record.dataPointList);
++recordCount;
return checkMemorySizeAndMayFlushChunks();
}
// 用于基于Tablet的寫入方法
public boolean write(Tablet tablet) throws IOException, WriteProcessException {
// 確保對應(yīng)的groupwriter和chunkwriter存在,不存在則生成
checkIsTimeSeriesExist(tablet);
// 獲取所要寫入的設(shè)備的groupwriter升略,并調(diào)用其來寫入所要導(dǎo)入的Tablet
groupWriters.get(tablet.deviceId).write(tablet);
recordCount += tablet.rowSize;
return checkMemorySizeAndMayFlushChunks();
}
持久化 ChunkGroup
// 檢查占用的內(nèi)存是否超過閾值(chunkGroupSizeThreshold)微王,如若超過,則調(diào)用flushAllChunkGroups()將其存至OutputStream中品嚣,并清空各group writer及其下chunk writer中數(shù)據(jù)炕倘,以進行剩余數(shù)據(jù)的輸入
// 返回false表示所占存儲空間未達閾值,true則表示達到
private boolean checkMemorySizeAndMayFlushChunks() throws IOException {
// recordCountForNextMemCheck為data point數(shù)目閾值的最小值翰撑,此處初步放縮估計內(nèi)存激才,以進行加速
if (recordCount >= recordCountForNextMemCheck) {
//
long memSize = calculateMemSizeForAllGroup();
assert memSize > 0;
if (memSize > chunkGroupSizeThreshold) {
LOG.debug("start to flush chunk groups, memory space occupy:{}", memSize);
// 更新recordCountForNextMemCheck
recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize;
// 存至OutputStream,并清空writer數(shù)據(jù)
return flushAllChunkGroups();
} else {
// 更新recordCountForNextMemCheck
recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize;
return false;
}
}
return false;
}
// 存至OutputStream额嘿,并清空各group writer及其下chunk writer中數(shù)據(jù)瘸恼,以進行剩余數(shù)據(jù)的輸入
public boolean flushAllChunkGroups() throws IOException {
if (recordCount > 0) {
for (Map.Entry<String, IChunkGroupWriter> entry : groupWriters.entrySet()) {
String deviceId = entry.getKey();
IChunkGroupWriter groupWriter = entry.getValue();
// 載入當前設(shè)備
fileWriter.startChunkGroup(deviceId);
long pos = fileWriter.getPos();
// 轉(zhuǎn)移該設(shè)備下的pages
long dataSize = groupWriter.flushToFileWriter(fileWriter);
if (fileWriter.getPos() - pos != dataSize) {
throw new IOException(
String.format(
"Flushed data size is inconsistent with computation! Estimated: %d, Actual: %d",
dataSize, fileWriter.getPos() - pos));
}
fileWriter.endChunkGroup();
}
// 清空其余相關(guān)記錄
reset();
}
return false;
}
關(guān)閉文件
fileWriter.endFile() 中,生成了path (測點) 到chunkMetadata 的映射 chunkMetadataListMap册养,flushMetadataIndex() 中基于 chunkMetadataListMap 生成了設(shè)備到TimeseriesMetadata 的映射 deviceTimeseriesMetadataMap东帅。
public void close() throws IOException {
LOG.info("start close file");
// 寫入剩余的ChunkGroup
flushAllChunkGroups();
// 根據(jù)內(nèi)存中緩存的元數(shù)據(jù),生成 TsFileMetadata 追加到文件尾部
fileWriter.endFile();
}
private MetadataIndexNode flushMetadataIndex(Map<Path, List<IChunkMetadata>> chunkMetadataListMap)
throws IOException {
deviceTimeseriesMetadataMap = new LinkedHashMap<>();
// device -> TimeseriesMetaDataList 的映射
for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
Path path = entry.getKey();
String device = path.getDevice();
// 生成 TimeseriesMetaData
PublicBAOS publicBAOS = new PublicBAOS();
TSDataType dataType = entry.getValue().get(entry.getValue().size() - 1).getDataType();
Statistics seriesStatistics = Statistics.getStatsByType(dataType);
int chunkMetadataListLength = 0;
boolean serializeStatistic = (entry.getValue().size() > 1);
// flush chunkMetadataList
for (IChunkMetadata chunkMetadata : entry.getValue()) {
if (!chunkMetadata.getDataType().equals(dataType)) {
continue;
}
chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
}
TimeseriesMetadata timeseriesMetadata =
new TimeseriesMetadata(
(byte)
((serializeStatistic ? (byte) 1 : (byte) 0) | entry.getValue().get(0).getMask()),
chunkMetadataListLength,
path.getMeasurement(),
dataType,
seriesStatistics,
publicBAOS);
deviceTimeseriesMetadataMap
.computeIfAbsent(device, k -> new ArrayList<>())
.add(timeseriesMetadata);
}
return MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, out);
}
然后通過MetadataIndexConstructor.constructMetadataIndex() 結(jié)合存儲在輸出流中的信息生成元數(shù)據(jù)樹球拦。
public static MetadataIndexNode constructMetadataIndex(
Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap, TsFileOutput out)
throws IOException {
Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
// 對于每個設(shè)備靠闭,有:
for (Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap.entrySet()) {
if (entry.getValue().isEmpty()) {
continue;
}
// 傳感器索引層,初始化此設(shè)備的索引節(jié)點的隊列
Queue<MetadataIndexNode> measurementMetadataIndexQueue = new ArrayDeque<>();
TimeseriesMetadata timeseriesMetadata;
// 傳感器索引層坎炼,初始化傳感器索引層級的葉子節(jié)點
MetadataIndexNode currentIndexNode =
new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
// 對于每個 TimeseriesMetadata愧膀,有:
for (int i = 0; i < entry.getValue().size(); i++) {
timeseriesMetadata = entry.getValue().get(i);
// 每隔 MAX_DEGREE_OF_INDEX_NODE 個,加一條 entry 到 currentIndexNode 中
if (i % config.getMaxDegreeOfIndexNode() == 0) {
// 每當 currentIndexNode 中攢夠 MAX_DEGREE_OF_INDEX_NODE 個 entry 后
if (currentIndexNode.isFull()) {
// 將 currentIndexNode 加入 queue 中
addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
// 并將 currentIndexNode 指向一個新的 MetadataIndexNode
currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
}
currentIndexNode.addEntry(
new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(), out.getPosition()));
}
// 序列化 timeseriesMetadata
timeseriesMetadata.serializeTo(out.wrapAsStream());
}
// 將 currentIndexNode 加入 queue 中
addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
// 根據(jù) queue 中已經(jīng)存儲的葉子節(jié)點谣光,逐層生成上層節(jié)點檩淋,直至最終的根節(jié)點,并將"設(shè)備-根節(jié)點"對應(yīng)的映射加入 deviceMetadataIndexMap 中
deviceMetadataIndexMap.put(
entry.getKey(),
generateRootNode(
measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
}
// 判斷設(shè)備數(shù)是否超過 MAX_DEGREE_OF_INDEX_NODE萄金,如果未超過則可以直接形成元數(shù)據(jù)索引樹的根節(jié)點并返回
if (deviceMetadataIndexMap.size() <= config.getMaxDegreeOfIndexNode()) {
MetadataIndexNode metadataIndexNode =
new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
// 對于 deviceMetadataIndexMap 中的每一個 entry蟀悦,有:
for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
// 將其轉(zhuǎn)化成一個索引項媚朦,加入到 metadataIndexNode 中
metadataIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
// 序列化 entry
entry.getValue().serializeTo(out.wrapAsStream());
}
// 設(shè)置根節(jié)點的 endOffset 并返回
metadataIndexNode.setEndOffset(out.getPosition());
return metadataIndexNode;
}
// 如若不然,則生成設(shè)備索引層
// 初始化存放設(shè)備索引層級節(jié)點的隊列 queue
Queue<MetadataIndexNode> deviceMetadaIndexQueue = new ArrayDeque<>();
// 初始化設(shè)備索引層級的葉子節(jié)點 currentIndexNode
MetadataIndexNode currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
// 對于 deviceMetadataIndexMap 中的每一個 entry日戈,有:
for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
// 每當 currentIndexNode 中攢夠 MAX_DEGREE_OF_INDEX_NODE 個 entry 后询张,
if (currentIndexNode.isFull()) {
addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadaIndexQueue, out);
// 將 currentIndexNode 指向一個新的 MetadataIndexNode
currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
}
// 將其轉(zhuǎn)化成一個索引項,加入到 currentIndexNode 中
currentIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
// 序列化 entry
entry.getValue().serializeTo(out.wrapAsStream());
}
// 將 currentIndexNode 加入 queue 中
addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadaIndexQueue, out);
// 根據(jù) queue 中已經(jīng)存儲的葉子節(jié)點浙炼,逐層生成上層節(jié)點份氧,直至最終的根節(jié)點
MetadataIndexNode deviceMetadataIndexNode =
generateRootNode(deviceMetadaIndexQueue, out, MetadataIndexNodeType.INTERNAL_DEVICE);
// 設(shè)置根節(jié)點的 endOffset 并返回
deviceMetadataIndexNode.setEndOffset(out.getPosition());
return deviceMetadataIndexNode;
}
對于其中的generateRootNode()
,有:
// 該方法需要將隊列中的 MetadataIndexNode 形成樹級結(jié)構(gòu)弯屈,并返回根節(jié)點
private static MetadataIndexNode generateRootNode(
Queue<MetadataIndexNode> metadataIndexNodeQueue, TsFileOutput out, MetadataIndexNodeType type)
throws IOException {
int queueSize = metadataIndexNodeQueue.size();
MetadataIndexNode metadataIndexNode;
// 根據(jù)需要的類型 type 初始化 currentIndexNode
MetadataIndexNode currentIndexNode = new MetadataIndexNode(type);
// 當隊列中有多余一個節(jié)點時 (不是僅剩根節(jié)點時)半火,循環(huán)處理隊列,對于隊列中存在的每個節(jié)點季俩,有:
while (queueSize != 1) {
for (int i = 0; i < queueSize; i++) {
metadataIndexNode = metadataIndexNodeQueue.poll();
// 每當 currentIndexNode 中攢夠 MAX_DEGREE_OF_INDEX_NODE 個 entry 后
if (currentIndexNode.isFull()) {
// 將 currentIndexNode 加入 queue 中
addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexNodeQueue, out);
// 并將 currentIndexNode 指向一個新的 MetadataIndexNode
currentIndexNode = new MetadataIndexNode(type);
}
// 將其轉(zhuǎn)化成一個索引項,加入到 currentIndexNode 中
currentIndexNode.addEntry(
new MetadataIndexEntry(metadataIndexNode.peek().getName(), out.getPosition()));
// 序列化 metadataIndexNode
metadataIndexNode.serializeTo(out.wrapAsStream());
}
// 將 currentIndexNode 加入 queue 中
addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexNodeQueue, out);
// 并將 currentIndexNode 指向一個新的 MetadataIndexNode
currentIndexNode = new MetadataIndexNode(type);
queueSize = metadataIndexNodeQueue.size();
}
// 隊列中只剩下一個節(jié)點時梅掠,返回隊列中最終剩余的節(jié)點 (即為根節(jié)點)
return metadataIndexNodeQueue.poll();
}
讀流程
過濾條件和查詢表達式
TsFile 查詢時的過濾器Filter定義了12種基本類型如下酌住,詳細定義可以參考[5]:
等于 | 大于 | 大于等于 | 小于 | 小于等于 | 不等于 |
---|---|---|---|---|---|
TimeEq | TimeGt | TimeGtEq | TimeLt | TimeLtEq | TimeNotEq |
ValueEq | ValueGt | ValueGtEq | ValueLt | ValueLtEq | ValueNotEq |
Filter
整體上分為一元條件UnaryFilter
以及二元條件BinaryFilter
,二元條件存在邏輯與關(guān)系AndFilter
以及邏輯或關(guān)系OrFilter
兩種條件間關(guān)系阎抒。
IoTDB中提供了兩種查詢表達式:SingleSeriesExpression
以及GlobalTimeExpression
酪我。SingleSeriesExpression
針對一個時間序列進行過濾,而GlobalTimeExpression
的過濾條件則應(yīng)用于全部時間序列且叁。因此都哭,SingleSeriesExpression
包含path
和filter
兩個參數(shù),path
用于確定針對的時間序列逞带,filter
則表示查詢的過濾條件欺矫。而GlobalTimeExpression
則只包含filter
。
查詢過濾條件(IExpression)為執(zhí)行查詢時的表達式展氓,有如下的定義:
IExpression := SingleSeriesExpression | GlobalTimeExpression | AndExpression | OrExpression
AndExpression := IExpression && IExpression
OrExpression := IExpression || IExpression
然而這些IExpression并不全部可以直接應(yīng)用于查詢穆趴,需要利用特殊的優(yōu)化算法轉(zhuǎn)化為可執(zhí)行的表達式,滿足以下任一條件的查詢表達式為可執(zhí)行表達式:
IExpression
為單一的GlobalTimeExpression
IExpression
為單一的SingleSeriesExpression
IExpression
為AndExpression
遇汞,且葉子節(jié)點均為 SingleSeriesExpressionIExpression
為OrExpression
,且葉子節(jié)點均為SingleSeriesExpression
上述優(yōu)化算法大致如下:
首先,根據(jù)上述的定義弥咪,一個不是可執(zhí)行表達式的查詢表達式應(yīng)是AndExpression
或者OrExpression
咖城,且同時包含GlobalTimeExpression
與 SingleSeriesExpression
。
因此歪赢,我們主要的主要問題是處理GlobalTimeExpression
與SingleSeriesExpression
的查詢統(tǒng)一問題化戳,選擇的解決辦法是把GlobalTimeExpression
的查詢條件投影到全部待查詢的path
(測點)中,然后就可以化歸成 SingleSeriesExpression
間的合并埋凯,這里我們的 IExpression 代指由若干條SingleSeriesExpression
合并而成的條件迂烁,因此有:對于IExpression
間和合并直接根據(jù)兩者的關(guān)系按照生成AndExpression
或者 ORExpression
的方法處理看尼,生成一個新的 IExpression (SingleSeriesExpression
直接作為IExpression
處理);
而對于GlobalTimeExpression
間的合并盟步,則直接根據(jù)兩者的關(guān)系按照生成AndExpression
或者 ORExpression
的方法處理藏斩,生成一個新的 GlobaLTimeFilter
;
對于GlobalTimeExpression
和IExpression
的合并却盘,我們按照上述的映射方法化歸問題狰域,生成新的 IExpression
,值得注意的是黄橘,如若兩者間的關(guān)系是AND兆览,則映射的path
即為被合并的IExpression
中包含的有效 path,而如若關(guān)系式OR塞关,則需首先查詢 GlobalTimeExpression
的投影集合抬探。
IoTDB對于上述的算法提供了optimize()
方法來進行實現(xiàn)(包含combineTwoGlobalTimeExpression()
,handleOneGlobalTimeExpressionr()
和MergeIExpression()
的實現(xiàn)方法帆赢,詳細定義可以參照[5])
TsFile 查詢執(zhí)行過程
TsFile 文件層查詢接口只包含原始數(shù)據(jù)查詢小压,根據(jù)是否包含值過濾條件,可以將查詢分為兩類“無過濾條件或僅包含時間過濾條件查詢”和“包含值過濾條件的查詢”椰于。這里重點介紹其中“無過濾條件或僅包含時間過濾條件查詢”怠益,“包含值過濾條件的查詢”的詳細介紹可以參照[5]。
為了執(zhí)行以上兩類查詢瘾婿,有兩套查詢流程:
- 歸并查詢
生成多個 reader蜻牢,按照 time 對齊,返回結(jié)果集偏陪。
- 連接查詢
根據(jù)查詢條件生成滿足過濾條件的時間戳抢呆,通過滿足條件的時間戳查詢投影列的值,返回結(jié)果集笛谦。
歸并查詢
歸并查詢對于每一個時間序列都構(gòu)建了一個FileSeriesReader
镀娶,基于最小堆來實現(xiàn)數(shù)據(jù)的合并。
查詢過程中揪罕,系統(tǒng)根據(jù)所有的FileSeriesReader
生成一個DataSetWithoutTimeGenerator
梯码,由于每個FileSeriesReader
會按照時間戳從小到大的順序迭代地返回數(shù)據(jù)點,所以可以采用“多路歸并”對所有FileSeriesReader
的結(jié)果進行按時間戳對齊好啰。
詳細的算法描述如下[5]:
(1) 創(chuàng)建一個最小堆轩娶,堆里面存儲“時間戳”,該堆將按照每個時間戳的大小進行組織框往。
(2) 初始化堆鳄抒,依次訪問每一個 FileSeriesReader,如果該 FileSeriesReader 中還有數(shù)據(jù)點,則獲取數(shù)據(jù)點的時間戳并放入堆中许溅。此時每個時間序列最多有1個時間戳被放入到堆中瓤鼻,即該序列最小的時間戳。
(3) 如果堆的 size > 0贤重,獲取堆頂?shù)臅r間戳茬祷,記為t,并將其在堆中刪除并蝗,進入步驟(4)祭犯;如果堆的 size 等于0,則跳到步驟(5)滚停,結(jié)束數(shù)據(jù)合并過程沃粗。
(4) 創(chuàng)建新的 RowRecord。依次遍歷每一條時間序列键畴。在處理其中一條時間序列時最盅,如果該序列沒有更多的數(shù)據(jù)點,則將該列標記為 null 并添加在 RowRecord 中起惕;否則涡贱,判斷最小的時間戳是否與 t 相同,若不相同疤祭,則將該列標記為 null 并添加在 RowRecord 中。若相同饵婆,將該數(shù)據(jù)點添加在 RowRecord 中勺馆,同時判斷該時間序列是否有新的數(shù)據(jù)點,若存在侨核,則將下一個時間戳 t' 添加在堆中草穆,并將 t' 設(shè)為當前時間序列的最小時間戳。最后搓译,返回步驟(3)悲柱。
(5) 結(jié)束數(shù)據(jù)合并過程。
連接查詢
連接查詢生成滿足“選擇條件”的時間戳些己、查詢被投影列在對應(yīng)時間戳下的數(shù)據(jù)點豌鸡、合成RowRecord
,即針對滿足條件的時間戳段标,將其依次投影至各時間序列上涯冠,記錄滿足條件的數(shù)據(jù)。
主要流程如下[5]:
(1) 根據(jù) QueryExpression逼庞,初始化時間戳計算模塊 TimeGeneratorImpl
(2) 為每個被投影的時間序列創(chuàng)建 FileSeriesReaderByTimestamp
(3) 如果“時間戳計算模塊”中還有下一個時間戳蛇更,則計算出下一個時間戳 t ,進入步驟(4);否則派任,結(jié)束查詢砸逊。
(4) 根據(jù) t,在每個時間序列上使用 FileSeriesReaderByTimestamp 組件獲取在時間戳 t 下的數(shù)據(jù)點掌逛;如果在該時間戳下沒有對應(yīng)的數(shù)據(jù)點师逸,則用 null 表示。
(5) 將步驟(4)中得到的所有數(shù)據(jù)點合并成一個 RowRecord颤诀,此時得到一條查詢結(jié)果字旭,返回步驟(3)計算下一個查詢結(jié)果。
查詢流程
TsFileExecutor
接收一個QueryExpression
崖叫,執(zhí)行該查詢并返回相應(yīng)的 QueryDataSet
遗淳。
其基本流程如下:
(1)接收一個 QueryExpression
(2)如果無過濾條件,執(zhí)行歸并查詢心傀。如果該 QueryExpression 包含 Filter(過濾條件)屈暗,則通過 ExpressionOptimizer 對該 QueryExpression 的 Filter 進行優(yōu)化。如果是 GlobalTimeExpression脂男,執(zhí)行歸并查詢养叛。如果包含值過濾,交給 ExecutorWithTimeGenerator 執(zhí)行連接查詢宰翅。
(3) 生成對應(yīng)的 QueryDataSet弃甥,迭代地生成 RowRecord,將查詢結(jié)果返回汁讼。
源碼分析
此處引用的為v0.12.0[4]的代碼
這里未介紹 value filter 相關(guān)的查詢淆攻,詳細的介紹可以參考[5],代碼可以參考[4]嘿架。
接口調(diào)用
// 這里展示了一個具有 0,1,2,3 4個設(shè)備瓶珊,每個設(shè)備具有1,2,3 3個測點的TsFile的例子
// 初始化 TsFile reader
try (TsFileSequenceReader reader = new TsFileSequenceReader(path);
ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader)) {
無 filter
// 將查詢映射到設(shè)備0的各個 path (測點)上
ArrayList<Path> paths = new ArrayList<>();
paths.add(new Path("device_0", "sensor_1"));
paths.add(new Path("device_0", "sensor_2"));
paths.add(new Path("device_0", "sensor_3"));
// 不使用 filter的情況,應(yīng)該查詢出所有設(shè)備0某一測點具有有效值的數(shù)據(jù)點對應(yīng)的時間戳下的設(shè)備0的測點1,2,3的情況
queryAndPrint(paths, readTsFile, null);
}
存在 time filter
// 使用 time filter (4 <= time <= 10) 的情況耸彪,應(yīng)該選出時間戳在[4,10]間的有效記錄
IExpression timeFilter =
BinaryExpression.and(
new GlobalTimeExpression(TimeFilter.gtEq(4L)),
new GlobalTimeExpression(TimeFilter.ltEq(10L)));
queryAndPrint(paths, readTsFile, timeFilter);
其中伞芹,查詢執(zhí)行部分接口調(diào)用如下:
// 其中,paths 為查詢的全體測點集蝉娜,statement 為查詢條件
private static void queryAndPrint(
ArrayList<Path> paths, ReadOnlyTsFile readTsFile, IExpression statement) throws IOException {
// 根據(jù)查詢條件生成queryExpression
QueryExpression queryExpression = QueryExpression.create(paths, statement);
// 執(zhí)行查詢唱较,如若存在過濾條件 (filter) 則先進行優(yōu)化
QueryDataSet queryDataSet = readTsFile.query(queryExpression);
// 完成查詢,并輸出全部查詢結(jié)果
while (queryDataSet.hasNext()) {
System.out.println(queryDataSet.next());
}
System.out.println("------------");
}
另外值得一提的是IoTDB還提供了可以根據(jù) offset 逐 byte 讀取 TsFile 的接口:
marker = reader.readMarker()
流程解析
如上文所述召川,查詢過程的主要流程包括生成QueryExpression
绊汹,(對于存在filter
時) 優(yōu)化, 查詢 (歸并查詢 / 連接查詢)扮宠。我們發(fā)現(xiàn)西乖,在生成QueryExpression
后狐榔,由readTsFile.query()
生成queryDataSet
,再迭代地生成RowRecord
获雕。
查看readTsFile.query()
模塊有:
public QueryDataSet query(QueryExpression queryExpression) throws IOException {
return tsFileExecutor.execute(queryExpression);
}
public QueryDataSet execute(QueryExpression queryExpression) throws IOException {
// 使用 bloom filter 判斷檢索對象 path 是否存在
BloomFilter bloomFilter = metadataQuerier.getWholeFileMetadata().getBloomFilter();
List<Path> filteredSeriesPath = new ArrayList<>();
if (bloomFilter != null) {
for (Path path : queryExpression.getSelectedSeries()) {
if (bloomFilter.contains(path.getFullPath())) {
filteredSeriesPath.add(path);
}
}
queryExpression.setSelectSeries(filteredSeriesPath);
}
// 加載metadata
metadataQuerier.loadChunkMetaDatas(queryExpression.getSelectedSeries());
// 存在 filter 時
if (queryExpression.hasQueryFilter()) {
try {
// 獲取查詢表達式
IExpression expression = queryExpression.getExpression();
// 獲取可執(zhí)行查詢表達式
IExpression regularIExpression =
ExpressionOptimizer.getInstance()
.optimize(expression, queryExpression.getSelectedSeries());
// 更新為對應(yīng)的可執(zhí)行表達式
queryExpression.setExpression(regularIExpression);
// 當可執(zhí)行表達式為 GlobalTimeExpression 時薄腻,及全由 GlobalTimeExpression 組成時,執(zhí)行歸并查詢
if (regularIExpression instanceof GlobalTimeExpression) {
return execute(
queryExpression.getSelectedSeries(), (GlobalTimeExpression) regularIExpression);
} else {
// 反之届案,執(zhí)行連接查詢
return new ExecutorWithTimeGenerator(metadataQuerier, chunkLoader)
.execute(queryExpression);
}
} catch (QueryFilterOptimizationException | NoMeasurementException e) {
throw new IOException(e);
}
} else {
try {
// 沒有 filter 時庵楷,執(zhí)行歸并查詢
return execute(queryExpression.getSelectedSeries());
} catch (NoMeasurementException e) {
throw new IOException(e);
}
}
}
// 其中對于存在 GlobalTimeExpression 的 filter 時,以及無 filter 時楣颠,有:
private QueryDataSet executeMayAttachTimeFiler(
List<Path> selectedPathList, GlobalTimeExpression timeExpression)
throws IOException, NoMeasurementException {
List<AbstractFileSeriesReader> readersOfSelectedSeries = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
for (Path path : selectedPathList) {
List<IChunkMetadata> chunkMetadataList = metadataQuerier.getChunkMetaDataList(path);
AbstractFileSeriesReader seriesReader;
if (chunkMetadataList.isEmpty()) {
seriesReader = new EmptyFileSeriesReader();
dataTypes.add(metadataQuerier.getDataType(path));
} else {
// 沒有filter 時
if (timeExpression == null) {
seriesReader = new FileSeriesReader(chunkLoader, chunkMetadataList, null);
} else {
// 存在 GlobalTimeExpression 的 filter 時尽纽,將 GlobalTimeExpression 的條件加入每一個時間序列的 reader 中
seriesReader =
new FileSeriesReader(chunkLoader, chunkMetadataList, timeExpression.getFilter());
}
dataTypes.add(chunkMetadataList.get(0).getDataType());
}
readersOfSelectedSeries.add(seriesReader);
}
// 開始歸并查詢
return new DataSetWithoutTimeGenerator(selectedPathList, dataTypes, readersOfSelectedSeries);
}
歸并查詢
// 初始化堆
private void initHeap() throws IOException {
hasDataRemaining = new ArrayList<>();
batchDataList = new ArrayList<>();
timeHeap = new PriorityQueue<>();
timeSet = new HashSet<>();
// 初始化堆,依次訪問每個 series reader
for (int i = 0; i < paths.size(); i++) {
AbstractFileSeriesReader reader = readers.get(i);
// 如果沒有數(shù)據(jù)點時
if (!reader.hasNextBatch()) {
batchDataList.add(new BatchData());
hasDataRemaining.add(false);
} else {
// 如果還有數(shù)據(jù)點童漩,將其加入list
batchDataList.add(reader.nextBatch());
hasDataRemaining.add(true);
}
}
// 獲取 list 中有效數(shù)據(jù)點中時間戳放入堆中
for (BatchData data : batchDataList) {
if (data.hasCurrent()) {
timeHeapPut(data.currentTime());
}
}
}
在queryDataSet.hasNext()
中判斷堆的size是否大于0弄贿,
如若大于0,有:
public RowRecord nextWithoutConstraint() throws IOException {
// 獲取堆頂記錄的最小時間
long minTime = timeHeapGet();
// 新建 RowRecord
RowRecord record = new RowRecord(minTime);
// 訪問每一個選中的時間序列
for (int i = 0; i < paths.size(); i++) {
Field field = new Field(dataTypes.get(i));
// 沒有更多的數(shù)據(jù)點時矫膨,標記為null
if (!hasDataRemaining.get(i)) {
record.addField(null);
continue;
}
// 獲取數(shù)據(jù)點
BatchData data = batchDataList.get(i);
// 如果當前的最小時間戳等于該測點當前時間戳
if (data.hasCurrent() && data.currentTime() == minTime) {
// 添加該數(shù)據(jù)點
putValueToField(data, field);
// 移動到下一個時間戳
data.next();
// 沒有下一個時間戳時
if (!data.hasCurrent()) {
AbstractFileSeriesReader reader = readers.get(i);
// 判斷有無nextBatch差凹,如有,則判斷其中有無新的時間戳侧馅,如有則將其加入堆中
if (reader.hasNextBatch()) {
data = reader.nextBatch();
if (data.hasCurrent()) {
batchDataList.set(i, data);
timeHeapPut(data.currentTime());
} else {
hasDataRemaining.set(i, false);
}
} else {
hasDataRemaining.set(i, false);
}
} else {
// 如果有下一個時間戳危尿,將其加入堆中
timeHeapPut(data.currentTime());
}
record.addField(field);
} else {
// 如果兩個時間不相同,則將其標志為 null
record.addField(null);
}
}
return record;
}
連接查詢
public DataSetWithTimeGenerator execute(QueryExpression queryExpression) throws IOException {
// 獲取查詢表達式
IExpression expression = queryExpression.getExpression();
List<Path> selectedPathList = queryExpression.getSelectedSeries();
// 根據(jù)查詢表達式獲取時間戳計算單元
TimeGenerator timeGenerator = new TsFileTimeGenerator(expression, chunkLoader, metadataQuerier);
List<Boolean> cached =
markFilterdPaths(expression, selectedPathList, timeGenerator.hasOrNode());
// 為每個被投影的時間序列創(chuàng)建 FileSeriesReaderByTimestamp
List<FileSeriesReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
Iterator<Boolean> cachedIterator = cached.iterator();
Iterator<Path> selectedPathIterator = selectedPathList.iterator();
// 如果“時間戳計算模塊”中還有下一個時間戳馁痴,則計算出下一個時間戳
while (cachedIterator.hasNext()) {
boolean cachedValue = cachedIterator.next();
Path selectedPath = selectedPathIterator.next();
List<IChunkMetadata> chunkMetadataList = metadataQuerier.getChunkMetaDataList(selectedPath);
if (chunkMetadataList.size() != 0) {
dataTypes.add(chunkMetadataList.get(0).getDataType());
// 如果該時間戳下沒有滿足 filter 的數(shù)據(jù)點谊娇,則將各對應(yīng)的測點下該時間戳處設(shè)為null
if (cachedValue) {
readersOfSelectedSeries.add(null);
continue;
}
FileSeriesReaderByTimestamp seriesReader =
new FileSeriesReaderByTimestamp(chunkLoader, chunkMetadataList);
// 將該 FileSeriesReaderByTimestamp 組件加入 readersOfSelectedSeries 以便獲取在該時間戳下的數(shù)據(jù)點
readersOfSelectedSeries.add(seriesReader);
} else {
// 如果 selectedPath 為空
selectedPathIterator.remove();
cachedIterator.remove();
}
}
// 返回 DataSetWithTimeGenerator
return new DataSetWithTimeGenerator(
selectedPathList, cached, dataTypes, timeGenerator, readersOfSelectedSeries);
}
// 根據(jù)上述“時間戳計算模塊”中算出的時間戳,對于每一個時間戳罗晕,分別在每個時間序列上使用之前記錄的 FileSeriesReaderByTimestamp 組件獲取在該時間戳下的數(shù)據(jù)點济欢,并合并成一個 RowRecord,得到一個查詢結(jié)果
public RowRecord nextWithoutConstraint() throws IOException {
// 獲取下一個“時間戳計算模塊”中算出的時間戳
long timestamp = timeGenerator.next();
RowRecord rowRecord = new RowRecord(timestamp);
// 對于每個時間序列攀例,獲取在該時間戳下的數(shù)據(jù)點
for (int i = 0; i < paths.size(); i++) {
// 如果是存在 filter 作用在該時間序列的情況
if (cached.get(i)) {
// 利用 time generator 中的 reader 獲取對應(yīng)數(shù)據(jù)點
Object value = timeGenerator.getValue(paths.get(i));
// 加入 RowRecord
rowRecord.addField(value, dataTypes.get(i));
continue;
}
// 如果是不存在 filter 作用在該時間序列的情況
// 利用 time generator 中的 reader 獲取對應(yīng)數(shù)據(jù)點
FileSeriesReaderByTimestamp fileSeriesReaderByTimestamp = readers.get(i);
Object value = fileSeriesReaderByTimestamp.getValueInTimestamp(timestamp);
// 加入 RowRecord
rowRecord.addField(value, dataTypes.get(i));
}
return rowRecord;
}
-
物聯(lián)網(wǎng)時序數(shù)據(jù)庫 Apache IoTDB船逮,詳細信息可以在https://iotdb.apache.org/中找到顾腊。 ?
-
https://iotdb.apache.org/zh/SystemDesign/TsFile/Format.html ? ? ? ?
-
https://iotdb.apache.org/zh/SystemDesign/TsFile/Write.html ?
-
https://iotdb.apache.org/zh/SystemDesign/TsFile/Read.html ? ? ? ? ? ?