Retrofit+Rxjava下载文件进度的实现

2025-05-27 0 53

前言

最近在学习Retrofit,虽然Retrofit没有提供文件下载进度的回调,但是Retrofit底层依赖的是OkHttp,实际上所需要的实现OkHttp对下载进度的监听,在OkHttp的官方Demo中,有一个Progress.java的文件,顾名思义。点我查看。

准备工作

本文采用Dagger2,RetrofitRxJava

?

1

2

3

4

5

6

7

8

9

10
compile'com.squareup.retrofit2:retrofit:2.0.2'

compile'com.squareup.retrofit2:converter-gson:2.0.2'

compile'com.squareup.retrofit2:adapter-rxjava:2.0.2'

//dagger2

compile'com.google.dagger:dagger:2.6'

apt'com.google.dagger:dagger-compiler:2.6'

//RxJava

compile'io.reactivex:rxandroid:1.2.0'

compile'io.reactivex:rxjava:1.1.5'

compile'com.jakewharton.rxbinding:rxbinding:0.4.0'

改造ResponseBody

okHttp3默认的ResponseBody因为不知道进度的相关信息,所以需要对其进行改造。可以使用接口监听进度信息。这里采用的是RxBus发送FileLoadEvent对象实现对下载进度的实时更新。这里先讲改造的ProgressResponseBody。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35
public class ProgressResponseBody extends ResponseBody {

private ResponseBody responseBody;

private BufferedSource bufferedSource;

public ProgressResponseBody(ResponseBody responseBody) {

this.responseBody = responseBody;

}

@Override

public MediaType contentType() {

return responseBody.contentType();

}

@Override

public long contentLength() {

return responseBody.contentLength();

}

@Override

public BufferedSource source() {

if (bufferedSource == null) {

bufferedSource = Okio.buffer(source(responseBody.source()));

}

return bufferedSource;

}

private Source source(Source source) {

return new ForwardingSource(source) {

long bytesReaded = 0;

@Override

public long read(Buffer sink, long byteCount) throws IOException {

long bytesRead = super.read(sink, byteCount);

bytesReaded += bytesRead == -1 ? 0 : bytesRead;

//实时发送当前已读取的字节和总字节

RxBus.getInstance().post(new FileLoadEvent(contentLength(), bytesReaded));

return bytesRead;

}

};

}

}

呃,OKIO相关知识我也正在学,这个是从官方Demo中copy的代码,只不过中间使用了RxBus实时发送FileLoadEvent对象。

FileLoadEvent

FileLoadEvent很简单,包含了当前已加载进度和文件总大小。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14
public class FileLoadEvent {

long total;

long bytesLoaded;

public long getBytesLoaded() {

return bytesLoaded;

}

public long getTotal() {

return total;

}

public FileLoadEvent(long total, long bytesLoaded) {

this.total = total;

this.bytesLoaded = bytesLoaded;

}

}

RxBus

RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用OTTO或者 EventBus。点我查看详情。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79
public class RxBus {

private static volatile RxBus mInstance;

private SerializedSubject<Object, Object> mSubject;

private HashMap<String, CompositeSubscription> mSubscriptionMap;

/**

* PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者

* Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,

* 需要将 Subject转换为一个 SerializedSubject ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。

*/

private RxBus() {

mSubject = new SerializedSubject<>(PublishSubject.create());

}

/**

* 单例 双重锁

* @return

*/

public static RxBus getInstance() {

if (mInstance == null) {

synchronized (RxBus.class) {

if (mInstance == null) {

mInstance = new RxBus();

}

}

}

return mInstance;

}

/**

* 发送一个新的事件

* @param o

*/

public void post(Object o) {

mSubject.onNext(o);

}

/**

* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者

* @param type

* @param <T>

* @return

*/

public <T> Observable<T> tObservable(final Class<T> type) {

//ofType操作符只发射指定类型的数据,其内部就是filter+cast

return mSubject.ofType(type);

}

public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {

return tObservable(type)

.onBackpressureBuffer()

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(next, error);

}

public void addSubscription(Object o, Subscription subscription) {

if (mSubscriptionMap == null) {

mSubscriptionMap = new HashMap<>();

}

String key = o.getClass().getName();

if (mSubscriptionMap.get(key) != null) {

mSubscriptionMap.get(key).add(subscription);

} else {

CompositeSubscription compositeSubscription = new CompositeSubscription();

compositeSubscription.add(subscription);

mSubscriptionMap.put(key, compositeSubscription);

// Log.e("air", "addSubscription:订阅成功 " );

}

}

public void unSubscribe(Object o) {

if (mSubscriptionMap == null) {

return;

}

String key = o.getClass().getName();

if (!mSubscriptionMap.containsKey(key)) {

return;

}

if (mSubscriptionMap.get(key) != null) {

mSubscriptionMap.get(key).unsubscribe();

}

mSubscriptionMap.remove(key);

//Log.e("air", "unSubscribe: 取消订阅" );

}

}

FileCallBack

