一:RxJava执行流程:

RxJava简单使用

private final String tag = getClass().getSimpleName();
//数据源,被观察对象
Observable<String> obser = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(tag,"emit 1");
emitter.onNext("t1");
Log.d(tag,"emit 2");
emitter.onNext("t2");
Log.d(tag,"emit 3");
emitter.onNext("t3");
Log.d(tag,"emit ");
emitter.onComplete();
}
}); private void observer_test(){
Observer<String> dnObser = new Observer<String>() {//观察者,处理对应事件
@Override
public void onSubscribe(Disposable d) {
Log.d(tag,"onSubscribe");
} @Override
public void onNext(String s) {
Log.d(tag,"onNext "+s);
} @Override
public void onError(Throwable e) {
Log.e(tag,"onError");
} @Override
public void onComplete() {
Log.d(tag,"onComplete");
}
};
obser.subscribe(dnObser);
});
}

从例子中看出RxJava主要组成:

Observable:被观察者,被观察者本身

ObservableOnSubscribe:通知观察者执行哪些行为

Observer:观察者,通过实现对应方法做具体处理

订阅过程处理:

  @Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    //开始调用ObservableOnSubscribe subscribe
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}

protected abstract void subscribeActual(Observer<? super T> observer);

查看subscribe方法为抽象方法,具体实现为ObservableCreate,从Observabel的create方法可以知道

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

查看Observable内的源码

public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
} @Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent); try {//这里调用ObservableOnSubscribe 的subscribe方法,开始执行事件流程
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
......
}

二:数据转换

收到Observable的消息之前我们有可能会对数据流进行处理,例如map()、flatMap()、fllter()等方法,

这里使用了map()方法,它接收了observeable的数据并将通过该方法将数据进行转换后的新数据发出去,即做了中间转化

 public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

rxjava2使用了Function接口提供转换功能,

public interface Function<T, R> {
//将T类型数据转化为R处理
@NonNull
R apply(@NonNull T t) throws Exception;
}

具体操作交给ObservableMap内部类MapObserver处理

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
} @Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
} static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
} @Override
public void onNext(T t) {
....... U v;
try {
//调用Function apply处理
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}//将转换后的类型再传递给原Obsever
actual.onNext(v);
}
......
}
}

MapObserver实现Observer,持有传入的Observer,通过Function的mapper.apply(t)进行转换后再传递给原observer onNext()

三:任务调度(scheduler)

通过使用subscribeOn()、observeOn()方法传入对应的Scheduler去指定每个操作应该运行在何种线程之中

Observable.create(...)
    ...
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    ...
    .subscribeOn(Schedulers.newThread())
    ...
    .subscribeOn(Schedulers.computation())
    ...
        .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
        .subscribe(...)

 @CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

创建了一个新的Observable,并为新的Observable创建了新的计划表ObservableSubscribeOn对象,新的计划表保存了原始Observable对象和调度器scheduler

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
} @Override
public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent);
//调用了Scheduler的shedule方法,创建Runable内部执行原obseverable sbscribe
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
...
}

以IOScheduler为例

Scheduler schdule

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//调用Work 切换执行线程
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit); return w;
}
    @NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
} public int size() {
return pool.get().allWorkers.size();
} static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean(); EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
。。。。 @NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
} return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
} static final class ThreadWorker extends NewThreadWorker {
private long expirationTime; ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
} 。。。
}

我们从缓存池里拿到需要的worker并作了一层封装成为EventLoopWorker:最后调用NewThreadWorker 的scheduleActual

看NewThreadWorker实现:

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;//线程执行器
... public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
...... //通过Execotor来执行上面传递过来的Runable对象,达到在不同类型线程来执行调用Observer方法
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(decoratedRun);//提交到线程池执行
} else {
f = executor.schedule(decoratedRun, delayTime, unit);
}
return Disposables.fromFuture(f);
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}

再看看observeOn

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
} @Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
} static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable { private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize; SimpleQueue<T> queue; Disposable s; Throwable error;
volatile boolean done; volatile boolean cancelled; int sourceMode; boolean outputFused; ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
} @Override
public void onSubscribe(Disposable s) {
... actual.onSubscribe(this);
}
} @Override
public void onNext(T t) {
if (done) {
return;
} if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
} @Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
} @Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
...
void schedule() {//任务调度,交给线程池回调Runable
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
    @Override
        public void run() {//回调处理,代理调用原Observer方法
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
        
   void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;
.......

                    a.onNext(v);
                ......
            }
        }       
... }
}

这里通过ObservableObserveOn代理,实现Observer observeOn线程切换处理

未完待续。。。

最新文章

  1. c++从文件中读取特定字符串问题的总结
  2. 如何配置ssh免密码登录
  3. android 纯c/c++开发(转)
  4. 转:db2 backup 及 restore
  5. .Net分布式缓存应用实例:Couchbase
  6. php与mysql通讯那点事
  7. Android开发之 android:windowSoftInputMode属性详解
  8. linux扩展权限
  9. shell入门之流程控制语句 分类: 学习笔记 linux ubuntu 2015-07-10 16:38 89人阅读 评论(0) 收藏
  10. sybase从表A创建表B
  11. Windows查看端口被哪个进程占用
  12. iOS学习——iOS常用的存储方式
  13. poj3436(拆点最大流)
  14. 使用eclipse整合ssh项目的例子--lljf(1)
  15. 安装php rabbitmq扩展,继上一篇安装Rabbitmq
  16. SOJ 1685:chopsticks(dp)
  17. Android代码安全工具集
  18. MySql数据库基础笔记(一)
  19. stl源码剖析 详细学习笔记 RB_tree (1)
  20. C++设计模式 之 “接口隔离” 模式:Facade、Proxy、Mediator、Adapter

热门文章

  1. gitlab中CI/CD过程中的坑
  2. IO 多路复用 select/poll/epoll ---&gt; Reactor ---&gt; Netty
  3. Idea2020.2.3 创建JavaWeb项目(部署Tomcat)方法
  4. ABAP学习(35):常用Function
  5. angular js 实现模糊查询并分页
  6. db2查看表结构、表索引
  7. 【Java】BigDecimal
  8. Java-Java调用mysqldump进行数据库备份
  9. A Novel Cross-domain Access Control Protocol in Mobile Edge Computing
  10. vue面试题及答案(1)