介紹
HDFS支持由Data Node管理的寫入到堆棧內(nèi)存的功能忆嗜。Data Node會(huì)異步的將數(shù)據(jù)從內(nèi)存持久化至磁盤鸵隧,從而在性能敏感的IO Path中移去昂貴的磁盤IO和校驗(yàn),因此我們稱之為L(zhǎng)azy Persist神凑。HDFS盡可能的保證在Lazy Persist策略下的持久性。在副本還未持久化至磁盤,節(jié)點(diǎn)重啟了窖铡,則有可能會(huì)發(fā)生罕見的數(shù)據(jù)遺失。我們可以選擇Lazy Persist Writes的策略來減少延遲坊谁,但可能會(huì)損失一定的持久性费彼。
上文描述的原理在圖一的表示其實(shí)是4,6的步驟.寫數(shù)據(jù)的RAM,然后異步的寫到Disk.前面幾個(gè)步驟是如何設(shè)置StorageType的操作,這個(gè)在下文種會(huì)具體提到.所以上圖所示的大體步驟可以歸納為如下:
- 對(duì)目標(biāo)文件目錄設(shè)置StoragePolicy為L(zhǎng)AZY_PERSIST的內(nèi)存存儲(chǔ)策略.
- 客戶端進(jìn)程向NameNode發(fā)起創(chuàng)建/寫文件的請(qǐng)求.
- 請(qǐng)求到具體的DataNode,DataNode會(huì)把這些數(shù)據(jù)塊寫入RAM內(nèi)存中,同時(shí)啟動(dòng)異步線程服務(wù)將內(nèi)存數(shù)據(jù)持久化到磁盤上.
內(nèi)存的異步持久化存儲(chǔ),就是明顯不同于其他介質(zhì)存儲(chǔ)數(shù)據(jù)的地方.這應(yīng)該也是LAZY_PERSIST的名稱的源由吧,數(shù)據(jù)不是馬上落盤,而是”lazy persisit”懶惰的方式,延時(shí)的處理.
源碼
在之前的篇幅中已經(jīng)提到過,數(shù)據(jù)存儲(chǔ)的同時(shí)會(huì)有另外一批數(shù)據(jù)會(huì)被異步的持久化,所以這里一定會(huì)涉及到多個(gè)服務(wù)對(duì)象的合作.這些服務(wù)對(duì)象的指揮者是FsDatasetImpl.他是一個(gè)掌管DataNode所有磁盤讀寫數(shù)據(jù)的管家.
在FsDatasetImpl中,與內(nèi)存存儲(chǔ)相關(guān)的服務(wù)對(duì)象有如下的3個(gè).
LazyWriter:lazyWriter是一個(gè)線程服務(wù),此線程會(huì)不斷的循環(huán)著從數(shù)據(jù)塊列表中取出數(shù)據(jù)塊,加入到異步持久化線程池RamDiskAsyncLazyPersistService中去執(zhí)行.
RamDiskAsyncLazyPersistService:此對(duì)象就是異步持久化線程服務(wù),里面針對(duì)每一個(gè)磁盤塊設(shè)置一個(gè)對(duì)應(yīng)的線程池,然后需要持久化到給定的磁盤塊的數(shù)據(jù)塊會(huì)被提交到對(duì)應(yīng)的線程池中去.每個(gè)線程池的最大線程數(shù)為1.
RamDiskReplicaLruTracker:副本塊跟蹤類,此類種維護(hù)了所有已持久化,未持久化的副本以及總副本數(shù)據(jù)信息.所以當(dāng)一個(gè)副本被最終存儲(chǔ)到內(nèi)存種后,相應(yīng)的會(huì)有副本所屬隊(duì)列信息的變更.其次當(dāng)節(jié)點(diǎn)內(nèi)存不足的時(shí)候,部分距離最近最久沒有被訪問的副本塊會(huì)在此類中被移除.
RamDiskReplicaLruTracker
在以上3者中,RamDiskReplicaLruTracker的角色起到了一個(gè)中間人的角色.因?yàn)樗麅?nèi)部維護(hù)了多個(gè)關(guān)系的數(shù)據(jù)塊信息.主要的就是以下3類.
public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
...
/**
* <block pool ID <block ID, RamDiskRelicaLru>
* Map of blockpool ID to <map of blockID to ReplicaInfo>.
*/
Map<String, Map<Long, RamDiskReplicaLru>> replicaMaps;
/**
* 將會(huì)被寫入到磁盤的副本隊(duì)列
* Queue of replicas that need to be written to disk.
* Stale entries are GC'd by dequeueNextReplicaToPersist.
*/
Queue<RamDiskReplicaLru> replicasNotPersisted;
/**
* 已經(jīng)被持久化的副本,按照上次使用的時(shí)間排序
* Map of persisted replicas ordered by their last use times.
*/
TreeMultimap<Long, RamDiskReplicaLru> replicasPersisted;
...
}
三種數(shù)據(jù)塊的關(guān)系如圖三所示口芍,replicaMaps是<block pool ID <block ID, RamDiskRelicaLru>的Map(關(guān)于Block pool敌买,在上篇HDFS Federation中有介紹),Ram副本都會(huì)在replicaMaps中存在阶界。replicasNotPersisted保存的是將會(huì)寫入到磁盤的隊(duì)列虹钮。replicasPersisted保存的是已經(jīng)被持久化的副本,并且按照上次使用的時(shí)間排序膘融。
列出幾個(gè)RamDiskReplicaLruTracker類中關(guān)鍵的方法
- addReplica 新添副本
@Override
synchronized void addReplica(final String bpid, final long blockId,
final FsVolumeImpl transientVolume) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
if (map == null) {
map = new HashMap<Long, RamDiskReplicaLru>();
replicaMaps.put(bpid, map);
}
RamDiskReplicaLru ramDiskReplicaLru = new RamDiskReplicaLru(bpid, blockId, transientVolume);
map.put(blockId, ramDiskReplicaLru);
//將ramDiskReplicaLru放入到replicasNotPersisted之中
replicasNotPersisted.add(ramDiskReplicaLru);
}
- dequeueNextReplicaToPersist 從replicasNotPersisted隊(duì)列中拿出下一個(gè)將要被持久化的副本
@Override
//從replicasNotPersisted中拿到下個(gè)將會(huì)被持久化的block
synchronized RamDiskReplicaLru dequeueNextReplicaToPersist() {
while (replicasNotPersisted.size() != 0) {
RamDiskReplicaLru ramDiskReplicaLru = replicasNotPersisted.remove();
Map<Long, RamDiskReplicaLru> replicaMap =
replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
return ramDiskReplicaLru;
}
// The replica no longer exists, look for the next one.
}
return null;
}
- recordStartLazyPersist 記錄開始Lazy Persist持久化操作
@Override
synchronized void recordStartLazyPersist(
final String bpid, final long blockId, FsVolumeImpl checkpointVolume) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
//設(shè)置被持久化的Volume
ramDiskReplicaLru.setLazyPersistVolume(checkpointVolume);
}
- recordEndLazyPersist 記錄結(jié)束Lazy Persist持久化操作
@Override
synchronized void recordEndLazyPersist(
final String bpid, final long blockId, final File[] savedFiles) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
if (ramDiskReplicaLru == null) {
throw new IllegalStateException("Unknown replica bpid=" +
bpid + "; blockId=" + blockId);
}
ramDiskReplicaLru.recordSavedBlockFiles(savedFiles);
if (replicasNotPersisted.peek() == ramDiskReplicaLru) {
// Common case.
replicasNotPersisted.remove();
} else {
// Caller error? Fallback to O(n) removal.
replicasNotPersisted.remove(ramDiskReplicaLru);
}
ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
ramDiskReplicaLru.isPersisted = true;
}
- reenqueueReplicaNotPersisted 持久化失敗芙粱,則將該副本重新加入replicasNotPersisted隊(duì)列
@Override
synchronized void reenqueueReplicaNotPersisted(final RamDiskReplica ramDiskReplicaLru) {
replicasNotPersisted.add((RamDiskReplicaLru) ramDiskReplicaLru);
}
而這些方法大概的執(zhí)行順序如圖四所示
當(dāng)節(jié)點(diǎn)重啟或者有新的文件被設(shè)置了LAZY_PERSIST策略后,就會(huì)有新的副本塊被存儲(chǔ)到內(nèi)存中,同時(shí)會(huì)加入到replicaNotPersisted隊(duì)列中.然后經(jīng)過中間的dequeueNextReplicaToPersist取出下一個(gè)將被持久化的副本塊,進(jìn)行寫磁盤的操作.recordStartLazyPersist,recordEndLazyPersist這2個(gè)方法會(huì)在持久化的過程中被調(diào)用,標(biāo)志著持久化狀態(tài)的變更.
還有三個(gè)方法雖然與持久化無關(guān),但是也比較重要
discardReplica:當(dāng)此副本已經(jīng)被檢測(cè)出不需要的時(shí)候,包括已被刪除,或已損壞的情況,可以從內(nèi)存中移除,撤銷.
touch:恰好與Linux種的touch同名,此方法意味訪問了一次某特定的副本塊,并會(huì)更新此副本塊的lastUesdTime. lastUesdTime會(huì)在后面提到的LRU算法中起到關(guān)鍵的作用.
//touch會(huì)更新最近訪問的時(shí)間
synchronized void touch(final String bpid,
final long blockId) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
...
// Reinsert the replica with its new timestamp.
// 更新最近訪問時(shí)間戳,并重新插入數(shù)據(jù)
if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) {
ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
}
}
//第二步獲取候選移除塊
synchronized RamDiskReplicaLru getNextCandidateForEviction() {
// 獲取replicasPersisted迭代器進(jìn)行遍歷
final Iterator<RamDiskReplicaLru> it = replicasPersisted.values().iterator();
while (it.hasNext()) {
// 因?yàn)閞eplicasPersisted已經(jīng)根據(jù)時(shí)間排好序了,所以取出當(dāng)前的塊進(jìn)行移除即可
final RamDiskReplicaLru ramDiskReplicaLru = it.next();
it.remove();
Map<Long, RamDiskReplicaLru> replicaMap =
replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
return ramDiskReplicaLru;
}
// The replica no longer exists, look for the next one.
}
return null;
}
- getNextCandidateForEviction:此方法在DataNode內(nèi)存空間不足,需要內(nèi)存額外預(yù)留出空間給新的副本塊存放時(shí)被調(diào)用.此方法會(huì)根據(jù)所設(shè)置的eviction scheme模式,選擇需要被移除的塊,默認(rèn)的是LRU策略的.
/**
* Attempt to evict one or more transient block replicas until we
* have at least bytesNeeded bytes free.
*/
//根據(jù)已持久化的塊的訪問時(shí)間來進(jìn)行篩選移除,而不是直接是內(nèi)存中的塊
public void evictBlocks(long bytesNeeded) throws IOException {
int iterations = 0;
final long cacheCapacity = cacheManager.getCacheCapacity();
// 當(dāng)檢測(cè)到內(nèi)存空間不滿足外界需要的大小時(shí)
while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
(cacheCapacity - cacheManager.getCacheUsed()) < bytesNeeded) {
// 獲取待移除副本信息
RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
if (replicaState == null) {
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Evicting block " + replicaState);
}
...
// 移除內(nèi)存中的相關(guān)塊并釋放空間
// Delete the block+meta files from RAM disk and release locked
// memory.
removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
blockFileUsed, metaFileUsed, bpid);
}
}
}
LazyWriter
LazyWriter是一個(gè)線程服務(wù),他是一個(gè)發(fā)動(dòng)機(jī),循環(huán)不斷的從隊(duì)列中取出待持久化的數(shù)據(jù)塊,提交到異步持久化服務(wù)中去.直接來看主要的run方法.
public void run() {
int numSuccessiveFailures = 0;
while (fsRunning && shouldRun) {
try {
// 取出新的副本塊并提交到異步服務(wù)中,返回是否提交成功布爾值
numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
// Sleep if we have no more work to do or if it looks like we are not
// making any forward progress. This is to ensure that if all persist
// operations are failing we don't keep retrying them in a tight loop.
if (numSuccessiveFailures >= ramDiskReplicaTracker.numReplicasNotPersisted()) {
Thread.sleep(checkpointerInterval * 1000);
numSuccessiveFailures = 0;
}
} catch (InterruptedException e) {
LOG.info("LazyWriter was interrupted, exiting");
break;
} catch (Exception e) {
LOG.warn("Ignoring exception in LazyWriter:", e);
}
}
}
進(jìn)入saveNextReplica方法的處理
private boolean saveNextReplica() {
RamDiskReplica block = null;
FsVolumeReference targetReference;
FsVolumeImpl targetVolume;
ReplicaInfo replicaInfo;
boolean succeeded = false;
try {
// 從隊(duì)列種取出新的待持久化的塊
block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
if (block != null) {
synchronized (FsDatasetImpl.this) {
...
// 提交到異步服務(wù)中去
asyncLazyPersistService.submitLazyPersistTask(
block.getBlockPoolId(), block.getBlockId(),
replicaInfo.getGenerationStamp(), block.getCreationTime(),
replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
targetReference);
}
}
}
succeeded = true;
} catch(IOException ioe) {
LOG.warn("Exception saving replica " + block, ioe);
} finally {
if (!succeeded && block != null) {
LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
onFailLazyPersist(block.getBlockPoolId(), block.getBlockId());
}
}
return succeeded;
}
所以LazyWriter線程服務(wù)的流程圖可以歸納為如圖五所示:
RamDiskAsyncLazyPersistService
RamDiskAsyncLazyPersistService主要圍繞著Volume磁盤和Executor線程池這2部分的內(nèi)容.秉持著下面一個(gè)原則
一個(gè)磁盤服務(wù)對(duì)應(yīng)一個(gè)線程池,并且一個(gè)線程池的最大線程數(shù)也只有1個(gè).
線程池列表定義如下
class RamDiskAsyncLazyPersistService {
...
private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>();
...
當(dāng)服務(wù)啟動(dòng)的時(shí)候,就會(huì)有新的磁盤目錄加入.
synchronized void addVolume(File volume) {
if (executors == null) {
throw new RuntimeException("AsyncLazyPersistService is already shutdown");
}
ThreadPoolExecutor executor = executors.get(volume);
// 如果當(dāng)前已存在此磁盤目錄對(duì)應(yīng)的線程池,則跑異常
if (executor != null) {
throw new RuntimeException("Volume " + volume + " is already existed.");
}
// 否則進(jìn)行添加
addExecutorForVolume(volume);
}
進(jìn)入addExecutorForVolume方法
private void addExecutorForVolume(final File volume) {
...
// 新建線程池,最大線程執(zhí)行數(shù)為
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
// This can reduce the number of running threads
executor.allowCoreThreadTimeOut(true);
// 加入到executors中,以為volume作為key
executors.put(volume, executor);
}
還有一個(gè)需要注意的是提交執(zhí)行方法submitLazyPersistTask.
void submitLazyPersistTask(String bpId, long blockId,
long genStamp, long creationTime,
File metaFile, File blockFile,
FsVolumeReference target) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: "
+ bpId + " block id: " + blockId);
}
// 獲取需要持久化到目標(biāo)磁盤實(shí)例
FsVolumeImpl volume = (FsVolumeImpl)target.getVolume();
File lazyPersistDir = volume.getLazyPersistDir(bpId);
if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) {
FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir);
throw new IOException("LazyWriter fail to find or create lazy persist dir: "
+ lazyPersistDir.toString());
}
// 新建此服務(wù)Task
ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask(
bpId, blockId, genStamp, creationTime, blockFile, metaFile,
target, lazyPersistDir);
// 提交到對(duì)應(yīng)volume的線程池中執(zhí)行
execute(volume.getCurrentDir(), lazyPersistTask);
}
如果在上述執(zhí)行的過程中發(fā)生失敗,會(huì)調(diào)用失敗處理的方法,并會(huì)重新將此副本塊插入到replicateNotPersisted隊(duì)列等待下一次的持久化.
public void onFailLazyPersist(String bpId, long blockId) {
RamDiskReplica block = null;
block = ramDiskReplicaTracker.getReplica(bpId, blockId);
if (block != null) {
LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
// 重新插入隊(duì)列操作
ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
}
}
RamDiskAsyncLazyPersistService總的結(jié)構(gòu)圖圖6所示氧映。
配置
安裝RAM磁盤
使用Unix mount命令安裝RAM磁盤分區(qū)春畔。 例如。 在/ mnt / dn-tmpfs /下安裝32 GB tmpfs分區(qū)
sudo mount -t tmpfs -o size = 32g tmpfs / mnt / dn-tmpfs /
建議您在/ etc / fstab中創(chuàng)建一個(gè)條目岛都,以便在節(jié)點(diǎn)重新啟動(dòng)時(shí)自動(dòng)重新創(chuàng)建RAM磁盤律姨。 另一個(gè)選項(xiàng)是使用/ dev / shm下的子目錄,它是默認(rèn)情況下在大多數(shù)Linux發(fā)行版上可用的tmpfs安裝臼疫。 確保安裝的大小大于或等于您的dfs.datanode.max.locked.memory設(shè)置择份,否則在/ etc / fstab中重寫它。 不建議對(duì)每個(gè)數(shù)據(jù)節(jié)點(diǎn)使用多個(gè)tmpfs分區(qū)進(jìn)行Lazy Persist寫入烫堤。
使用RAM_DISK存儲(chǔ)類型標(biāo)記tmpfs卷
通過hdfs-site.xml中的dfs.datanode.data.dir配置設(shè)置將tmpfs目錄標(biāo)記為RAM_DISK存儲(chǔ)類型荣赶。 例如。 在具有三個(gè)硬盤卷/ grid / 0鸽斟,/ grid / 1和/ grid / 2和tmpfs mount / mnt / dn-tmpfs的數(shù)據(jù)節(jié)點(diǎn)上拔创,必須按如下所示設(shè)置dfs.datanode.data.dir:
<property>
<name>dfs.datanode.data.dir</name>
<value>/grid/0,/grid/1,/grid/2,[RAM_DISK]/mnt/dn-tmpfs</value>
</property>
這一步是至關(guān)重要的。 如果沒有RAM_DISK標(biāo)記富蓄,HDFS會(huì)將tmpfs卷視為非易失性存儲(chǔ)剩燥,并且數(shù)據(jù)不會(huì)保存到永久存儲(chǔ)。 您將在節(jié)點(diǎn)重新啟動(dòng)時(shí)丟失數(shù)據(jù)立倍。
啟用存儲(chǔ)策略
使用上面提到的LAZY_PERSIST,而不是使用默認(rèn)的StoragePolicy.DEFAULT,默認(rèn)策略的存儲(chǔ)介質(zhì)是DISK類型的.設(shè)置存儲(chǔ)策略的方法目前有2種:
- 第一種,通過命令行的方式,調(diào)用如下命令
hdfs storagepolicies -setStoragePolicy -path <path> -policy LAZY_PERSIST
- 第二種,通過調(diào)用對(duì)應(yīng)的程序方法,比如調(diào)用暴露到外部的create文件方法,但是得帶上參數(shù)CreateFlag.LAZY_PERSIST.例子如下:
FSDataOutputStream fos =
fs.create(
path,
FsPermission.getFileDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.LAZY_PERSIST),
bufferLength,
replicationFactor,
blockSize,
null);
總結(jié)
Lazy Persist策略可以損失一些持久化的保證灭红,以減少延遲侣滩,達(dá)到懶持久的目的。