Directory類(lèi)繼承樹(shù)
在IndicesService的createIndexService函數(shù)中馒铃,IndicesService創(chuàng)建IndexModule實(shí)例對(duì)象痕惋,IndexModule創(chuàng)建IndexStore實(shí)例對(duì)象娃殖,IndexStore創(chuàng)建FsDirectoryService實(shí)例對(duì)象
IndicesService::createIndexService
private synchronized IndexService createIndexService(final String reason, IndexMetaData indexMetaData, IndicesQueryCache indicesQueryCache, IndicesFieldDataCache indicesFieldDataCache, List<IndexEventListener> builtInListeners, IndexingOperationListener... indexingOperationListeners) throws IOException {
final Index index = indexMetaData.getIndex();
final Predicate<String> indexNameMatcher = (indexExpression) -> indexNameExpressionResolver.matchesIndex(index.getName(), indexExpression, clusterService.state());
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexNameMatcher, indexScopeSetting);
...
final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry); // 創(chuàng)建IndexModule
...
return indexModule.newIndexService(nodeEnv, xContentRegistry, this, circuitBreakerService, bigArrays, threadPool, scriptService,
clusterService, client, indicesQueryCache, mapperRegistry, indicesFieldDataCache); // 托管IndexModule創(chuàng)建IndexService
}
IndexModule::newIndexService
public IndexService newIndexService(NodeEnvironment environment, NamedXContentRegistry xContentRegistry,
IndexService.ShardStoreDeleter shardStoreDeleter, CircuitBreakerService circuitBreakerService, BigArrays bigArrays,
ThreadPool threadPool, ScriptService scriptService,
ClusterService clusterService, Client client, IndicesQueryCache indicesQueryCache, MapperRegistry mapperRegistry,
IndicesFieldDataCache indicesFieldDataCache) throws IOException {
...;
final String storeType = indexSettings.getValue(INDEX_STORE_TYPE_SETTING);
final IndexStore store;
if (Strings.isEmpty(storeType) || isBuiltinType(storeType)) { // IndexModule提供了5種類(lèi)型的文件系統(tǒng):NIOFS述寡,MMAPFS叶洞,SIMPLEFS,F(xiàn)S螟炫,DEFAULT
store = new IndexStore(indexSettings, indexStoreConfig);
} else { // 也可實(shí)現(xiàn)自定義的IndexStore艺晴,通過(guò)調(diào)用addIndexStore(String type, BiFunction<IndexSettings, IndexStoreConfig, IndexStore> provider)函數(shù)去注冊(cè)自定義IndexStore
BiFunction<IndexSettings, IndexStoreConfig, IndexStore> factory = storeTypes.get(storeType);
if (factory == null) {
throw new IllegalArgumentException("Unknown store type [" + storeType + "]");
}
store = factory.apply(indexSettings, indexStoreConfig);
if (store == null) {
throw new IllegalStateException("store must not be null");
}
}
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING, store::setType); // index.store.throttle.type, 節(jié)流控制
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING,
store::setMaxRate); // index.store.throttle.max_bytes_per_sec, 節(jié)流控制
...;
return new IndexService(indexSettings, environment, xContentRegistry, new SimilarityService(indexSettings, similarities),
shardStoreDeleter, analysisRegistry, engineFactory.get(), circuitBreakerService, bigArrays, threadPool, scriptService,
clusterService, client, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry,
indicesFieldDataCache, searchOperationListeners, indexOperationListeners);
}
IndexStore::newDirectoryService
public DirectoryService newDirectoryService(ShardPath path) {
return new FsDirectoryService(indexSettings, this, path);
}
FsDirectoryService
FsDirectoryService創(chuàng)建目錄時(shí)主要做兩件事:獲取文件鎖封寞,以及根據(jù)指定類(lèi)型創(chuàng)建對(duì)應(yīng)類(lèi)型的目錄對(duì)象
FsDirectoryService::newDirectory
@Override
public Directory newDirectory() throws IOException {
final Path location = path.resolveIndex(); // 獲取目錄位置
final LockFactory lockFactory = indexSettings.getValue(INDEX_LOCK_FACTOR_SETTING); // 獲取文件鎖
Files.createDirectories(location);
Directory wrapped = newFSDirectory(location, lockFactory); // 根據(jù)指定類(lèi)型創(chuàng)建對(duì)應(yīng)類(lèi)型的目錄對(duì)象
Set<String> preLoadExtensions = new HashSet<>(
indexSettings.getValue(IndexModule.INDEX_STORE_PRE_LOAD_SETTING));
wrapped = setPreload(wrapped, location, lockFactory, preLoadExtensions); // MMapFS文件系統(tǒng)在將文件映射到應(yīng)用程序內(nèi)存時(shí)進(jìn)行預(yù)加載操作狈究,將數(shù)據(jù)寫(xiě)進(jìn)MappedByteBuffer
if (indexSettings.isOnSharedFilesystem()) {
wrapped = new SleepingLockWrapper(wrapped, 5000);
}
return new RateLimitedFSDirectory(wrapped, this, this) ;
}
protected Directory newFSDirectory(Path location, LockFactory lockFactory) throws IOException {
final String storeType = indexSettings.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(),
IndexModule.Type.FS.getSettingsKey());
if (IndexModule.Type.FS.match(storeType)) {
return FSDirectory.open(location, lockFactory); // use lucene defaults
} else if (IndexModule.Type.DEFAULT.match(storeType)) {
deprecationLogger.deprecated("[default] store type is deprecated, use [fs] instead");
return FSDirectory.open(location, lockFactory); // use lucene defaults
} else if (IndexModule.Type.SIMPLEFS.match(storeType)) {
return new SimpleFSDirectory(location, lockFactory);
} else if (IndexModule.Type.NIOFS.match(storeType)) {
return new NIOFSDirectory(location, lockFactory);
} else if (IndexModule.Type.MMAPFS.match(storeType)) {
return new MMapDirectory(location, lockFactory);
}
throw new IllegalArgumentException("No directory found for type [" + storeType + "]");
}
FSDirectoryService文件鎖
由index.store.fs.fs_lock參數(shù)指定抖锥,可選[native, simple]磅废,默認(rèn)native
創(chuàng)建方式 | 校驗(yàn)方式 | 解鎖 | 備注 | |
---|---|---|---|---|
NativeFSLockFactory | Files.createFile(lockFile); // 文件存在時(shí)忽略異常 FileChannel channel = FileChannel.open(realPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE); FileLock lock = channel.tryLock(); FileTime creationTime = Files.readAttributes(realPath, BasicFileAttributes.class).creationTime(); |
lock.isValid(); channel.size() == 0; creationTime.equals(Files.readAttributes(path, BasicFileAttributes.class).creationTime()); |
沒(méi)有顯示釋放文件鎖【怪海可能是等垃圾回收時(shí)自動(dòng)釋放 | 依托NIO的文件鎖API實(shí)現(xiàn) |
SimpleFSLockFactory | Files.createFile(lockFile); // 文件存在時(shí)拋異常 FileTime creationTime = Files.readAttributes(lockFile, BasicFileAttributes.class).creationTime(); |
creationTime.equals(Files.readAttributes(path, BasicFileAttributes.class).creationTime()); | Files.delete(path); | 文件創(chuàng)建時(shí)間屬性校驗(yàn) |
FSDirectory文件類(lèi)型
type | openInput | read | createOutput and write | 備注 | |
---|---|---|---|---|---|
FSDirectory | Type.FS 或 Type.DEFAULT | if (Constants.JRE_IS_64BIT && MMapDirectory.UNMAP_SUPPORTED) { return new MMapDirectory(path, lockFactory); } else if (Constants.WINDOWS) { return new SimpleFSDirectory(path, lockFactory); } else { return new NIOFSDirectory(path, lockFactory); } |
OutputStream out = Files.newOutputStream(directory.resolve(name), options); out.write(b, offset, chunk); |
FSDirectory是lucene庫(kù)實(shí)現(xiàn)的文件系統(tǒng)類(lèi)岔帽,根據(jù)操作系統(tǒng)是否為WINDOWS斗遏,操作系統(tǒng)是否為64位鞋邑,操作系統(tǒng)的文件系統(tǒng)是否支持MMAP來(lái)決定使用SimpleFSDirectory账蓉、NIOFSDirectory逾一、MMapDirectory中的一種方式 | |
SimpleFSDirectory | Type.SIMPLEFS | SeekableByteChannel channel = Files.newByteChannel(path, StandardOpenOption.READ); | ByteBuffer bb = ByteBuffer.wrap(b, offset, len); int i = channel.read(bb); |
面向WINDOWS平臺(tái)的文件系統(tǒng) | |
NIOFSDirectory | Type.NIOFS | FileChannel fc = FileChannel.open(path, StandardOpenOption.READ); | ByteBuffer bb = ByteBuffer.wrap(b, offset, len); int i = channel.read(bb, pos); |
面向LINUX平臺(tái)的NIO庫(kù)的文件系統(tǒng) | |
MMapDirectory | Type.MMAPFS | FileChannel c = FileChannel.open(path, StandardOpenOption.READ); | MappedByteBuffer buffer = fc.map(MapMode.READ_ONLY, offset + bufferStart, bufSize); if (preload) { buffer.load(); } |
面向LINUX平臺(tái)的支持MMap方式的文件系統(tǒng) |