dcddc

西米大人的博客

0%

系统学习Rxjava

参考:

https://www.jianshu.com/p/88aacbed8aa5

以下介绍基于Rxjava2

reactive是什么

一种基于观察者模式的响应式编程范式。

{待补充}

源码分析

基本架构

先看看基于Rxjava2的reactive编程最简demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Observable.create(new ObservableOnSubscribe<Object>() {

@Override
public void subscribe(ObservableEmitter<Object> observableEmitter) throws Exception {

}

}).subscribe(new Observer<Object>() {

@Override
public void onNext(@NonNull Object s) {
//订阅业务代码
}

@Override
public void onSubscribe(Disposable disposable) {

}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onComplete() {

}
});

可以看到,是一种链式调用的写法

Observable.create会返回一个ObservableCreate,他本身也是一个Observable。

Observable是一个模板类,首先看一下Observable的subscribe方法,这是真正执行消费逻辑的起点。subscribeActual是模板方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SchedulerSupport("none")
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");

try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
this.subscribeActual(observer);
} catch (NullPointerException var4) {
throw var4;
} catch (Throwable var5) {
Exceptions.throwIfFatal(var5);
RxJavaPlugins.onError(var5);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(var5);
throw npe;
}
}

ObservableCreate的subscribeActual方法实现

1
2
3
4
5
6
7
8
9
10
11
12
protected void subscribeActual(Observer<? super T> observer) {
ObservableCreate.CreateEmitter<T> parent = new ObservableCreate.CreateEmitter(observer);
observer.onSubscribe(parent);

try {
this.source.subscribe(parent);
} catch (Throwable var4) {
Exceptions.throwIfFatal(var4);
parent.onError(var4);
}

}

subscribeActual方法做了这几件事:

  • Observer包装成CreateEmitter,
  • Observer和Emitter彼此持有对方的引用
  • 调用ObservableOnSubscribe的subscribe方法(ObservableCreate构造方法里注入source,类型是ObservableOnSubscribe)

执行流程

Observer包装成Emitter –>
ObservableOnSubscribe生产事件,调用Emitter的onNext方法将事件传递给订阅者 –>
Emitter执行一些框架自身的逻辑功能(比如流控),然后调用Observer的onNext方法将事件传递回Observer –>
执行Observer的onNext方法

执行链路

Observer –(包装)–> Emitter –(传递)–> ObservableOnSubscribe –(回调)–> Emitter –(回调)–> Observer

小结

  • Rxjava框架提供核心类:ObservableCreate。核心方法:subscribeActual。
  • ObservableCreate负责将事件源的生产与消费串联起来。当然,还有负责事件源类型映射的ObservableMap等。
  • ObservableCreate拿到业务方的Observer实现类,包装成CreateEmitter(RxJava提供的事件发送器)传递给事件源生产者ObservableOnSubscribe。
  • ObservableCreate注入业务方的ObservableOnSubscribe实现类,调用其subscribe方法执行“事件源的生产逻辑”。方法内部逻辑:生产事件源Object,通过Emitter向下传递到Observer,执行消费逻辑。

所以使用reactive编程的基本模式:

  • 自定义Observer(负责事件源的消费)
  • 实现ObservableOnSubscribe接口的subscribe方法(负责事件源的生产)

加工“事件源”

先看下写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.create(new ObservableOnSubscribe<Object>() {            
@Override
public void call(@NonNull Observer<Object> e) throws Exception {
// 生产并发送数据源
}
}).map(new Function<Object, String>() {
@Override
public String apply(@NonNull Object o) throws Exception {
// 数据源类型转换
// return "Obj";
}
}).subscribe(new Observer<String>() {
@Override
public void onNext(@NonNull String s) {
//订阅业务代码
}
});

Observable可以调用map\filter等方法对事件源进行加工

Observable调用create创建ObservableCreate,调用map创建ObservableMap

下面是ObservableMap的subscribeActual方法

1
2
3
public void subscribeActual(Observer<? super U> t) {
this.source.subscribe(new ObservableMap.MapObserver(t, this.function));
}

ObservableMap把Observer包装为MapObserver,Function是业务方自定义的映射函数,然后调用ObservableCreate的subscribe方法执行“事件源的生产与发送”。对于ObservableMap,source就是级联调用上一级的Observable实现类,这个demo里就是ObservableCreate

