参考博文:给初学者的RxJava2.0教程-简书     源码 :https://github.com/ssseasonnn/RxJava2Demo

1 若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃:

package com.shiqing.rxjava2;
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import io.reactivex.*;
public class MainActivity extends AppCompatActivity {
public static final String TAG = "sqrxjava MainActivity";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
// emitter.onComplete();
          //连续两次执行 onError
emitter.onError(new Throwable("抛出异常...1"));
emitter.onError(new Throwable("抛出异常...2"
));
}
});
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Object value) {
Log.d(TAG, "onNext value " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
};
observable.subscribe(observer);
}
}

崩溃日志:

ObservableEmitter: Emitter是发射器的意思,那就很好猜了,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)onComplete()onError(Throwable error)就可以分别发出next事件、complete事件和error事件。

但是,请注意,并不意味着你可以随意乱七八糟发射事件,需要满足一定的规则:

  • 上游可以发送无限个onNext, 下游也可以接收无限个onNext.
  • 当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件.
  • 当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
  • 上游可以不发送onComplete或onError.
  • 最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然

注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, **并不一定会导致程序崩溃. ** 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.

以上几个规则用示意图表示如下:

只发送onNext事件

 发送onComplete事件  

 发送onError事件  

2 Disposable 好比两根管道之间的一个机关, 当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件,但是上游仍会继续发送剩余的事件。

subscribe()有多个重载的方法:

    public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}

最后一个带有Observer参数的我们已经使用过了,这里对其他几个方法进行说明.

  • 不带任何参数的subscribe() 表示下游不关心任何事件,你上游尽管发你的数据去吧, 老子可不管你发什么.
  • 带有一个Consumer参数的方法表示下游只关心onNext事件, 其他的事件我假装没看见, 因此我们如果只需要onNext事件可以这么写:
 observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "onError: " + throwable.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "onComplete");
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "onSubscribe");
}
});

输出值:

03-23 07:13:12.438 3997-3997/com.shiqing.rxjava2 D/sqrxjava MainActivity: onSubscribe...
03-23 07:13:12.438 3997-3997/com.shiqing.rxjava2 D/sqrxjava MainActivity: onNext... : 1
03-23 07:13:12.438 3997-3997/com.shiqing.rxjava2 D/sqrxjava MainActivity: onNext... : 2
03-23 07:13:12.438 3997-3997/com.shiqing.rxjava2 D/sqrxjava MainActivity: onNext... : 3
03-23 07:13:12.438 3997-3997/com.shiqing.rxjava2 D/sqrxjava MainActivity: onComplete...

4 在RxJava中, 已经内置了很多线程选项供我们选择, 例如有

  • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
  • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
  • Schedulers.newThread() 代表一个常规的新线程
  • AndroidSchedulers.mainThread() 代表Android的主线程。

这些内置的Scheduler已经足够满足我们开发的需求, 因此我们应该使用内置的这些选项, 在RxJava内部使用的是线程池来维护这些线程, 所有效率也比较高.

最新文章

  1. loj 1030概率dp
  2. ArcEngine 通过SpatialRelDescription删除不相交要素
  3. Test complete测试工具介绍
  4. ASP.NET MVC轻教程 Step By Step 4——Model、View和Controller
  5. PAT (Advanced Level) 1092. To Buy or Not to Buy (20)
  6. Redis详解
  7. AVL树(平衡二叉查找树)
  8. vmware虚拟机的克隆
  9. Lenovo System x3650 设置管理接口地址
  10. Saslauthd服务实现SMTP发信认证
  11. appium+java(四)微信公众号自动化测试实践
  12. 使用 cacti 监控 windows 服务器硬盘的 I/O 状况
  13. 无根树的计数——prufer序列
  14. Junit测试中找不到junit.framework.testcase
  15. 使用IntelliJ IDEA 15和Maven创建Java Web项目
  16. RPM 安装oracle18c 修改字符集的方法
  17. Firefox 火狐 页面特殊符号乱码解决方法
  18. 【转】django 与 vue 的完美结合 实现前后端的分离开发之后在整合
  19. bzoj1715 虫洞
  20. Atitit.异步编程 java .net php python js 的比较

热门文章

  1. JIT和AOT编译详解
  2. (转)查询或修改iPhone的短信服务中心号码(iOS通用)
  3. Vue百度搜索
  4. $.extend与$.fn.extend()
  5. (转)C# WebApi 接口参数不再困惑:传参详解
  6. 小米造最强超分辨率算法 | Fast, Accurate and Lightweight Super-Resolution with Neural Architecture Search
  7. CF1133E K Balanced Teams(DP)
  8. 避免crontab输出日志
  9. tornado-版本迁移工具alembic
  10. 前端通过js-xlsx获取Excel完整数据