【Android】写给小白的RxJava教程(四)

阅读: 评论:0

【Android】写给小白的RxJava教程(四)

【Android】写给小白的RxJava教程(四)

简介:大三学生党一枚!主攻Android开发,对于Web和后端均有了解。
个人语录取乎其上,得乎其中,取乎其中,得乎其下,以顶级态度写好一篇的博客。

RxJava教程(四)

  • 一.再谈操作符
      • 1.1 zip操作符
      • 1.2 zip带来的问题
        • 1.2.1 如果两个Observable发射事件的数量不同会产生什么?
        • 1.2.2 如何避免OOM?
  • 二.Flowable
  • 三.总结

这是 RxJava系列的最后一篇,要想完全掌握 RxJava使用,只有先掌握其重要的方法,在具体场景使用到的时候,再查询其他的方法,节省学习成本。

一.再谈操作符

1.1 zip操作符

zip操作符其实在Python语言中也有体现,组合两个列表。同样的,在这里的功能是组合两个Observable! 打个比方吧,两条河流经过某处交汇最后形成一条河流,也就是把两个Observable组合成一个新的Observable,我们来看一个例子:

 private void LogByZip() {Observable observable1  = ate(new ObservableOnSubscribe() {@Overridepublic void subscribe(ObservableEmitter e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);}});Observable observable2 = ate(new ObservableOnSubscribe() {@Overridepublic void subscribe(ObservableEmitter e) throws Exception {e.onNext(4);e.onNext(5);e.onNext(6);}});Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Exception {return integer+integer2;}}).subscribe(new Observer<Integer>(){@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: "+integer);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}

可见observable1observable2的发射事件,比如observable1发射的第一个事件是1,observable2发射的第一个事件是4他们两个组合起来相加,观察者收到的就是5!
就这么简单吗?是的,但是还有几点需要注意,zip也会带来一些问题!

1.2 zip带来的问题

1.2.1 如果两个Observable发射事件的数量不同会产生什么?

两个Observable组合,如果第一个Observable发射五个事件,第二个Observable发射四个事件,那么观察者能收到几个事件呢?对上面的代码稍作修改

  private void LogByZip() {Observable observable1  = ate(new ObservableOnSubscribe() {@Overridepublic void subscribe(ObservableEmitter e) throws Exception {Log.d("observable1", "subscribe: "+1);e.onNext(1);Log.d("observable1", "subscribe: "+2);e.onNext(2);Log.d("observable1", "subscribe: "+3);e.onNext(3);Log.d("observable1", "subscribe: "+4);e.onNext(4);Log.d("observable1", "subscribe: "+5);e.onNext(5);}});Observable observable2 = ate(new ObservableOnSubscribe() {@Overridepublic void subscribe(ObservableEmitter e) throws Exception {Log.d("observable2", "subscribe: "+4);e.onNext(4);Log.d("observable2", "subscribe: "+5);e.onNext(5);Log.d("observable2", "subscribe: "+6);e.onNext(6);Log.d("observable2", "subscribe: "+7);e.onNext(7);}});Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Exception {return integer+integer2;}}).subscribe(new Observer<Integer>(){@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: "+integer);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}

看看打印结果:
Observable1

observable2

组合后的Observable

由此可得出结论,如果两个Observable组合,那么观察者接受到的事件数目以两个Observable中发射事件较少的那个数目为准。

此时我们可以引出另外一个问题,observable1发射完第一个事件以后,会继续发送还是等待和Observable2发射的第一个事件结合完再发出第二个事件呢?我们来做一个实验!让Observable2每次发送完事件以后睡眠2s.看一下日志是如何打印的

 Observable observable2 = ate(new ObservableOnSubscribe() {@Overridepublic void subscribe(ObservableEmitter e) throws Exception {Log.d("observable2", "subscribe: "+4);e.onNext(4);Thread.sleep(2000);Log.d("observable2", "subscribe: "+5);e.onNext(5);Thread.sleep(2000);Log.d("observable2", "subscribe: "+6);e.onNext(6);Thread.sleep(2000);Log.d("observable2", "subscribe: "+7);e.onNext(7);Thread.sleep(2000);}});

Observable1

Observable2


观察者打印的日志和Observable效果一样,每隔2s打印一个。那实验结果显而易见了,Observable1不会等待和Observable2结合完再发出第二个事件,而是直接把事件一次性都发送出去!

那么问题就又来了,如何存储Observable1发出的所有事件呢?如果不存储就会丢失,那我们就需要存储好这些事件。就相当于Observable1一次性把事件都放进一个容器里面,并满足先进先出的数据结构,对没错就是队列!用队列把事件存储起来。

那么问题又又又来了!!!
请试想一下,这个容器是无限大的嘛?如果Observable1每秒发送1000个事件,observable2每秒发送一个事件,那么Observable1中还剩余999个未处理的,速度严重不对称,会导致OOM.因为所有的事件都是存放在队列中的,并且这个队列还不限制大小,那就有可能会造成OOM了!如何解决呢?

1.2.2 如何避免OOM?

为了防止OOM,我们可以使用三种方式
1.降低过快的发射速度,可以让Observable1发射完以后,睡眠一段时间!缺点,影响性能
2.可以在Observable1事件所在的队列中取一部分处理,其他的丢弃,缺点,会导致事件丢失。
3.使用Flowable

还需要再补充同步异步的概念!请看下面的代码

  private void syncRxJava(){ate(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {for (int i = 0; ; i++) {   //无限循环发事件                                              e.onNext(i+"hello");}}}).subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(String s) {Log.d(TAG,"onNext:"+s);Thread.sleep(2000);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}

虽然在Observable中无限的发出事件,但是不会造成OOM,原因是什么呢?
因为他们两个处在同一个线程中,调用 就相当于调用Observer中的onNext(),所以事件都会被及时处理。这就是同步,天然的屏障!但是如果是运行在不同的线程中,那就会造成速度不同,Observable不停的发,但是处理者处理每个时间要停顿两秒,很明显,处理者就会忙不过来,造成容器中存储的事件数量快速增加,最后造成OOM

二.Flowable

FlowableObservable差不多,使用方式也差不多。

private void EasyUse() {Flowable<Integer> upstream = ate(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "emit 1");Next(1);Log.d(TAG, "emit 2");Next(2);Log.d(TAG, "emit 3");Next(3);Log.d(TAG, "emit complete");Complete();}}, BackpressureStrategy.ERROR); //增加了一个参数Subscriber<Integer> downstream = new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");s.request(Long.MAX_VALUE);  //注意这句代码}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: " + integer);}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}};upstream.subscribe(downstream);}

区别呢??????????

引用别人的一段话来说明Flowable的优点以及在代码中注释的

因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式来更好的解决上下游流速不均衡的问题, 与我们之前所讲的控制数量控制速度不太一样, 这种方式用通俗易懂的话来说就好比是叶问打鬼子, 我们把上游看成小日本, 把下游当作叶问, 当调用时, 叶问就说我要打一个! 然后小日本就拿出一个鬼子给叶问, 让他打, 等叶问打死这个鬼子之后, 再次调用request(10), 叶问就又说我要打十个! 然后小日本又派出十个鬼子给叶问, 然后就在边上看热闹, 看叶问能不能打死十个鬼子, 等叶问打死十个鬼子后再继续要鬼子接着打…所以我们把request当做是一种能力, 当成下游处理事件的能力, 下游能处理几个就告诉上游我要几个, 这样只要上游根据下游的处理能力来决定发送多少事件, 就不会造成一窝蜂的发出一堆事件来, 从而导致OOM. 这也就完美的解决之前我们所学到的两种方式的缺陷, 过滤事件会导致事件丢失, 减速又可能导致性能损失. 而这种方式既解决了事件丢失的问题, 又解决了速度的问题, 完美 !

附上这位大神的博客地址RxJava教程

三.总结

用四篇博客,为小白讲解了RxJava的基本使用方法,其实RxJava远不止如此,与Retrofit结合可以发挥出更好的实力。希望以后会写一篇两者结合的博客!

先别走,我有一个资源学习群要推荐给你,它是白嫖党的乐园,小白的天堂!

别再犹豫,一起来学习!

本文发布于:2024-02-04 19:29:11,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170714772558799.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:教程   Android   RxJava
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23