淘先锋技术网

首页 1 2 3 4 5 6 7

目录

参考

添加依赖

创建被观察者、观察者、订阅

一、创建操作符

1.1 create()

1.2 just()    最多不超过10个

1.3 From操作符

1.3.1 fromArray(). 

1.3.2 fromCallable().  

1.3.3 fromFuture()

1.3.4 fromIterable().

1.4 defer().

1.5 timer().

1.6 interval().  

1.7 intervalRange() 

1.8 range 和rangeLong

1.9 empty() ,nerver(),error()

二、转换操作符

2.1  map()  

2.2 flatMap()

2.3 concatMap()

2.4 buffer()

2.5 groupBy()

2.6 scan()

2.7 window()

三、 组合操作符

3.1 concat()

3.2 concatArray()

3.3 merge() 和mergeArray()

3.4 concatArrayDelayError()和mergeArrayDelayError()

3.5 zip()

3.6 combineLatest() 和combineLatestDelayError()

3.7 reduce()

3.8 collect()

3.9 startWith()&startWithArray()

3.10 count()

四、 功能操作符

4.1 delay()

4.2 doOnEach()

4.3 doOnNext()

4.4  doAfterNext()

4.5 doOnComplete()

4.6 doOnError()

4.7 doOnSubscribe()

4.8 doOnDispose()

4.9 doOnLifecycle()

4.10 doOnTerminate() he doAfterTerminate()

4.11 doFinally()

4.12 onErrorReturn()

4.13 onErrorResumeNext()

4.14 onExceptionResumeNext()

4.15 retry()

4.16 retryUntil()

4.17 retryWhen()

4.18 repeat()

4.19 repeatWhen()

4.20 subscribeOn 和observeOn()

五、过滤操作符

5.1 filter()

5.2 ofType()

5.3 skip()和skipLast

5.4 distinct()

5.5 distinctUntilChanged()

5.6 take() 和takeLast()

5.7 debounce()

5. 8 throttleWithTimeout()

5.9 firstElement() 和lastElement

5.10 elementAt() 和elementAtOrError()

六、条件操作符

6.1 all()

6.2 takeWhile()

6.3 skipWhile()

6.4 takeUntil()

6.5 skipUntil()

6.6 sequenceEqual()

6.7 contains()

6.8 isEmpty()

6.9 amb()

6.10 defaultIfEmpty()


参考

1.玉刚说《RxJava 只看这一篇文章就够了 (上、中、下)》

https://mp.weixin.qq.com/s/RkGHpVSpngfHDXo4Es-a9w

https://mp.weixin.qq.com/s/elA3Gib57YGWnXOEcFOPUQ

https://mp.weixin.qq.com/s/WaWEtFjmajalISwAkJyuKw

2. Season_zlc 《 给初学者的RxJava2.0教程(一~十) 》

给出作者《简书》主页

https://www.jianshu.com/u/c50b715ccaeb

添加依赖

    implementation 'io.reactivex.rxjava2:rxjava:2.1.17'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'

创建被观察者、观察者、订阅

public class Test {
    public static void main(String[] args)
    {
        //创建被观察者
        Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onNext(5);
                emitter.onComplete();
            }
        });

        //创建观察者
        Observer observer=new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

        //建立连接
        observable.subscribe(observer);
    }
}

一、创建操作符

1.1 create()

//创建被观察者
        Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onNext(5);
                emitter.onComplete();
            }
        });

1.2 just()    最多不超过10个

 //创建被观察者
        Observable observable=Observable.just(1,2,3,4,5);

 

1.3 From操作符

1.3.1 fromArray(). 

利用这个可以弥补just方法的缺点,可以多余10个

        //创建被观察者
        Integer[] ints= new Integer[]{new Integer(0),new Integer(1),new Integer(3)};
        Observable observable=Observable.fromArray(ints);

 

1.3.2 fromCallable().  

它只会返回一个结果

 //创建被观察者
       Observable observable=Observable.fromCallable(new Callable() {
           @Override
           public Object call() throws Exception {
               return 1;
           }
       });

 

1.3.3 fromFuture()

public class Test {
    public static void main(String[] args)
    {

        final FutureTask<String> futureTask=new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("Callable is running");
                return "result";
            }
        });

        //建立被观察者  doOnsubscribe只有在订阅的时候才发送事件
        Observable observable=Observable.fromFuture(futureTask)
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        futureTask.run();
                    }
                });

        //建立观察者
        Observer observer=new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

        //建立连接
        observable.subscribe(observer);
    }
}

 

