作者: maplejaw
本篇只解析标准包中的操作符。对于扩展包,由于使用率较低,如有需求,请读者自行查阅文档。
以下操作符用于创建Observable。
create: 使用OnSubscribe从头创建一个Observable,这种方法比较简单。需要注意的是,使用该方法创建时,建议在OnSubscribe#call方法中检查订阅状态,以便及时停止发射数据或者运算。
ate(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {Next("item1");Next("item2");Completed();}});
from: 将一个Iterable, 一个Future, 或者一个数组,内部通过代理的方式转换成一个Observable。Future转换为OnSubscribe
是通过OnSubscribeToObservableFuture
进行的,Iterable转换通过OnSubscribeFromIterable
进行。数组通过OnSubscribeFromArray
转换。
//IterableList<String> list=new ArrayList<>();...Observable.from(list).subscribe(new Action1<String>() {@Overridepublic void call(String s) {}});//FutureFuture<String> futrue= wSingleThreadExecutor().submit(new Callable<String>() {@Overridepublic String call() throws Exception {Thread.sleep(1000);return "maplejaw";}});Observable.from(futrue).subscribe(new Action1<String>() {@Overridepublic void call(String s) {}});
;
just: 将一个或多个对象转换成发射这个或这些对象的一个Observable。如果是单个对象,内部创建的是ScalarSynchronousObservable
对象。如果是多个对象,则是调用了from方法创建。
never: 创建一个什么都不做的Observable
Observable observable1pty();//直接调用onCompleted。Observable observable2(new RuntimeException());//直接调用onError。这里可以自定义异常Observable observable3ver();//啥都不做
timer: 创建一个在给定的延时之后发射数据项为0的Observable<Long>
,内部通过OnSubscribeTimerOnce
工作
Observable.timer(1000,TimeUnit.MILLISECONDS).subscribe(new Action1<Long>() {@Overridepublic void call(Long aLong) {Log.d("JG",String()); // 0}});
interval: 创建一个按照给定的时间间隔发射从0开始的整数序列的Observable<Long>
,内部通过OnSubscribeTimerPeriodically
工作。
Observable.interval(1, TimeUnit.SECONDS).subscribe(new Action1<Long>() {@Overridepublic void call(Long aLong) {//每隔1秒发送数据项,从0开始计数//0,1,}});
range: 创建一个发射指定范围的整数序列的Observable<Integer>
Observable.range(2,5).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {Log.d("JG",String());// 2,3,4,5,6 从2开始发射5个数据}});
defer: 只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable。内部通过OnSubscribeDefer
在订阅时调用Func0创建Observable。
Observable.defer(new Func0<Observable<String>>() {@Overridepublic Observable<String> call() {return Observable.just("hello");}}).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.d("JG",s);}});
以下操作符用于组合多个Observable。
注意,为了使结构更加清晰以及缩小代码量,之后的例子部分地方将会使用Lambda表达式书写,如果你对Lambda表达式不太熟悉的话,可以阅读JAVA8 Lambda表达式完全解析这篇文章。
concat: 按顺序连接多个Observables。需要注意的是
Observable<Integer> observable1=Observable.just(1,2,3,4);Observable<Integer> observable2=Observable.just(4,5,6);at(observable1,observable2).subscribe(item->Log.d("JG",String()));//1,2,3,4,4,5,6
startWith: 在数据序列的开头增加一项数据。startWith
的内部也是调用了concat
Observable.just(1,2,3,4,5).startWith(6,7,8).subscribe(item->Log.d("JG",String()));//6,7,8,1,2,3,4,5
merge: 将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接。其中mergeDelayError
将异常延迟到其它没有错误的Observable发送完毕后才发射。而merge
则是一遇到异常将停止发射数据,发送onError通知。
zip: 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。如果多个Observable发射的数据量不一样,则以最少的Observable为标准进行压合。内部通过OperatorZip
进行压合。
Observable<Integer> observable1=Observable.just(1,2,3,4);
Observable<Integer> observable2=Observable.just(4,5,6);Observable.zip(observable1, observable2, new Func2<Integer, Integer, String>() {@Overridepublic String call(Integer item1, Integer item2) {return item1+"and"+item2;}}).subscribe(item->Log.d("JG",item)); //1and4,2and5,3and6
combineLatest: 。当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果。类似于zip,但是,不同的是zip只有在每个Observable都发射了数据才工作,而combineLatest任何一个发射了数据都可以工作,每次与另一个Observable最近的数据压合。具体请看下面流程图。
zip工作流程
combineLatest工作流程
filter: 过滤数据。内部通过OnSubscribeFilter
过滤数据。
Observable.just(3,4,5,6).filter(new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer integer) {return integer>4;}}).subscribe(item->Log.d("JG",String())); //5,6
ofType: 过滤指定类型的数据,与filter类似,
Observable.just(1,2,"3").ofType(Integer.class).subscribe(item -> Log.d("JG",String()));
take: 只发射开始的N项数据或者一定时间内的数据。内部通过OperatorTake
和OperatorTakeTimed
过滤数据。
Observable.just(3,4,5,6).take(3)//发射前三个数据项.take(100, TimeUnit.MILLISECONDS)//发射100ms内的数据
takeLast: 只发射最后的N项数据或者一定时间内的数据。内部通过OperatorTakeLast
和OperatorTakeLastTimed
过滤数据。takeLastBuffer和takeLast类似,不同点在于takeLastBuffer会收集成List后发射。
Observable.just(3,4,5,6).takeLast(3).subscribe(integer -> Log.d("JG",String()));//4,5,6
takeFirst:提取满足条件的第一项。内部实现源码如下:
public final Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {return filter(predicate).take(1); //先过滤,后提取
}
first/firstOrDefault:只发射第一项(或者满足某个条件的第一项)数据,可以指定默认值。
Observable.just(3,4,5,6).first().subscribe(integer -> Log.d("JG",String()));//3Observable.just(3,4,5,6).first(new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer integer) {return integer>3;}}) .subscribe(integer -> Log.d("JG",String()));//4
last/lastOrDefault:只发射最后一项(或者满足某个条件的最后一项)数据,可以指定默认值。
skip:跳过开始的N项数据或者一定时间内的数据。内部通过OperatorSkip
和OperatorSkipTimed
实现过滤。
Observable.just(3,4,5,6).skip(1).subscribe(integer -> Log.d("JG",String()));//4,5,6
skipLast:跳过最后的N项数据或者一定时间内的数据。内部通过OperatorSkipLast
和OperatorSkipLastTimed
实现过滤。
OperatorElementAt
过滤。 Observable.just(3,4,5,6).elementAt(2).subscribe(item->Log.d("JG",String())); //5
ignoreElements:丢弃所有数据,只发射错误或正常终止的通知。内部通过OperatorIgnoreElements
实现。
distinct:过滤重复数据,内部通过OperatorDistinct
实现。
Observable.just(3,4,5,6,3,3,4,9).distinct().subscribe(item->Log.d("JG",String())); //3,4,5,6,9
distinctUntilChanged:过滤掉连续重复的数据。内部通过OperatorDistinctUntilChanged
实现
Observable.just(3,4,5,6,3,3,4,9).distinctUntilChanged().subscribe(item->Log.d("JG",String())); //3,4,5,6,3,4,9
throttleFirst:定期发射Observable发射的第一项数据。内部通过OperatorThrottleFirst
实现。
ate(subscriber -> {Next(1);try {Thread.sleep(500);} catch (InterruptedException e) {throw Exceptions.propagate(e);}Next(2);try {Thread.sleep(500);} catch (InterruptedException e) {throw Exceptions.propagate(e);}Next(3);try {Thread.sleep(1000);} catch (InterruptedException e) {throw Exceptions.propagate(e);}Next(4);Next(5);Completed();}).throttleFirst(999, TimeUnit.MILLISECONDS).subscribe(item-> Log.d("JG",String())); //结果为1,3,4
throttleWithTimeout/debounce:发射数据时,如果两次数据的发射间隔小于指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时
才进行发射
ate(subscriber -> {Next(1);try {Thread.sleep(500);} catch (InterruptedException e) {throw Exceptions.propagate(e);}Next(2);try {Thread.sleep(500);} catch (InterruptedException e) {throw Exceptions.propagate(e);}Next(3);try {Thread.sleep(1000);} catch (InterruptedException e) {throw Exceptions.propagate(e);}Next(4);Next(5);Completed();}).debounce(999, TimeUnit.MILLISECONDS)//或者为throttleWithTimeout(1000, TimeUnit.MILLISECONDS).subscribe(item-> Log.d("JG",String())); //结果为3,5
sample/throttleLast:定期发射Observable最近的数据。内部通过OperatorSampleWithTime
实现。
ate(subscriber -> {Next(1);try {Thread.sleep(500);} catch (InterruptedException e) {throw Exceptions.propagate(e);}Next(2);try {Thread.sleep(500);} catch (InterruptedException e) {throw Exceptions.propagate(e);}Next(3);try {Thread.sleep(1000);} catch (InterruptedException e) {throw Exceptions.propagate(e);}Next(4);Next(5);Completed();}).sample(999, TimeUnit.MILLISECONDS)//或者为throttleLast(1000, TimeUnit.MILLISECONDS).subscribe(item-> Log.d("JG",String())); //结果为2,3,5
timeout: 如果原始Observable过了指定的一段时长没有发射任何数据,就发射一个异常或者使用备用的Observable。
ate(( subscriber) -> {Next(1);try {Thread.sleep(1000);} catch (InterruptedException e) {throw Exceptions.propagate(e);}Next(2);Completed();}).timeout(999, TimeUnit.MILLISECONDS,Observable.just(99,100))//如果不指定备用Observable将会抛出异常.subscribe(item-> Log.d("JG",String()),error->Log.d("JG","onError")); //结果为1,99,100 如果不指定备用Observable结果为1,onError
}
all: 判断所有的数据项是否满足某个条件,内部通过OperatorAll
实现。
Observable.just(2,3,4,5).all(new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer integer) {return integer>3;}}).subscribe(new Action1<Boolean>() {@Overridepublic void call(Boolean aBoolean) {Log.d("JG",String()); //false}});
exists: 判断是否存在数据项满足某个条件。内部通过OperatorAny
实现。
Observable.just(2,3,4,5).exists(integer -> integer>3).subscribe(aBoolean -> Log.d("JG",String())); //true
contains: 判断在发射的所有数据项中是否包含指定的数据,内部调用的其实是exists
Observable.just(2,3,4,5).contains(3).subscribe(aBoolean -> Log.d("JG",String())); //true
sequenceEqual: 用于判断两个Observable发射的数据是否相同(数据,发射顺序,终止状态)。
Observable.sequenceEqual(Observable.just(2,3,4,5),Observable.just(2,3,4,5)).subscribe(aBoolean -> Log.d("JG",String()));//true
isEmpty: 用于判断Observable发射完毕时,有没有发射数据。有数据false,如果只收到了onComplete通知则为true。
Observable.just(3,4,5,6).isEmpty().subscribe(item -> Log.d("JG",String()));//false
amb: 给定多个Observable,只让第一个发射数据的Observable发射全部数据,其他Observable将会被忽略。
Observable<Integer> observable1ate(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {try {Thread.sleep(1000);} catch (InterruptedException e) {Error(e);}Next(1);Next(2);Completed();}}).subscribeOn(Schedulersputation());Observable<Integer> observable2ate(subscriber -> {Next(3);Next(4);Completed();});Observable.amb(observable1,observable2).subscribe(integer -> Log.d("JG",String())); //3,4
switchIfEmpty: 如果原始Observable正常终止后仍然没有发射任何数据,就使用备用的Observable。
pty().switchIfEmpty(Observable.just(2,3,4)).subscribe(o -> Log.d("JG",o.toString())); //2,3,4
takeUntil: 当发射的数据满足某个条件后(包含该数据),或者第二个Observable发送完毕,终止第一个Observable发送数据。
Observable.just(2,3,4,5).takeUntil(new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer integer) {return integer==4;}}).subscribe(integer -> Log.d("JG",String())); //2,3,4
takeWhile: 当发射的数据满足某个条件时(不包含该数据),Observable终止发送数据。
Observable.just(2,3,4,5).takeWhile(new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer integer) {return integer==4;}}).subscribe(integer -> Log.d("JG",String())); //2,3
reduce: 对序列使用reduce()函数并发射最终的结果,内部使用OnSubscribeReduce
实现。
Observable.just(2,3,4,5).reduce(new Func2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer sum, Integer item) {return sum+item;}}).subscribe(integer -> Log.d("JG",String()));//14
collect: 使用collect
收集数据到一个可变的数据结构。
Observable.just(3,4,5,6).collect(new Func0<List<Integer>>() { //创建数据结构@Overridepublic List<Integer> call() {return new ArrayList<Integer>();}}, new Action2<List<Integer>, Integer>() { //收集器@Overridepublic void call(List<Integer> integers, Integer integer) {integers.add(integer);}}).subscribe(new Action1<List<Integer>>() {@Overridepublic void call(List<Integer> integers) {}});
count/countLong: 计算发射的数量,内部调用的是reduce
.
toList: 收集原始Observable发射的所有数据到一个列表,然后返回这个列表.
Observable.just(2,3,4,5).toList().subscribe(new Action1<List<Integer>>() {@Overridepublic void call(List<Integer> integers) {}});
toSortedList: 收集原始Observable发射的所有数据到一个有序列表,然后返回这个列表。
Observable.just(6,2,3,4,5).toSortedList(new Func2<Integer, Integer, Integer>() {//自定义排序@Overridepublic Integer call(Integer integer, Integer integer2) {return integer-integer2; //>0 升序 ,<0 降序}}).subscribe(new Action1<List<Integer>>() {@Overridepublic void call(List<Integer> integers) {Log.d("JG",String()); // [2, 3, 4, 5, 6]}});
toMap: 将序列数据转换为一个Map。我们可以根据数据项生成key和生成value。
Observable.just(6,2,3,4,5).toMap(new Func1<Integer, String>() {@Overridepublic String call(Integer integer) {return "key:" + integer; //根据数据项生成map的key}}, new Func1<Integer, String>() {@Overridepublic String call(Integer integer) {return "value:"+integer; //根据数据项生成map的kvalue}}).subscribe(new Action1<Map<String, String>>() {@Overridepublic void call(Map<String, String> stringStringMap) {Log.d("JG",String()); // {key:6=value:6, key:5=value:5, key:4=value:4, key:2=value:2, key:3=value:3}}});
map: 对Observable发射的每一项数据都应用一个函数来变换。
Observable.just(6,2,3,4,5).map(integer -> "item:"+integer).subscribe(s -> Log.d("JG",s));//item:6,
flatMap: 将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部采用merge合并。
Observable.just(2,3,5).flatMap(new Func1<Integer, Observable<String>>() {@Overridepublic Observable<String> call(Integer integer) {ate(subscriber -> {Next(integer*10+"");Next(integer*100+"");Completed();});}}).subscribe(o -> Log.d("JG",o)) //20,200,30,300,50,500
flatMapIterable: 和flatMap的作用一样,只不过生成的是Iterable而不是Observable。
Observable.just(2,3,5).flatMapIterable(new Func1<Integer, Iterable<String>>() {@Overridepublic Iterable<String> call(Integer integer) {return Arrays.asList(integer*10+"",integer*100+"");}}).subscribe(new Action1<String>() {@Overridepublic void call(String s) {}});
switchMap: 和flatMap很像,将Observable发射的数据变换为Observables集合,当原始Observable发射一个新的数据(Observable)时,它将取消订阅前一个Observable。
ate(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {for(int i=1;i<4;i++){Next(i);Utils.sleep(500,subscriber);//线程休眠Completed();}}).wThread()).switchMap(new Func1<Integer, Observable<Integer>>() {@Overridepublic Observable<Integer> call(Integer integer) {//每当接收到新的数据,之前的Observable将会被取消订阅ate(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> subscriber) {Next(integer*10);Utils.sleep(500,subscriber);Next(integer*100);Completed();}}).wThread());}}).subscribe(s -> Log.d("JG",s.toString()));//10,20,30,300
scan: 与reduce很像,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值。
Observable.just(2,3,5).scan(new Func2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer sum, Integer item) {return sum+item;}}).subscribe(integer -> Log.d("JG",String())) //2,5,10
groupBy: 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据。
Observable.just(2,3,5,6).groupBy(new Func1<Integer, String>() {@Overridepublic String call(Integer integer) {//分组return integer%2==0?"偶数":"奇数";}}).subscribe(new Action1<GroupedObservable<String, Integer>>() {@Overridepublic void call(GroupedObservable<String, Integer> o) {o.subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {Log.d("JG",o.getKey()+":"String()); //偶数:2,奇数:3,...}});}})
buffer: 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
Observable.just(2,3,5,6).buffer(3).subscribe(new Action1<List<Integer>>() {@Overridepublic void call(List<Integer> integers) {}})
window: 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。
Observable.just(2,3,5,6).window(3).subscribe(new Action1<Observable<Integer>>() {@Overridepublic void call(Observable<Integer> integerObservable) {integerObservable.subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {}});}})
onErrorResumeNext: 当原始Observable在遇到错误时,使用备用Observable。。
Observable.just(1,"2",3).cast(Integer.class).onErrorResumeNext(Observable.just(1,2,3)).subscribe(integer -> Log.d("JG",String())) //1,2,3;
onErrorResumeNext
类似,区别在于onErrorResumeNext
可以处理所有的错误,onExceptionResumeNext只能处理异常。onErrorReturn: 当原始Observable在遇到错误时发射一个特定的数据。
Observable.just(1,"2",3).cast(Integer.class).onErrorReturn(new Func1<Throwable, Integer>() {@Overridepublic Integer call(Throwable throwable) {return 4;}}).subscribe(new Action1<Integer>() {@Overridepublic void call(Integer integer) {Log.d("JG",String());1,4}});
retry: 当原始Observable在遇到错误时进行重试。
Observable.just(1,"2",3).cast(Integer.class).retry(3).subscribe(integer -> Log.d("JG",String()),throwable -> Log.d("JG","onError"));//1,1,1,1,onError
retryWhen: 当原始Observable在遇到错误,将错误传递给另一个Observable来决定是否要重新订阅这个Observable,内部调用的是retry
。
Observable.just(1,"2",3).cast(Integer.class).retryWhen(new Func1<Observable<? extends Throwable>, Observable<Long>>() {@Overridepublic Observable<Long> call(Observable<? extends Throwable> observable) {return Observable.timer(1, TimeUnit.SECONDS);}}).subscribe(integer -> Log.d("JG",String()),throwable -> Log.d("JG","onError"));//1,1
ConnectableObservable
与普通的Observable差不多,但是可连接的Observable在被订阅时并不开始发射数据,只有在它的connect()被调用时才开始。用这种方法,你可以等所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。
Observable.publish()
将一个Observable转换为一个可连接的Observable
ConnectableObservable
,即使它们在Observable开始发射数据之后才订阅。
ConnectableObservable<Integer> co= Observable.just(1,2,3).publish();co .subscribe(integer -> Log.d("JG",String()) );co.connect();//此时开始发射数据
BlockingObservable
是一个阻塞的Observable。普通的Observable 转换为 BlockingObservable,可以使用 BlockingObservable.from( )
方法。内部通过CountDownLatch
实现了阻塞操作。
以下的操作符可以用于BlockingObservable,如果是普通的Observable,务必使用Blocking()转为阻塞Observable后使用,否则达不到预期的效果。
forEach: 对BlockingObservable发射的每一项数据调用一个方法,会阻塞直到Observable完成。
Observable.just(2,3).wThread()).toBlocking().forEach(integer -> {Log.d("JG",String()+" "+Thread.currentThread().getName());Utils.sleep(500);});Log.d("JG",Thread.currentThread().getName());// 2 RxNewThreadScheduler-1// 3 RxNewThreadScheduler-1// main
materialize: 将Observable转换成一个通知列表。
Observable.just(1,2,3).materialize().subscribe(new Action1<Notification<Integer>>() {@Overridepublic void call(Notification<Integer> notification) {Log.d("JG",Kind()+" "Value());//OnNext 1//OnNext 2//OnNext 3//OnCompleted null}});
timestamp: 给Observable发射的每个数据项添加一个时间戳。
Observable.just(1,2,3).timestamp().subscribe(new Action1<Timestamped<Integer>>() {@Overridepublic void call(Timestamped<Integer> timestamped) {Log.d("JG",TimestampMillis()+" "Value());//1472627510548 1//1472627510549 2//1472627510549 3}});
timeInterval:给Observable发射的两个数据项间添加一个时间差,实现在OperatorTimeInterval
中
doOnEach: 注册一个动作,对Observable发射的每个数据项使用
Observable.just(2,3).doOnEach(new Action1<Notification<? super Integer>>() {@Overridepublic void call(Notification<? super Integer> notification) {Log.d("JG","--doOnEach--"String());}}).subscribe(integer -> Log.d("JG",String()));
//结果为: // --doOnEach--[rx.Notification@133c40b0 OnNext 2]
// 2// --doOnEach--[rx.Notification@133c40b0 OnNext 3]
// 3
// --doOnEach--[rx.Notification@df4db0e OnCompleted]
doOnCompleted: 注册一个动作,对正常完成的Observable使用
doOnTerminate:注册一个动作,对完成的Observable使用,无论是否发生错误
Observable.just(2,3).doOnTerminate(new Action0() {@Overridepublic void call() {Log.d("JG","--doOnTerminate--");}}).subscribe(integer -> Log.d("JG",String()));
// 2 , 3 , --doOnTerminate--
OperatorDoOnSubscribe
实现,OperatorDoOnUnsubscribe
实现,在call
中加入一个解绑动作。 finallyDo/doAfterTerminate: 注册一个动作,在Observable完成时使用
Observable.just(2,3).doAfterTerminate(new Action0() {@Overridepublic void call() {Log.d("JG","--doAfterTerminate--");}}).subscribe(integer -> Log.d("JG",String()));
//2,3, --doAfterTerminate--
delay: 延时发射Observable的结果。即让原始Observable在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量(除了onError,它会即时通知)。
delaySubscription: 延时处理订阅请求。实现在OnSubscribeDelaySubscription
中
using: 创建一个只在Observable生命周期存在的资源,当Observable终止时这个资源会被自动释放。
Observable.using(new Func0<File>() {//资源工厂@Overridepublic File call() {File file = new File(getCacheDir(), ");if(!ists()){try {Log.d("JG","--create--");ateNewFile();} catch (IOException e) {e.printStackTrace();}}return file;}}, new Func1<File, Observable<String>>() { //Observable@Overridepublic Observable<String> call(File file) {return Observable.ists() ? "exist" : "no exist");}}, new Action1<File>() {//释放资源动作@Overridepublic void call(File file) {if(file!=null&&ists()){Log.d("JG","--delete--");file.delete();}}}).subscribe(s -> Log.d("JG",s));//--create--//exist//--delete--
关于RxJava标准库的操作符已经介绍完毕,纯粹当个备忘录。如有错误之处,欢迎指出。
本文发布于:2024-01-31 11:04:26,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170667026528051.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |