背景
多線程寫入文件咱圆,要考慮線程同步問(wèn)題睛廊,實(shí)現(xiàn)數(shù)據(jù)完整落盤磁盤備份藐不。
操作系統(tǒng):
win10:沒(méi)問(wèn)題
centos7:有問(wèn)題
public static void writeFileLock(String content, String filePath) {
File file = new File(filePath);
RandomAccessFile raf = null;
FileChannel fileChannel = null;
FileLock fileLock = null;
try {
raf = new RandomAccessFile(file, "rw");
fileChannel = raf.getChannel();
while (true) {
try {
fileLock = fileChannel.tryLock();
if (fileLock != null) {
break;
}
} catch (Exception e) {
Thread.sleep(0);
}
}
raf.seek(raf.length());
raf.write(content.getBytes());
fileLock.release();
fileChannel.close();
raf.close();
} catch (Exception e) {
log.error("寫文件異常", e);
log.error("寫入文件路徑:{}莉兰, 文件內(nèi)容:{}", filePath, content);
}
}
RandomAccessFile建立文件連接符芽狗,raf獲取文件管道绢掰,文件管道獲取文件鎖,tryLock方法有兩個(gè)特點(diǎn):第一童擎、非阻塞滴劲,調(diào)用后立刻返回;第二顾复、沒(méi)拿到鎖可能返回null班挖,也可以能拋出異常,所以if判斷循環(huán)獲取芯砸,異常塊捕獲異常再重新嘗試獲取鎖萧芙,注意Thread.sleep(0)的作用并不是睡0秒给梅,而是馬上加入到可執(zhí)行隊(duì)列,等待cpu的時(shí)間分片双揪。
這段代碼承載線上的kafka多線程備份消息的任務(wù)动羽,用lock協(xié)調(diào)多線程的寫入同步,埋點(diǎn)監(jiān)控發(fā)現(xiàn)渔期,備份數(shù)據(jù)偶發(fā)遺漏运吓,大概2.3億數(shù)據(jù),會(huì)有5條偏差疯趟,就是漏了。
下面記錄壓測(cè)思路及過(guò)程信峻。
準(zhǔn)備
壓測(cè)代碼:
private static final ExecutorService FILE_THREADS = Executors.newFixedThreadPool(100);
public void execute(String... strings) throws Exception {
int cnt = 100 * 100 * 100;
int idx = 1;
long begin = 1574305200000L;
while (idx <= cnt) {
Map<String, Object> map = new HashMap<>();
map.put("id", idx);
map.put("time", begin);
String timeDirectory = DateUtil.getBeforeOneHour("yyyyMMddHHmm", 8, begin);
String mm = DateUtil.getBeforeOneHour("mm", 0, begin).concat(".txt");
String json = JsonUtil.getJosnString(map).concat(System.getProperty("line.separator"));
FILE_THREADS.execute(new PersistThread(timeDirectory, mm , json));
if (idx % 10000 == 0) {
begin += 60000L;
}
idx++;
}
}
private class PersistThread extends Thread {
String time;
String filename;
String content;
PersistThread(String time, String filename, String content) {
this.time = time;
this.filename = filename;
this.content = content;
}
@Override
public void run() {
String folder = "/data/job_project/txt/" + time + "/";
FileUtil.createDirectory(folder);
FileUtil.writeFileIO(content, folder + filename);
}
}
創(chuàng)建100個(gè)線程的線程池倦青,提交寫入文件Thread任務(wù)磷账,實(shí)現(xiàn)多線程寫入文件,且文件目錄吼鱼、文件是動(dòng)態(tài)創(chuàng)建的(模擬線上)蓬豁,id每自增1萬(wàn)創(chuàng)建一個(gè)時(shí)間戳目錄,格式是:yyyyMMddHHmm,在目錄下創(chuàng)建一個(gè)文件斗忌,寫入1萬(wàn)行數(shù)據(jù),相當(dāng)于100個(gè)線程饭入,動(dòng)態(tài)寫入100個(gè)目錄下的100個(gè)文件中,每個(gè)文件寫入1萬(wàn)行帕翻。
首先懷疑創(chuàng)建目錄和文件:
代碼如下:
public static File createDirectory(String path) {
File file = new File(path);
if (!file.exists() && !file.isDirectory()) {
file.mkdirs();
}
return file;
}
public static File createFile(String file) {
File f = null;
try {
f = new File(file);
if (!f.exists()) {
f.createNewFile();
}
} catch (Exception e) {
e.printStackTrace();
}
return f;
}
創(chuàng)建目錄和文件鸠补,邏輯都是先檢查再創(chuàng)建,顯然不是原子的嘀掸,所以懷疑有沒(méi)有可能是多線程環(huán)境中紫岩,目錄重復(fù)創(chuàng)建導(dǎo)致,所以把代碼優(yōu)化成兩次判斷的同步方式睬塌,如下:
public static File createDirectory(String path) {
File file = new File(path);
if (!file.exists() && !file.isDirectory()) {
synchronized (FileUtil.class) {
if (!file.exists() && !file.isDirectory()) {
file.mkdirs();
}
}
}
return file;
}
public static File createFile(String file) {
File f = null;
try {
f = new File(file);
if (!f.exists()) {
synchronized (FileUtil.class) {
if (!f.exists()) {
f.createNewFile();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
return f;
}
壓入100w數(shù)據(jù)泉蝌,觀察結(jié)果,大失所望:
/data/job_project/txt/201911211100/00.txt lines: 9989
/data/job_project/txt/201911211101/01.txt lines: 9996
/data/job_project/txt/201911211102/02.txt lines: 9984
/data/job_project/txt/201911211103/03.txt lines: 9984
/data/job_project/txt/201911211104/04.txt lines: 9982
事實(shí)是絕大部分文件都漏了揩晴,下面把所有的目錄和文件全部規(guī)劃好勋陪,再試。
規(guī)劃目錄腳本:
#!/bin/sh
txt=/data/job_project/txt/*
for folder in $txt;do
filename=${folder##*/}
if [[ $filename = "f.sh" ]] || [[ $filename = "search.sh" ]];then
echo "$filename is a shell file"
else
filename=${filename:10}
filepath=${folder}/${filename}.txt
#rm -f $filepath
#touch $filepath
lines=$(wc -l ${filepath} | awk '{print $1}')
if [ $lines -ne 10000 ];then
echo "$filepath lines: $lines"
fi
fi
done
結(jié)果仍然會(huì)漏數(shù)據(jù)硫兰。
為了徹底屏蔽創(chuàng)建目錄和文件帶來(lái)的影響粥鞋,下面的壓測(cè)前都創(chuàng)建好了文件和目錄。
使用RandomAccessFile的rws方式同步寫入文件瞄崇。
測(cè)試結(jié)果:
/data/job_project/txt/201911211101/01.txt lines: 9998
/data/job_project/txt/201911211106/06.txt lines: 9999
/data/job_project/txt/201911211107/07.txt lines: 9999
/data/job_project/txt/201911211109/09.txt lines: 9999
/data/job_project/txt/201911211112/12.txt lines: 9999
/data/job_project/txt/201911211116/16.txt lines: 9998
/data/job_project/txt/201911211119/19.txt lines: 9999
/data/job_project/txt/201911211120/20.txt lines: 9998
...
壓測(cè)過(guò)程十分緩慢呻粹,寫入性能非常差壕曼,但是結(jié)果震驚,仍然漏了等浊,仔細(xì)看了官網(wǎng)api注解:
* <p>The <tt>"rwd"</tt> mode can be used to reduce the number of I/O
* operations performed. Using <tt>"rwd"</tt> only requires updates to the
* file's content to be written to storage; using <tt>"rws"</tt> requires
* updates to both the file's content and its metadata to be written, which
* generally requires at least one more low-level I/O operation.
*
* <p>If there is a security manager, its {@code checkRead} method is
* called with the pathname of the {@code file} argument as its
* argument to see if read access to the file is allowed. If the mode
* allows writing, the security manager's {@code checkWrite} method is
* also called with the path argument to see if write access to the file is
* allowed.
rwd模式同步文件內(nèi)容腮郊,rws模式同步文件內(nèi)容和文件元數(shù)據(jù),壓測(cè)首選當(dāng)然選擇更嚴(yán)格的rws筹燕,結(jié)果仍然遺漏轧飞,此時(shí)已經(jīng)開(kāi)始懷疑jdk源碼了。
調(diào)整close順序撒踪,校驗(yàn)lock
第一處改動(dòng):
if (fileLock != null) {
break;
}
多加一層校驗(yàn)过咬,改成
if (fileLock != null && fileLock.isValid()) {
break;
}
第二處改動(dòng):
fileLock.release();
fileChannel.close();
raf.close();
調(diào)整close順尋,改成:
fileLock.release();
raf.close();
fileChannel.close();
測(cè)試結(jié)果:
/data/job_project/txt/201911211100/00.txt lines: 9989
/data/job_project/txt/201911211101/01.txt lines: 9996
/data/job_project/txt/201911211102/02.txt lines: 9984
/data/job_project/txt/201911211103/03.txt lines: 9984
/data/job_project/txt/201911211104/04.txt lines: 9982
...
結(jié)果顯示制妄,反而漏了更多數(shù)據(jù)掸绞,此時(shí)已經(jīng)自閉了,但是還要接著擼耕捞。
使用channel寫入緩沖區(qū)
public static void writeFileLock(String content, String filePath, String time) {
File file = createFile(filePath);
RandomAccessFile raf = null;
FileChannel fileChannel = null;
FileLock fileLock = null;
try {
raf = new RandomAccessFile(file, "rw");
fileChannel = raf.getChannel();
while (true) {
try {
fileLock = fileChannel.tryLock();
if (fileLock != null && fileLock.isValid()) {
break;
}
} catch (Exception e) {
Thread.sleep(0);
}
}
fileChannel.write(ByteBuffer.wrap(content.getBytes()), fileChannel.size());
fileLock.release();
raf.close();
fileChannel.close();
} catch (Exception e) {
log.error("寫文件異常", e);
log.error("寫入文件路徑:{}衔掸, 文件內(nèi)容:{}", filePath, content);
}
}
改變寫入方式,用nio的管道channel寫入數(shù)據(jù)俺抽,結(jié)果仍然失望敞映。
日志埋點(diǎn)——使用redis計(jì)數(shù)器
埋點(diǎn)代碼:
public static void writeFileLock(String content, String filePath, String time) {
File file = createFile(filePath);
RandomAccessFile raf = null;
FileChannel fileChannel = null;
FileLock fileLock = null;
try {
redisHelper.incr("filelock0:".concat(time));
raf = new RandomAccessFile(file, "rw");
fileChannel = raf.getChannel();
while (true) {
try {
fileLock = fileChannel.tryLock();
if (fileLock != null && fileLock.isValid()) {
break;
}
} catch (Exception e) {
Thread.sleep(0);
}
}
redisHelper.incr("filelock1:".concat(time));
raf.seek(raf.length());
redisHelper.incr("filelock2:".concat(time));
raf.write(content.getBytes());
redisHelper.incr("filelock3:".concat(time));
fileLock.release();
redisHelper.incr("filelock4:".concat(time));
raf.close();
redisHelper.incr("filelock5:".concat(time));
fileChannel.close();
redisHelper.incr("filelock6:".concat(time));
} catch (Exception e) {
log.error("寫文件異常", e);
log.error("寫入文件路徑:{}, 文件內(nèi)容:{}", filePath, content);
}
}
此時(shí)對(duì)這段代碼徹底失望磷斧,得找到數(shù)據(jù)在哪個(gè)位置漏掉的振愿,所以使用了redis計(jì)數(shù)器,incr是線程安全得弛饭,所以能夠很快發(fā)現(xiàn)到底哪里出問(wèn)題了埃疫,問(wèn)題馬上浮出水面,心中竊喜孩哑。
再說(shuō)明一下:redis的key包含目錄名稱,即一個(gè)目錄一個(gè)文件一個(gè)key翠桦,埋點(diǎn)的密集顯示出來(lái)必勝的信心横蜒。
結(jié)果是所有key的value都是完美的10000,毫無(wú)破綻销凑,心如死灰丛晌,于是有同事提議,搞個(gè)反查斗幼,看看RangdomAccessFile的指針到底有沒(méi)有更新澎蛛。
判斷RandomAccessFile的文件指針,是不是有沒(méi)更新指針的情況
long filelength = raf.length();
raf.seek(filelength);
raf.write(content.getBytes());
if(filelength == raf.length()){
log.error ( "errorrrrrrrrrrrrr: "+ content);
}
如果write方法沒(méi)有寫入文件蜕窿,那么文件指針必然沒(méi)有更新谋逻,調(diào)用write后再反查文件指針是否更新呆馁,就能判斷write是否有寫入。結(jié)果仍然失望毁兆,預(yù)期的日志沒(méi)有打印浙滤,說(shuō)明write確實(shí)更新了文件指針,但是就是漏掉了幾行數(shù)據(jù)气堕,結(jié)合上述redis計(jì)數(shù)器埋點(diǎn)和文件指針判斷纺腊,壓測(cè)已經(jīng)走進(jìn)了死胡同,所有的情況都試過(guò)了茎芭,至少可以說(shuō)兩點(diǎn):第一揖膜、文件鎖沒(méi)有問(wèn)題,鎖的線程沒(méi)有逃逸出while循環(huán)梅桩;第二壹粟、測(cè)試的每一行代碼都執(zhí)行了到位了,沒(méi)有哪一行沒(méi)有執(zhí)行的摘投。百思不得其解煮寡,那就下班,次日再戰(zhàn)犀呼。
java.io包+可重入鎖的方式
昨天的壓測(cè)可以說(shuō)把所有情況都試過(guò)了幸撕,還有試過(guò)lock阻塞方式,fileChannel方式寫入緩沖區(qū)外臂,此處不表坐儿。今天決定換個(gè)思路,拒絕花里胡哨宋光,就用jdk1.0版本的java.io包+ReentrantLock可重入鎖的方式寫貌矿,代碼如下:
public static void writeSyncFile(String content, String filePath) {
try {
fileLock.lock();
File file = createFile(filePath);
FileWriter fw = new FileWriter(file, true);
BufferedWriter bw = new BufferedWriter(fw);
bw.write(content);
bw.flush();
fw.close();
bw.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
fileLock.unlock();
}
}
結(jié)果可想而知,每個(gè)目錄的每個(gè)文件罪佳,都是完美的10000行逛漫,且由于使用了緩沖區(qū),文件寫入效率大幅提升赘艳,具體提升幅度沒(méi)有嚴(yán)格計(jì)算酌毡,使用同步塊的方式+寫入buffer的方式大概2分鐘就能寫完,而使用上述方式可能要1小時(shí)以上蕾管,效率杠杠的枷踏。普通的文件io方式?jīng)]有問(wèn)題,于是同事提議掰曾,用FileOutputStream替代RandomAccessFile看看旭蠕。
替換RandomAccessFile,使用FileOutputStream獲取channel
決定拋棄RandomAccessFile,使用FileOutputStream獲取channel掏熬,代碼如下:
public static void writeFileIO(String content, String path) {
FileLock lock = null;
try {
FileChannel channel = new FileOutputStream(path, true).getChannel();
while (true) {
try {
lock = channel.lock();
if (lock != null && lock.isValid()) {
break;
}
} catch (Exception e) {
Thread.sleep(10);
}
}
channel.write(ByteBuffer.wrap(content.getBytes()));
lock.release();
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
RandomAccessFile是任意讀寫的類佑稠,而FileOutputStream沒(méi)有這個(gè)功能,要想追加寫入文件末尾孽江,在構(gòu)造方法加個(gè)true就行讶坯,同樣能實(shí)現(xiàn)我們想要的功能,第一次壓測(cè)后岗屏,3分鐘就出結(jié)果辆琅,100w數(shù)據(jù)壓入100個(gè)文件,每個(gè)文件10000行这刷,與預(yù)期結(jié)果完全相符婉烟,完美!乘勝追亞暇屋,再壓1000w發(fā)現(xiàn)數(shù)據(jù)有誤似袁,結(jié)果是oom,壓入的數(shù)據(jù)全部寫入線程池的阻塞隊(duì)列中了咐刨,于是調(diào)大內(nèi)存到6g昙衅,還是如此,奈何機(jī)器資源有限定鸟,改壓400w而涉,結(jié)果數(shù)據(jù)與預(yù)期完全符合,此時(shí)水落石出联予,沒(méi)有想到坑在RandomAccessFile這里啼县,回過(guò)頭來(lái)看這個(gè)類,雖然這個(gè)類的注釋已經(jīng)被看爛了沸久,比較詭異的是jdk1.0就出的季眷,但是作者未知,可能怕被人噴卷胯,嘿嘿嘿子刮。
總結(jié)
1、代碼不是復(fù)制粘特窑睁,光搜索谷歌百度挺峡,往往很多噪音。
2卵慰、高并發(fā)場(chǎng)景要多次嚴(yán)格壓測(cè),保證數(shù)據(jù)質(zhì)量佛呻。
3裳朋、千萬(wàn)區(qū)分windows系統(tǒng)和linux系統(tǒng),二者的文件系統(tǒng)完全不同,上述代碼在windows完全沒(méi)問(wèn)題鲤嫡,但是linux就是狀況百出送挑。
4、懷疑精神暖眼,代碼都是人寫的惕耕,就會(huì)有bug,測(cè)試用例覆蓋所有場(chǎng)景诫肠,測(cè)試各種可能性司澎。