1.3.4 fromIterable().

直接发送一个list列表

 List<String> list=new ArrayList<>();
        list.add("hello");
        list.add("world");
        //建立被观察者
        Observable<String> observable=Observable.fromIterable(list);

 

1.4 defer().

 直到被观察者被订阅才创建被观察者

String string="hello";
        //建立被观察者  
        final Observable<String> observable=Observable.defer(new Callable<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> call() throws Exception {
                return Observable.just(string);
            }
        });

意思是只要是没有被订阅之前,string的值改了那么输出就变了;

 

1.5 timer().

到达指定时间之后就发送一个OL值给观察者

Observable.timer(2, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println(aLong);
                    }
                });

 

1.6 interval().  

每隔一段时间发送一个事件,从0开始,不断加1

Observable.interval(2,TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println(aLong);
                    }
                });

参数如下:时间间隔,时间单位,还有一种,接受事件之前的初始化事件,时间间隔,时间单位

 

1.7 intervalRange() 

public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

1.8 range 和rangeLong

Observable.range(2,5)
               .subscribe(new Consumer<Integer>() {
                   @Override
                   public void accept(Integer integer) throws Exception {
                       System.out.println(integer);
                   }
               });

       Observable.rangeLong(2,4)
               .subscribe(new Consumer<Long>() {
                   @Override
                   public void accept(Long aLong) throws Exception {
                       System.out.println(aLong);
                   }
               });

 

1.9 empty() ,nerver(),error()

empty()直接发送onComplate();

nerver()不发送任何事件

error()直接发送onError()

注意,都会回调onSubscribe()方法

 Observable.empty().subscribe(new Observer<Object>() {
           @Override
           public void onSubscribe(Disposable d) {
               System.out.println("onSubscribe");
           }

           @Override
           public void onNext(Object o) {
             
           }

           @Override
           public void onError(Throwable e) {
               System.out.println("onError");
           }

           @Override
           public void onComplete() {
               System.out.println("onComplete");
           }
       });

 

二、转换操作符

2.1  map()  

将发送的数据类型转换成其他类型,而且还可以针对每一个发送的事件分别做不同的反应

Observable.just(1,2,4)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) {
                        return "this is "+integer.toString();
                    }
                }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

 

2.2 flatMap()

类似map()方法,但是功能更为强大,比如说,上游发送三个数据(1,2,3)经过flatmap操作符,在内部将对应着三个数据做不同的改变,最后返回一个Observable对象,利用这个对象我们可以将变换的对象发出去,但是是无序的。

最大的不同就是可以返回一个Observable对象

Observable.just(1,2,3)
                .flatMap(new Function<Integer, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Integer integer) throws Exception {
                        List <String> list=new ArrayList<>();
                        list.add("this is "+integer.toString());
                        list.add("hello every on");
                        list.add("welcome");
                        return Observable.fromIterable(list);
                    }
                }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println(o);
            }
        });

 

2.3 concatMap()

这个转换符和flatmap基本相同,唯一区别就是严格按照发送顺序接受。

2.4 buffer()

public final Observable<List<T>> buffer(int count, int skip)

count代表缓冲区的最大元素数量,skip表示当缓冲区数据发送完毕之后,再次填充缓冲区要在原来位置上跳过几个元素,比如说1,2,3,4,5 ,count=2,skip=1,那么当第一次缓冲区为1,2发送完毕之后,再次入缓冲区的元素就是2,3。

Observable.just(1,2,3,4,5)
                .buffer(2,1)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        System.out.println(integers);
                    }
                });

2.5 groupBy()

将发送的数据进行分组,每个组都会返回一个Oberverable

Observable.just(1,2,3,4,5,7,8,9,10)
                .groupBy(new Function<Integer, Object>() {
                    @Override
                    public Object apply(Integer integer) throws Exception {
                        return integer%5;
                    }
                }).subscribe(new Consumer<GroupedObservable<Object, Integer>>() {
            @Override
            public void accept(GroupedObservable<Object, Integer> objectIntegerGroupedObservable) throws Exception {
                System.out.println("this is :"+objectIntegerGroupedObservable.getKey());

                objectIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
            }
        });

 

2.6 scan()

将数据按照一定的逻辑聚合起来

Observable.just(1,3,5,4,5,7)
                .scan(new BiFunction<Integer, Integer, Integer>() {     //结合方式
                    @Override
                    //起始第一个参数integer 1,起始第二个参数 2,integer 将存储最终值
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        System.out.println("integer: "+integer);
                        System.out.println("integer1: "+integer2);
                        return integer+integer2;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("最终数据:"+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

 

2.7 window()

发送指定数量的事件的时候,就将这些事件分为一组。

Observable.just(1,2,3,4,5)
                .window(2)
                .subscribe(new Observer<Observable<Integer>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Observable<Integer> integerObservable) {
                        integerObservable
                                .subscribe(new Observer<Integer>() {
                                    @Override
                                    public void onSubscribe(Disposable d) {
                                        System.out.println("onSubscribe----");
                                    }

                                    @Override
                                    public void onNext(Integer integer) {
                                        System.out.println(integer);
                                    }

                                    @Override
                                    public void onError(Throwable e) {
                                        System.out.println("onError----");
                                    }

                                    @Override
                                    public void onComplete() {
                                        System.out.println("onComplete----");
                                    }
                                });
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

 

 

三、 组合操作符

3.1 concat()

将多个观察者组合在一起,按照顺序发送事件。最多只能组合四个观察者。

        Observable.concat(Observable.just(1,2)
                ,Observable.just(3,4)
                ,Observable.just(5,6)
                ,Observable.just(7,8))
                .subscribe(new Consumer<Number>() {
                    @Override
                    public void accept(Number number) throws Exception {
                        System.out.println(number);
                    }
                });

 

3.2 concatArray()

和concat()一样,但是不限于4个

 

3.3 merge() 和mergeArray()

和concat与concatArray一样,但是这是并行发送,之前的是串行发送。

        Observable.merge(
                Observable.interval(2,TimeUnit.SECONDS)
                .map(new Function<Long,String>() {
                    @Override
                    public String apply(Long aLong) throws Exception {
                        return "A "+aLong;
                    }
                })
                ,Observable.interval(2,TimeUnit.SECONDS)
                .map(new Function<Long,String>() {
                    @Override
                    public String apply(Long aLong) throws Exception {
                        return "B "+aLong;
                    }
                })
        ).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });

 

3.4 concatArrayDelayError()和mergeArrayDelayError()

concatArray和mergeArray都是将多个观察者组合在一起发送的,当其中一个发送了Error事件,那么observer将不接受消息,那么标题所说的两个方法可以让所有事件都发送完毕再执行onError()

Observable.concatArray(
                Observable.create(new ObservableOnSubscribe<Object>()
                {
                    @Override
                    public void subscribe(ObservableEmitter<Object> emitter) throws Exception
                    {
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onError(new NumberFormatException());
                    }
                })
                ,Observable.create(new ObservableOnSubscribe<Object>()
                {
                    @Override
                    public void subscribe(ObservableEmitter<Object> emitter) throws Exception
                    {
                        emitter.onNext(4);
                        emitter.onNext(5);
                        emitter.onNext(6);
                    }
                })
        ).subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Object o) {
                        System.out.println(o);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

 

3.5 zip()

多个被观察者在一起, 将他们各自发送的数据,依次拿出来进行处理后当作一个事件发出去。发送事件的个数,就短原则

        Observable.zip(
                Observable.just(1, 2, 3, 4, 5)
                        .map(new Function<Integer, Object>() {
                            @Override
                            public Object apply(Integer integer) throws Exception {
                                return "A " + integer.toString();
                            }
                        })
                , Observable.just(6, 7, 8, 9)
                        .map(new Function<Integer, Object>() {
                            @Override
                            public Object apply(Integer integer) throws Exception {
                                return "B " + integer.toString();
                            }
                        })
                , new BiFunction<Object, Object, Object>() {  //组合方式
                    @Override
                    public Object apply(Object o, Object o2) throws Exception {
                        return o.toString()+" "+o2.toString();
                    }
                }
        ).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete(){
                System.out.println("onComplete"); 
            }
        });

 

 

3.6 combineLatest() 和combineLatestDelayError()

发送时间错开之后,总是结合两个被观察者最新的事件,只要有一方没有产生那么就不会产生结果