那么,重点来了。代码其实有5个方法需要重写,好吧,其实这些方法可以精简一下。其中progress()方法有两个参数,progress和total,分别表示文件已下载的大小和总大小,我们将这两个参数不断更新到UI上就行了。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69
public abstract class FileCallBack<T> {

private String destFileDir;

private String destFileName;

public FileCallBack(String destFileDir, String destFileName) {

this.destFileDir = destFileDir;

this.destFileName = destFileName;

subscribeLoadProgress();

}

public abstract void onSuccess(T t);

public abstract void progress(long progress, long total);

public abstract void onStart();

public abstract void onCompleted();

public abstract void onError(Throwable e);

public void saveFile(ResponseBody body) {

InputStream is = null;

byte[] buf = new byte[2048];

int len;

FileOutputStream fos = null;

try {

is = body.byteStream();

File dir = new File(destFileDir);

if (!dir.exists()) {

dir.mkdirs();

}

File file = new File(dir, destFileName);

fos = new FileOutputStream(file);

while ((len = is.read(buf)) != -1) {

fos.write(buf, 0, len);

}

fos.flush();

unsubscribe();

//onCompleted();

} catch (FileNotFoundException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

} finally {

try {

if (is != null) is.close();

if (fos != null) fos.close();

} catch (IOException e) {

Log.e("saveFile", e.getMessage());

}

}

}

/**

* 订阅加载的进度条

*/

public void subscribeLoadProgress() {

Subscription subscription = RxBus.getInstance().doSubscribe(FileLoadEvent.class, new Action1<FileLoadEvent>() {

@Override

public void call(FileLoadEvent fileLoadEvent) {

progress(fileLoadEvent.getBytesLoaded(),fileLoadEvent.getTotal());

}

}, new Action1<Throwable>() {

@Override

public void call(Throwable throwable) {

//TODO 对异常的处理

}

});

RxBus.getInstance().addSubscription(this, subscription);

}

/**

* 取消订阅,防止内存泄漏

*/

public void unsubscribe() {

RxBus.getInstance().unSubscribe(this);

}

}

开始下载

使用自己的ProgressResponseBody

通过OkHttpClient的拦截器去拦截Response,并将我们的ProgressReponseBody设置进去监听进度

?

1

2

3

4

5

6

7

8

9
public class ProgressInterceptor implements Interceptor {

@Override

public Response intercept(Chain chain) throws IOException {

Response originalResponse = chain.proceed(chain.request());

return originalResponse.newBuilder()

.body(new ProgressResponseBody(originalResponse.body()))

.build();

}

}

构建Retrofit

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32
@Module

public class ApiModule {

@Provides

@Singleton

public OkHttpClient provideClient() {

OkHttpClient client = new OkHttpClient.Builder()

.addInterceptor(new ProgressInterceptor())

.build();

return client;

}

@Provides

@Singleton

public Retrofit provideRetrofit(OkHttpClient client){

Retrofit retrofit = new Retrofit.Builder()

.client(client)

.baseUrl(Constant.HOST)

.addCallAdapterFactory(RxJavaCallAdapterFactory.create())

.addConverterFactory(GsonConverterFactory.create())

.build();

return retrofit;

}

@Provides

@Singleton

public ApiInfo provideApiInfo(Retrofit retrofit){

return retrofit.create(ApiInfo.class);

}

@Provides

@Singleton

public ApiManager provideApiManager(Application application, ApiInfo apiInfo){

return new ApiManager(application,apiInfo);

}

}

请求接口

?

1

2

3

4

5
public interface ApiInfo {

@Streaming

@GET

Observable<ResponseBody> download(@Url String url);

}

执行请求

?

1

2

3

4

5

6

7

8

9

10

11

12

13
public void load(String url, final FileCallBack<ResponseBody> callBack){

apiInfo.download(url)

.subscribeOn(Schedulers.io())//请求网络 在调度者的io线程

.observeOn(Schedulers.io()) //指定线程保存文件

.doOnNext(new Action1<ResponseBody>() {

@Override

public void call(ResponseBody body) {

callBack.saveFile(body);

}

})

.observeOn(AndroidSchedulers.mainThread()) //在主线程中更新ui

.subscribe(new FileSubscriber<ResponseBody>(application,callBack));

}

在presenter层中执行网络请求。

通过V层依赖注入的presenter对象调用请求网络,请求网络后调用V层更新UI的操作。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29
public void load(String url){

String fileName = "app.apk";

String fileStoreDir = Environment.getExternalStorageDirectory().getAbsolutePath();

Log.e(TAG, "load: "+fileStoreDir.toString() );

FileCallBack<ResponseBody> callBack = new FileCallBack<ResponseBody>(fileStoreDir,fileName) {

@Override

public void onSuccess(final ResponseBody responseBody) {

Toast.makeText(App.getInstance(),"下载文件成功",Toast.LENGTH_SHORT).show();

}

@Override

public void progress(long progress, long total) {

iHomeView.update(total,progress);

}

@Override

public void onStart() {

iHomeView.showLoading();

}

@Override

public void onCompleted() {

iHomeView.hideLoading();

}

@Override

public void onError(Throwable e) {

//TODO: 对异常的一些处理

e.printStackTrace();

}

};

apiManager.load(url, callBack);

}

踩到的坑。

依赖的Retrofit版本一定要保持一致!!!说多了都是泪啊。

保存文件时要使用RxJava的doOnNext操作符,后续更新UI的操作切换到UI线程。

总结

看似代码很多,其实过程并不复杂:

在保存文件时,调用ForwardingSource的read方法,通过RxBus发送实时的FileLoadEvent对象。

FileCallBack订阅RxBus发送的FileLoadEvent。通过接收到FileLoadEvent中的下载进度和文件总大小对UI进行更新。

在下载保存文件完成后,取消订阅,防止内存泄漏。

Demo地址:https://github.com/AirMiya/DownloadDemo

原文链接:http://www.jianshu.com/p/060d55fc1c82

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 Retrofit+Rxjava下载文件进度的实现 https://www.kuaiidc.com/77393.html

相关文章

发表评论
暂无评论