RxJava的简单使用

#RxJava的简单使用

一个在Java VM上使用可观测的序列来组成异步的、基于事件的程序库

RxJava: https://github.com/ReactiveX/RxJava
RxAndroid: https://github.com/ReactiveX/RxAndroid

1
2
3
4
添加依赖:
implementation "io.reactivex.rxjava2:rxjava:2.1.14"
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
  • Observable :被观察者(主题Subject)
  • Observer/Subscriber :观察者
  • Subscribe:订阅

Observable 和 Observer 通过subscribe()方法实现订阅关系

基本使用3部曲

  • 1、创建Observeable

    1
    2
    3
    4
    5
    6
    7
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception {
    emitter.onNext("Hello");
    emitter.onNext("world");
    emitter.onComplete();
    }
    });
  • 2、创建Observer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observer<String> observer = new Observer<String>() {
@Override public void onSubscribe(Disposable d) {
Log.d("adu","onSubscribe:"+d.isDisposed());
}
@Override public void onNext(String s) {
Log.d("adu","onNext:"+s);
}
@Override public void onError(Throwable e) {
Log.d("adu","onError:"+e.getLocalizedMessage());
}
@Override public void onComplete() {
Log.d("adu","onComplete()");
}
};
  • 3、订阅
1
observable.subscribe(observer);

##Scheduler线程控制

下面我们来模仿我们在用RxJava于Retrofit结合起来请求数据时的应用

1
2
3
4
5
首先依赖以下几个库
implementation 'io.reactivex.rxjava2:rxjava:2.1.14'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
implementation 'com.squareup.retrofit2:retrofit:2.4.0'
implementation 'com.squareup.retrofit2:converter-gson:2.4.0'

我们创建User实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class User {
private String head_url;
private String id;
private String username;
public User() {
}
public User(String id, String username) {
this.id = id;
this.username = username;
}
public User(String head_url, String id, String username) {
this.head_url = head_url;
this.id = id;
this.username = username;
}
public String getHead_url() {
return head_url;
}
public void setHead_url(String head_url) {
this.head_url = head_url;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
@Override public String toString() {
return new Gson().toJson(this);
}
}

然后我们创建一个接口

1
2
3
4
5
public interface Api {
@GET("user/{id}")
Call<User> getUserInfoWithPath(@Path("id") int user_id);
}

接着我们在Activity中建一个button与一个textview来进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
主要代码
private Button btn;
private TextView text;
private Api api;
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://192.168.5.23:5000")
.addConverterFactory(GsonConverterFactory.create())
.build();
api = retrofit.create(Api.class);
btn = findViewById(R.id.btn);
text = findViewById(R.id.text);
btn.setOnClickListener(new View.OnClickListener() {
@SuppressLint("CheckResult")
@Override public void onClick(View v) {
Observable.create(new ObservableOnSubscribe<User>() {
@Override public void subscribe(ObservableEmitter<User> emitter)
throws Exception {
User user = api.getUserInfoWithPath(1).execute().body();
emitter.onNext(user);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override public void onSubscribe(Disposable d) {
}
@Override public void onNext(User user) {
Log.i("adu","onNext=="+user);
text.setText(user.getUsername());
}
@Override public void onError(Throwable e) {
}
@Override public void onComplete() {
}
});
}
});

在开始我们在RxJava的使用三部曲中订阅的时候并没有使用到.subscribeOn(Schedulers.io())与.observeOn(AndroidSchedulers.mainThread())这两个操作符,可是在这里为什么要使用呢,下面我们就来总结一下关于Scheduler的线程控制

  • Schedulers.immediate();
    • 直接在当前线程运行, 相当于不指定线程。这是默认的Scheduler。
  • Schedulers.newThread();
    • 总是启用新线程,并在新线程执行操作。
  • Schedulers.io();
    • I/O操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io()的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()比newThread()。不要把计算工作放在io()中,可以避免创建不必要的线程。
  • Schedulers.computation();
    • 计算所使用的Scheduler。这个计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,例如图形的计算。这个Scheduler使用的固定的线程池,大小为CPU核数。不要把I/O操作放在computation()中,否则I/O操作的等待时间会浪费CPU。
  • AndroidSchedulers.mainThread();
    • 它指定的操作将在Android主线程运行。

observeOn()指定Observer线程;
subscribeOn指定Observable线程

如果不添加.subscribeOn(Schedulers.io())这句,我们在调用api.getUserInfoWithPath获取用户信息的时候就会报NetworkOnMainThreadException这个异常,就是说我们不能在主线程中调用
然后我们在onNext方法中添加了text.setText(user.getUsername());这句,来更新UI的操作,所以我们必须要回到Android的主线程中,因此RxAndroid这个库就给我们提供了 .observeOn(AndroidSchedulers.mainThread())这个方法,因此就可以更新UI了

关于Rxjava的操作符有很多,我们接下来结合实际的案例来和大家一起学习

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器