RxJava操作符及相关案例

变换操作符

Map操作符

对Observable发射的每一项数据应用一个函数,执行变换操作
Map操作符对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。
我们来写个例子来解释一下

1
2
3
4
5
6
将原始的数据进行转换,也就是将Integer类型转换为String类型进行输出
Observable.just(1).map(new Function<Integer, String>() {
@Override public String apply(Integer integer) throws Exception {
return 1+"";
}
});

FlatMap操作符

FlatMap 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable

案例:登录获取用户信息案例核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.just(getUserParam()).flatMap(
new Function<UserParam, ObservableSource<BaseResult>>() {
@Override public ObservableSource<BaseResult> apply(UserParam userParam)
throws Exception {
BaseResult baseResult = api.login(userParam).execute().body();
return Observable.just(baseResult);
}
}).flatMap(new Function<BaseResult, ObservableSource<User>>() {
@Override public ObservableSource<User> apply(BaseResult baseResult)
throws Exception {
User user = api.getUserInfoWithPath(baseResult.getUser_id()).execute().body();
return Observable.just(user);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<User>() {
@Override public void accept(User user) throws Exception {
text.setText(user.getUsername());
}
});

在第一个flatMap中,我们先将用户名密码的对象UserParam类去请求结果转换成BaseResult对象。
然后在第二个flatMap中,将BaseResult对象转换成User对象。
接着切换线程,订阅更新UI

Debounce操作符

仅在过了一段指定的时间还没发射数据时才发射一个数据,Debounce操作符会过滤掉发射速率过快的数据项

案例:关键词搜索

一般情况我们监听EditText控件,当值发生改变去请求搜索接口,如下

1
2
3
4
5
6
7
8
9
10
11
12
editText.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {
}
@Override public void onTextChanged(CharSequence s, int start, int before, int count) {
}
@Override public void afterTextChanged(Editable s) {
请求搜索接口,成功后把结果显示在界面上
}
});

a:可能导致很多没用意义的请求,耗费用户流量(因为空间的值每更改一次立即就会去请求网络,而且只是最后输入的关键字是有用的)
b:可能导致最终的搜索的结果不是用户想要的

接下来我们就要了解一下Debounce操作符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
RxTextView.textChanges(editText)
.debounce(200, TimeUnit.MILLISECONDS)
//因为是输入框所以要在ui线程中操作
.subscribeOn(AndroidSchedulers.mainThread())
//过滤数据,让输入框数据长度大于0才去发送请求
.filter(new Predicate<CharSequence>() {
@Override public boolean test(CharSequence charSequence) throws Exception {
Log.i("adu","CharSequence过滤="+charSequence);
return charSequence.toString().trim().length()>0;
}
})
//.flatMap(new Function<CharSequence, ObservableSource<List<String>>>() {
// @Override public ObservableSource<List<String>> apply(CharSequence charSequence) throws Exception {
// Log.i("adu","CharSequence="+charSequence);
// List<String> list = new ArrayList<>();
// list.add("adidas");
// list.add("add");
// list.add("qwe");
//
// return Observable.just(list);
// }
//})
.switchMap(new Function<CharSequence, ObservableSource<List<String>>>() {
@Override public ObservableSource<List<String>> apply(CharSequence charSequence)
throws Exception {
Log.i("adu","CharSequence="+charSequence);
List<String> list = new ArrayList<>();
list.add("adidas");
list.add("add");
list.add("qwe");
return Observable.just(list);
}
})
.subscribeOn(Schedulers.io())//请求网络放在ui线程
.subscribeOn(AndroidSchedulers.mainThread()) //回到主线程更新ui
.subscribe(new Consumer<List<String>>() {
@Override public void accept(List<String> strings) throws Exception {
Log.i("adu","结果="+strings);
}
});

