RxJava简要分析
一:RxJava执行流程:
RxJava简单使用
private final String tag = getClass().getSimpleName();
//数据源,被观察对象
Observable<String> obser = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(tag,"emit 1");
emitter.onNext("t1");
Log.d(tag,"emit 2");
emitter.onNext("t2");
Log.d(tag,"emit 3");
emitter.onNext("t3");
Log.d(tag,"emit ");
emitter.onComplete();
}
}); private void observer_test(){
Observer<String> dnObser = new Observer<String>() {//观察者,处理对应事件
@Override
public void onSubscribe(Disposable d) {
Log.d(tag,"onSubscribe");
} @Override
public void onNext(String s) {
Log.d(tag,"onNext "+s);
} @Override
public void onError(Throwable e) {
Log.e(tag,"onError");
} @Override
public void onComplete() {
Log.d(tag,"onComplete");
}
};
obser.subscribe(dnObser);
});
}
从例子中看出RxJava主要组成:
Observable:被观察者,被观察者本身
ObservableOnSubscribe:通知观察者执行哪些行为
Observer:观察者,通过实现对应方法做具体处理
订阅过程处理:
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//开始调用ObservableOnSubscribe subscribe
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
protected abstract void subscribeActual(Observer<? super T> observer);
查看subscribe方法为抽象方法,具体实现为ObservableCreate,从Observabel的create方法可以知道
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
查看Observable内的源码
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
} @Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent); try {//这里调用ObservableOnSubscribe 的subscribe方法,开始执行事件流程
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
......
}
二:数据转换
收到Observable的消息之前我们有可能会对数据流进行处理,例如map()、flatMap()、fllter()等方法,
这里使用了map()方法,它接收了observeable的数据并将通过该方法将数据进行转换后的新数据发出去,即做了中间转化
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
rxjava2使用了Function接口提供转换功能,
public interface Function<T, R> {
//将T类型数据转化为R处理
@NonNull
R apply(@NonNull T t) throws Exception;
}
具体操作交给ObservableMap内部类MapObserver处理
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
} @Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
} static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
} @Override
public void onNext(T t) {
....... U v;
try {
//调用Function apply处理
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}//将转换后的类型再传递给原Obsever
actual.onNext(v);
}
......
}
}
MapObserver实现Observer,持有传入的Observer,通过Function的mapper.apply(t)进行转换后再传递给原observer onNext()
三:任务调度(scheduler)
通过使用subscribeOn()、observeOn()方法传入对应的Scheduler去指定每个操作应该运行在何种线程之中
Observable.create(...)
...
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
...
.subscribeOn(Schedulers.newThread())
...
.subscribeOn(Schedulers.computation())
...
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(...)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
创建了一个新的Observable
,并为新的Observable
创建了新的计划表ObservableSubscribeOn对象,新的计划表保存了原始Observable
对象和调度器scheduler
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
} @Override
public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent);
//调用了Scheduler的shedule方法,创建Runable内部执行原obseverable sbscribe
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
...
}
以IOScheduler为例
Scheduler schdule
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//调用Work 切换执行线程
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit); return w;
}
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
} public int size() {
return pool.get().allWorkers.size();
} static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean(); EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
。。。。 @NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
} return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
} static final class ThreadWorker extends NewThreadWorker {
private long expirationTime; ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
} 。。。
}
我们从缓存池里拿到需要的worker并作了一层封装成为EventLoopWorker:最后调用NewThreadWorker 的scheduleActual
看NewThreadWorker实现:
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;//线程执行器
... public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
...... //通过Execotor来执行上面传递过来的Runable对象,达到在不同类型线程来执行调用Observer方法
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(decoratedRun);//提交到线程池执行
} else {
f = executor.schedule(decoratedRun, delayTime, unit);
}
return Disposables.fromFuture(f);
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
再看看observeOn
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
} @Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
} static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable { private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize; SimpleQueue<T> queue; Disposable s; Throwable error;
volatile boolean done; volatile boolean cancelled; int sourceMode; boolean outputFused; ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
} @Override
public void onSubscribe(Disposable s) {
... actual.onSubscribe(this);
}
} @Override
public void onNext(T t) {
if (done) {
return;
} if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
} @Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
} @Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
...
void schedule() {//任务调度,交给线程池回调Runable
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
@Override
public void run() {//回调处理,代理调用原Observer方法
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
.......
a.onNext(v);
......
}
}
... }
}
这里通过ObservableObserveOn代理,实现Observer observeOn线程切换处理
未完待续。。。
最新文章
- c++从文件中读取特定字符串问题的总结
- 如何配置ssh免密码登录
- android 纯c/c++开发(转)
- 转:db2 backup 及 restore
- .Net分布式缓存应用实例:Couchbase
- php与mysql通讯那点事
- Android开发之 android:windowSoftInputMode属性详解
- linux扩展权限
- shell入门之流程控制语句 分类: 学习笔记 linux ubuntu 2015-07-10 16:38 89人阅读 评论(0) 收藏
- sybase从表A创建表B
- Windows查看端口被哪个进程占用
- iOS学习——iOS常用的存储方式
- poj3436(拆点最大流)
- 使用eclipse整合ssh项目的例子--lljf(1)
- 安装php rabbitmq扩展,继上一篇安装Rabbitmq
- SOJ 1685:chopsticks(dp)
- Android代码安全工具集
- MySql数据库基础笔记(一)
- stl源码剖析 详细学习笔记 RB_tree (1)
- C++设计模式 之 “接口隔离” 模式:Facade、Proxy、Mediator、Adapter
热门文章
- gitlab中CI/CD过程中的坑
- IO 多路复用 select/poll/epoll --->; Reactor --->; Netty
- Idea2020.2.3 创建JavaWeb项目(部署Tomcat)方法
- ABAP学习(35):常用Function
- angular js 实现模糊查询并分页
- db2查看表结构、表索引
- 【Java】BigDecimal
- Java-Java调用mysqldump进行数据库备份
- A Novel Cross-domain Access Control Protocol in Mobile Edge Computing
- vue面试题及答案(1)