目录
3.4 concatArrayDelayError()和mergeArrayDelayError()
3.6 combineLatest() 和combineLatestDelayError()
3.9 startWith()&startWithArray()
4.10 doOnTerminate() he doAfterTerminate()
5.9 firstElement() 和lastElement
5.10 elementAt() 和elementAtOrError()
参考
1.玉刚说《RxJava 只看这一篇文章就够了 (上、中、下)》
https://mp.weixin.qq.com/s/RkGHpVSpngfHDXo4Es-a9w
https://mp.weixin.qq.com/s/elA3Gib57YGWnXOEcFOPUQ
https://mp.weixin.qq.com/s/WaWEtFjmajalISwAkJyuKw
2. Season_zlc 《 给初学者的RxJava2.0教程(一~十) 》
给出作者《简书》主页
https://www.jianshu.com/u/c50b715ccaeb
添加依赖
implementation 'io.reactivex.rxjava2:rxjava:2.1.17'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
创建被观察者、观察者、订阅
public class Test {
public static void main(String[] args)
{
//创建被观察者
Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onNext(5);
emitter.onComplete();
}
});
//创建观察者
Observer observer=new Observer() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
//建立连接
observable.subscribe(observer);
}
}
一、创建操作符
1.1 create()
//创建被观察者
Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onNext(5);
emitter.onComplete();
}
});
1.2 just() 最多不超过10个
//创建被观察者
Observable observable=Observable.just(1,2,3,4,5);
1.3 From操作符
1.3.1 fromArray().
利用这个可以弥补just方法的缺点,可以多余10个
//创建被观察者
Integer[] ints= new Integer[]{new Integer(0),new Integer(1),new Integer(3)};
Observable observable=Observable.fromArray(ints);
1.3.2 fromCallable().
它只会返回一个结果
//创建被观察者
Observable observable=Observable.fromCallable(new Callable() {
@Override
public Object call() throws Exception {
return 1;
}
});
1.3.3 fromFuture()
public class Test {
public static void main(String[] args)
{
final FutureTask<String> futureTask=new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("Callable is running");
return "result";
}
});
//建立被观察者 doOnsubscribe只有在订阅的时候才发送事件
Observable observable=Observable.fromFuture(futureTask)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
futureTask.run();
}
});
//建立观察者
Observer observer=new Observer() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
//建立连接
observable.subscribe(observer);
}
}
1.3.4 fromIterable().
直接发送一个list列表
List<String> list=new ArrayList<>();
list.add("hello");
list.add("world");
//建立被观察者
Observable<String> observable=Observable.fromIterable(list);
1.4 defer().
直到被观察者被订阅才创建被观察者
String string="hello";
//建立被观察者
final Observable<String> observable=Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.just(string);
}
});
意思是只要是没有被订阅之前,string的值改了那么输出就变了;
1.5 timer().
到达指定时间之后就发送一个OL值给观察者
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(aLong);
}
});
1.6 interval().
每隔一段时间发送一个事件,从0开始,不断加1
Observable.interval(2,TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(aLong);
}
});
参数如下:时间间隔,时间单位,还有一种,接受事件之前的初始化事件,时间间隔,时间单位
1.7 intervalRange()
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
1.8 range 和rangeLong
Observable.range(2,5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
Observable.rangeLong(2,4)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(aLong);
}
});
1.9 empty() ,nerver(),error()
empty()直接发送onComplate();
nerver()不发送任何事件
error()直接发送onError()
注意,都会回调onSubscribe()方法
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
二、转换操作符
2.1 map()
将发送的数据类型转换成其他类型,而且还可以针对每一个发送的事件分别做不同的反应
Observable.just(1,2,4)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) {
return "this is "+integer.toString();
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
2.2 flatMap()
类似map()方法,但是功能更为强大,比如说,上游发送三个数据(1,2,3)经过flatmap操作符,在内部将对应着三个数据做不同的改变,最后返回一个Observable对象,利用这个对象我们可以将变换的对象发出去,但是是无序的。
最大的不同就是可以返回一个Observable对象
Observable.just(1,2,3)
.flatMap(new Function<Integer, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Integer integer) throws Exception {
List <String> list=new ArrayList<>();
list.add("this is "+integer.toString());
list.add("hello every on");
list.add("welcome");
return Observable.fromIterable(list);
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println(o);
}
});
2.3 concatMap()
这个转换符和flatmap基本相同,唯一区别就是严格按照发送顺序接受。
2.4 buffer()
public final Observable<List<T>> buffer(int count, int skip)
count代表缓冲区的最大元素数量,skip表示当缓冲区数据发送完毕之后,再次填充缓冲区要在原来位置上跳过几个元素,比如说1,2,3,4,5 ,count=2,skip=1,那么当第一次缓冲区为1,2发送完毕之后,再次入缓冲区的元素就是2,3。
Observable.just(1,2,3,4,5)
.buffer(2,1)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
System.out.println(integers);
}
});
2.5 groupBy()
将发送的数据进行分组,每个组都会返回一个Oberverable
Observable.just(1,2,3,4,5,7,8,9,10)
.groupBy(new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) throws Exception {
return integer%5;
}
}).subscribe(new Consumer<GroupedObservable<Object, Integer>>() {
@Override
public void accept(GroupedObservable<Object, Integer> objectIntegerGroupedObservable) throws Exception {
System.out.println("this is :"+objectIntegerGroupedObservable.getKey());
objectIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
}
});
2.6 scan()
将数据按照一定的逻辑聚合起来
Observable.just(1,3,5,4,5,7)
.scan(new BiFunction<Integer, Integer, Integer>() { //结合方式
@Override
//起始第一个参数integer 1,起始第二个参数 2,integer 将存储最终值
public Integer apply(Integer integer, Integer integer2) throws Exception {
System.out.println("integer: "+integer);
System.out.println("integer1: "+integer2);
return integer+integer2;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Integer integer) {
System.out.println("最终数据:"+integer);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
2.7 window()
发送指定数量的事件的时候,就将这些事件分为一组。
Observable.just(1,2,3,4,5)
.window(2)
.subscribe(new Observer<Observable<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Observable<Integer> integerObservable) {
integerObservable
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe----");
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
System.out.println("onError----");
}
@Override
public void onComplete() {
System.out.println("onComplete----");
}
});
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
三、 组合操作符
3.1 concat()
将多个观察者组合在一起,按照顺序发送事件。最多只能组合四个观察者。
Observable.concat(Observable.just(1,2)
,Observable.just(3,4)
,Observable.just(5,6)
,Observable.just(7,8))
.subscribe(new Consumer<Number>() {
@Override
public void accept(Number number) throws Exception {
System.out.println(number);
}
});
3.2 concatArray()
和concat()一样,但是不限于4个
3.3 merge() 和mergeArray()
和concat与concatArray一样,但是这是并行发送,之前的是串行发送。
Observable.merge(
Observable.interval(2,TimeUnit.SECONDS)
.map(new Function<Long,String>() {
@Override
public String apply(Long aLong) throws Exception {
return "A "+aLong;
}
})
,Observable.interval(2,TimeUnit.SECONDS)
.map(new Function<Long,String>() {
@Override
public String apply(Long aLong) throws Exception {
return "B "+aLong;
}
})
).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
3.4 concatArrayDelayError()和mergeArrayDelayError()
concatArray和mergeArray都是将多个观察者组合在一起发送的,当其中一个发送了Error事件,那么observer将不接受消息,那么标题所说的两个方法可以让所有事件都发送完毕再执行onError()
Observable.concatArray(
Observable.create(new ObservableOnSubscribe<Object>()
{
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception
{
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new NumberFormatException());
}
})
,Observable.create(new ObservableOnSubscribe<Object>()
{
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception
{
emitter.onNext(4);
emitter.onNext(5);
emitter.onNext(6);
}
})
).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
3.5 zip()
多个被观察者在一起, 将他们各自发送的数据,依次拿出来进行处理后当作一个事件发出去。发送事件的个数,就短原则
Observable.zip(
Observable.just(1, 2, 3, 4, 5)
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) throws Exception {
return "A " + integer.toString();
}
})
, Observable.just(6, 7, 8, 9)
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) throws Exception {
return "B " + integer.toString();
}
})
, new BiFunction<Object, Object, Object>() { //组合方式
@Override
public Object apply(Object o, Object o2) throws Exception {
return o.toString()+" "+o2.toString();
}
}
).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete(){
System.out.println("onComplete");
}
});
3.6 combineLatest() 和combineLatestDelayError()
发送时间错开之后,总是结合两个被观察者最新的事件,只要有一方没有产生那么就不会产生结果
Observable.combineLatest(
Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
.map(new Function<Long, Object>() {
@Override
public Object apply(Long aLong) throws Exception {
return "A 发送 "+aLong;
}
})
, Observable.intervalRange(2, 6, 2, 2, TimeUnit.SECONDS)
.map(new Function<Long, Object>() {
@Override
public Object apply(Long aLong) throws Exception {
return "B 发送 "+aLong;
}
})
, new BiFunction<Object, Object, Object>() {
@Override
public Object apply(Object o, Object o2) throws Exception {
return "最终结合发送数据: "+o+" "+o2;
}
}
).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
3.7 reduce()
与scan()操作符类似,也是发送数据以一定的方式结合起来,不同的是scan每次结合都发送一次结果值,而reduce直接发送最终结果
Observable.just(1,3,5,4,5,7)
.reduce(new BiFunction<Integer, Integer, Integer>() { //结合方式
@Override
//起始第一个参数integer 1,起始第二个参数 2,integer 将存储最终值
public Integer apply(Integer integer, Integer integer2) throws Exception {
System.out.println("integer: "+integer);
System.out.println("integer1: "+integer2);
return integer+integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("最终数据: "+integer);
}
});
3.8 collect()
将数据收集到数据结构当中
Observable.just(1,2,3,3,4,5,5,6,7)
//返回数据结构
.collect(new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
ArrayList<Integer> list=new ArrayList<>();
return list;
}
}
//向数据结构添加数据
,new BiConsumer<ArrayList<Integer>,Integer>() {
@Override
public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
list.add(integer);
}
})
.subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> list) throws Exception {
System.out.println(list);
}
});
3.9 startWith()&startWithArray()
发送事件之前追加事件,前者只能追加一个事件,后者可以追加多个事件。追加的事件会先发出。
Observable.just(5,5,5,5,5)
.startWith(1)
.startWithArray(2,2,2,2,2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
越后声明的事件将会先输出
3.10 count()
返回被观察者发送事件的数量而不发送事件
Observable.just(5,5,5,5,5)
.startWith(2)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(aLong);
}
});
四、 功能操作符
4.1 delay()
延迟一段时间发送事件
Observable.just(5,5,5,5,5)
.delay(2,TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
4.2 doOnEach()
被观察者每次发送一个事件之前都会回调这个方法,onComplage也会调用
Observable.just(2,4,5,6,1)
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
System.out.println("将要发送: "+integerNotification);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("接收:"+integer);
}
});
4.3 doOnNext()
被观察者每次发送onNext()之前都会回调这个方法
Observable.just(2,4,5,6,1)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("doOnNext :"+integer);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext "+integer);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
4.4 doAfterNext()
顾名思义,每次发送onNext()之后回调
Observable.just(2,4,5,6,1)
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("doAfterNext :"+integer);
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext "+integer);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
4.5 doOnComplete()
被观察者每次发送onComplete之前回调
4.6 doOnError()
被观察者每次发送onError之前回调
4.7 doOnSubscribe()
被观察者每次发送onSubscribe()之前回调
Observable.just(2,4,5,6,1)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("doOnsubscribe");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext "+integer);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
4.8 doOnDispose()
当调用Disposable的dispose()之后回调这个方法
Observable.just(2,4,5,6,1)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
System.out.println("doOnDispose");
}
})
.subscribe(new Observer<Integer>() {
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
disposable=d;
System.out.println("onSubscribe");
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext "+integer);
if(integer.equals(new Integer(4)))
{
disposable.dispose();
}
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
4.9 doOnLifecycle()
public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)
第一个参数的回调方法在回调onSubscribe()方法之前回调,第二个参数的回调方法在调用Disposable的dispose()方法之后被回调
就是接受数据之前调用,断开连接之后再调用
Observable.just(1,2,3,4,5,6)
.doOnLifecycle(new Consumer<Disposable>() {
@Override //在回调onSubscribe()之前调用
public void accept(Disposable disposable) throws Exception {
System.out.println("on accept");
}
}
, new Action() {
@Override //在Disposable的dispose方法之后调用
public void run() throws Exception {
System.out.println("on Action");
}
})
.subscribe(new Observer<Integer>() {
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
disposable=d;
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
disposable.dispose();
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
4.10 doOnTerminate() he doAfterTerminate()
顾名思义就是分别在结束之前调用和在结束之后调用,这里的结束就是回调onError或者onComplete方法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onComplete();
}
})
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
System.out.println("Action");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
4.11 doFinally()
所有的事件发送完了之后调用,包括Error和Complete
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onComplete();
}
}).doFinally(new Action() {
@Override
public void run() throws Exception {
System.out.println("end");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
4.12 onErrorReturn()
当接收到onError事件之后,回调这个方法(不回调onError方法),这个方法的返回值会回调onNext方法(一般用来监测错误原因),并正常结束该事件序列
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onNext(5);
emitter.onError(new NullPointerException());
}
}).onErrorReturn(new Function<Throwable, Object>() {
@Override
public Object apply(Throwable throwable) {
System.out.println("onErrorReturn: "+throwable);
return 404;
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
4.13 onErrorResumeNext()
当接收到onError事件之后回调这个方法,并且返回一个新的Observable,可以用来错误弥补(并不会回调onError方法)
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onError(new NullPointerException());
emitter.onNext(5);
}
}).onErrorResumeNext(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
System.out.println("onErrorResumeNext:"+throwable);
return Observable.just(6,7,8);
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
返回结果里面并没有5这个数字
4.14 onExceptionResumeNext()
与 onErrorResumeNext()作用基本一致,但是这个只能用来扑捉Exception,不能用来扑捉Error事件
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onError(new NullPointerException()); //产生异常
emitter.onNext(5);
}
}).onExceptionResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer observer) {
observer.onNext(10);
observer.onComplete();
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
4.15 retry()
如果出现错误事件就会重新发送所有事件,参数是重发的次数,onError只是最后回调一次
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
emitter.onNext(1);
emitter.onError(new NullPointerException());
}
}).retry(3).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
4.16 retryUntil()
出现错误事件之后,可以通过这个功能操作符判断是否继续发送事件,return true表示否,return fales表示继续重复发送
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
emitter.onNext(1);
emitter.onError(new NullPointerException());
}
}).retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
return true; //禁止
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
4.17 retryWhen()
当被观察者接收到异常或者错误事件的时候会回调这个方法,这个方法返回一个新的被观察者,如果被观察者返回的是Error事件那么之前的观察者就不会继续发送事件,如果被观察者发送的事正常事件,那么之前的被观察者将不断重复发送事件。
Observable.create(new ObservableOnSubscribe < String > () {
@Override
public void subscribe(ObservableEmitter < String > e) throws Exception {
e.onNext("1111");
e.onNext("2222");
e.onNext("3333");
e.onError(new Exception("404"));
e.onNext("4444");
}
})
.retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {
@Override
public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {
@Override
public ObservableSource <? > apply(Throwable throwable) throws Exception {
if(!throwable.toString().equals("java.lang.Exception: 404")) {
return Observable.just("忽略这个异常");
} else {
return Observable.error(new Throwable("终止"));
}
}
});
}
})
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
4.18 repeat()
重复发送被观察者的事件,参数为重复发送的次数
Observable.just(1,2,3,4)
.repeat(2)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
4.19 repeatWhen()
增加条件,是否重复发送事件,返回一个新的被观察者
和retrywhen()差不多
4.20 subscribeOn 和observeOn()
前者指定被观察者运行的线程,后者指定观察者的运行线程
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
System.out.println("observable:"+Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
System.out.println("observer:"+Thread.currentThread().getName());
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
可以指定的值如下:
调度器 | 作用 |
| 代表一个常规的新线程 |
| 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作 |
| 代表CPU计算密集型的操作, 例如需要大量计算的操作 |
| 代表Android的主线程 |
五、过滤操作符
5.1 filter()
自定义一定的逻辑,来过滤被观察者发送的事件
Observable.just("hel","hello","word","world")
.filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return s.startsWith("h");
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
5.2 ofType()
顾名思义,除了参数所指定的类型,其他的都过滤
Observable.just(1,2,3,4,"hello","world")
.ofType(String.class)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
5.3 skip()和skipLast
按照顺序跳过参数所指定的数量的事件,前者正序后者反序
Observable.just(1,2,3,4,5)
.skip(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
5.4 distinct()
过滤重复事件
Observable.just(1,2,3,4,3)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
5.5 distinctUntilChanged()
过滤重复连续事件
5.6 take() 和takeLast()
控制观察者接收事件的数量,前者正序,后者反序
Observable.just(1,2,3,4,3)
.take(4)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
5.7 debounce()
过滤那些发送间隔时间小于参数指定时间的事件
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
Thread.sleep(800);
emitter.onNext(2);
}
}).debounce(1,TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Integer o) {
System.out.println(o);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
5. 8 throttleWithTimeout()
和上面的方法作用一样
5.9 firstElement() 和lastElement
顾名思义观察者只接收第一个事件(最后一个事件)
5.10 elementAt() 和elementAtOrError()
前者可以让接受者接收事件序列的指定事件,但是如果index超出了序列的范围的话并不会发出任何异常,只不过没有数据被接收,如果想要发出异常用后者。 index 从0开始
Observable.just(1,2,3,4)
.elementAt(0)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
六、条件操作符
6.1 all()
判断事件序列是否全部满足某个条件,如果是返回true
Observable.just(1,2,3,4)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer>8;
}
})
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean b) throws Exception {
System.out.println(b);
}
});
6.2 takeWhile()
功能和filter一样,过滤不满足条件的事件.
6.3 skipWhile()
和takeWhile()作用相反
6.4 takeUntil()
接受者接收满足条件的事件之前的事件(包括本事件)
Observable.just(1,2,3,4,2)
.takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer>3;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
6.5 skipUntil()
当skipUntil()里的数据发送完毕才接收被观察者的数据,但是发送skipUntil()里的数据的时候,被观察者也在发送数据,但是没有被接收,所以只能接收到后面的数据
Observable.intervalRange(1,5,0,1,TimeUnit.SECONDS)
.skipUntil(Observable.intervalRange(6,5,3,1,TimeUnit.SECONDS))
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(aLong);
}
});
6.6 sequenceEqual()
判断连个被观察者发送的事件是否一样
Observable.sequenceEqual(Observable.just(1,2,3)
,Observable.just(1,2,3))
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
System.out.println(aBoolean);
}
});
6.7 contains()
判断被观察者发送的事件序列里面有没有某个事件
Observable.just(1,2,3)
.contains(2)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
System.out.println(aBoolean);
}
});
6.8 isEmpty()
判断事件序列是否为空,不包括onError和onComplete
6.9 amb()
amb() 要传入一个 Observable 集合,但是只会发送最先发送事件的 Observable 中的事件,其余 Observable 将会被丢弃。
List<Observable<Integer>> list=new ArrayList<>();
list.add(Observable.just(1,2,3,4).delay(3,TimeUnit.SECONDS));
list.add(Observable.just(5,6,7));
Observable.amb(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
6.10 defaultIfEmpty()
如果被观察者只发送了一个oncomplete那么可以利用这个方法发送一个值
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
emitter.onNext(3);
emitter.onComplete();
}
}).defaultIfEmpty(1)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println(o);
}
});