MapObserver的“发送数据源”onNext方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void onNext(T t) {
if (!this.done) {
if (this.sourceMode != 0) {
this.downstream.onNext((Object)null);
} else {
Object v;
try {
v = ObjectHelper.requireNonNull(this.mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable var4) {
this.fail(var4);
return;
}

this.downstream.onNext(v);
}
}
}

downstream是业务方实现类Observer的引用,mapper是业务方定义的映射函数,这两个参数均在构造函数里指定了。

上层(ObservableCreate)调用onNext生产并发送数据后,调用下层MapObserver的onNext方法。内部逻辑:执行映射函数完成数据源类型转换,最后交给业务方实现的Observer消费数据(onNext方法)。

执行链路

Observer –(包装)–> MapObserver –(包装)–> Emitter –(传递)–> ObservableOnSubscribe –(回调)–> Emitter –(回调)–> MapObserver –(回调)–> Observer

前半程一层层包装Observer,后半程从顶到底逐层回调每一层Observer包装类的onNext方法消费数据源,直至调到最底层业务方Observer实现类的onNext方法。

小结

  • ObservableCreate负责生产和发送数据。生产数据的逻辑由业务方在ObservableOnSubscribe的实现类里控制,发送数据的机制由RxJava框架提供的CreateEmitter负责。
  • ObservableMap负责完成数据类型的映射。
  • 最后执行业务方Observer的onNext方法消费数据。

看到这,基本能看出reactive在实现生产-消费模型的一般思路:

ObservableCreate->ObservableMap->ObservableXXX。箭头左边的是右边的parent,右边的ObservableXXX使用一些包装类例如MapObserver、CreateEmitter包装Observer,指定从数据生产后到消费前的处理逻辑。ObservableCreate最终拿到Observer的包装类CreateEmitter,发送数据

流控

所谓流控,在Rxjava架构下,就是消费端(下游)Observer可以决定何时终止对事件源生产端(上游)的消费。

上下游的这种流控通过Disposable实现。

看下Observer的定义:

1
2
3
4
5
6
7
8
9
public interface Observer<T> {
void onSubscribe(@NonNull Disposable var1);

void onNext(@NonNull T var1);

void onError(@NonNull Throwable var1);

void onComplete();
}

Observer的onSubscribe维护一个上游事件生产端创建的Disposable引用,执行dispose方法来终止对上游的消费。

demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void demo8() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10; i++) {
System.out.println("发送" + i);
emitter.onNext(i);
}
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
private Disposable disposable;

@Override
public void onSubscribe(Disposable d) {
disposable = d;
}

@Override
public void onNext(Integer integer) {
System.out.println("接收" + integer);
if (integer > 4) disposable.dispose();
}

@Override
public void onError(Throwable e) {
e.printStackTrace();
}

@Override
public void onComplete() {
System.out.println("数据接受完成");
}
});
}

那么,Disposable是何时创建并注入Observer的?

上面提到,ObservableCreate会把Observer包装成一个发射器CreateEmitter,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
protected void subscribeActual(Observer<? super T> observer) {
ObservableCreate.CreateEmitter<T> parent = new ObservableCreate.CreateEmitter(observer);
observer.onSubscribe(parent);

try {
this.source.subscribe(parent);
} catch (Throwable var4) {
Exceptions.throwIfFatal(var4);
parent.onError(var4);
}

}

消费者Observer不持有生产者Observable,而是持有Emitter。生产者Observable也是持有Emitter,可以理解Emitter是生产消费者之间的桥梁。生产者产出的数据通过Emitter传递给消费者,所以Emitter可以承担流控的职责。

CreateEmitter其实是Disposable的一个实现类,Disposable是一次性的意思。流控时,可以理解为消费者不再需要消费了,那么这个Emitter发送器就没有价值了,即它是一次性的,用后即丢。

1
2
3
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
...
}

Observer想终止消费时,调用Disposable的dispose方法,实际会执行CreateEmiter的dispose方法,将发射器设置为已废弃

1
2
3
public void dispose() {
DisposableHelper.dispose(this);
}

再看下CreateEmitter的onNext方法

1
2
3
4
5
6
7
8
9
10
public void onNext(T t) {
if (t == null) {
this.onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
} else {
if (!this.isDisposed()) {
this.observer.onNext(t);
}

}
}

因此,​下游Observer调用dispose方法废弃发射器后,发射器不再回调observer的onNext来消费,达到流控的效果

小结

  • CreateEmitter(一个Disposable实现类)和Observer互相持有彼此的引用
  • CreateEmitter持有Observer来回调下游,执行事件源消费链路
  • Observer持有CreateEmitter,控制上游是否回调下游进行消费,达到流控的目的。

​​​

线程切换

线程切换有两种方法:subscribeOnobserveOn

demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public static void test1() {

Observable.create(new ObservableOnSubscribe<String>() {

@Override
public void subscribe(ObservableEmitter<String> observableEmitter) {
System.out.println("准备");
System.out.println(Thread.currentThread().getName());

for (int i = 0; i < 5; i++) {
//Thread.sleep(1000);
System.out.println("发射" + " " + Thread.currentThread().getName());
observableEmitter.onNext("123");
}
observableEmitter.onComplete();
}

}).
subscribeOn(Schedulers.computation()).
subscribeOn(Schedulers.io()).
observeOn(Schedulers.single()).
map(new Function<String, String>() {

@Override
public String apply(@NonNull String o) throws Exception {
//转换业务代码
System.out.println("convert" + Thread.currentThread().getName());
return "Obj";
}
}).
observeOn(Schedulers.io()).
subscribe(new Observer<String>() {

@Override
public void onNext(@NonNull String s) {
//订阅业务代码
System.out.println("接收" + Thread.currentThread().getName());
}

@Override
public void onSubscribe(Disposable disposable) {
System.out.println("onSubscribe");
}

@Override
public void onError(Throwable throwable) {
System.out.println("onError" + throwable.getMessage());
}

@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}

先说结论:

  • subscribeOn指定上游数据处理执行的线程。上游是相对subscribeOn调用处而言。以上面的例子说明,上游是数据生成和发送。每次调用subscribeOn都对上游生效。如果连续多次调用,第一次调用指定的线程生效
  • observeOn指定下游数据消费的线程。下游是相对subscribeOn调用处而言。以上面的例子说明,下游是数据Map映射、消费数据。每次调用observeOn都对下游生效。如果连续多次调用,最后一次调用指定的线程生效

因此,上面的demo中,数据生产和发送运行在computation线程,数据类型转换运行在single线程,消费端运行在io线程

subscribeOn

看下subscribeOn方法的实现

1
2
3
4
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
}

和map方法类似,也是返回一个Observable,看下它的subscribeActual方法

1
2
3
4
5
public void subscribeActual(Observer<? super T> observer) {
ObservableSubscribeOn.SubscribeOnObserver<T> parent = new ObservableSubscribeOn.SubscribeOnObserver(observer);
observer.onSubscribe(parent);
parent.setDisposable(this.scheduler.scheduleDirect(new ObservableSubscribeOn.SubscribeTask(parent)));
}

也是包装了下层的Observer,不过这个SubscribeOnObserver包装类并没有对事件源做其他操作,直接调用了下游observer的onNext方法,这里就不贴代码了。

​重点看下subscribeTask这个任务做了啥

1
2
3
4
5
6
7
8
9
10
11
final class SubscribeTask implements Runnable {
private final ObservableSubscribeOn.SubscribeOnObserver<T> parent;

SubscribeTask(ObservableSubscribeOn.SubscribeOnObserver<T> parent) {
this.parent = parent;
}

public void run() {
ObservableSubscribeOn.this.source.subscribe(this.parent);
}
}

很简单,继续向上调用上层Observable的subscribe方法,重点是使得上游的代码都运行在scheduler指定的线程里

执行链路

Observer –(包装)–> SubscribeOnObserver –(包装)–> Emitter –(传递)–> ObservableOnSubscribe –(回调)–> Emitter –(回调)–> SubscribeOnObserver –(回调)–> Observer

虽然SubscribeOnObserver没有对Observer做额外处理,但继续调用上层source.subscribe方法时,已经切换到scheduler定义的线程了。因此,上游的所有操作,都会在subscribeOn方法指定的scheduler线程里执行。​​由此可知,如果连续多次调用subscribeOn指定线程,只有第一次起作用

observeOn

observeOn和subscribeOn的思路基本一致,这里只介绍差异。

ObserveOnObserver是observeOn方法执行后,对observer的包装类,看下它的onNext方法

1
2
3
4
5
6
7
8
9
public void onNext(T t) {
if (!this.done) {
if (this.sourceMode != 2) {
this.queue.offer(t);
}

this.schedule();
}
}

schedule方法的实现:

1
2
3
4
5
6
void schedule() {
if (this.getAndIncrement() == 0) {
this.worker.schedule(this);
}

}

this指代ObserveOnObserver,它是一个runnable,run方法里会调用下游observer的onNext方法。因此,下游observer的消费代码运行在observeOn方法指定的scheduler线程里。

Flowable背压

Flowable是Rxjava2较Rxjava的主要更新,可以把它看做是Observable+背压处理

那么,什么是背压呢?

生产速度>消费速度时,未被及时消费的对象堆积在内存中,就产生了所谓的背压

在Reactive编程模型中,只有在生产端和消费端运行在不同线程,且生产速度>消费速度时,才会出现背压的情况。

需要说明的是,尽管Flowable支持背压,但也牺牲了一些性能,所以除非满足背压的场景,否则还是推荐使用Observable。

另外,Flowable使用了另一套体系(Publisher-Subscriber),与Observable体系(ObservableSource-Observer)的关系可以简单理解为:

  • Flowable(Publisher实现类) <–> Observable(ObservableSource实现类)
  • Subscriber <–> Observer

看下背压的基本写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public void demo2() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
System.out.println("发射----> 1");
e.onNext(1);
System.out.println("发射----> 2");
e.onNext(2);
System.out.println("发射----> 3");
e.onNext(3);
System.out.println("发射----> 完成");
e.onComplete();
}
}, BackpressureStrategy.BUFFER) //create方法中多了一个BackpressureStrategy类型的参数
.subscribeOn(Schedulers.newThread())//为上下游分别指定各自的线程
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) { //onSubscribe回调的参数不是Disposable而是Subscription
s.request(Long.MAX_VALUE); //注意此处,暂时先这么设置
}

