参考:
https://www.jianshu.com/p/88aacbed8aa5
以下介绍基于Rxjava2
reactive是什么
一种基于观察者模式的响应式编程范式。
{待补充}
源码分析 基本架构 先看看基于Rxjava2的reactive编程最简demo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> observableEmitter) throws Exception { } }).subscribe(new Observer<Object>() { @Override public void onNext(@NonNull Object s) { //订阅业务代码 } @Override public void onSubscribe(Disposable disposable) { } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { } });
可以看到,是一种链式调用
的写法
Observable.create会返回一个ObservableCreate,他本身也是一个Observable。
Observable是一个模板类,首先看一下Observable的subscribe方法,这是真正执行消费逻辑的起点。subscribeActual是模板方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @SchedulerSupport("none") public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); this.subscribeActual(observer); } catch (NullPointerException var4) { throw var4; } catch (Throwable var5) { Exceptions.throwIfFatal(var5); RxJavaPlugins.onError(var5); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(var5); throw npe; } }
ObservableCreate的subscribeActual方法实现
1 2 3 4 5 6 7 8 9 10 11 12 protected void subscribeActual(Observer<? super T> observer) { ObservableCreate.CreateEmitter<T> parent = new ObservableCreate.CreateEmitter(observer); observer.onSubscribe(parent); try { this.source.subscribe(parent); } catch (Throwable var4) { Exceptions.throwIfFatal(var4); parent.onError(var4); } }
subscribeActual方法做了这几件事:
Observer包装成CreateEmitter,
Observer和Emitter彼此持有对方的引用
调用ObservableOnSubscribe的subscribe方法(ObservableCreate构造方法里注入source,类型是ObservableOnSubscribe)
执行流程
Observer包装成Emitter –> ObservableOnSubscribe生产事件,调用Emitter的onNext方法将事件传递给订阅者 –> Emitter执行一些框架自身的逻辑功能(比如流控),然后调用Observer的onNext方法将事件传递回Observer –> 执行Observer的onNext方法
执行链路
Observer –(包装)–> Emitter –(传递)–> ObservableOnSubscribe –(回调)–> Emitter –(回调)–> Observer
小结
Rxjava框架提供核心类:ObservableCreate。核心方法:subscribeActual。
ObservableCreate负责将事件源的生产与消费串联起来。当然,还有负责事件源类型映射的ObservableMap等。
ObservableCreate拿到业务方的Observer实现类,包装成CreateEmitter(RxJava提供的事件发送器)传递给事件源生产者ObservableOnSubscribe。
ObservableCreate注入业务方的ObservableOnSubscribe实现类,调用其subscribe方法执行“事件源的生产逻辑”。方法内部逻辑:生产事件源Object,通过Emitter向下传递到Observer,执行消费逻辑。
所以使用reactive编程的基本模式:
自定义Observer(负责事件源的消费)
实现ObservableOnSubscribe接口的subscribe方法(负责事件源的生产)
加工“事件源” 先看下写法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 Observable.create(new ObservableOnSubscribe<Object>() { @Override public void call(@NonNull Observer<Object> e) throws Exception { // 生产并发送数据源 } }).map(new Function<Object, String>() { @Override public String apply(@NonNull Object o) throws Exception { // 数据源类型转换 // return "Obj"; } }).subscribe(new Observer<String>() { @Override public void onNext(@NonNull String s) { //订阅业务代码 } });
Observable可以调用map\filter等方法对事件源进行加工
Observable调用create创建ObservableCreate,调用map创建ObservableMap
下面是ObservableMap的subscribeActual方法
1 2 3 public void subscribeActual(Observer<? super U> t) { this.source.subscribe(new ObservableMap.MapObserver(t, this.function)); }
ObservableMap把Observer包装为MapObserver,Function是业务方自定义的映射函数,然后调用ObservableCreate的subscribe方法执行“事件源的生产与发送”。对于ObservableMap,source就是级联调用上一级的Observable实现类,这个demo里就是ObservableCreate
MapObserver的“发送数据源”onNext方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void onNext(T t) { if (!this.done) { if (this.sourceMode != 0) { this.downstream.onNext((Object)null); } else { Object v; try { v = ObjectHelper.requireNonNull(this.mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable var4) { this.fail(var4); return; } this.downstream.onNext(v); } } }
downstream是业务方实现类Observer的引用,mapper是业务方定义的映射函数,这两个参数均在构造函数里指定了。
上层(ObservableCreate)调用onNext生产并发送数据后,调用下层MapObserver的onNext方法。内部逻辑:执行映射函数完成数据源类型转换,最后交给业务方实现的Observer消费数据(onNext方法)。
执行链路
Observer –(包装)–> MapObserver –(包装)–> Emitter –(传递)–> ObservableOnSubscribe –(回调)–> Emitter –(回调)–> MapObserver –(回调)–> Observer
前半程一层层包装Observer,后半程从顶到底逐层回调
每一层Observer包装类的onNext方法消费数据源,直至调到最底层业务方Observer实现类的onNext方法。
小结
ObservableCreate负责生产和发送数据。生产数据的逻辑由业务方在ObservableOnSubscribe的实现类里控制,发送数据的机制由RxJava框架提供的CreateEmitter负责。
ObservableMap负责完成数据类型的映射。
最后执行业务方Observer的onNext方法消费数据。
看到这,基本能看出reactive在实现生产-消费模型的一般思路:
ObservableCreate->ObservableMap->ObservableXXX。箭头左边的是右边的parent,右边的ObservableXXX使用一些包装类例如MapObserver、CreateEmitter包装Observer,指定从数据生产后到消费前的处理逻辑。ObservableCreate最终拿到Observer的包装类CreateEmitter,发送数据
流控 所谓流控,在Rxjava架构下,就是消费端(下游)Observer可以决定何时终止对事件源生产端(上游)的消费。
上下游的这种流控通过Disposable实现。
看下Observer的定义:
1 2 3 4 5 6 7 8 9 public interface Observer<T> { void onSubscribe(@NonNull Disposable var1); void onNext(@NonNull T var1); void onError(@NonNull Throwable var1); void onComplete(); }
Observer的onSubscribe维护一个上游事件生产端创建的Disposable引用,执行dispose方法来终止对上游的消费。
demo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public void demo8() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { for (int i = 0; i < 10; i++) { System.out.println("发送" + i); emitter.onNext(i); } emitter.onComplete(); } }).subscribe(new Observer<Integer>() { private Disposable disposable; @Override public void onSubscribe(Disposable d) { disposable = d; } @Override public void onNext(Integer integer) { System.out.println("接收" + integer); if (integer > 4) disposable.dispose(); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println("数据接受完成"); } }); }
那么,Disposable是何时创建并注入Observer的?
上面提到,ObservableCreate会把Observer包装成一个发射器CreateEmitter,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 protected void subscribeActual(Observer<? super T> observer) { ObservableCreate.CreateEmitter<T> parent = new ObservableCreate.CreateEmitter(observer); observer.onSubscribe(parent); try { this.source.subscribe(parent); } catch (Throwable var4) { Exceptions.throwIfFatal(var4); parent.onError(var4); } }
消费者Observer不持有生产者Observable,而是持有Emitter。生产者Observable也是持有Emitter,可以理解Emitter是生产消费者之间的桥梁。生产者产出的数据通过Emitter传递给消费者,所以Emitter可以承担流控的职责。
CreateEmitter其实是Disposable的一个实现类,Disposable是一次性的意思。流控时,可以理解为消费者不再需要消费了,那么这个Emitter发送器就没有价值了,即它是一次性的,用后即丢。
1 2 3 static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { ... }
Observer想终止消费时,调用Disposable的dispose方法,实际会执行CreateEmiter的dispose方法,将发射器设置为已废弃
1 2 3 public void dispose() { DisposableHelper.dispose(this); }
再看下CreateEmitter的onNext方法
1 2 3 4 5 6 7 8 9 10 public void onNext(T t) { if (t == null) { this.onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); } else { if (!this.isDisposed()) { this.observer.onNext(t); } } }
因此,下游Observer调用dispose方法废弃发射器后,发射器不再回调observer的onNext来消费,达到流控的效果
小结
CreateEmitter(一个Disposable实现类)和Observer互相持有彼此的引用
CreateEmitter持有Observer来回调下游,执行事件源消费链路
Observer持有CreateEmitter,控制上游是否回调下游进行消费,达到流控的目的。
线程切换 线程切换有两种方法:subscribeOn
和observeOn
demo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public static void test1() { Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> observableEmitter) { System.out.println("准备"); System.out.println(Thread.currentThread().getName()); for (int i = 0; i < 5; i++) { //Thread.sleep(1000); System.out.println("发射" + " " + Thread.currentThread().getName()); observableEmitter.onNext("123"); } observableEmitter.onComplete(); } }). subscribeOn(Schedulers.computation()). subscribeOn(Schedulers.io()). observeOn(Schedulers.single()). map(new Function<String, String>() { @Override public String apply(@NonNull String o) throws Exception { //转换业务代码 System.out.println("convert" + Thread.currentThread().getName()); return "Obj"; } }). observeOn(Schedulers.io()). subscribe(new Observer<String>() { @Override public void onNext(@NonNull String s) { //订阅业务代码 System.out.println("接收" + Thread.currentThread().getName()); } @Override public void onSubscribe(Disposable disposable) { System.out.println("onSubscribe"); } @Override public void onError(Throwable throwable) { System.out.println("onError" + throwable.getMessage()); } @Override public void onComplete() { System.out.println("onComplete"); } }); }
先说结论:
subscribeOn指定上游数据处理执行的线程。上游是相对subscribeOn调用处而言。以上面的例子说明,上游是数据生成和发送。每次调用subscribeOn都对上游生效。如果连续多次调用,第一次调用指定的线程生效
observeOn指定下游数据消费的线程。下游是相对subscribeOn调用处而言。以上面的例子说明,下游是数据Map映射、消费数据。每次调用observeOn都对下游生效。如果连续多次调用,最后一次调用指定的线程生效
因此,上面的demo中,数据生产和发送运行在computation线程,数据类型转换运行在single线程,消费端运行在io线程
subscribeOn 看下subscribeOn方法的实现
1 2 3 4 public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler)); }
和map方法类似,也是返回一个Observable,看下它的subscribeActual方法
1 2 3 4 5 public void subscribeActual(Observer<? super T> observer) { ObservableSubscribeOn.SubscribeOnObserver<T> parent = new ObservableSubscribeOn.SubscribeOnObserver(observer); observer.onSubscribe(parent); parent.setDisposable(this.scheduler.scheduleDirect(new ObservableSubscribeOn.SubscribeTask(parent))); }
也是包装了下层的Observer,不过这个SubscribeOnObserver包装类并没有对事件源做其他操作,直接调用了下游observer的onNext方法,这里就不贴代码了。
重点看下subscribeTask这个任务做了啥
1 2 3 4 5 6 7 8 9 10 11 final class SubscribeTask implements Runnable { private final ObservableSubscribeOn.SubscribeOnObserver<T> parent; SubscribeTask(ObservableSubscribeOn.SubscribeOnObserver<T> parent) { this.parent = parent; } public void run() { ObservableSubscribeOn.this.source.subscribe(this.parent); } }
很简单,继续向上调用上层Observable的subscribe方法,重点是使得上游的代码都运行在scheduler指定的线程里
执行链路
Observer –(包装)–> SubscribeOnObserver –(包装)–> Emitter –(传递)–> ObservableOnSubscribe –(回调)–> Emitter –(回调)–> SubscribeOnObserver –(回调)–> Observer
虽然SubscribeOnObserver没有对Observer做额外处理,但继续调用上层source.subscribe方法时,已经切换到scheduler定义的线程了。因此,上游的所有操作,都会在subscribeOn方法指定的scheduler线程里执行。由此可知,如果连续多次调用subscribeOn指定线程,只有第一次起作用
observeOn observeOn和subscribeOn的思路基本一致,这里只介绍差异。
ObserveOnObserver
是observeOn方法执行后,对observer的包装类,看下它的onNext方法
1 2 3 4 5 6 7 8 9 public void onNext(T t) { if (!this.done) { if (this.sourceMode != 2) { this.queue.offer(t); } this.schedule(); } }
schedule方法的实现:
1 2 3 4 5 6 void schedule() { if (this.getAndIncrement() == 0) { this.worker.schedule(this); } }
this指代ObserveOnObserver
,它是一个runnable,run方法里会调用下游observer的onNext方法。因此,下游observer的消费代码运行在observeOn方法指定的scheduler线程里。
Flowable背压 Flowable是Rxjava2较Rxjava的主要更新,可以把它看做是Observable+背压处理
那么,什么是背压
呢?
当生产速度>消费速度
时,未被及时消费的对象堆积在内存中,就产生了所谓的背压
。
在Reactive编程模型中,只有在生产端和消费端运行在不同线程
,且生产速度>消费速度
时,才会出现背压的情况。
需要说明的是,尽管Flowable支持背压,但也牺牲了一些性能
,所以除非满足背压的场景,否则还是推荐使用Observable。
另外,Flowable使用了另一套体系(Publisher-Subscriber),与Observable体系(ObservableSource-Observer)的关系可以简单理解为:
Flowable(Publisher实现类) <–> Observable(ObservableSource实现类)
Subscriber <–> Observer
看下背压的基本写法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public void demo2() { Flowable .create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> e) throws Exception { System.out.println("发射----> 1"); e.onNext(1); System.out.println("发射----> 2"); e.onNext(2); System.out.println("发射----> 3"); e.onNext(3); System.out.println("发射----> 完成"); e.onComplete(); } }, BackpressureStrategy.BUFFER) //create方法中多了一个BackpressureStrategy类型的参数 .subscribeOn(Schedulers.newThread())//为上下游分别指定各自的线程 .observeOn(Schedulers.newThread()) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { //onSubscribe回调的参数不是Disposable而是Subscription s.request(Long.MAX_VALUE); //注意此处,暂时先这么设置 } @Override public void onNext(Integer integer) { System.out.println("接收----> " + integer); } @Override public void onError(Throwable t) { } @Override public void onComplete() { System.out.println("接收----> 完成"); } }); }
Flow.create会返回一个FlowableCreate,和ObservableCreate类似,看下它的subscribeActual方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void subscribeActual(Subscriber<? super T> t) { Object emitter; switch(this.backpressure) { case MISSING: emitter = new FlowableCreate.MissingEmitter(t); break; case ERROR: emitter = new FlowableCreate.ErrorAsyncEmitter(t); break; case DROP: emitter = new FlowableCreate.DropAsyncEmitter(t); break; case LATEST: emitter = new FlowableCreate.LatestAsyncEmitter(t); break; default: emitter = new FlowableCreate.BufferAsyncEmitter(t, bufferSize()); } t.onSubscribe((Subscription)emitter); try { this.source.subscribe((FlowableEmitter)emitter); } catch (Throwable var4) { Exceptions.throwIfFatal(var4); ((FlowableCreate.BaseEmitter)emitter).onError(var4); } }
与ObservableCreate不同的是,这里基于入参指定的背压策略枚举,创建对应的发射器Emitter。
这些Emitter对于背压有不同的处理策略,具体体现在onNext实现的差异上。
但这些Emitter也有一些共同逻辑:Emitter会维护一个初始值为128的Long型原子类。每次向下游发送一次数据,该值自减1。如果减到0,基于不同的Emitter实现类,执行不同策略:丢弃 or 忽略(指忽略背压,继续生产数据,可能产生OOM) or 调用subscriber的onError
等等
Flowable体系的Emitter发送数据和Observable体系的Emitter不同,并非依次回调下游的onNext方法,他回调下游ObserveOnSubscriber的onNext,将数据放入下游ObserveOnSubscriber的异步缓存池内(本质是一个队列Queue),然后尝试启动ObserveOnSubscriber的异步线程就结束了
ObserveOnSubscriber在异步线程里执行runAsync方法,如果已消费的数据量小于消费者subscriber指定的阈值,则从异步缓存池里取数据后回调消费者subscriber.onNext方法消费。当下游消费了96个数据对象后,Emitter的原子Long当前值value+96,即已消费的额度回补给发端
消费者subscriber在onSubscribe方法里设置消费数据个数,该值维护在ObserveOnSubscriber,一旦消费数达到了阈值,ObserveOnSubscriber在异步消费线程里不会回调下游subscriber.onNext方法消费
1 2 3 4 @Override public void onSubscribe(Subscription s) { //onSubscribe回调的参数不是Disposable而是Subscription s.request(Long.MAX_VALUE); //注意此处,暂时先这么设置 }
总结一下:
生产出来的数据,会缓存到异步缓存池中。上游发送数据前,先判断原子Long的value,如果为0,执行背压策略,否则发送数据。然后CAS操作value-1。当下游消费了96个数据对象后,回补额度,Emitter的原子Long当前value+96。
这种设计允许下游消费速度在一定限度内比上游生产速度慢。只要生产128个数据的时间内,能消费96个数据,就能回补已消费数据额度用于再生产。否则就要执行相应的背压策略。
打个比方:女神一开始给屌丝128的好感度,好友度随女神等级不同,下跌速度不同。在好感度跌0前,如果屌丝送了超过96次礼物,回补96好感度,并重置礼物次数。如果好感度跌0,分手。。。