注:本篇文章代码基于Rxjava1.x
RxJava是目前非常流行的一个响应是编程框架,它用了Java的语法特性来模拟出一套流式过程化的写法,并可以通过线程调度器,非常方便的实现线程切换。本系列文章假设读者已经是使用过Rxjava或者RxAndroid的开发者,如果你还未使用过,不妨看下下面的几篇文章:1.2.3.本篇将使用一个非常简单的例子做引线,引出在Rxjava中一些核心类和核心对象,如果你尚未使用过Rxjava,请在阅读过上面几篇文章后,编写过一些Rxjava相关代码后再阅读本文章。
我们来看下我们的例子:
Observable.just("str1", "str2", "str3", "str4") .map(new Func1() { @Override public String call(String t) { // TODO Auto-generated method stub return "[" + t + "]"; } }).subscribe(new Subscriber () { @Override public void onCompleted() { System.out.println("onCompleted "); } @Override public void onNext(String t) { System.out.println("onNext " + t); } @Override public void onError(Throwable e) {} });复制代码
上面的例子非常简单,换成我们的自然语言可以分成以下步骤:
1.构建一个String[]
数组的Observable
2.通过映射方法map
,将返回值映射成为"["+t+"]"
格式3.被一个订阅者所订阅最后我们将在控制台输出:output:
onNext [str1]onNext [str2]onNext [str3]onNext [str4]onCompleted虽然是短短几行代码,简单几个类,但是已经包含了大部分RxJava中的核心元素,本章就以这个简单的例子为引子,引出RxJava的基本体系结构和一些核心功能类。我们先来看下
Observable.just("str1", "str2", "str3", "str4")复制代码这个方法的输入是一堆数组,输出是一个
Observable
对象。实际上它就是一个静态工厂,我们看下它的源码:
public staticObservable just(T t1, T t2, T t3, T t4) { return from((T[])new Object[] { t1, t2, t3, t4 });}// call frompublic static Observable from(T[] array) { int n = array.length; if (n == 0) { return empty(); } else if (n == 1) { return just(array[0]);//选择构造方法 } return unsafeCreate(new OnSubscribeFromArray (array));//使用OnSubscribeFromArray为参数构造 }复制代码
just代码中调用了from方法来执行构造,而在from方法中,会先进行数组空判断和长度判断,目的是为了选择不同的构造方法。最后我们所传入的数组对象,将被包装成为一个OnSubscribeFromArray对象。而这个对象作为一个参数被unsafeCreate方法所调用。unsafeCreate是构造Observable最核心的方法,不论哪种方式的构造器最终都会调用到这个方法。我们现在看下这个方法的声明:
public staticObservable unsafeCreate(OnSubscribe f) { return new Observable (RxJavaHooks.onCreate(f)); }复制代码
首先,我们传入的OnSubscribe对象将被RxJavaHooks的onCreate给hook住,转化为一个OnSubscribe对象。这里,如果你对aop不陌生的话,相信这块很好理解,实际上相当于你在构造Observable的时候做了一层拦截,或者说一次hook。我们不妨深入一点,看下RxJavaHooks里面究竟到底做了什么转换:
//code RxJavaHooks.javapublic staticObservable.OnSubscribe onCreate(Observable.OnSubscribe onSubscribe) { Func1 f = onObservableCreate; if (f != null) { return f.call(onSubscribe); } return onSubscribe; }复制代码
RxJavaHooks.onCreate方法用到了一个函数对象onObservableCreate。这里所定义的函数对象很类似我们在动态语言中定义的闭包对象。我们看下onObservableCreate对象是怎么被赋值的:
public class RxJavaHooks { static { init(); } static void init() { ... initCreate(); }static void initCreate() { ... onObservableCreate = new Func1() { @Override public Observable.OnSubscribe call(Observable.OnSubscribe f) { return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f); } }; ...}复制代码
RxJavaHooks类在类初始化的时候通过调用init->initCreate方法给onObservableCreate函数对象赋值。而赋值函数会调用
RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);复制代码也就是说RxJavaHooks只是提供一个简单的接口和初始化操作。实际调用者在RxJavaPlugins中。我们看下RxJavaPlugins.getObservableExecutionHook函数:
public RxJavaObservableExecutionHook getObservableExecutionHook() { if (observableExecutionHook.get() == null) { // check for an implementation from System.getProperty first Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class, System.getProperties());//通过系统配置获取一个RxJavaObservableExecutionHook对象。 if (impl == null) { // nothing set via properties so initialize with default observableExecutionHook.compareAndSet(null, RxJavaObservableExecutionHookDefault.getInstance());//如果没有配置对象则使用默认对象 // we don't return from here but call get() again in case of thread-race so the winner will always get returned } else { // we received an implementation from the system property so use it observableExecutionHook.compareAndSet(null, (RxJavaObservableExecutionHook) impl); } } return observableExecutionHook.get(); }复制代码
实际上,
getObservableExecutionHook()
方法得到的对象也是一个单利,但是是非线程安全的,这段代码主要做以下事情:1.如果RxJavaObservableExecutionHook对象不存在,会先通过调用getPluginImplementationViaProperty方法,也就是通过查看系统配置参数查看是否有实现类,如果有,将生成一个具体的RxJavaObservableExecutionHook实例返回
2.如果通过步骤1无法生成一个RxJavaObservableExecutionHook对象,将返回一个默认的RxJavaObservableExecutionHookDefault. getInstance()对象3.最后将通过1,2获取的对象记录在全局变量中
这里引出一个问题,就是我们如何注入一个hook函数呢?这就需要深入到getPluginImplementationViaProperty的具体实现中去:
{//code getPluginImplementationViaProperty()final String classSimpleName = pluginClass.getSimpleName(); String pluginPrefix = "rxjava.plugin."; String defaultKey = pluginPrefix + classSimpleName + ".implementation";...}复制代码
首先,getPluginImplementationViaProperty会先定义一个key,这个key的基本结构为:rxjava.plugin.[classSimpleName].implementation。而这里的classSimpleName依赖于我们传入的pluginClass对象。我们回到刚才的调用链:
Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class, System.getProperties());复制代码
在调用getPluginImplementationViaProperty函数的时候,我们传入的是一个RxJavaObservableExecutionHook类型,因此这里的classSimpleName 值对应的应该是"RxJavaObservableExecutionHook",所以我们就得到了配置的key为:
"rxjava.plugin.RxJavaObservableExecutionHook.implementation"
之后,getPluginImplementationViaProperty函数会通过这个key,从System.property中寻找具体的实现类,然后通过反射构建出具体的实现对象。
//code getPluginImplementationViaProperty(){...String implementingClass = props.getProperty(defaultKey); try { Class cls = Class.forName(implementingClass); // narrow the scope (cast) to the type we're expecting cls = cls.asSubclass(pluginClass); return cls.newInstance(); }...}复制代码
我们不妨来试一下这种写法,还是基于上面的简单例子,我们在代码前增加一段话:
{System.setProperty("rxjava.plugin.RxJavaObservableExecutionHook.implementation", "demos.rx.RxJavaObservableExecutionHookImpl");//配置hook实现类Observable.just("str1", "str2", "str3", "str4")...}复制代码
RxJavaObservableExecutionHookImpl是我们实现的一个RxJavaObservableExecutionHook类型:
public class RxJavaObservableExecutionHookImpl extends RxJavaObservableExecutionHook{ @Override publicOnSubscribe onCreate(OnSubscribe f) { System.out.println("perform intercept onCreate"); return super.onCreate(f); }}复制代码
我们执行以下输出:
output:
perform intercept onCreate //被我们的hook函数拦截perform intercept onCreate //被我们的hook函数拦截onNext [str1]onNext [str2]onNext [str3]onNext [str4]onCompleted我们可以从输出日志看出,我们所配置的hook类,确实被构造,并且成功实现了hook操作。根据上面所述,如果我们不采用配置Hook类的方式,RxJava将调用一个默认的实现类:RxJavaObservableExecutionHookDefault.getInstance()。而这个类的主要操作实际上就是直接返回,不进行任何的拦截:
//code RxJavaObservableExecutionHookDefaultpublicOnSubscribe onCreate(OnSubscribe f) { return f;//直接返回,不进行任何拦截和转换 }复制代码
好的,我们花了很大的篇幅就是讲了RxJavaHooks的onCreate函数,我们在没有配置任何的hook函数的情况下,返回值就是我们所传入的OnSubscribe对象。那么什么是OnSubscribe对象呢?我们先来看下OnSubscribe这个类吧:
public interface OnSubscribeextends Action1 > { // cover for generics insanity }public interface Action1 extends Action { void call(T t);}复制代码
OnSubscribe继承于Action1,OnSubscribe限定了在Action1声明中的泛型变量T,是一个Subscriber类型,而T变量声明应用在Action1的call函数中,所以,OnSubscribe实际上是限定了OnSubscribe中的call方法的参数类型是一个Subscriber类型。但这并没有解释OnSubscribe是个什么东西,我们来看下OnSubscribe的继承树:
在OnSubscribe类型的顶端是一个Function。Function就是一个函数或者说一个过程,那么OnSubscribe是一个什么样的过程呢?OnSubscribe是一个当订阅者订阅的时候,执行的一个过程。正如OnSubscribe这个类名所描述的那样,这个过程的触发在Subscribe的时候。这实际上是一种策略的模式,根据不同的需求构建不同的过程策略,比如我们回到上面说的例子中,当我们传入一个数组对象的时候:
public staticObservable from(T[] array) { .... return unsafeCreate(new OnSubscribeFromArray (array)); }复制代码
RxJava将采用一个叫做OnSubscribeFromArray的策略对象传递给unsafeCreate函数。为了继续说明这点我们不妨在来看下map函数:
public finalObservable map(Func1 func) { return unsafeCreate(new OnSubscribeMap (this, func)); }复制代码
正如我上面所说的一样,在map函数中,基于我们上次构造的Observable对象又生成了一个新的Observable对象,而新生成的对象,将采用OnSubscribeMap策略来处理订阅事件。这种包装的写法实际上是一种职责链模式。回顾一下我们上面简单例子的那个流程:
1.通过just生成一个数组Observable对象-Observable12.通过map完成映射,在Observable1之上包装,生成一个新的Observable对象Observable23.通过subscribe函数订阅Observable2对象
通过上面的"引用关系图"我们可以很清楚的看到Observable类型的整条职责链,那么当我们调用Observable.subscribe的时候发生了什么呢?
public final Subscription subscribe(Subscriber subscriber) { return Observable.subscribe(subscriber, this); }复制代码
这个方法中调用了一个静态方法subscribe(subscriber, this)来生成这种订阅关系。
staticSubscription subscribe(Subscriber subscriber, Observable observable) { ....pre check subscriber.onStart(); .... try { // allow the hook to intercept and/or decorate RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { .... } return Subscriptions.unsubscribed(); } }复制代码
这个函数中,会:
1.进行pre check检查参数是否合法2.回调subscriber.onStart方法,告诉订阅者我这边已经准备开始了3.之后就是我们的老朋友RxJavaHooks对象执行onObservableStart,用来在onSubscribe函数执行前做一次hook。(如何hook根据我们上面的方法可以实现,不再赘述)4.通过调用onSubscribe对象的call方法执行函数操作5.通过RxJavaHooks的onObservableReturn
去hook订阅操作执行结束以后的返回值根据我们上面的"引用关系图",我们可以知道订阅者发生订阅的时候,最初执行的onSubscribe对象是OnSubscribeMap类型,我们来看下这个类型的实现:
//code OnSubscribeMap.javapublic OnSubscribeMap(Observablesource, Func1 transformer) { this.source = source; this.transformer = transformer; }@Override public void call(final Subscriber o) { MapSubscriber parent = new MapSubscriber (o, transformer); o.add(parent); source.unsafeSubscribe(parent); }复制代码
OnSubscribeMap在构造的时候需要传递两个参数
1.输入源Observable对象source2.映射函数:transformer当调用call方法的时候OnSubscribeMap会生成一个新的订阅对象MapSubscriber,然后注册到source对象(对应例子中的Observable1)的订阅者中。unsafeSubscribe执行代码:
public final Subscription unsafeSubscribe(Subscriber subscriber) { try { subscriber.onStart(); RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber);}复制代码
这时候unsafeSubscribe中的订阅Observable的onSubscribe函数对象就是"引用关系图"中的Observable1.OnSubscribeFromArray对象。一样,我们看下OnSubscribeFromArray的call方法:
@Override public void call(Subscriber child) { child.setProducer(new FromArrayProducer(child, array)); }复制代码
这里我们引入了一个新的类Producer,而OnSubscribeFromArray中所引用的实现类是FromArrayProducer。我们根据上面的调用链可以知道此时,传入OnSubscribeFromArray.call方法中的参数child对象,对应着已经被OnSubscribeMap装饰过的MapSubscriber对象。而在OnSubscribeFromArray.call方法中调用了Subscriber的setProducer方法,我们看下这个方法是干什么的:
public void setProducer(Producer p) { long toRequest; boolean passToSubscriber = false; synchronized (this) { toRequest = requested; producer = p; if (subscriber != null) { if (toRequest == NOT_SET) { passToSubscriber = true; } } } // do after releasing lock if (passToSubscriber) { subscriber.setProducer(producer); } else { if (toRequest == NOT_SET) { producer.request(Long.MAX_VALUE); } else { producer.request(toRequest); } } }复制代码
Producer顾名思义,就是对一个生产者的一个抽象,而生产什么东西呢?生产的是数据,Producer.request(int n)函数中的n参数代表让生产者生产多少的数据对象。为什么需要这个方法呢?toRequest变量又是从何而来呢?toRequest由Subscriber的成员变量requested,而requested通过Subscriber的request函数进行赋值:
protected final void request(long n) { if (n < 0) { throw new IllegalArgumentException("number requested cannot be negative: " + n); } Producer producerToRequestFrom; synchronized (this) { if (producer != null) { producerToRequestFrom = producer; } else { addToRequested(n);//如果没有producer需要计数 return; } } // after releasing lock (we should not make requests holding a lock) producerToRequestFrom.request(n); }复制代码
代码写的很清楚,当你的这个订阅者对象Subscriber并没有对应的producer的时候,每一次请求数据的操作都会被记录到你的requested变量中,这样,当你进行设置了producer的时候,就可以知道自己请求了多少次,需要多少个数据对象。那么我们回到Subscriber的setProducer方法中去,当代码执行到最后,Subscriber会调用Producer的request方法来请求数据,而这里所对应的Producer对象,就是在OnSubscribeFromArray.call方法中传递进来的FromArrayProducer类型对象。
public void call(Subscriber child) { child.setProducer(new FromArrayProducer(child, array)); }复制代码
根据我们最终输出的日志,我们可以推测FromArrayProducer是进行了一次数组的迭代遍历,那么是不是这样呢?我们看下FromArrayProducer的request方法:
@Override public void request(long n) { if (n < 0) {//异常参数检查 throw new IllegalArgumentException("n >= 0 required but it was " + n); } if (n == Long.MAX_VALUE) { if (BackpressureUtils.getAndAddRequest(this, n) == 0) { fastPath(); } } else if (n != 0) { if (BackpressureUtils.getAndAddRequest(this, n) == 0) { slowPath(n); } } }复制代码
这里,在FromArrayProducer的request处理的时候执行了两个分支分别对应执行fastPath方法和slowPath方法。而在执行之前有判断了一个条件:
BackpressureUtils.getAndAddRequest(this, n)//code BackpressureUtils.getAndAddRequest()public static long getAndAddRequest(AtomicLong requested, long n) { // add n to field but check for overflow while (true) { long current = requested.get(); long next = addCap(current, n); if (requested.compareAndSet(current, next)) { return current; } } }复制代码
注意这里的传递对象:
1.requested参数对应的是FromArrayProducer对象2.n对应的就是我们所传递的请求总数requested初始值为0,通过addCap将数值加入到requested对象中,这样就完成了生成对象的统计操作。public static long addCap(long a, long b) { long u = a + b; if (u < 0L) {//防止越界 u = Long.MAX_VALUE; } return u; }复制代码同时我们也可以看到,由于产生请求以后,FromArrayProducer统计数增加,因此返回的(BackpressureUtils.getAndAddRequest(this, n) 必不为0。所以每一个数据生产者FromArrayProducer对象只能被使用一次,这时候有人会问了如果我用以下的代码,数据可以被回调两次的。
Observableob = Observable.just("str1", "str2", "str3", "str4"); ob.subscribe(new Subscriber () { @Override public void onCompleted() { System.out.println("onCompleted "); } @Override public void onNext(String t) { System.out.println("onNext " + t); } @Override public void onError(Throwable e) {} }); System.out.println("----------"); ob.subscribe(new Subscriber () { @Override public void onCompleted() { System.out.println("onCompleted "); } @Override public void onNext(String t) { System.out.println("onNext " + t); } @Override public void onError(Throwable e) {} });复制代码
最后输出:
output://第一次输出onNext str1onNext str2onNext str3onNext str4onCompletedonNext str1//第二次输出onNext str2onNext str3onNext str4onCompleted
这是为什么呢?
回答这个问题,我们需要回到我们的OnSubscribeFromArray类中:
@Override public void call(Subscriber child) { child.setProducer(new FromArrayProducer(child, array)); }复制代码
因为每发生一次订阅操作就生成一个全新的FromArrayProducer对象,因此你用到数据自然是在的。我们最后再来看下fastPath函数和slowPath函数,字面意思上好像是代表快慢的路径搜索,我们现在看下fastPath函数:
void fastPath() { final Subscriber child = this.child; for (T t : array) { if (child.isUnsubscribed()) { return; } child.onNext(t); } if (child.isUnsubscribed()) { return; } child.onCompleted(); }复制代码
代码非常简单,就是遍历内存中的数组array,然后执行订阅者的onNext回调和onCompleted回调函数。而slowPath方法呢?
void slowPath(long r) { final Subscriber child = this.child; final T[] array = this.array; final int n = array.length; long e = 0L; int i = index;//当前数据流索引 for (;;) { while (r != 0L && i != n) { child.onNext(array[i]); i++; if (i == n) { return; } r--; e--; } r = get() + e; if (r == 0L) { index = i; r = addAndGet(e); if (r == 0L) { return; } e = 0L; } } }复制代码
这个函数中几个重要的参数:
1.参数r代表你的请求数2.e代表数据消耗数量3.n代表你的数组长度4.index代表你的数组数据流索引这个函数执行的时候会先执行一个大循环,而这个大循环中包含着一个小循环:
while (r != 0L && i != n) { child.onNext(array[i]); i++; if (i == n) { return; } r--; e--; }复制代码
用来判断数据流i是否结束,或者请求数r是否满足。当满足一个条件以后跳出循环执行addAndGet方法将请求数目加入到计数器中:
r = get() + e; if (r == 0L) { index = i; r = addAndGet(e); if (r == 0L) { return; } e = 0L; }复制代码
按照"订阅函数调用时序图"我们知道,此时我们的订阅者类是被Observable.map函数装饰过的MapSubscriber类,前面我们说过,这个类是一个Subscriber类的装饰器,我们来看下它的基本实现:
public MapSubscriber(Subscriber actual, Func1 mapper) { this.actual = actual; this.mapper = mapper; }复制代码
从构造器上看,在构建MapSubscriber类的时候需要指定它的被装饰对象和映射函数mapper,而当我们回调到MapSubscriber的onNext回调的时候:
@Override public void onNext(T t) { R result; try { result = mapper.call(t);//映射转换 } catch (Throwable ex) { .... return; } actual.onNext(result);//回调被装饰对象 }复制代码
我们传入的原始数据t,将被mapper映射函数处理,转化为一个R类型的结果result,然后把这个结果回调给被装饰对象。按照我们上面的例子,这里面我们的映射函数就是将t外面增加"[]"的Func1接口函数,actual被装饰对象就是我们代码中的匿名订阅者对象。
好了我们总结一下,我们通过上面一个非常非常简单例子我们接触到RxJava这个大家族中的很多核心类:
1.Observable是一个被观察者对象,每个订阅者需要通过subscribe方法与Observable对象签订订阅契约2.Observable的构建是一系列OnSubscribe对象职责链式处理过程3.RxJava中可以在观察的每个阶段配置hook函数4.链式处理过程中,订阅者Subscriber对象可能会被链条中的中间环节所包装5.Producer是用来定义生产数据的类型6.Subscriber在函数setProducer中调用Producer的request(int n)方法用于请求n个数据