@Override
public void onNext(Integer integer) {
System.out.println("接收----> " + integer);
}

@Override
public void onError(Throwable t) {
}

@Override
public void onComplete() {
System.out.println("接收----> 完成");
}
});
}

Flow.create会返回一个FlowableCreate,和ObservableCreate类似,看下它的subscribeActual方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void subscribeActual(Subscriber<? super T> t) {
Object emitter;
switch(this.backpressure) {
case MISSING:
emitter = new FlowableCreate.MissingEmitter(t);
break;
case ERROR:
emitter = new FlowableCreate.ErrorAsyncEmitter(t);
break;
case DROP:
emitter = new FlowableCreate.DropAsyncEmitter(t);
break;
case LATEST:
emitter = new FlowableCreate.LatestAsyncEmitter(t);
break;
default:
emitter = new FlowableCreate.BufferAsyncEmitter(t, bufferSize());
}

t.onSubscribe((Subscription)emitter);

try {
this.source.subscribe((FlowableEmitter)emitter);
} catch (Throwable var4) {
Exceptions.throwIfFatal(var4);
((FlowableCreate.BaseEmitter)emitter).onError(var4);
}

}

与ObservableCreate不同的是,这里基于入参指定的背压策略枚举,创建对应的发射器Emitter。

这些Emitter对于背压有不同的处理策略,具体体现在onNext实现的差异上。

但这些Emitter也有一些共同逻辑:Emitter会维护一个初始值为128的Long型原子类。每次向下游发送一次数据,该值自减1。如果减到0,基于不同的Emitter实现类,执行不同策略:丢弃 or 忽略(指忽略背压,继续生产数据,可能产生OOM) or 调用subscriber的onError 等等

Flowable体系的Emitter发送数据和Observable体系的Emitter不同,并非依次回调下游的onNext方法,他回调下游ObserveOnSubscriber的onNext,将数据放入下游ObserveOnSubscriber的异步缓存池内(本质是一个队列Queue),然后尝试启动ObserveOnSubscriber的异步线程就结束了

ObserveOnSubscriber在异步线程里执行runAsync方法,如果已消费的数据量小于消费者subscriber指定的阈值,则从异步缓存池里取数据后回调消费者subscriber.onNext方法消费。当下游消费了96个数据对象后,Emitter的原子Long当前值value+96,即已消费的额度回补给发端

消费者subscriber在onSubscribe方法里设置消费数据个数,该值维护在ObserveOnSubscriber,一旦消费数达到了阈值,ObserveOnSubscriber在异步消费线程里不会回调下游subscriber.onNext方法消费

1
2
3
4
@Override
public void onSubscribe(Subscription s) { //onSubscribe回调的参数不是Disposable而是Subscription
s.request(Long.MAX_VALUE); //注意此处,暂时先这么设置
}

总结一下:

生产出来的数据,会缓存到异步缓存池中。上游发送数据前,先判断原子Long的value,如果为0,执行背压策略,否则发送数据。然后CAS操作value-1。当下游消费了96个数据对象后,回补额度,Emitter的原子Long当前value+96。

这种设计允许下游消费速度在一定限度内比上游生产速度慢。只要生产128个数据的时间内,能消费96个数据,就能回补已消费数据额度用于再生产。否则就要执行相应的背压策略。

打个比方:女神一开始给屌丝128的好感度,好友度随女神等级不同,下跌速度不同。在好感度跌0前,如果屌丝送了超过96次礼物,回补96好感度,并重置礼物次数。如果好感度跌0,分手。。。