Observable.combineLatest(
                Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
                        .map(new Function<Long, Object>() {
                            @Override
                            public Object apply(Long aLong) throws Exception {
                                return "A 发送 "+aLong;
                            }
                        })
                , Observable.intervalRange(2, 6, 2, 2, TimeUnit.SECONDS)
                        .map(new Function<Long, Object>() {
                            @Override
                            public Object apply(Long aLong) throws Exception {
                                return "B 发送 "+aLong;
                            }
                        })
                , new BiFunction<Object, Object, Object>() {
                    @Override
                    public Object apply(Object o, Object o2) throws Exception {
                        return "最终结合发送数据: "+o+" "+o2;
                    }
                }
        ).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

 

 

3.7 reduce()

与scan()操作符类似,也是发送数据以一定的方式结合起来,不同的是scan每次结合都发送一次结果值,而reduce直接发送最终结果

Observable.just(1,3,5,4,5,7)
                .reduce(new BiFunction<Integer, Integer, Integer>() {     //结合方式
                    @Override
                    //起始第一个参数integer 1,起始第二个参数 2,integer 将存储最终值
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        System.out.println("integer: "+integer);
                        System.out.println("integer1: "+integer2);
                        return integer+integer2;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("最终数据: "+integer);
                    }
                });

 

 

3.8 collect()

将数据收集到数据结构当中

        Observable.just(1,2,3,3,4,5,5,6,7)
                //返回数据结构
                .collect(new Callable<ArrayList<Integer>>() {   
                    @Override
                    public ArrayList<Integer> call() throws Exception {
                        ArrayList<Integer> list=new ArrayList<>();
                        return list;
                    }
                }
                //向数据结构添加数据
                ,new BiConsumer<ArrayList<Integer>,Integer>() {
                    @Override
                    public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
                        list.add(integer);
                    }
                })
                .subscribe(new Consumer<ArrayList<Integer>>() {
                    @Override
                    public void accept(ArrayList<Integer> list) throws Exception {
                        System.out.println(list);
                    }
                });

 

3.9 startWith()&startWithArray()

发送事件之前追加事件,前者只能追加一个事件,后者可以追加多个事件。追加的事件会先发出。

        Observable.just(5,5,5,5,5)
                .startWith(1)
                .startWithArray(2,2,2,2,2)   
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

越后声明的事件将会先输出

 

3.10 count()

返回被观察者发送事件的数量而不发送事件

        Observable.just(5,5,5,5,5)
                .startWith(2)
                .count()
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println(aLong);
                    }
                });

 

 

四、 功能操作符

4.1 delay()

延迟一段时间发送事件

Observable.just(5,5,5,5,5)
                .delay(2,TimeUnit.SECONDS)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

 

4.2 doOnEach()

被观察者每次发送一个事件之前都会回调这个方法,onComplage也会调用

        Observable.just(2,4,5,6,1)
                .doOnEach(new Consumer<Notification<Integer>>() {
                    @Override
                    public void accept(Notification<Integer> integerNotification) throws Exception {
                        System.out.println("将要发送: "+integerNotification);
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("接收:"+integer);
                    }
                });

 

4.3 doOnNext()

被观察者每次发送onNext()之前都会回调这个方法

        Observable.just(2,4,5,6,1)
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("doOnNext :"+integer);
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext "+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

 

4.4  doAfterNext()

顾名思义,每次发送onNext()之后回调

Observable.just(2,4,5,6,1)
                .doAfterNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("doAfterNext :"+integer);
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext "+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

 

 

4.5 doOnComplete()

被观察者每次发送onComplete之前回调

4.6 doOnError()

被观察者每次发送onError之前回调

4.7 doOnSubscribe()

被观察者每次发送onSubscribe()之前回调

Observable.just(2,4,5,6,1)
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        System.out.println("doOnsubscribe");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext "+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

 

 

4.8 doOnDispose()

当调用Disposable的dispose()之后回调这个方法

 