上面我们用debounce操作符来监听用户停止操作后200毫秒发送一次请求,解决了我们上面a问题。然后我们将flatMap操作符换成了switchMap操作符,完美的解决了第二个问题,它在这里的作用是,当用户停止操作后200毫秒发送请求,当数据还没有请求回来时,用户接着又输入字符串,switchMap操作符会清空上一次的请求,接着操作请求这一次的数据,我们就更好的解决了b问题。我们来看看官方对于switchMap操作符的解释

switchMap操作符

将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据

ThrottleFirst操作符

允许设置一个时间长度,之后它会发送固定时间长度内的第一个事件,而屏蔽其他事件,在间隔达到设置的时间后,可以再发送下一个事件

案例:防止按钮重复(连续)点击

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
RxView.clicks(button)
.throttleFirst(1,TimeUnit.SECONDS)
.subscribe(new Observer<Object>() {
@Override public void onSubscribe(Disposable d) {
}
@Override public void onNext(Object o) {
Log.i("adu","这是点击按钮");
}
@Override public void onError(Throwable e) {
}
@Override public void onComplete() {
}
});

当然你将throttleFirst操作符替换成上面的debounce操作符,结果是一样的

在上面的RxTextViewRxView都来自于rxbinding2的依赖

1
implementation 'com.jakewharton.rxbinding2:rxbinding:2.1.1'

关于RxBinding的一些使用场景我们来通过这篇文章学习

merge操作符

merge操作符可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。
merge可能会让合并的Observables发射的数据交错(有一个类似的操作符concat操作符不会让数据交错,他会按顺序一个接一个发射多个Observables的发射物)

案例:购物车-合并本地和网络数据

场景介绍:当我们在网页的淘宝购物时,将商品添加到购物车,然后用手机APP打开淘宝,这个时候,购物车列表数据就会将你刚才在网页添加的商品与购物车本来的商品合并,这就是网络数据与本地数据合并的意思。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
合并本地网络数据
Observable.merge(getDataFormLocal(),getDataFormNetWork())
.subscribe(new Observer<List<Course>>() {
@Override public void onSubscribe(Disposable d) {
}
@Override public void onNext(List<Course> courses) {
for (Course c: courses) {
Log.i("adu","courseName="+c.getName());
}
}
@Override public void onError(Throwable e) {
}
@Override public void onComplete() {
}
});
//获取本地数据
private Observable<List<Course>> getDataFormLocal(){
List<Course> list = new ArrayList<>();
list.add(new Course("Android第一行代码"));
list.add(new Course("Android组件化架构"));
list.add(new Course("Android开发艺术探索"));
return Observable.just(list);
}
//获取网络数据
private Observable<List<Course>> getDataFormNetWork(){
return api.getCourse().subscribeOn(Schedulers.io());
}

注意:merge操作符只会合并数据,并不会去重

Interval操作符

创建一个按固定时间间隔发射整数序列的Observable,Interval操作符返回一个Observable,它按固定的时间间隔发射一个无限递增的整数序列

案例:发送验证码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private int count = 30;
btn.setOnClickListener(new View.OnClickListener() {
@Override public void onClick(View v) {
//时间间隔
Observable.interval(0,1, TimeUnit.SECONDS)
.take(count+1)
.map(new Function<Long, Long>() {
@Override public Long apply(Long aLong) throws Exception {
return count-aLong ;
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(new Consumer<Disposable>() {
@Override public void accept(Disposable disposable) throws Exception {
btn.setEnabled(false);
}
})
.subscribe(new Observer<Long>() {
@Override public void onSubscribe(Disposable d) {
Log.i("adu","onSubscribe"+d.isDisposed());
}
@Override public void onNext(Long aLong) {
btn.setText("剩余"+aLong+"秒");
}
@Override public void onError(Throwable e) {
}
@Override public void onComplete() {
btn.setEnabled(true);
btn.setText("获取验证码");
}
});
}
});

上面用take操作符来获取总的时间长度,map操作符来转换显示倒计时的时间,doOnSubscribe中的accept方法里将点击后的button状态切换为不可点击,然后我们在onNext中更新button的时间倒计时,在倒计时完成时,在onComplete方法中更新button状态。

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器