OkDownload是一款多線程斷點(diǎn)續(xù)傳下載引擎摹察,它的功能完整恩掷,性能高,可配置性高,可以注入自定義組件來修改下載策略、替換網(wǎng)絡(luò)請求框架等等账嚎,而且在項(xiàng)目中已有成熟應(yīng)用(英語流利說),是一個很不錯的開源下載框架逼争。
項(xiàng)目地址:https://github.com/lingochamp/okdownload
OkDownload的簡單使用
OkDownload的使用非常簡單:
1.引入該開源庫:
implementation 'com.liulishuo.okdownload:okhttp:1.0.5' (提供okhttp連接,ps:如果使用的話劝赔,需要引入okhttp網(wǎng)絡(luò)請求庫)
implementation 'com.liulishuo.okdownload:okdownload:1.0.5' (下載核心庫)
implementation 'com.liulishuo.okdownload:sqlite:1.0.5' (存儲斷點(diǎn)信息的數(shù)據(jù)庫)
2.開始一個任務(wù):
task = new DownloadTask.Builder(url, parentFile)
.setFilename(filename)
// 下載進(jìn)度回調(diào)的間隔時(shí)間(毫秒)
.setMinIntervalMillisCallbackProcess(30)
// 任務(wù)過去已完成是否要重新下載
.setPassIfAlreadyCompleted(false)
.build();
//異步執(zhí)行任務(wù)
task.enqueue(listener);
// 取消任務(wù)
task.cancel();
// 同步執(zhí)行任務(wù)
task.execute(listener);
當(dāng)然也可以同時(shí)異步執(zhí)行多個任務(wù)
DownloadTask.enqueue(tasks, listener);
3.任務(wù)隊(duì)列的構(gòu)建誓焦、開始和停止
DownloadContext.Builder builder = new DownloadContext.QueueSet()
.setParentPathFile(parentFile)
.setMinIntervalMillisCallbackProcess(150)
.commit();
builder.bind(url1);
builder.bind(url2).addTag(key, value);
builder.bind(url3).setTag(tag);
builder.setListener(contextListener);
DownloadTask task = new DownloadTask.Builder(url4, parentFile)
.setPriority(10).build();
builder.bindSetTask(task);
DownloadContext context = builder.build();
context.startOnParallel(listener);
// stop
context.stop();
4.獲取任務(wù)狀態(tài)
Status status = StatusUtil.getStatus(task)
status = StatusUtil.getStatus(url, parentPath, null);
status = StatusUtil.getStatus(url, parentPath, filename);
boolean isCompleted = StatusUtil.isCompleted(task);
isCompleted = StatusUtil.isCompleted(url, parentPath, null);
isCompleted = StatusUtil.isCompleted(url, parentPath, filename);
Status completedOrUnknown = StatusUtil.isCompletedOrUnknown(task);
5.獲取斷點(diǎn)信息
// 注意:任務(wù)完成后,斷點(diǎn)信息將會被刪除
BreakpointInfo info = OkDownload.with().breakpointStore().get(id);
info = StatusUtil.getCurrentInfo(url, parentPath, null);
info = StatusUtil.getCurrentInfo(url, parentPath, filename);
// 斷點(diǎn)信息將被緩存在任務(wù)對象中着帽,即使任務(wù)已經(jīng)完成了
info = task.getInfo();
6.設(shè)置任務(wù)監(jiān)聽
可以為任務(wù)設(shè)置五種不同類型的監(jiān)聽器杂伟,同時(shí),也可以給任務(wù)和監(jiān)聽器建立1對1仍翰、1對多赫粥、多對1、多對多的關(guān)聯(lián)歉备。
給一個任務(wù)設(shè)置多種監(jiān)聽:
DownloadListener listener1 = new DownloadListener1();
DownloadListener listener2 = new DownloadListener2();
DownloadListener combinedListener = new DownloadListenerBunch.Builder()
.append(listener1)
.append(listener2)
.build();
DownloadTask task = new DownloadTask.build(url, file).build();
task.enqueue(combinedListener);
為多個任務(wù)動態(tài)設(shè)置監(jiān)聽:
UnifiedListenerManager manager = new UnifiedListenerManager();
DownloadListener listener1 = new DownloadListener1();
DownloadListener listener2 = new DownloadListener2();
DownloadListener listener3 = new DownloadListener3();
DownloadListener listener4 = new DownloadListener4();
DownloadTask task = new DownloadTask.build(url, file).build();
manager.attachListener(task, listener1);
manager.attachListener(task, listener2);
manager.detachListener(task, listener2);
// 當(dāng)一個任務(wù)結(jié)束時(shí)傅是,這個任務(wù)的所有監(jiān)聽器都被移除
manager.addAutoRemoveListenersWhenTaskEnd(task.getId());
// enqueue task to start.
manager.enqueueTaskWithUnifiedListener(task, listener3);
manager.attachListener(task, listener4);
下面我們來分析一下這個下載框架的源碼:
OkDownload
首先看一下OkDownload這個類匪燕,這個類定義了所有的下載策略蕾羊,我們可以自定義一些下載策略喧笔,可以通過OkDownload的Builder構(gòu)造自定義的一個OkDownload實(shí)例,再通過OkDownload.setSingletonInstance進(jìn)行設(shè)置:
OkDownload.Builder builder = new OkDownload.Builder(context)
.downloadStore(downloadStore) //斷點(diǎn)信息存儲的位置龟再,默認(rèn)是SQLite數(shù)據(jù)庫
.callbackDispatcher(callbackDispatcher) //監(jiān)聽回調(diào)分發(fā)器书闸,默認(rèn)在主線程回調(diào)
.downloadDispatcher(downloadDispatcher) //下載管理機(jī)制,最大下載任務(wù)數(shù)利凑、同步異步執(zhí)行下載任務(wù)的處理
.connectionFactory(connectionFactory) //選擇網(wǎng)絡(luò)請求框架浆劲,默認(rèn)是OkHttp
.outputStreamFactory(outputStreamFactory) //構(gòu)建文件輸出流DownloadOutputStream,是否支持隨機(jī)位置寫入
.downloadStrategy(downloadStrategy) //下載策略哀澈,文件分為幾個線程下載
.processFileStrategy(processFileStrategy) //多文件寫文件的方式牌借,默認(rèn)是根據(jù)每個線程寫文件的不同位置,支持同時(shí)寫入割按。
.monitor(monitor); //下載狀態(tài)監(jiān)聽
OkDownload.setSingletonInstance(builder.build());
DownloadTask
DownloadTask下載任務(wù)類膨报,可通過它的Builder來構(gòu)造一個下載任務(wù),我們看它是如何執(zhí)行的:
public void execute(DownloadListener listener) {
this.listener = listener;
OkDownload.with().downloadDispatcher().execute(this);
}
public void enqueue(DownloadListener listener) {
this.listener = listener;
OkDownload.with().downloadDispatcher().enqueue(this);
}
可以看到都是通過downloadDispatcher來執(zhí)行下載任務(wù)的适荣,默認(rèn)的downloadDispatcher是一個DownloadDispatcher實(shí)例现柠,我們以同步執(zhí)行一個下載任務(wù)為例,看它是如何下載的:
public void execute(DownloadTask task) {
Util.d(TAG, "execute: " + task);
final DownloadCall call;
synchronized (this) {
if (inspectCompleted(task)) return;
if (inspectForConflict(task)) return;
call = DownloadCall.create(task, false, store);
runningSyncCalls.add(call);
}
syncRunCall(call);
}
void syncRunCall(DownloadCall call) {
call.run();
}
在execute方法里將一個DownloadTask實(shí)例又封裝為了一個DownloadCall對象弛矛,然后在syncRunCall方法里執(zhí)行了DownloadCall對象的run方法够吩。通過看DownloadCall源碼可以知道該類繼承自NamedRunnable,而NamedRunnable實(shí)現(xiàn)了Runnable丈氓,在run方法里調(diào)用了execute方法周循。(enqueue執(zhí)行任務(wù)最終則是調(diào)用 getExecutorService().execute(call);來異步執(zhí)行的)
那我們看一下DownloadCall這個類。
DownloadCall
先看一下DownloadCall是如何實(shí)現(xiàn)execute方法的万俗,該方法比較長鱼鼓,首先執(zhí)行的是inspectTaskStart:
先看一下這個store是什么:
通過看OkDownload這個類的源碼可以知道,DownloadCall的store是調(diào)用BreakpointStoreOnSQLite的createRemitSelf方法生成的一個實(shí)例:
可以看到是RemitStoreOnSQLite的一個實(shí)例该编,其主要用來保存任務(wù)及斷點(diǎn)信息至本地?cái)?shù)據(jù)庫迄本。RemitStoreOnSQLite里持有BreakpointStoreOnSQLite對象,BreakpointStoreOnSQLite里面包含了BreakpointSQLiteHelper(用于操作數(shù)據(jù))和BreakpointStoreOnCache(用于做數(shù)據(jù)操作之前的數(shù)據(jù)緩存)课竣。
@Override public void syncCacheToDB(int id) throws IOException {
sqLiteHelper.removeInfo(id);
final BreakpointInfo info = sqliteCache.get(id);
if (info == null || info.getFilename() == null || info.getTotalOffset() <= 0) return;
sqLiteHelper.insert(info);
}
最終會調(diào)用上述syncCacheToDB方法嘉赎,先刪除數(shù)據(jù)庫中的任務(wù)信息,若緩存(創(chuàng)建BreakpointStoreOnCache對象時(shí)于樟,會調(diào)用loadToCache方法將數(shù)據(jù)庫中所有任務(wù)信息進(jìn)行緩存)
this.onCache = new BreakpointStoreOnCache(helper.loadToCache(),
helper.loadResponseFilenameToMap());
中有該任務(wù)公条,則檢查任務(wù)信息是否合法,若合法則再次將該任務(wù)及斷點(diǎn)信息保存在本地?cái)?shù)據(jù)庫中迂曲。
inspectTaskStart方法結(jié)束后靶橱,會進(jìn)入一個do-while循環(huán),首先做一些下載前的準(zhǔn)備工作:
do {
//1.判斷當(dāng)前任務(wù)的下載鏈接長度是否大于0,否則就拋出異常关霸;
if (task.getUrl().length() <= 0) {
this.cache = new DownloadCache.PreError(
new IOException("unexpected url: " + task.getUrl()));
break;
}
if (canceled) break;
//2.從緩存中獲取任務(wù)的斷點(diǎn)信息传黄,若沒有斷點(diǎn)信息,則創(chuàng)建斷點(diǎn)信息并保存至數(shù)據(jù)庫队寇;
@NonNull final BreakpointInfo info;
try {
BreakpointInfo infoOnStore = store.get(task.getId());
if (infoOnStore == null) {
info = store.createAndInsert(task);
} else {
info = infoOnStore;
}
setInfoToTask(info);
} catch (IOException e) {
this.cache = new DownloadCache.PreError(e);
break;
}
if (canceled) break;
// 3.創(chuàng)建帶緩存的下載輸出流膘掰;
@NonNull final DownloadCache cache = createCache(info);
this.cache = cache;
// 4.訪問下載鏈接判斷斷點(diǎn)信息是否合理;
final BreakpointRemoteCheck remoteCheck = createRemoteCheck(info);
try {
remoteCheck.check();
} catch (IOException e) {
cache.catchException(e);
break;
}
//5.確定文件路徑后等待文件鎖釋放佳遣;
fileStrategy.getFileLock().waitForRelease(task.getFile().getAbsolutePath());
// 6. 判斷緩存中是否有相同的任務(wù)识埋,若有則復(fù)用緩存中的任務(wù)的分塊信息;
OkDownload.with().downloadStrategy()
.inspectAnotherSameInfo(task, info, remoteCheck.getInstanceLength());
try {
//7.檢查斷點(diǎn)信息是否是可恢復(fù)的零渐,若不可恢復(fù)窒舟,則根據(jù)文件大小進(jìn)行分塊,重新下載诵盼,否則繼續(xù)進(jìn)行下一步辜纲;
if (remoteCheck.isResumable()) {
// 8.判斷斷點(diǎn)信息是否是臟數(shù)據(jù)(文件存在且斷點(diǎn)信息正確且下載鏈接支持?jǐn)帱c(diǎn)續(xù)傳);
final BreakpointLocalCheck localCheck = createLocalCheck(info,
remoteCheck.getInstanceLength());
localCheck.check();
// 9.若是臟數(shù)據(jù)則根據(jù)文件大小進(jìn)行分塊拦耐,重新開始下載耕腾,否則從斷點(diǎn)位置開始下載;
if (localCheck.isDirty()) {
Util.d(TAG, "breakpoint invalid: download from beginning because of "
+ "local check is dirty " + task.getId() + " " + localCheck);
fileStrategy.discardProcess(task);
assembleBlockAndCallbackFromBeginning(info, remoteCheck,
localCheck.getCauseOrThrow());
} else {
okDownload.callbackDispatcher().dispatch()
.downloadFromBreakpoint(task, info);
}
} else {
Util.d(TAG, "breakpoint invalid: download from beginning because of "
+ "remote check not resumable " + task.getId() + " " + remoteCheck);
fileStrategy.discardProcess(task);
assembleBlockAndCallbackFromBeginning(info, remoteCheck,
remoteCheck.getCauseOrThrow());
}
} catch (IOException e) {
cache.setUnknownError(e);
break;
}
// 10. 開始下載
start(cache, info);
if (canceled) break;
// 11. 錯誤重試機(jī)制
if (cache.isPreconditionFailed()
&& retryCount++ < MAX_COUNT_RETRY_FOR_PRECONDITION_FAILED) {
store.remove(task.getId());
retry = true;
} else {
retry = false;
}
} while (retry);
1.判斷當(dāng)前任務(wù)的下載鏈接長度是否大于0杀糯,否則就拋出異常扫俺;2.從緩存中獲取任務(wù)的斷點(diǎn)信息,若沒有斷點(diǎn)信息固翰,則創(chuàng)建斷點(diǎn)信息并保存至數(shù)據(jù)庫狼纬;3.創(chuàng)建帶緩存的下載輸出流;4.訪問下載鏈接判斷斷點(diǎn)信息是否合理骂际;5.確定文件路徑后等待文件鎖釋放疗琉; 6. 判斷緩存中是否有相同的任務(wù),若有則復(fù)用緩存中的任務(wù)的分塊信息歉铝;7.檢查斷點(diǎn)信息是否是可恢復(fù)的盈简,若不可恢復(fù),則根據(jù)文件大小進(jìn)行分塊太示,重新下載柠贤,否則繼續(xù)進(jìn)行下一步;8.判斷斷點(diǎn)信息是否是臟數(shù)據(jù)(文件存在且斷點(diǎn)信息正確且下載鏈接支持?jǐn)帱c(diǎn)續(xù)傳)类缤;9.若是臟數(shù)據(jù)則根據(jù)文件大小進(jìn)行分塊臼勉,重新開始下載,否則從斷點(diǎn)位置開始下載餐弱;10.開始下載宴霸。
文件分成多少塊進(jìn)行下載由DownloadStrategy決定的:
// 1 connection: [0, 1MB)
private static final long ONE_CONNECTION_UPPER_LIMIT = 1024 * 1024; // 1MiB
// 2 connection: [1MB, 5MB)
private static final long TWO_CONNECTION_UPPER_LIMIT = 5 * 1024 * 1024; // 5MiB
// 3 connection: [5MB, 50MB)
private static final long THREE_CONNECTION_UPPER_LIMIT = 50 * 1024 * 1024; // 50MiB
// 4 connection: [50MB, 100MB)
private static final long FOUR_CONNECTION_UPPER_LIMIT = 100 * 1024 * 1024; // 100MiB
public ResumeAvailableResponseCheck resumeAvailableResponseCheck(
DownloadConnection.Connected connected,
int blockIndex,
BreakpointInfo info) {
return new ResumeAvailableResponseCheck(connected, blockIndex, info);
}
public int determineBlockCount(@NonNull DownloadTask task, long totalLength) {
if (task.getSetConnectionCount() != null) return task.getSetConnectionCount();
if (totalLength < ONE_CONNECTION_UPPER_LIMIT) {
return 1;
}
if (totalLength < TWO_CONNECTION_UPPER_LIMIT) {
return 2;
}
if (totalLength < THREE_CONNECTION_UPPER_LIMIT) {
return 3;
}
if (totalLength < FOUR_CONNECTION_UPPER_LIMIT) {
return 4;
}
return 5;
}
文件大小在0-1MB囱晴、1-5MB、5-50MB瓢谢、50-100MB畸写、100MB以上時(shí)分別開啟1、2恩闻、3、4剧董、5個線程進(jìn)行下載幢尚。
我們重點(diǎn)看一下下載部分的源碼,也就是start(cache翅楼,info)方法:
void start(final DownloadCache cache, BreakpointInfo info) throws InterruptedException {
final int blockCount = info.getBlockCount();
final List<DownloadChain> blockChainList = new ArrayList<>(info.getBlockCount());
for (int i = 0; i < blockCount; i++) {
final BlockInfo blockInfo = info.getBlock(i);
if (Util.isCorrectFull(blockInfo.getCurrentOffset(), blockInfo.getContentLength())) {
continue;
}
Util.resetBlockIfDirty(blockInfo);
blockChainList.add(DownloadChain.createChain(i, task, info, cache, store));
}
if (canceled) {
return;
}
startBlocks(blockChainList);
}
可以看到它是分塊下載的尉剩,每一個分塊都是一個DownloadChain實(shí)例,DownloadChain實(shí)現(xiàn)了Runnable接口毅臊,繼續(xù)看startBlocks方法:
對于每一個分塊任務(wù)理茎,都調(diào)用了submitChain方法,由一個線程池去處理每一個DownloadChain分塊管嬉,核心代碼就在這里:
void start() throws IOException {
final CallbackDispatcher dispatcher = OkDownload.with().callbackDispatcher();
// 處理請求攔截鏈
final RetryInterceptor retryInterceptor = new RetryInterceptor();
final BreakpointInterceptor breakpointInterceptor = new BreakpointInterceptor();
connectInterceptorList.add(retryInterceptor);
connectInterceptorList.add(breakpointInterceptor);
connectInterceptorList.add(new RedirectInterceptor());
connectInterceptorList.add(new HeaderInterceptor());
connectInterceptorList.add(new CallServerInterceptor());
connectIndex = 0;
final DownloadConnection.Connected connected = processConnect();
if (cache.isInterrupt()) {
throw InterruptException.SIGNAL;
}
dispatcher.dispatch().fetchStart(task, blockIndex, getResponseContentLength());
// 獲取數(shù)據(jù)攔截鏈
final FetchDataInterceptor fetchDataInterceptor =
new FetchDataInterceptor(blockIndex, connected.getInputStream(),
getOutputStream(), task);
fetchInterceptorList.add(retryInterceptor);
fetchInterceptorList.add(breakpointInterceptor);
fetchInterceptorList.add(fetchDataInterceptor);
fetchIndex = 0;
final long totalFetchedBytes = processFetch();
dispatcher.dispatch().fetchEnd(task, blockIndex, totalFetchedBytes);
}
可以看到它主要使用責(zé)任鏈模式進(jìn)行了兩個鏈?zhǔn)秸{(diào)用:處理請求攔截鏈和獲取數(shù)據(jù)攔截鏈皂林。
處理請求攔截鏈包含了RetryInterceptor重試攔截器、BreakpointInterceptor斷點(diǎn)攔截器蚯撩、RedirectInterceptor重定向攔截器础倍、HeaderInterceptor頭部信息處理攔截器、CallServerInterceptor請求攔截器胎挎,該鏈?zhǔn)秸{(diào)用過程會逐個調(diào)用攔截器的interceptConnect方法:
public class RetryInterceptor implements Interceptor.Connect, Interceptor.Fetch {
@NonNull @Override
public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
final DownloadCache cache = chain.getCache();
// 如果產(chǎn)生了RetryException沟启,則重新執(zhí)行該鏈?zhǔn)秸{(diào)用
while (true) {
try {
if (cache.isInterrupt()) {
throw InterruptException.SIGNAL;
}
return chain.processConnect();
} catch (IOException e) {
if (e instanceof RetryException) {
chain.resetConnectForRetry();
continue;
}
chain.getCache().catchException(e);
throw e;
}
}
}
......
}
public class BreakpointInterceptor implements Interceptor.Connect, Interceptor.Fetch {
private static final String TAG = "BreakpointInterceptor";
@NonNull @Override
public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
final DownloadConnection.Connected connected = chain.processConnect();
final BreakpointInfo info = chain.getInfo();
if (chain.getCache().isInterrupt()) {
throw InterruptException.SIGNAL;
}
if (info.getBlockCount() == 1 && !info.isChunked()) {
// 當(dāng)只有一個線程進(jìn)行下載文件時(shí),如果斷點(diǎn)信息中保存的文件長度和服務(wù)端返回的文件長度不一致犹菇,則以服務(wù)端返回的為準(zhǔn)重新進(jìn)行下載
final long blockInstanceLength = getExactContentLengthRangeFrom0(connected);
final long infoInstanceLength = info.getTotalLength();
if (blockInstanceLength > 0 && blockInstanceLength != infoInstanceLength) {
Util.d(TAG, "SingleBlock special check: the response instance-length["
+ blockInstanceLength + "] isn't equal to the instance length from trial-"
+ "connection[" + infoInstanceLength + "]");
final BlockInfo blockInfo = info.getBlock(0);
boolean isFromBreakpoint = blockInfo.getRangeLeft() != 0;
final BlockInfo newBlockInfo = new BlockInfo(0, blockInstanceLength);
info.resetBlockInfos();
info.addBlock(newBlockInfo);
if (isFromBreakpoint) {
final String msg = "Discard breakpoint because of on this special case, we have"
+ " to download from beginning";
Util.w(TAG, msg);
throw new RetryException(msg);
}
OkDownload.with().callbackDispatcher().dispatch()
.downloadFromBeginning(chain.getTask(), info, CONTENT_LENGTH_CHANGED);
}
}
// update for connected.
final DownloadStore store = chain.getDownloadStore();
try {
if (!store.update(info)) {
throw new IOException("Update store failed!");
}
} catch (Exception e) {
throw new IOException("Update store failed!", e);
}
return connected;
}
......
}
public class RedirectInterceptor implements Interceptor.Connect {
//最大重定向次數(shù)
static final int MAX_REDIRECT_TIMES = 10;
private static final int HTTP_TEMPORARY_REDIRECT = 307;
private static final int HTTP_PERMANENT_REDIRECT = 308;
@NonNull @Override
public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
int redirectCount = 0;
String url;
DownloadConnection connection;
while (true) {
if (chain.getCache().isInterrupt()) {
throw InterruptException.SIGNAL;
}
final DownloadConnection.Connected connected = chain.processConnect();
final int code = connected.getResponseCode();
if (!isRedirect(code)) {
return connected;
}
//若需要重定向德迹,則根據(jù)返回的新的url重新進(jìn)行網(wǎng)絡(luò)請求
if (++redirectCount >= MAX_REDIRECT_TIMES) {
throw new ProtocolException("Too many redirect requests: " + redirectCount);
}
url = connected.getResponseHeaderField("Location");
if (url == null) {
throw new ProtocolException(
"Response code is " + code + " but can't find Location field");
}
chain.releaseConnection();
connection = OkDownload.with().connectionFactory().create(url);
chain.setConnection(connection);
chain.setRedirectLocation(url);
}
}
private static boolean isRedirect(int code) {
return code == HttpURLConnection.HTTP_MOVED_PERM
|| code == HttpURLConnection.HTTP_MOVED_TEMP
|| code == HttpURLConnection.HTTP_SEE_OTHER
|| code == HttpURLConnection.HTTP_MULT_CHOICE
|| code == HTTP_TEMPORARY_REDIRECT
|| code == HTTP_PERMANENT_REDIRECT;
}
}
public class HeaderInterceptor implements Interceptor.Connect {
private static final String TAG = "HeaderInterceptor";
@NonNull @Override
public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
final BreakpointInfo info = chain.getInfo();
final DownloadConnection connection = chain.getConnectionOrCreate();
final DownloadTask task = chain.getTask();
// 添加User-Agent字段
final Map<String, List<String>> userHeader = task.getHeaderMapFields();
if (userHeader != null) Util.addUserRequestHeaderField(userHeader, connection);
if (userHeader == null || !userHeader.containsKey(USER_AGENT)) {
Util.addDefaultUserAgent(connection);
}
//添加Range字段
final int blockIndex = chain.getBlockIndex();
final BlockInfo blockInfo = info.getBlock(blockIndex);
if (blockInfo == null) {
throw new IOException("No block-info found on " + blockIndex);
}
String range = "bytes=" + blockInfo.getRangeLeft() + "-";
range += blockInfo.getRangeRight();
connection.addHeader(RANGE, range);
Util.d(TAG, "AssembleHeaderRange (" + task.getId() + ") block(" + blockIndex + ") "
+ "downloadFrom(" + blockInfo.getRangeLeft() + ") currentOffset("
+ blockInfo.getCurrentOffset() + ")");
// 如果有Etag信息,則添加If-Match字段
final String etag = info.getEtag();
if (!Util.isEmpty(etag)) {
connection.addHeader(IF_MATCH, etag);
}
if (chain.getCache().isInterrupt()) {
throw InterruptException.SIGNAL;
}
OkDownload.with().callbackDispatcher().dispatch()
.connectStart(task, blockIndex, connection.getRequestProperties());
DownloadConnection.Connected connected = chain.processConnect();
Map<String, List<String>> responseHeaderFields = connected.getResponseHeaderFields();
if (responseHeaderFields == null) responseHeaderFields = new HashMap<>();
OkDownload.with().callbackDispatcher().dispatch().connectEnd(task, blockIndex,
connected.getResponseCode(), responseHeaderFields);
if (chain.getCache().isInterrupt()) {
throw InterruptException.SIGNAL;
}
// 檢查Etag字段是否一致
final DownloadStrategy strategy = OkDownload.with().downloadStrategy();
final DownloadStrategy.ResumeAvailableResponseCheck responseCheck =
strategy.resumeAvailableResponseCheck(connected, blockIndex, info);
responseCheck.inspect();
//獲取Content-Length揭芍、Content-Range字段信息
final long contentLength;
final String contentLengthField = connected.getResponseHeaderField(CONTENT_LENGTH);
if (contentLengthField == null || contentLengthField.length() == 0) {
final String contentRangeField = connected.getResponseHeaderField(CONTENT_RANGE);
contentLength = Util.parseContentLengthFromContentRange(contentRangeField);
} else {
contentLength = Util.parseContentLength(contentLengthField);
}
chain.setResponseContentLength(contentLength);
return connected;
}
}
public class CallServerInterceptor implements Interceptor.Connect {
@NonNull @Override
public DownloadConnection.Connected interceptConnect(DownloadChain chain) throws IOException {
OkDownload.with().downloadStrategy().inspectNetworkOnWifi(chain.getTask());
OkDownload.with().downloadStrategy().inspectNetworkAvailable();
\\進(jìn)行網(wǎng)絡(luò)請求胳搞,獲得響應(yīng)
return chain.getConnectionOrCreate().execute();
}
}
獲取數(shù)據(jù)攔截鏈包含了RetryInterceptor重試攔截器、BreakpointInterceptor斷點(diǎn)攔截器称杨、RedirectInterceptor重定向攔截器流酬、HeaderInterceptor頭部信息處理攔截器、FetchDataInterceptor獲取數(shù)據(jù)攔截器列另,該鏈?zhǔn)秸{(diào)用過程會逐個調(diào)用攔截器的interceptFetch方法:
public class RetryInterceptor implements Interceptor.Connect, Interceptor.Fetch {
......
@Override
public long interceptFetch(DownloadChain chain) throws IOException {
try {
return chain.processFetch();
} catch (IOException e) {
chain.getCache().catchException(e);
throw e;
}
}
}
public class BreakpointInterceptor implements Interceptor.Connect, Interceptor.Fetch {
......
@Override
public long interceptFetch(DownloadChain chain) throws IOException {
final long contentLength = chain.getResponseContentLength();
final int blockIndex = chain.getBlockIndex();
final boolean isNotChunked = contentLength != CHUNKED_CONTENT_LENGTH;
long fetchLength = 0;
long processFetchLength;
final MultiPointOutputStream outputStream = chain.getOutputStream();
try {
while (true) {
//循環(huán)調(diào)用FetchDataInterceptor攔截器讀寫文件
processFetchLength = chain.loopFetch();
if (processFetchLength == -1) {
break;
}
fetchLength += processFetchLength;
}
} finally {
chain.flushNoCallbackIncreaseBytes();
if (!chain.getCache().isUserCanceled()) outputStream.done(blockIndex);
}
if (isNotChunked) {
outputStream.inspectComplete(blockIndex);
if (fetchLength != contentLength) {
throw new IOException("Fetch-length isn't equal to the response content-length, "
+ fetchLength + "!= " + contentLength);
}
}
return fetchLength;
}
......
public class FetchDataInterceptor implements Interceptor.Fetch {
private final InputStream inputStream;
private final byte[] readBuffer;
private final MultiPointOutputStream outputStream;
private final int blockIndex;
private final DownloadTask task;
private final CallbackDispatcher dispatcher;
public FetchDataInterceptor(int blockIndex,
@NonNull InputStream inputStream,
@NonNull MultiPointOutputStream outputStream,
DownloadTask task) {
this.blockIndex = blockIndex;
this.inputStream = inputStream;
this.readBuffer = new byte[task.getReadBufferSize()];
this.outputStream = outputStream;
this.task = task;
this.dispatcher = OkDownload.with().callbackDispatcher();
}
@Override
public long interceptFetch(DownloadChain chain) throws IOException {
if (chain.getCache().isInterrupt()) {
throw InterruptException.SIGNAL;
}
OkDownload.with().downloadStrategy().inspectNetworkOnWifi(chain.getTask());
// 讀取數(shù)據(jù)
int fetchLength = inputStream.read(readBuffer);
if (fetchLength == -1) {
return fetchLength;
}
//寫文件
outputStream.write(blockIndex, readBuffer, fetchLength);
// 判斷是否回調(diào)下載進(jìn)度
chain.increaseCallbackBytes(fetchLength);
if (this.dispatcher.isFetchProcessMoment(task)) {
chain.flushNoCallbackIncreaseBytes();
}
return fetchLength;
}
}
每一個DownloadChain都完成后芽腾,最終會調(diào)用inspectTaskEnd方法,從數(shù)據(jù)庫中刪除該任務(wù)页衙,并回調(diào)通知任務(wù)完成摊滔。這樣阴绢,一個完整的下載任務(wù)就完成了〖杼桑總體流程如下:
OkDownload的優(yōu)勢在于:
1.OkDownload內(nèi)部使用的網(wǎng)絡(luò)請求框架默認(rèn)為OkHttp呻袭,OkHttp底層使用的IO庫為Okio,相較于原生Java IO流腺兴,它更加簡便高效左电。
2.使用了數(shù)據(jù)庫緩存+內(nèi)存緩存的二級緩存模式,操作效率更高页响。
3.功能更完善篓足,除了多線程斷點(diǎn)續(xù)傳外,還提供了暫停功能闰蚕,多種回調(diào)監(jiān)聽功能栈拖,多任務(wù)管理功能等。
4.更加可靠:下載前有多重檢查機(jī)制來判斷重新下載還是從斷點(diǎn)處下載没陡;每次從斷點(diǎn)續(xù)傳時(shí)涩哟,都會對比響應(yīng)信息跟之前是否一致;對重定向做了處理盼玄;有錯誤重試機(jī)制贴彼。
5.可配置性高,可以注入自定義組件埃儿。