博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RxJava源码解析(一)从一个例子开始
阅读量:6243 次
发布时间:2019-06-22

本文共 18922 字,大约阅读时间需要 63 分钟。

注:本篇文章代码基于Rxjava1.x

RxJava是目前非常流行的一个响应是编程框架,它用了Java的语法特性来模拟出一套流式过程化的写法,并可以通过线程调度器,非常方便的实现线程切换。本系列文章假设读者已经是使用过Rxjava或者RxAndroid的开发者,如果你还未使用过,不妨看下下面的几篇文章:
1.
2.
3.

本篇将使用一个非常简单的例子做引线,引出在Rxjava中一些核心类和核心对象,如果你尚未使用过Rxjava,请在阅读过上面几篇文章后,编写过一些Rxjava相关代码后再阅读本文章。

我们来看下我们的例子:

Observable.just("str1", "str2", "str3", "str4")        .map(new Func1
() { @Override public String call(String t) { // TODO Auto-generated method stub return "[" + t + "]"; } }).subscribe(new Subscriber
() { @Override public void onCompleted() { System.out.println("onCompleted "); } @Override public void onNext(String t) { System.out.println("onNext " + t); } @Override public void onError(Throwable e) {} });复制代码

上面的例子非常简单,换成我们的自然语言可以分成以下步骤:

1.构建一个String[]数组的Observable
2.通过映射方法map,将返回值映射成为"["+t+"]"格式
3.被一个订阅者所订阅
最后我们将在控制台输出:

output:

onNext [str1]
onNext [str2]
onNext [str3]
onNext [str4]
onCompleted

虽然是短短几行代码,简单几个类,但是已经包含了大部分RxJava中的核心元素,本章就以这个简单的例子为引子,引出RxJava的基本体系结构和一些核心功能类。我们先来看下

Observable.just("str1", "str2", "str3", "str4")复制代码

这个方法的输入是一堆数组,输出是一个Observable对象。实际上它就是一个静态工厂,我们看下它的源码:

public static 
Observable
just(T t1, T t2, T t3, T t4) { return from((T[])new Object[] { t1, t2, t3, t4 });}// call frompublic static
Observable
from(T[] array) { int n = array.length; if (n == 0) { return empty(); } else if (n == 1) { return just(array[0]);//选择构造方法 } return unsafeCreate(new OnSubscribeFromArray
(array));//使用OnSubscribeFromArray为参数构造 }复制代码

just代码中调用了from方法来执行构造,而在from方法中,会先进行数组空判断和长度判断,目的是为了选择不同的构造方法。最后我们所传入的数组对象,将被包装成为一个OnSubscribeFromArray对象。而这个对象作为一个参数被unsafeCreate方法所调用。unsafeCreate是构造Observable最核心的方法,不论哪种方式的构造器最终都会调用到这个方法。我们现在看下这个方法的声明:

public static 
Observable
unsafeCreate(OnSubscribe
f) { return new Observable
(RxJavaHooks.onCreate(f)); }复制代码

首先,我们传入的OnSubscribe对象将被RxJavaHooks的onCreate给hook住,转化为一个OnSubscribe对象。这里,如果你对aop不陌生的话,相信这块很好理解,实际上相当于你在构造Observable的时候做了一层拦截,或者说一次hook。我们不妨深入一点,看下RxJavaHooks里面究竟到底做了什么转换:

//code RxJavaHooks.javapublic static 
Observable.OnSubscribe
onCreate(Observable.OnSubscribe
onSubscribe) { Func1
f = onObservableCreate; if (f != null) { return f.call(onSubscribe); } return onSubscribe; }复制代码

RxJavaHooks.onCreate方法用到了一个函数对象onObservableCreate。这里所定义的函数对象很类似我们在动态语言中定义的闭包对象。我们看下onObservableCreate对象是怎么被赋值的:

public class RxJavaHooks {  static {        init();    }  static void init() {       ...      initCreate();  }static void initCreate() {        ...        onObservableCreate = new Func1
() { @Override public Observable.OnSubscribe call(Observable.OnSubscribe f) { return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f); } }; ...}复制代码

RxJavaHooks类在类初始化的时候通过调用init->initCreate方法给onObservableCreate函数对象赋值。而赋值函数会调用

RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);复制代码

也就是说RxJavaHooks只是提供一个简单的接口和初始化操作。实际调用者在RxJavaPlugins中。我们看下RxJavaPlugins.getObservableExecutionHook函数:

public RxJavaObservableExecutionHook getObservableExecutionHook() {        if (observableExecutionHook.get() == null) {            // check for an implementation from System.getProperty first            Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class, System.getProperties());//通过系统配置获取一个RxJavaObservableExecutionHook对象。            if (impl == null) {                // nothing set via properties so initialize with default                observableExecutionHook.compareAndSet(null, RxJavaObservableExecutionHookDefault.getInstance());//如果没有配置对象则使用默认对象                // we don't return from here but call get() again in case of thread-race so the winner will always get returned            } else {                // we received an implementation from the system property so use it                observableExecutionHook.compareAndSet(null, (RxJavaObservableExecutionHook) impl);            }        }        return observableExecutionHook.get();    }复制代码

实际上, getObservableExecutionHook()方法得到的对象也是一个单利,但是是非线程安全的,这段代码主要做以下事情:

1.如果RxJavaObservableExecutionHook对象不存在,会先通过调用getPluginImplementationViaProperty方法,也就是通过查看系统配置参数查看是否有实现类,如果有,将生成一个具体的RxJavaObservableExecutionHook实例返回

2.如果通过步骤1无法生成一个RxJavaObservableExecutionHook对象,将返回一个默认的RxJavaObservableExecutionHookDefault. getInstance()对象
3.最后将通过1,2获取的对象记录在全局变量中

getObservableExecutionHook函数流程图

这里引出一个问题,就是我们如何注入一个hook函数呢?这就需要深入到getPluginImplementationViaProperty的具体实现中去:

{//code getPluginImplementationViaProperty()final String classSimpleName = pluginClass.getSimpleName();        String pluginPrefix = "rxjava.plugin.";        String defaultKey = pluginPrefix + classSimpleName + ".implementation";...}复制代码

首先,getPluginImplementationViaProperty会先定义一个key,这个key的基本结构为:rxjava.plugin.[classSimpleName].implementation。而这里的classSimpleName依赖于我们传入的pluginClass对象。我们回到刚才的调用链:

Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class, System.getProperties());复制代码

在调用getPluginImplementationViaProperty函数的时候,我们传入的是一个RxJavaObservableExecutionHook类型,因此这里的classSimpleName 值对应的应该是"RxJavaObservableExecutionHook",所以我们就得到了配置的key为:

"rxjava.plugin.RxJavaObservableExecutionHook.implementation"

之后,getPluginImplementationViaProperty函数会通过这个key,从System.property中寻找具体的实现类,然后通过反射构建出具体的实现对象。

//code getPluginImplementationViaProperty(){...String implementingClass = props.getProperty(defaultKey); try {                Class
cls = Class.forName(implementingClass); // narrow the scope (cast) to the type we're expecting cls = cls.asSubclass(pluginClass); return cls.newInstance(); }...}复制代码

我们不妨来试一下这种写法,还是基于上面的简单例子,我们在代码前增加一段话:

{System.setProperty("rxjava.plugin.RxJavaObservableExecutionHook.implementation",                 "demos.rx.RxJavaObservableExecutionHookImpl");//配置hook实现类Observable.just("str1", "str2", "str3", "str4")...}复制代码

RxJavaObservableExecutionHookImpl是我们实现的一个RxJavaObservableExecutionHook类型:

public class RxJavaObservableExecutionHookImpl extends  RxJavaObservableExecutionHook{    @Override    public 
OnSubscribe
onCreate(OnSubscribe
f) { System.out.println("perform intercept onCreate"); return super.onCreate(f); }}复制代码

我们执行以下输出:

output:

perform intercept onCreate //被我们的hook函数拦截
perform intercept onCreate //被我们的hook函数拦截
onNext [str1]
onNext [str2]
onNext [str3]
onNext [str4]
onCompleted

我们可以从输出日志看出,我们所配置的hook类,确实被构造,并且成功实现了hook操作。根据上面所述,如果我们不采用配置Hook类的方式,RxJava将调用一个默认的实现类:RxJavaObservableExecutionHookDefault.getInstance()。而这个类的主要操作实际上就是直接返回,不进行任何的拦截:

//code RxJavaObservableExecutionHookDefaultpublic 
OnSubscribe
onCreate(OnSubscribe
f) { return f;//直接返回,不进行任何拦截和转换 }复制代码

好的,我们花了很大的篇幅就是讲了RxJavaHooks的onCreate函数,我们在没有配置任何的hook函数的情况下,返回值就是我们所传入的OnSubscribe对象。那么什么是OnSubscribe对象呢?我们先来看下OnSubscribe这个类吧:

public interface OnSubscribe
extends Action1
> { // cover for generics insanity }public interface Action1
extends Action { void call(T t);}复制代码

OnSubscribe继承于Action1,OnSubscribe限定了在Action1声明中的泛型变量T,是一个Subscriber类型,而T变量声明应用在Action1的call函数中,所以,OnSubscribe实际上是限定了OnSubscribe中的call方法的参数类型是一个Subscriber类型。但这并没有解释OnSubscribe是个什么东西,我们来看下OnSubscribe的继承树:

OnSubscribe的继承树

在OnSubscribe类型的顶端是一个Function。Function就是一个函数或者说一个过程,那么OnSubscribe是一个什么样的过程呢?OnSubscribe是一个当订阅者订阅的时候,执行的一个过程。正如OnSubscribe这个类名所描述的那样,这个过程的触发在Subscribe的时候。这实际上是一种策略的模式,根据不同的需求构建不同的过程策略,比如我们回到上面说的例子中,当我们传入一个数组对象的时候:

public static 
Observable
from(T[] array) { .... return unsafeCreate(new OnSubscribeFromArray
(array)); }复制代码

RxJava将采用一个叫做OnSubscribeFromArray的策略对象传递给unsafeCreate函数。为了继续说明这点我们不妨在来看下map函数:

public final 
Observable
map(Func1
func) { return unsafeCreate(new OnSubscribeMap
(this, func)); }复制代码

正如我上面所说的一样,在map函数中,基于我们上次构造的Observable对象又生成了一个新的Observable对象,而新生成的对象,将采用OnSubscribeMap策略来处理订阅事件。这种包装的写法实际上是一种职责链模式。回顾一下我们上面简单例子的那个流程:

1.通过just生成一个数组Observable对象-Observable1
2.通过map完成映射,在Observable1之上包装,生成一个新的Observable对象Observable2
3.通过subscribe函数订阅Observable2对象

引用关系图

通过上面的"引用关系图"我们可以很清楚的看到Observable类型的整条职责链,那么当我们调用Observable.subscribe的时候发生了什么呢?

public final Subscription subscribe(Subscriber
subscriber) { return Observable.subscribe(subscriber, this); }复制代码

这个方法中调用了一个静态方法subscribe(subscriber, this)来生成这种订阅关系。

static 
Subscription subscribe(Subscriber
subscriber, Observable
observable) { ....pre check subscriber.onStart(); .... try { // allow the hook to intercept and/or decorate RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { .... } return Subscriptions.unsubscribed(); } }复制代码

这个函数中,会:

1.进行pre check检查参数是否合法
2.回调subscriber.onStart方法,告诉订阅者我这边已经准备开始了
3.之后就是我们的老朋友RxJavaHooks对象执行onObservableStart,用来在onSubscribe函数执行前做一次hook。(如何hook根据我们上面的方法可以实现,不再赘述)
4.通过调用onSubscribe对象的call方法执行函数操作
5.通过RxJavaHooks的onObservableReturn去hook订阅操作执行结束以后的返回值

根据我们上面的"引用关系图",我们可以知道订阅者发生订阅的时候,最初执行的onSubscribe对象是OnSubscribeMap类型,我们来看下这个类型的实现:

//code OnSubscribeMap.javapublic OnSubscribeMap(Observable
source, Func1
transformer) { this.source = source; this.transformer = transformer; }@Override public void call(final Subscriber
o) { MapSubscriber
parent = new MapSubscriber
(o, transformer); o.add(parent); source.unsafeSubscribe(parent); }复制代码

OnSubscribeMap在构造的时候需要传递两个参数

1.输入源Observable对象source
2.映射函数:transformer

当调用call方法的时候OnSubscribeMap会生成一个新的订阅对象MapSubscriber,然后注册到source对象(对应例子中的Observable1)的订阅者中。unsafeSubscribe执行代码:

public final Subscription unsafeSubscribe(Subscriber
subscriber) { try { subscriber.onStart(); RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber);}复制代码

这时候unsafeSubscribe中的订阅Observable的onSubscribe函数对象就是"引用关系图"中的Observable1.OnSubscribeFromArray对象。一样,我们看下OnSubscribeFromArray的call方法:

@Override    public void call(Subscriber
child) { child.setProducer(new FromArrayProducer
(child, array)); }复制代码

这里我们引入了一个新的类Producer,而OnSubscribeFromArray中所引用的实现类是FromArrayProducer。我们根据上面的调用链可以知道此时,传入OnSubscribeFromArray.call方法中的参数child对象,对应着已经被OnSubscribeMap装饰过的MapSubscriber对象。而在OnSubscribeFromArray.call方法中调用了Subscriber的setProducer方法,我们看下这个方法是干什么的:

public void setProducer(Producer p) {        long toRequest;        boolean passToSubscriber = false;        synchronized (this) {            toRequest = requested;            producer = p;            if (subscriber != null) {                if (toRequest == NOT_SET) {                    passToSubscriber = true;                }            }        }        // do after releasing lock        if (passToSubscriber) {            subscriber.setProducer(producer);        } else {            if (toRequest == NOT_SET) {                producer.request(Long.MAX_VALUE);            } else {                producer.request(toRequest);            }        }    }复制代码

Producer顾名思义,就是对一个生产者的一个抽象,而生产什么东西呢?生产的是数据,Producer.request(int n)函数中的n参数代表让生产者生产多少的数据对象。为什么需要这个方法呢?toRequest变量又是从何而来呢?toRequest由Subscriber的成员变量requested,而requested通过Subscriber的request函数进行赋值:

protected final void request(long n) {        if (n < 0) {            throw new IllegalArgumentException("number requested cannot be negative: " + n);        }        Producer producerToRequestFrom;        synchronized (this) {            if (producer != null) {                producerToRequestFrom = producer;            } else {                addToRequested(n);//如果没有producer需要计数                return;            }        }        // after releasing lock (we should not make requests holding a lock)        producerToRequestFrom.request(n);    }复制代码

代码写的很清楚,当你的这个订阅者对象Subscriber并没有对应的producer的时候,每一次请求数据的操作都会被记录到你的requested变量中,这样,当你进行设置了producer的时候,就可以知道自己请求了多少次,需要多少个数据对象。那么我们回到Subscriber的setProducer方法中去,当代码执行到最后,Subscriber会调用Producer的request方法来请求数据,而这里所对应的Producer对象,就是在OnSubscribeFromArray.call方法中传递进来的FromArrayProducer类型对象。

public void call(Subscriber
child) { child.setProducer(new FromArrayProducer
(child, array)); }复制代码

根据我们最终输出的日志,我们可以推测FromArrayProducer是进行了一次数组的迭代遍历,那么是不是这样呢?我们看下FromArrayProducer的request方法:

@Override        public void request(long n) {            if (n < 0) {//异常参数检查                throw new IllegalArgumentException("n >= 0 required but it was " + n);            }            if (n == Long.MAX_VALUE) {                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {                    fastPath();                }            } else            if (n != 0) {                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {                    slowPath(n);                }            }        }复制代码

订阅函数调用时序图

这里,在FromArrayProducer的request处理的时候执行了两个分支分别对应执行fastPath方法和slowPath方法。而在执行之前有判断了一个条件:

BackpressureUtils.getAndAddRequest(this, n)//code BackpressureUtils.getAndAddRequest()public static long getAndAddRequest(AtomicLong requested, long n) {        // add n to field but check for overflow        while (true) {            long current = requested.get();            long next = addCap(current, n);            if (requested.compareAndSet(current, next)) {                return current;            }        }    }复制代码

注意这里的传递对象:

1.requested参数对应的是FromArrayProducer对象
2.n对应的就是我们所传递的请求总数
requested初始值为0,通过addCap将数值加入到requested对象中,这样就完成了生成对象的统计操作。

public static long addCap(long a, long b) {        long u = a + b;        if (u < 0L) {//防止越界            u = Long.MAX_VALUE;        }        return u;    }复制代码

同时我们也可以看到,由于产生请求以后,FromArrayProducer统计数增加,因此返回的(BackpressureUtils.getAndAddRequest(this, n) 必不为0。所以每一个数据生产者FromArrayProducer对象只能被使用一次,这时候有人会问了如果我用以下的代码,数据可以被回调两次的。

Observable
ob = Observable.just("str1", "str2", "str3", "str4"); ob.subscribe(new Subscriber
() { @Override public void onCompleted() { System.out.println("onCompleted "); } @Override public void onNext(String t) { System.out.println("onNext " + t); } @Override public void onError(Throwable e) {} }); System.out.println("----------"); ob.subscribe(new Subscriber
() { @Override public void onCompleted() { System.out.println("onCompleted "); } @Override public void onNext(String t) { System.out.println("onNext " + t); } @Override public void onError(Throwable e) {} });复制代码

最后输出:

output:

//第一次输出
onNext str1
onNext str2
onNext str3
onNext str4
onCompleted
onNext str1//第二次输出
onNext str2
onNext str3
onNext str4
onCompleted

这是为什么呢?

回答这个问题,我们需要回到我们的OnSubscribeFromArray类中:

@Override    public void call(Subscriber
child) { child.setProducer(new FromArrayProducer
(child, array)); }复制代码

因为每发生一次订阅操作就生成一个全新的FromArrayProducer对象,因此你用到数据自然是在的。我们最后再来看下fastPath函数和slowPath函数,字面意思上好像是代表快慢的路径搜索,我们现在看下fastPath函数:

void fastPath() {            final Subscriber
child = this.child; for (T t : array) { if (child.isUnsubscribed()) { return; } child.onNext(t); } if (child.isUnsubscribed()) { return; } child.onCompleted(); }复制代码

代码非常简单,就是遍历内存中的数组array,然后执行订阅者的onNext回调和onCompleted回调函数。而slowPath方法呢?

void slowPath(long r) {            final Subscriber
child = this.child; final T[] array = this.array; final int n = array.length; long e = 0L; int i = index;//当前数据流索引 for (;;) { while (r != 0L && i != n) { child.onNext(array[i]); i++; if (i == n) { return; } r--; e--; } r = get() + e; if (r == 0L) { index = i; r = addAndGet(e); if (r == 0L) { return; } e = 0L; } } }复制代码

这个函数中几个重要的参数:

1.参数r代表你的请求数
2.e代表数据消耗数量
3.n代表你的数组长度
4.index代表你的数组数据流索引
这个函数执行的时候会先执行一个大循环,而这个大循环中包含着一个小循环:

while (r != 0L && i != n) {                    child.onNext(array[i]);                    i++;                    if (i == n) {                        return;                    }                    r--;                    e--;                }复制代码

用来判断数据流i是否结束,或者请求数r是否满足。当满足一个条件以后跳出循环执行addAndGet方法将请求数目加入到计数器中:

r = get() + e;                if (r == 0L) {                    index = i;                    r = addAndGet(e);                    if (r == 0L) {                        return;                    }                    e = 0L;                }复制代码

按照"订阅函数调用时序图"我们知道,此时我们的订阅者类是被Observable.map函数装饰过的MapSubscriber类,前面我们说过,这个类是一个Subscriber类的装饰器,我们来看下它的基本实现:

public MapSubscriber(Subscriber
actual, Func1
mapper) { this.actual = actual; this.mapper = mapper; }复制代码

从构造器上看,在构建MapSubscriber类的时候需要指定它的被装饰对象和映射函数mapper,而当我们回调到MapSubscriber的onNext回调的时候:

@Override        public void onNext(T t) {            R result;            try {                result = mapper.call(t);//映射转换            } catch (Throwable ex) {                ....                return;            }            actual.onNext(result);//回调被装饰对象        }复制代码

我们传入的原始数据t,将被mapper映射函数处理,转化为一个R类型的结果result,然后把这个结果回调给被装饰对象。按照我们上面的例子,这里面我们的映射函数就是将t外面增加"[]"的Func1接口函数,actual被装饰对象就是我们代码中的匿名订阅者对象。

好了我们总结一下,我们通过上面一个非常非常简单例子我们接触到RxJava这个大家族中的很多核心类:

1.Observable是一个被观察者对象,每个订阅者需要通过subscribe方法与Observable对象签订订阅契约
2.Observable的构建是一系列OnSubscribe对象职责链式处理过程
3.RxJava中可以在观察的每个阶段配置hook函数
4.链式处理过程中,订阅者Subscriber对象可能会被链条中的中间环节所包装
5.Producer是用来定义生产数据的类型
6.Subscriber在函数setProducer中调用Producer的request(int n)方法用于请求n个数据

转载地址:http://cppia.baihongyu.com/

你可能感兴趣的文章
Bandicam视频录制技巧总结+小丸工具箱压缩视频解决视频体积问题
查看>>
JSP实现用户登录样例
查看>>
搞笑的W3C和M$对DOM中属性命名
查看>>
[Struts]让Dreamweaver显示Struts标签的插件
查看>>
便利的html5 之 required、number 、pattern
查看>>
[LeetCode] Find K Pairs with Smallest Sums 找和最小的K对数字
查看>>
VC6.0 C++ 如何调用微软windows系统SDK 语音API
查看>>
Python 3.5 RuntimeError: can&#39;t start new thread
查看>>
POJ 1659 Frogs&#39; Neighborhood(可图性判定—Havel-Hakimi定理)【超详解】
查看>>
数字统计问题
查看>>
Windows下Redis缓存服务器的使用 .NET StackExchange.Redis Redis Desktop Manager
查看>>
SharpMap简析
查看>>
使用类加载器加载配置文件/getClassLoader().getResourceAsStream()
查看>>
配置 linux-bridge mechanism driver - 每天5分钟玩转 OpenStack(77)
查看>>
matplotlib绑定到PyQt5(有菜单)
查看>>
iOS - QRCode 二维码
查看>>
记录第一次纯手打爬虫经历
查看>>
PyCharm 开发Django ,错误汇总
查看>>
插入排序
查看>>
一个完整的C++程序SpreadSheet - 1) 类的声明和定义
查看>>