RxJava作为目前一款超火的框架,它便捷的线程切换一直被人们津津乐道,本文从源码的角度,来对RxJava的线程模型做一次深入理解。(注:本文的多处代码都并非原本的RxJava的源码,而是用来说明逻辑的伪代码)

入手体验

RxJava 中切换线程非常简单,例如最常见的异步线程处理,主线程回调的模型,可以很优雅的用如下代码来做处理:

  1. Observable.just("magic")
  2. .map(str -> doExpensiveWork(str))
  3. .subscribeOn(Schedulers.io())
  4. .observeOn(AndroidSchedulers.mainThread())
  5. .subscribe(obj -> System.out.print(String.valueOf(obj)));

如上,subscribeOn(Schedulers.io())保证了doExpensiveWork 函数发生在io线程,observeOn(AndroidSchedulers.mainThread())保证了subscribe 回调发生在Android 的主线程。所以,这自然而然的引出了本文的关键点,subscribeOnobserveOn到底区别在哪里?

流程浅析

要想回答上面的问题,我们首先需要对RxJava的流程有大体了解,一个Observable从产生,到最终执行subscribe,中间可以经历n个变换,每次变换会产生一个新的Observable,就像奥运开幕的传递火炬一样,每次火炬都会传递到下一个人,最终点燃圣火的是最后一个火炬手,即最终执行subscribe操作的是最后一个Observable,所以,每个Observable之间必须有联系,这种关系在代码中的体现就是,每个变换后的Observable都会持有上一个Observable 中OnSubscribe对象的引用(Observable.create 函数所需的参数),最终 Observable的subscribe函数中的关键代码是这一句:

  1. observable.onSubscribe.call(subscriber)

这个observable就是最后一个变换后的observable,那这个onSubscribe对象是谁呢?如何一个observable没有经过任何变换,直接执行了subscribe,当然就是我们在create中传入的onSubscribe, 但如果中间经过map、reduce等变换,这个onSubscribe显然就应该是创建变换后的observable传入的参数,大部分变换最终都交由lift函数:

  1. public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
  2. return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
  3. }

所以,上文所提到的onSubscribe对象应该是OnSubscribeLift的实例,而这个OnSubscribeLift所接收的两个参数,一个是前文提到的,上一个Observable中的OnSubscribe对象,而operator则是每种变换的一个抽象接口。再来看这个OnSubscribeLift对象的call方法:

  1. public void call(Subscriber<? super R> o) {
  2. Subscriber<? super T> st = operator.call(o);
  3. parent.call(st);

operator与parent就是前文提到的两个参数,可见,operator接口会拥有call方法,接收一个Subscriber, 并返回一个新的Subscriber对象,而接下来的parent.call(st)是回调上一层observable的onSubscribe的call方法,这样如此继续,一直到一个onSubscribe截止。这样我们首先理清了一条线路,就是从最后一个observable的subscribe后,OnSubscribe调用的顺序是从后向前的。

这就带来了另外一个疑问,从上面的代码可以看到,在执行parent.call(st)之前已经执行了operator.call(o)方法,如果call方法里就把变换的操作执行了的话,那似乎变换也会是从后向前传递的呀?所以这个operator.call方法绝对不是我们想象的那么简单。这里以map操作符为例,看源码:

  1. public Subscriber<? super T> call(final Subscriber<? super R> s) {
  2. MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
  3. o.add(parent);
  4. return parent;
  5. }

这里果然没有执行变换操作,而是生成一个MapSubscriber对象,这里需要注意MapSubscriber构造函数的两个参数,transformer是真正要执行变换的Func1对象,这很好理解,那对于o这个Subscriber是哪一个呢?什么意思?举个

最新文章

  1. bootstrap入门
  2. VMware桥接模式无法自动化获取IP的解决方法
  3. comms.nottingham.ac.uk/learningtechnology
  4. uedit富文本编辑器
  5. Python写UTF8文件,UE、记事本打开依然乱码的问题
  6. JS中 submit提交与Form表单里的onsubmit的调用问题?
  7. Oracle——PL/SQL 语句
  8. iframe 元素
  9. python操作redis--string
  10. Struts2中在Action里面向前端页面传值的方法总结
  11. 四轴飞行器1.3 MPU6050(大端)和M4的FPU开启方法
  12. cocos2dx进阶学习之CCTMXLayer
  13. Linux(CentOS)系统下安装好apache(httpd)服务后,其他电脑无法访问的原因
  14. 华为oj之质数因子
  15. [LeetCode] Smallest Rotation with Highest Score 得到最高分的最小旋转
  16. mysqlGTID主从配置
  17. 《转》推荐几个精致的web UI框架
  18. C语言应用操作之文件
  19. centos7 Minimal安装没有ifconfig
  20. MatLab Swap Rows or Cols 交换行或列

热门文章

  1. CentOS下搭建nginx+php环境
  2. SinalR+WebSocket
  3. Intelligencia.UrlRewriter在IIS 7.0下的完全配置攻略
  4. Python之路-python(rabbitmq、redis)
  5. Python之路-python(面向对象一)
  6. 使用百度编辑器时,报错:从客户端(&quot;...)中检测到有潜在危险的 Request.Form 值
  7. PL/SQL不支持64位Oracle Client 解决办法
  8. haproxy log config
  9. MongoDb 聚合报错
  10. Linux 下修改配置实现在当前目录下寻找可执行文件