RxJava的核心内容很简单,就是进行异步操作。类似于Handler和AsyncTask的功能,但是在代码结构上不同。
RxJava使用了观察者模式和建造者模式中的链式调用(类似于C#的LINQ)。
观察者模式:Observable(被观察者)被Observer(观察者)订阅(Subscribe)之后,Observable在发出消息的时候会通知对应的Observer,并且,一个Observable可以有被多个Observer订阅。
链式调用:和Builder模式类似,调用对应的方法对原对象进行处理后返回原对象,从而做到链式调用。
这里介绍一些下面会用到的名词:
- Observable:被观察者,也就是消息的发送者
- Observer:观察者,消息的接收者
- Subscriber:订阅者,观察者的另一种表示
- Scheduler:调度器,进行线程切换
先从简单入手模拟一下这个过程:
先配置依赖(版本为当前最新):
dependencies {
...
compile 'io.reactivex:rxjava:1.1.8'
compile 'io.reactivex:rxandroid:1.2.1'
}
接着在工程的onCreate方法中直接加入如下代码:
1 Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
2 @Override
3 public void call(Subscriber<? super String> subscriber) {
4 subscriber.onNext("hello");
5 subscriber.onNext("android");
6 subscriber.onCompleted();
7 }
8 });
9
10 Subscriber<String> subscriber = new Subscriber<String>() {
11 @Override
12 public void onCompleted() {
13 Log.d(TAG, "onCompleted() called");
14 }
15
16 @Override
17 public void onError(Throwable e) {
18 Log.d(TAG, "onError() called with: e = [" + e + "]");
19 }
20
21 @Override
22 public void onNext(String s) {
23 Log.d(TAG, "onNext() called with: s = [" + s + "]");
24 }
25 };
26
27 observable.subscribe(subscriber);
1-8行创建了一个Observable对象,这个对象可以通过create方法来创建,传入一个OnSubscribe的实现类,并在其onCall方法中调用subscriber的onNext方法进行消息的发送,分别发送两个字符串然后调用onComplete方法表示发送完成。
10-25行创建了一个Subscriber(订阅者)对象,这个类是Observer的子类,Observer是传统的观察者,而Subscriber则丰富了Observer,Subsciber添加了两个方法,分别是:
- onStart:在Subscriber和Observable连接之后,消息发送之前调用,也就是先于onNext方法调用
- unsubscirbe:解除订阅关系,Subscriber不再收到从Observable发送的消息
如果不需要用到这两个方法,直接创建Observer也是可以的,创建的方法类似Subscriber。
27行调用了Observable的subscribe方法对Subscriber进行绑定。
运行程序会打印如下的Log:
最简单的流程就是这样,如果上面代码中的局部变量不是必须的话,代码可以变成如下所示:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
subscriber.onNext("android");
subscriber.onCompleted();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted() called");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError() called with: e = [" + e + "]");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext() called with: s = [" + s + "]");
}
});
效果是完全一样的,相信可以看得明白,后面这种形式会比较常见。
接着要介绍两个类——Actionx和Funcx(这两个类最后的x为数字)。
Actionx:指的是一组操作(用于简化Subscriber),而这组操作有x个参数,并且返回Void
Funcx:指的是一组方法(用于对消息进行处理,下面的变换会使用到),这个方法有x个参数和1个返回值
这里还是和上面例子一样,假设我们只关心Subscriber的onNext方法,而不关心onCompleted和onError方法,那么这一个Subscriber就可以被替换为一个Action1,代码如下:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
subscriber.onNext("android");
subscriber.onCompleted();
}
}).subscribe(new Action1<String>() {
@Override
public void call(Strin