由于要實(shí)時(shí)讀取redis的AOF文件喧锦,但是flume的taildir source在監(jiān)控文件的時(shí)候,如果文件的inode變化了置吓,那么會(huì)出現(xiàn)重復(fù)讀取數(shù)據(jù)的情況匠题,這里可以通過修改flume taildir源碼解決,只針對(duì)讀一個(gè)文件的情況眶诈。
- 去flume官網(wǎng)下載flume源碼下載
- 解壓后在idea中打開如下
配置好maven涨醋,到flume-ng-source中找到ReliableTaildirEventReader
- 找到updateTailFiles方法
/**
* Update tailFiles mapping if a new file is created or appends are detected
* to the existing file.
*/
public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
pass
...
...
for (TaildirMatcher taildir : taildirCache) {
long inode = getInode(f);
TailFile tf = tailFiles.get(inode);
//判斷是否是新文件,inode或者文件名不同就認(rèn)為是新文件
if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
long startPos = skipToEnd ? f.length() : 0;
System.out.println(tf);
if (tf != null) {
inode = tf.getInode();
}
//找到這行逝撬,startPos是讀取文件的位置浴骂,當(dāng)有新文件時(shí)會(huì)從0開始讀
//tf = openFile(f, headers, inode, startPos);
//改成??,f.length()是此時(shí)讀到的位置
tf = openFile(f, headers, inode, f.length());
} else {
繼續(xù)找到TaildirSource類
private String toPosInfoJson() {
@SuppressWarnings("rawtypes")
List<Map> posInfos = Lists.newArrayList();
for (Long inode : existingInodes) {
TailFile tf = reader.getTailFiles().get(inode);
//這里會(huì)將新的inode寫到位置文件中
//posInfos.add(ImmutableMap.of("inode", inode, "pos", tf.getPos(), "file", tf.getPath()));
//改成??球拦,inode=0
posInfos.add(ImmutableMap.of("inode", 0, "pos", tf.getPos(), "file", tf.getPath()));
}
return new Gson().toJson(posInfos);
}
右側(cè)找到package打包靠闭,或者進(jìn)入到項(xiàng)目目錄用mvn package打包帐我,復(fù)制target中的flume-taildir-source-1.8.0.jar到flume中的lib下即可,這樣即使文件的inode變化愧膀,也可以繼續(xù)讀
另外拦键,我是把taildir這個(gè)包又復(fù)制了一份,這樣在flume的配置中a1.sources.r1.type = org.apache.flume.source.taildir2.TaildirSource
直接指定修改后的類名檩淋,這樣不會(huì)影響原來的TAILDIR