Observable.just(2,4,5,6,1)
                .doOnDispose(new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("doOnDispose");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    Disposable disposable;
                    @Override
                    public void onSubscribe(Disposable d) {
                        disposable=d;
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext "+integer);
                        if(integer.equals(new Integer(4)))
                        {
                            disposable.dispose();
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

 

4.9 doOnLifecycle()

public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)

第一个参数的回调方法在回调onSubscribe()方法之前回调,第二个参数的回调方法在调用Disposable的dispose()方法之后被回调

就是接受数据之前调用,断开连接之后再调用

Observable.just(1,2,3,4,5,6)
                .doOnLifecycle(new Consumer<Disposable>() {
                    @Override     //在回调onSubscribe()之前调用
                    public void accept(Disposable disposable) throws Exception {
                        System.out.println("on accept");
                    }
                    }
                    , new Action() {
                    @Override   //在Disposable的dispose方法之后调用
                    public void run() throws Exception {
                        System.out.println("on Action");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    Disposable disposable;

                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                        disposable=d;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                        disposable.dispose();
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

 

 

4.10 doOnTerminate() he doAfterTerminate()

顾名思义就是分别在结束之前调用和在结束之后调用,这里的结束就是回调onError或者onComplete方法

 

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onComplete();
            }
        })
                .doOnTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("Action");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

 

 

4.11 doFinally()

所有的事件发送完了之后调用,包括Error和Complete

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onComplete();
            }
        }).doFinally(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("end");
            }
        })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

4.12 onErrorReturn()

当接收到onError事件之后,回调这个方法(不回调onError方法),这个方法的返回值会回调onNext方法(一般用来监测错误原因),并正常结束该事件序列

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onNext(5);
                emitter.onError(new NullPointerException());
            }
        }).onErrorReturn(new Function<Throwable, Object>() {
            @Override
            public Object apply(Throwable throwable) {
                System.out.println("onErrorReturn: "+throwable);
                return 404;
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

 

 

4.13 onErrorResumeNext()

当接收到onError事件之后回调这个方法,并且返回一个新的Observable,可以用来错误弥补(并不会回调onError方法)

        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onError(new NullPointerException());
                emitter.onNext(5);
            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Throwable throwable) throws Exception {
                System.out.println("onErrorResumeNext:"+throwable);
                return Observable.just(6,7,8);
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

返回结果里面并没有5这个数字

4.14 onExceptionResumeNext()

与 onErrorResumeNext()作用基本一致,但是这个只能用来扑捉Exception,不能用来扑捉Error事件

        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onError(new NullPointerException());  //产生异常
                emitter.onNext(5);
            }
        }).onExceptionResumeNext(new Observable<Integer>() {
            @Override
            protected void subscribeActual(Observer observer) {
                observer.onNext(10);
                observer.onComplete();
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

 

 

4.15 retry()

如果出现错误事件就会重新发送所有事件,参数是重发的次数,onError只是最后回调一次

        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onError(new NullPointerException());
            }
        }).retry(3).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

 

 

4.16 retryUntil()

出现错误事件之后,可以通过这个功能操作符判断是否继续发送事件,return true表示否,return fales表示继续重复发送

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onError(new NullPointerException());
            }
        }).retryUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {
                return true;   //禁止
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

 

 

4.17 retryWhen()

当被观察者接收到异常或者错误事件的时候会回调这个方法,这个方法返回一个新的被观察者,如果被观察者返回的是Error事件那么之前的观察者就不会继续发送事件,如果被观察者发送的事正常事件,那么之前的被观察者将不断重复发送事件。

 

Observable.create(new ObservableOnSubscribe < String > () {
            @Override
            public void subscribe(ObservableEmitter < String > e) throws Exception {
                e.onNext("1111");
                e.onNext("2222");
                e.onNext("3333");
                e.onError(new Exception("404"));
                e.onNext("4444");
            }
        })
                .retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {
                    @Override
                    public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {
                        return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {
                            @Override
                            public ObservableSource <? > apply(Throwable throwable) throws Exception {
                                if(!throwable.toString().equals("java.lang.Exception: 404")) {
                                    return Observable.just("忽略这个异常");
                                } else {
                                    return Observable.error(new Throwable("终止"));
                                }
                            }
                        });
                    }
                })
                .subscribe(new Observer < String > () {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }
                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

 

 

4.18 repeat()

重复发送被观察者的事件,参数为重复发送的次数

Observable.just(1,2,3,4)
                .repeat(2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

 

 

4.19 repeatWhen()

增加条件,是否重复发送事件,返回一个新的被观察者

和retrywhen()差不多

 

4.20 subscribeOn 和observeOn()

前者指定被观察者运行的线程,后者指定观察者的运行线程

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                System.out.println("observable:"+Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io())
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                        System.out.println("observer:"+Thread.currentThread().getName());
                    }

                    @Override
                    public void onNext(Object o) {
                        System.out.println(o);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");

                    }
                });

 

 

