下来会详细介绍subscribeOn()
和observeOn()
操作符。
要理解RxJava真正的多线程功能,我们需要深入理解这两个操作符分别如何运行以及何时要将它们组合起来。
简而言之,这个操作符指定源observable要在哪个线程中发出各个条目。读者需要理解这里“源(source)” observable的含义,如果你有一个observable链的话,源observable始终位于根部或者说是链的顶部,也就是产出物(emission)生成的地方。
读者也看到了,如果我们不使用subscribeOn()
的话,所有的产出都是直接在代码执行的线程中生成的(在我们的场景中,也就是main
线程)。
接下来,我们将所有的产出物在计算线程中生成,这需要使用subscribeOn()
和 Schedulers.computation()
Scheduler。如果你运行下面的代码片段,所有产出物是在线程池中某个可用的计算线程中生成的,RxComputationThreadPool-1
。
简洁起见,我们没有使用完整的DisposableSubscriber
,因为在这种简单的场景中,我们不需要每次都处理onError()
和onComplete()
,我们只需要处理onNext()
,它只是一个简单的消费者。
在链中,将subscribeOn()
放到什么位置其实无关紧要。它只会影响源observable并控制在哪个线程中生成条目。
在上面的样例中,读者可能会注意到map()
和filter()
操作符也会生成其他的observable,subscribeOn()
放到了链的底部。但是,如果你运行下面的代码片段的话,就会发现它只会影响源observable。如果将observeOn()
同时加入到链中,会更加清晰。即便我们将subscribeOn()
放到observeOn()
下面,它也只会影响源observable。
另外需要注意的是,我们不能在链中多次使用subscribeOn()
。从技术上讲,你可以这样做,但是它并不会产生额外的作用。在下面的代码片段中,我们将三个不同的Scheduler连接到了一起,你能猜出来,哪个Scheduler
会成为源observable吗?
如果你的答案是Schedulers.io()
,那么恭喜你答对了!
即便我们在链中放置多个subscribeOn()
操作符,只有最靠近源observable的那一个会发挥作用。
我们值得花一些时间更深入地理解一下上述的场景。为什么Schedulers.io()
Scheduler会发挥作用,而不是其他的Scheduler?正常情况下,你可能会认为Schedulers.newThread()
会生效,因为它是在链的最后才添加上去的。
我们必须要理解,在RxJava中,订阅(subscription)必须是基于上游observable实例的。如下的代码与我们前面看到的非常类似,只是更加繁琐一些。
我们从片段的最后一行开始理解这段代码。在这里,目标订阅者(或者说链中最下游的observer)基于observable o3
调用subscribe()
方法,这样会隐式地对它的直接上游observable o2
调用subscribe()
方法。o3
所提供的observer实现会将生成的数字乘以10。
这个过程会重复进行,o2
会隐式地基于o1
调用subscribe,所传入的observer实现会将偶数过滤出来并允许其通过。现在,我们到达了根部,源observable o1
没有任何上游observable来调用subscribe
。这实际上就完成了observable链,源observable就能生成其条目了。
对于RxJava中订阅是如何实现的,读者应该就会比较清楚了。到目前为止,对observable链如何组成以及事件如何从源observable开始在链中传播应该有了一个基本的了解。
正如我们刚刚看到的,subscribeOn()
能够指明源observable要在一个特定的线程中生成其条目,这个线程还会负责将这些条目一直推送到sink Subscriber
中。因此,默认情况下,订阅者也会在这个线程中消费这些条目。
但是,我们的应用所期望的行为往往并非总是如此。假设,我们想要通过网络获取一些数据并在App的UI中展现。
本质上,我们有两件事情需要完成:
我们需要有一个observable在I/O线程进行网络调用,并将产出传递给目标订阅者。如果你只是使用subscribeOn()
和Schedulers.io()
的话,最终的订阅者将会在I/O线程中进行操作。我们无法在主线程之外的其他线程中访问UI组件,因此我们如果这样做的话,会遇到麻烦。
现在,我们迫切需要切换线程,而这就是observeOn()
操作符能够发挥作用的地方。在observable链中,如果遇到observeOn()
,那么产出物将会立即切换到所指定的线程中。
在这个稍微有点牵强的样例中,我们有一个observable从网络上获取整数所组成的流。在实际的用例中,这可以换成任意的异步操作,比如读取大文件或从数据库获取数据等等。你可以尝试该代码片段并查看结果,需要关注的是日志中的线程名。
现在,我们看一个稍微复杂一点的样例,这里会使用多个observeOn()
操作符,这样在我们的observable链中会多次切换线程。
在上面的代码片段中,源observable在I/O线程生成其条目,因为这里组合使用了subscribeOn()
和Schedulers.io()
。现在,我们想要使用map()
操作符来转换每个条目,只不过需要在计算线程中进行。为了实现这一点,我们可以在map()
操作符之前组合使用observeOn()
和Schedulers.computation()
,这样的话就能切换线程并将产出物传递到计算线程中了。
接下来,需要过滤条目,但是因为某些原因,我们想在一个全新的线程中进行操作。这样的话,我们可以在filter()
操作符前面组合使用observeOn()
和Schedulers.newThread()
,这样的话,就能为每个条目切换至一个新的线程。
最后,我们希望订阅者消费最终处理后的条目并在UI上展现结果,为了实现这一点,我们需要再次切换线程,不过这一次需要切换至主线程,这里需要组合使用observeOn()
和AndroidSchedulers.mainThread()
Scheduler。
但是,如果我们多次地连续使用observeOn()
会怎样呢?在下面的代码片段中,最终的订阅者在消费结果的时候,到底使用的是哪个线程呢?是第一个还是最后一个observeOn()
能发挥作用呢?
如果你运行这个代码片段的话,你会发现所有的条目都是在RxComputationThreadPool-1
线程中消费的,这意味着最后的observeOn()
和Schedulers.computation()
发挥了作用。但是,为什么呢?
你可能已经猜到,为何是最后的observeOn()
,而不是其他的observeOn()
发挥作用了。我们已经知道,订阅只能针对上游才能发生,另外一方面,产出物只能面对下游才能发挥作用。它们从源observable开始,一路沿着链向下,直至最后的sink subscriber。
observeO