RxJAVA在Android开发中已经算是无人不知无人不晓了,加之它与Retrofit等流行框架的完美结合,已经成为Android项目开发的必备利器。随手记作为一个大型项目,引入三方框架一直比较慎重,但也从今年初开始,正式引入了RxJava2.0,并配合Retrofit对项目的网络框架和繁琐的异步逻辑进行重构。
就是异步RxJava的使用,可以使“逻辑复杂的代码”保持极强的阅读性。
Android中RxAndorid与RxJava配合使用; RxAndorid 封装了
AndroidSchedulers.mainThread(),Android开发者使用过程中,可以轻松的将任务post Andorid主线程中,执行页面更新操作。
简单来讲RxJava是一个简化异步调用的库,但其实它更是一种优雅的编程方式和编程思想,当你熟悉RxJava的使用方式之后,会很容易爱上它。 我总结它的优点主要有两个方面:
刚接触Rx的人面对一堆各式各样的操作符会觉得不知如何去学习记忆,其实你只需要从整体上了解Rx操作符的类别和掌握一些使用频率较高的操作符就足够了,至于其他的操作符,你只需要知道它的使用场景和掌握如何快速理解一个操作符的方法,就可以在需要的时候快速拿来用了。 下图是我根据官方文档总结的Rx操作符的分类及每个类别下的代表性操作符
要了解操作符的原理,肯定要从源码入手喽。所以我们先来简单撸一遍Rx的最基本的Create操作符的源码。 Rx的源码目录结构是比较清晰的,我们先从Observable.create方法来分析
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("s");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// 创建的Observer中多了一个回调方法onSubscribe,传递参数为Disposable ,Disposable相当于RxJava1.x中的Subscription,用于解除订阅。
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});复制代码
create方法如下
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}复制代码
代码很简单,第一行判空不用管,第二行调用RxJavaPlugins的方法是为了实现Rx的hook功能,我们暂时也无需关注,在一般情况下,第二行代码会直接返回它的入参即ObservableCreate对象,ObservableCreate是Observable的子类,实现了Observable的一些抽象方法比如subscribeActual。事实上Rx的每个操作符都对应Observable的一个子类。 这里create方法接受的是一个ObservableOnSubscribe的接口实现类:
/**
* A functional interface that has a {@code subscribe()} method that receives
* an instance of an {@link ObservableEmitter} instance that allows pushing
* events in a cancellation-safe manner.
*
* @param <T> the value type pushed
*/
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}复制代码
通过注释可以知道这个接口的作用是通过一个subscribe方法接受一个ObservableEmitter类型的实例,俗称发射器。 Observable.create方法执行时,我们传入的就是一个ObservableOnSubscribe类型的匿名内部类,并实现了它的subscribe方法,然后它又被传入create方法的返回对象ObservableCreate,最终成为ObservableCreate的成员source
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
...复制代码
接着我们来看Observable的subscribe方法,它的入参是一个Observer(即观察者,也就是事件接收者)
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}复制代码
最终它会调用它的子类ObservableCreate的subscribeActual方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}复制代码
在subscribeActual里首先创建了用于发射事件的CreateEmitter对象parent,CreateEmitter实现了接口Emitter和Disposable,并持有observer。 这段代码的关键语句是source.subscribe(parent),这行代码执行后,就会触发事件源进行发射事件,即e.onNext("s")会被调用。细心的同学也会注意到这行代码之前,parent先被传入了observer的onSubscribe()方法,而在上面我们说过,observer的onSubscribe()方法接受一个Disposable类型的参数,可以用于解除订阅,之所以能够解除订阅,正是因为在触发事件发射之前调用了observer的onSubscribe(),给了我们调用CreateEmitter的解除订阅的方法dispose()的机会。 继续来看CreateEmitter的onNext()方法,它最终是通过调用observer的onNext()方法将事件发射出去的
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 在真正发射之前,会先判断该CreateEmitter是否已经解除订阅
if (!isDisposed()) {
observer.onNext(t);
}
}
...
}复制代码
至此,Rx事件源的创建和订阅的流程就走通了。
下面我们从map操作符来入手看一下Rx操作符的原理,map方法如下
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mApper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}复制代码
map方法接受一个Function类型的参数mapper,返回了一个ObservableMap对象,它也是继承自Observable,而mapper被传给了ObservableMap的成员function,同时当前的源Observable被传给ObservableMap的成员source,进入ObservableMap类
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}复制代码
可以看到这里用到了装饰者模式,ObservableMap持有来自它上游的事件源source,MapObserver持有来自它下游的事件接收者和我们实现的转换方法function,在subscribeActual()方法中完成ObservableMap对source的订阅,触发MapObserver的onNext()方法,继而将来自source的原始数据经过函数mapper转换后再发射给下游的事件接收者,从而实现map这一功能。
现在我们终于能够来总结一下包含多个操作符时的订阅流程了,以下面这段代码为例
Observable.
create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("holen");
}
})
.map(new Function<String, Integer>() {
@Override
public Integer apply(@NonNull String s) throws Exception {
return s.length();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});复制代码
执行代码时,自上而下每一步操作符都会创建一个新的Observable(均为Observable的子类,对应不同的操作符),当执行create时,创建并返回了ObservableCreate,当执行map时,创建并返回了ObservableMap,并且每一个新的Observable都持有它上游的源Observable(即source)及当前涉及到的操作函数function。当最后一步执行订阅方法subscribe时会触发ObservableMap的subscribeActual()方法,并将最下游的Observer包装成MapObserver,同时该方法又会继续调用它所持有ObservableCreate的订阅方法(即执行source.subscribe),由此也会触发ObservableCreate的subscribeActual()方法,此时我们的发射器CreateEmitter才会调用它的onNext()方法发射事件,再依次调用MapObserver的操作函数mapper和onNext()方法,最终将事件传递给了最下游的Observer的onNext()方法。
我简单的将这段逻辑用下面这幅图来表示
map
可以看到,这幅图表达的意思是一个源先后发射了1、2、3的三个item,而经过操作符一转换,就变成了一个发射了10、20、30三个item的新的。描述操作符的长方框中也清楚的说明了该操作符进行了何种具体的转换操作(图中的10*x只是一个例子,这个具体的转换函数是可以自定义的)。于是,我们就很快速地理解了操作符的含义和用法,简单来讲,它就是通过一个函数将一个发射的item逐个进行某种转换。示例代码:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return integer * 10;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer result) throws Exception {
Log.i(TAG, "accept : " + result +"n" );
}
});复制代码
输出结果:
根据的宝石图,可以知道zip操作符的作用是把多个源发射的item通过特定函数组合在一起,然后发射组合后的item。从图中还可以看到一个重要的信息是,最终发射的item是对上面的两个源发射的item按照发射顺序逐个组合的结果,而且最终发射的等item的发射时间是由组合它的和等item中发射时间较晚的那个item决定的,也正是如此,操作符经常可以用在需要同时组合处理多个网络请求的结果的业务场景中。示例代码:
Observable.zip(Observable.just(1, 2, 3),
Observable.just("A", "B", "C"),
new BiFunction<Integer, String, String>() {
@Override
public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
return integer + s;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i(TAG, "zip : accept : " + s + "n");
}
});复制代码
输出结果:
从宝石图可以看出,操作符的作用就是将两个源发射的item连接在一起发射出来。这里的连接指的是整体连接,被操作后产生的会先发射第一个源的所有item,然后紧接着再发射第二个源的所有的item。示例代码:
Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "concat : " + integer + "n");
}
});复制代码
输出结果:
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
...
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
看看ObservableFlatMap代码
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
是不是和MAP超级像,我们这几看MergeObserver onNext做了什么
@Override
public void onNext(T t) {
...
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
...
subscribeInner(p);
}
@SuppressWarnings("unchecked")
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
} else {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
addInner(inner);
p.subscribe(inner);
break;
}
}
}
省略了很多代码,我们看主要逻辑,获取到flatMap生成的observableSource,然后 p.subscribe(inner);注意这里的P不是observable 看innerObserver的onNext做了什么
//这里的onNext事件由 p.subscribe(inner)触发
@Override
public void onNext(U t) {
if (fusionMode == QueueDisposable.NONE) {
parent.tryEmit(t, this);
} else {
parent.drain();
}
}
void tryEmit(U value, InnerObserver<T, U> inner) {
if (get() == 0 && compareAndSet(0, 1)) {
actual.onNext(value);
if (decrementAndGet() == 0) {
return;
}
} else {
SimpleQueue<U> q = inner.queue;
if (q == null) {
q = new SpscLinkedArrayQueue<U>(bufferSize);
inner.queue = q;
}
q.offer(value);
if (getAndIncrement() != 0) {
return;
}
}
drainLoop();
}
在这里我们终于看到我们定义的observer接收到了onNext事件
说了这么多,其实我们最关心的还是Rx操作符的应用场景。其实只要存在异步的地方,都可以优雅地使用Rx操作符。比如很多流行的Rx周边开源项目