可以指定的值如下:

调度器作用
Schedulers.newThread()
代表一个常规的新线程
Schedulers.io();
代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation();
代表CPU计算密集型的操作, 例如需要大量计算的操作
AndroidSchedulers.mainThread();
代表Android的主线程

 

五、过滤操作符

5.1 filter()

自定义一定的逻辑,来过滤被观察者发送的事件

Observable.just("hel","hello","word","world")
                .filter(new Predicate<String>() {
                    @Override
                    public boolean test(String s) throws Exception {
                        return s.startsWith("h");
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });

 

 

5.2 ofType()

顾名思义,除了参数所指定的类型,其他的都过滤

Observable.just(1,2,3,4,"hello","world")
                .ofType(String.class)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });

 

 

5.3 skip()和skipLast

按照顺序跳过参数所指定的数量的事件,前者正序后者反序

Observable.just(1,2,3,4,5)
                .skip(3)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

 

 

5.4 distinct()

过滤重复事件

Observable.just(1,2,3,4,3)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

 

 

5.5 distinctUntilChanged()

过滤重复连续事件

5.6 take() 和takeLast()

控制观察者接收事件的数量,前者正序,后者反序

 Observable.just(1,2,3,4,3)
                .take(4)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

 

5.7 debounce()

过滤那些发送间隔时间小于参数指定时间的事件

 

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                Thread.sleep(800);
                emitter.onNext(2);
            }
        }).debounce(1,TimeUnit.SECONDS)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer o) {
                        System.out.println(o);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });

 

5. 8 throttleWithTimeout()

和上面的方法作用一样

 

5.9 firstElement() 和lastElement

顾名思义观察者只接收第一个事件(最后一个事件)

 

5.10 elementAt() 和elementAtOrError()

前者可以让接受者接收事件序列的指定事件,但是如果index超出了序列的范围的话并不会发出任何异常,只不过没有数据被接收,如果想要发出异常用后者。 index 从0开始

Observable.just(1,2,3,4)
                .elementAt(0)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

 

 

六、条件操作符

6.1 all()

判断事件序列是否全部满足某个条件,如果是返回true

Observable.just(1,2,3,4)
                .all(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer>8;
                    }
                })
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean b) throws Exception {
                        System.out.println(b);
                    }
                });

 

6.2 takeWhile()

功能和filter一样,过滤不满足条件的事件.

6.3 skipWhile()

和takeWhile()作用相反

6.4 takeUntil()

接受者接收满足条件的事件之前的事件(包括本事件)

Observable.just(1,2,3,4,2)
                .takeUntil(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer>3;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

 

6.5 skipUntil()

当skipUntil()里的数据发送完毕才接收被观察者的数据,但是发送skipUntil()里的数据的时候,被观察者也在发送数据,但是没有被接收,所以只能接收到后面的数据

Observable.intervalRange(1,5,0,1,TimeUnit.SECONDS)
                .skipUntil(Observable.intervalRange(6,5,3,1,TimeUnit.SECONDS))
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println(aLong);
                    }
                });

6.6 sequenceEqual()

判断连个被观察者发送的事件是否一样

Observable.sequenceEqual(Observable.just(1,2,3)
                ,Observable.just(1,2,3))
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        System.out.println(aBoolean);
                    }
                });

 

6.7 contains()

判断被观察者发送的事件序列里面有没有某个事件

Observable.just(1,2,3)
                .contains(2)
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        System.out.println(aBoolean);
                    }
                });

 

6.8 isEmpty()

判断事件序列是否为空,不包括onError和onComplete

6.9 amb()

amb() 要传入一个 Observable 集合,但是只会发送最先发送事件的 Observable 中的事件,其余 Observable 将会被丢弃。

List<Observable<Integer>> list=new ArrayList<>();
        list.add(Observable.just(1,2,3,4).delay(3,TimeUnit.SECONDS));
        list.add(Observable.just(5,6,7));

        Observable.amb(list)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

 

6.10 defaultIfEmpty()

如果被观察者只发送了一个oncomplete那么可以利用这个方法发送一个值

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).defaultIfEmpty(1)
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        System.out.println(o);
                    }
                });