背景
斷點(diǎn)續(xù)傳下載一直是移動(dòng)開(kāi)發(fā)中必不可少的一項(xiàng)重要的技術(shù)糖儡,同樣的Rxjava和Retrofit的結(jié)合讓這個(gè)技術(shù)解決起來(lái)更加的靈活芭商,我們完全可以封裝一個(gè)適合自的下載框架坏怪,簡(jiǎn)單而且安全沙合!
效果
實(shí)現(xiàn)
下載和之前的http請(qǐng)求可以相互獨(dú)立再愈,所以我們單獨(dú)給download建立一個(gè)工程moudel處理
1.創(chuàng)建service接口
和以前一樣榜苫,先寫接口
注意:Streaming是判斷是否寫入內(nèi)存的標(biāo)示,如果小文件可以考慮不寫翎冲,一般情況必須寫垂睬;下載地址需要通過(guò)@url動(dòng)態(tài)指定(不適固定的),@head標(biāo)簽是指定下載的起始位置(斷點(diǎn)續(xù)傳的位置)
/*斷點(diǎn)續(xù)傳下載接口*/
@Streaming/*大文件需要加入這個(gè)判斷,防止下載過(guò)程中寫入到內(nèi)存中*/
@GET
Observable<ResponseBody> download(@Header("RANGE") String start, @Url String url);
2.復(fù)寫ResponseBody
和之前的上傳封裝一樣驹饺,下載更加的需要進(jìn)度钳枕,所以我們同樣覆蓋ResponseBody類,寫入進(jìn)度監(jiān)聽(tīng)回調(diào)
/**
* 自定義進(jìn)度的body
* @author wzg
*/
public class DownloadResponseBody extends ResponseBody {
private ResponseBody responseBody;
private DownloadProgressListener progressListener;
private BufferedSource bufferedSource;
public DownloadResponseBody(ResponseBody responseBody, DownloadProgressListener progressListener) {
this.responseBody = responseBody;
this.progressListener = progressListener;
}
@Override
public BufferedSource source() {
if (bufferedSource == null) {
bufferedSource = Okio.buffer(source(responseBody.source()));
}
return bufferedSource;
}
private Source source(Source source) {
return new ForwardingSource(source) {
long totalBytesRead = 0L;
@Override
public long read(Buffer sink, long byteCount) throws IOException {
long bytesRead = super.read(sink, byteCount);
// read() returns the number of bytes read, or -1 if this source is exhausted.
totalBytesRead += bytesRead != -1 ? bytesRead : 0;
if (null != progressListener) {
progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);
}
return bytesRead;
}
};
}
}
3.自定義進(jìn)度回調(diào)接口
/**
* 成功回調(diào)處理
* Created by WZG on 2016/10/20.
*/
public interface DownloadProgressListener {
/**
* 下載進(jìn)度
* @param read
* @param count
* @param done
*/
void update(long read, long count, boolean done);
}
4.復(fù)寫Interceptor
復(fù)寫Interceptor赏壹,可以將我們的監(jiān)聽(tīng)回調(diào)通過(guò)okhttp的client方法addInterceptor自動(dòng)加載我們的監(jiān)聽(tīng)回調(diào)和ResponseBody
/**
* 成功回調(diào)處理
* Created by WZG on 2016/10/20.
*/
public class DownloadInterceptor implements Interceptor {
private DownloadProgressListener listener;
public DownloadInterceptor(DownloadProgressListener listener) {
this.listener = listener;
}
@Override
public Response intercept(Chain chain) throws IOException {
Response originalResponse = chain.proceed(chain.request());
return originalResponse.newBuilder()
.body(new DownloadResponseBody(originalResponse.body(), listener))
.build();
}
}
5.封裝請(qǐng)求downinfo數(shù)據(jù)
這個(gè)類中的數(shù)據(jù)可自由擴(kuò)展鱼炒,用戶自己選擇需要保持到數(shù)據(jù)庫(kù)中的數(shù)據(jù),可以自由選擇需要數(shù)據(jù)庫(kù)第三方框架蝌借,demo采用greenDao框架存儲(chǔ)數(shù)據(jù)
public class DownInfo {
/*存儲(chǔ)位置*/
private String savePath;
/*下載url*/
private String url;
/*基礎(chǔ)url*/
private String baseUrl;
/*文件總長(zhǎng)度*/
private long countLength;
/*下載長(zhǎng)度*/
private long readLength;
/*下載唯一的HttpService*/
private HttpService service;
/*回調(diào)監(jiān)聽(tīng)*/
private HttpProgressOnNextListener listener;
/*超時(shí)設(shè)置*/
private int DEFAULT_TIMEOUT = 6;
/*下載狀態(tài)*/
private DownState state;
}
6.DownState狀態(tài)封裝
很簡(jiǎn)單昔瞧,和大多數(shù)封裝框架一樣
public enum DownState {
START,
DOWN,
PAUSE,
STOP,
ERROR,
FINISH,
}
7.請(qǐng)求HttpProgressOnNextListener回調(diào)封裝類
注意:這里和DownloadProgressListener不同,這里是下載這個(gè)過(guò)程中的監(jiān)聽(tīng)回調(diào)菩佑,DownloadProgressListener只是進(jìn)度的監(jiān)聽(tīng)
通過(guò)抽象類自晰,可以自由選擇需要覆蓋的類,不需要完全覆蓋稍坯!更加靈活
/**
* 下載過(guò)程中的回調(diào)處理
* Created by WZG on 2016/10/20.
*/
public abstract class HttpProgressOnNextListener<T> {
/**
* 成功后回調(diào)方法
* @param t
*/
public abstract void onNext(T t);
/**
* 開(kāi)始下載
*/
public abstract void onStart();
/**
* 完成下載
*/
public abstract void onComplete();
/**
* 下載進(jìn)度
* @param readLength
* @param countLength
*/
public abstract void updateProgress(long readLength, long countLength);
/**
* 失敗或者錯(cuò)誤方法
* 主動(dòng)調(diào)用酬荞,更加靈活
* @param e
*/
public void onError(Throwable e){
}
/**
* 暫停下載
*/
public void onPuase(){
}
/**
* 停止下載銷毀
*/
public void onStop(){
}
}
8.封裝回調(diào)Subscriber
準(zhǔn)備的工作做完,需要將回調(diào)和傳入回調(diào)的信息統(tǒng)一封裝到sub中瞧哟,統(tǒng)一判斷混巧;和封裝二的原理一樣,我們通過(guò)自定義Subscriber來(lái)提前處理返回的數(shù)據(jù)绢涡,讓用戶字需要關(guān)系成功和失敗以及向關(guān)心的數(shù)據(jù)牲剃,避免重復(fù)多余的代碼出現(xiàn)在處理類中
- sub需要繼承DownloadProgressListener,和自帶的回調(diào)一起組成我們需要的回調(diào)結(jié)果
- 傳入DownInfo數(shù)據(jù)雄可,通過(guò)回調(diào)設(shè)置DownInfo的不同狀態(tài)凿傅,保存狀態(tài)
- 通過(guò)RxAndroid將進(jìn)度回調(diào)指定到主線程中(如果不需要進(jìn)度最好去掉該處理避免主線程處理負(fù)擔(dān))
- update進(jìn)度回調(diào)在斷點(diǎn)續(xù)傳使用時(shí),需要手動(dòng)判斷斷點(diǎn)后加載的長(zhǎng)度数苫,因?yàn)橹付〝帱c(diǎn)下載長(zhǎng)度下載后總長(zhǎng)度=(物理長(zhǎng)度-起始下載長(zhǎng)度)
/**
* 用于在Http請(qǐng)求開(kāi)始時(shí)聪舒,自動(dòng)顯示一個(gè)ProgressDialog
* 在Http請(qǐng)求結(jié)束是,關(guān)閉ProgressDialog
* 調(diào)用者自己對(duì)請(qǐng)求數(shù)據(jù)進(jìn)行處理
* Created by WZG on 2016/7/16.
*/
public class ProgressDownSubscriber<T> extends Subscriber<T> implements DownloadProgressListener {
//弱引用結(jié)果回調(diào)
private WeakReference<HttpProgressOnNextListener> mSubscriberOnNextListener;
/*下載數(shù)據(jù)*/
private DownInfo downInfo;
public ProgressDownSubscriber(DownInfo downInfo) {
this.mSubscriberOnNextListener = new WeakReference<>(downInfo.getListener());
this.downInfo=downInfo;
}
/**
* 訂閱開(kāi)始時(shí)調(diào)用
* 顯示ProgressDialog
*/
@Override
public void onStart() {
if(mSubscriberOnNextListener.get()!=null){
mSubscriberOnNextListener.get().onStart();
}
downInfo.setState(DownState.START);
}
/**
* 完成虐急,隱藏ProgressDialog
*/
@Override
public void onCompleted() {
if(mSubscriberOnNextListener.get()!=null){
mSubscriberOnNextListener.get().onComplete();
}
downInfo.setState(DownState.FINISH);
}
/**
* 對(duì)錯(cuò)誤進(jìn)行統(tǒng)一處理
* 隱藏ProgressDialog
*
* @param e
*/
@Override
public void onError(Throwable e) {
/*停止下載*/
HttpDownManager.getInstance().stopDown(downInfo);
if(mSubscriberOnNextListener.get()!=null){
mSubscriberOnNextListener.get().onError(e);
}
downInfo.setState(DownState.ERROR);
}
/**
* 將onNext方法中的返回結(jié)果交給Activity或Fragment自己處理
*
* @param t 創(chuàng)建Subscriber時(shí)的泛型類型
*/
@Override
public void onNext(T t) {
if (mSubscriberOnNextListener.get() != null) {
mSubscriberOnNextListener.get().onNext(t);
}
}
@Override
public void update(long read, long count, boolean done) {
if(downInfo.getCountLength()>count){
read=downInfo.getCountLength()-count+read;
}else{
downInfo.setCountLength(count);
}
downInfo.setReadLength(read);
if (mSubscriberOnNextListener.get() != null) {
/*接受進(jìn)度消息箱残,造成UI阻塞,如果不需要顯示進(jìn)度可去掉實(shí)現(xiàn)邏輯止吁,減少壓力*/
rx.Observable.just(read).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
/*如果暫捅患或者停止?fàn)顟B(tài)延遲,不需要繼續(xù)發(fā)送回調(diào)敬惦,影響顯示*/
if(downInfo.getState()==DownState.PAUSE||downInfo.getState()==DownState.STOP)return;
downInfo.setState(DownState.DOWN);
mSubscriberOnNextListener.get().updateProgress(aLong,downInfo.getCountLength());
}
});
}
}
}
9.下載管理類封裝HttpDownManager
- 單利獲取
/**
* 獲取單例
* @return
*/
public static HttpDownManager getInstance() {
if (INSTANCE == null) {
synchronized (HttpDownManager.class) {
if (INSTANCE == null) {
INSTANCE = new HttpDownManager();
}
}
}
return INSTANCE;
}
- 因?yàn)閱卫孕枰涗浾谙螺d的數(shù)據(jù)和回到sub
/*回調(diào)sub隊(duì)列*/
private HashMap<String,ProgressDownSubscriber> subMap;
/*單利對(duì)象*/
private volatile static HttpDownManager INSTANCE;
private HttpDownManager(){
downInfos=new HashSet<>();
subMap=new HashMap<>();
}
- 開(kāi)始下載需要記錄下載的service避免每次都重復(fù)創(chuàng)建盼理,然后請(qǐng)求sercie接口,得到ResponseBody數(shù)據(jù)后將數(shù)據(jù)流寫入到本地文件中(6.0系統(tǒng)后需要提前申請(qǐng)權(quán)限)
/**
* 開(kāi)始下載
*/
public void startDown(DownInfo info){
/*正在下載不處理*/
if(info==null||subMap.get(info.getUrl())!=null){
return;
}
/*添加回調(diào)處理類*/
ProgressDownSubscriber subscriber=new ProgressDownSubscriber(info);
/*記錄回調(diào)sub*/
subMap.put(info.getUrl(),subscriber);
/*獲取service俄删,多次請(qǐng)求公用一個(gè)sercie*/
HttpService httpService;
if(downInfos.contains(info)){
httpService=info.getService();
}else{
DownloadInterceptor interceptor = new DownloadInterceptor(subscriber);
OkHttpClient.Builder builder = new OkHttpClient.Builder();
//手動(dòng)創(chuàng)建一個(gè)OkHttpClient并設(shè)置超時(shí)時(shí)間
builder.connectTimeout(info.getConnectionTime(), TimeUnit.SECONDS);
builder.addInterceptor(interceptor);
Retrofit retrofit = new Retrofit.Builder()
.client(builder.build())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.baseUrl(info.getBaseUrl())
.build();
httpService= retrofit.create(HttpService.class);
info.setService(httpService);
}
/*得到rx對(duì)象-上一次下載的位置開(kāi)始下載*/
httpService.download("bytes=" + info.getReadLength() + "-",info.getUrl())
/*指定線程*/
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
/*失敗后的retry配置*/
.retryWhen(new RetryWhenNetworkException())
/*讀取下載寫入文件*/
.map(new Func1<ResponseBody, DownInfo>() {
@Override
public DownInfo call(ResponseBody responseBody) {
try {
writeCache(responseBody,new File(info.getSavePath()),info);
} catch (IOException e) {
/*失敗拋出異常*/
throw new HttpTimeException(e.getMessage());
}
return info;
}
})
/*回調(diào)線程*/
.observeOn(AndroidSchedulers.mainThread())
/*數(shù)據(jù)回調(diào)*/
.subscribe(subscriber);
}
- 寫入文件
注意:一開(kāi)始調(diào)用進(jìn)度回調(diào)是第一次寫入在進(jìn)度回調(diào)之前宏怔,所以需要判斷一次DownInfo是否獲取到下載總長(zhǎng)度奏路,沒(méi)有這選擇當(dāng)前ResponseBody 讀取長(zhǎng)度為總長(zhǎng)度
/**
* 寫入文件
* @param file
* @param info
* @throws IOException
*/
public void writeCache(ResponseBody responseBody,File file,DownInfo info) throws IOException{
if (!file.getParentFile().exists())
file.getParentFile().mkdirs();
long allLength;
if (info.getCountLength()==0){
allLength=responseBody.contentLength();
}else{
allLength=info.getCountLength();
}
FileChannel channelOut = null;
RandomAccessFile randomAccessFile = null;
randomAccessFile = new RandomAccessFile(file, "rwd");
channelOut = randomAccessFile.getChannel();
MappedByteBuffer mappedBuffer = channelOut.map(FileChannel.MapMode.READ_WRITE,
info.getReadLength(),allLength-info.getReadLength());
byte[] buffer = new byte[1024*8];
int len;
int record = 0;
while ((len = responseBody.byteStream().read(buffer)) != -1) {
mappedBuffer.put(buffer, 0, len);
record += len;
}
responseBody.byteStream().close();
if (channelOut != null) {
channelOut.close();
}
if (randomAccessFile != null) {
randomAccessFile.close();
}
}
- 停止下載
調(diào)用 subscriber.unsubscribe()解除監(jiān)聽(tīng),然后remove記錄的下載數(shù)據(jù)和sub回調(diào)臊诊,并且設(shè)置下載狀態(tài)(同步數(shù)據(jù)庫(kù)自己添加)
/**
* 停止下載
*/
public void stopDown(DownInfo info){
if(info==null)return;
info.setState(DownState.STOP);
info.getListener().onStop();
if(subMap.containsKey(info.getUrl())) {
ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
subscriber.unsubscribe();
subMap.remove(info.getUrl());
}
/*同步數(shù)據(jù)庫(kù)*/
}
- 暫停下載
原理和停止下載原理一樣
/**
* 暫停下載
* @param info
*/
public void pause(DownInfo info){
if(info==null)return;
info.setState(DownState.PAUSE);
info.getListener().onPuase();
if(subMap.containsKey(info.getUrl())){
ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
subscriber.unsubscribe();
subMap.remove(info.getUrl());
}
/*這里需要講info信息寫入到數(shù)據(jù)中鸽粉,可自由擴(kuò)展,用自己項(xiàng)目的數(shù)據(jù)庫(kù)*/
}
*暫停全部和停止全部下載任務(wù)
/**
* 停止全部下載
*/
public void stopAllDown(){
for (DownInfo downInfo : downInfos) {
stopDown(downInfo);
}
subMap.clear();
downInfos.clear();
}
/**
* 暫停全部下載
*/
public void pauseAll(){
for (DownInfo downInfo : downInfos) {
pause(downInfo);
}
subMap.clear();
downInfos.clear();
}
- 整合代碼HttpDownManager
同樣使用了封裝二中的retry處理和運(yùn)行時(shí)異常自定義處理封裝(不復(fù)述了)
總結(jié)
到此我們的Rxjava+ReTrofit+okHttp深入淺出-封裝就基本完成了抓艳,已經(jīng)可以完全勝任開(kāi)發(fā)和學(xué)習(xí)的全部工作触机,如果后續(xù)再使用過(guò)程中有任何問(wèn)題歡迎留言給我,會(huì)一直維護(hù)玷或!
1.Retrofit+Rxjava+okhttp基本使用方法
2.統(tǒng)一處理請(qǐng)求數(shù)據(jù)格式
3.統(tǒng)一的ProgressDialog和回調(diào)Subscriber處理
4.取消http請(qǐng)求
5.預(yù)處理http請(qǐng)求
6.返回?cái)?shù)據(jù)的統(tǒng)一判斷
7.失敗后的retry封裝處理
8.RxLifecycle管理生命周期威兜,防止泄露
9.文件上傳和文件下載(支持多文件斷點(diǎn)續(xù)傳)