您当前的位置: 首页 >  Java
  • 0浏览

    0关注

    674博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

面试:Rxjava线程调度

沙漠一只雕得儿得儿 发布时间:2021-12-13 12:06:21 ,浏览量:0

RxJava2线程调度使用:

最简单易懂的RxJava2.0学习教程之RxJava2的线程调度(二)_简简单单_zz的博客-CSDN博客

使用RxJava的时候,在没有切换线程的情况下, 上游==被观察者(observable)和下游==观察者(observer)是工作在同一个线程中的,即都在主线程中。

话不多说上代码:

Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
                Log.d(TAG, "emitter=1");
                emitter.onNext(1);
            }
        });

        Observer observer = new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
                Log.d(TAG, "onNext: " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        observable.subscribe(observer);                                                           
}

运行结果:

这里写图片描述

 从结果中可以看到,Observable和Observer都是在主线程中。这样肯定是不行的,因为我们在实际开发中,是不允许在主线程中进行耗时操作的,比如网络请求。我们肯定是要在子线程中网络请求,然后在主线程更新UI。那应该怎么做呢?

首先,我们需要先改变上游Observable发送事件的线程, 让它去子线程中发送事件, 然后再改变下游Observer的线程, 让它在主线程接收事件,然后做相应的处理. 通过RxJava内置的线程调度器就可以实现这个需求了。 代码如下:

Observable observable = Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
                Log.d(TAG, "emitter=1");
                emitter.onNext(1);
            }
        });

        Observer observer = new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
                Log.d(TAG, "onNext: " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        observable.subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(observer);                                                             
}

运行结果:

这里写图片描述

可以看到,发送事件的线程改变了, 是在一个叫RxNewThreadScheduler-1的线程中发送的事件, 而下游在主线程中接收事件, 我们仅仅添加了下边两行代码就实现了这个需求。

.subscribeOn(Schedulers.newThread())                                              
.observeOn(AndroidSchedulers.mainThread())

 简单的来说, subscribeOn() 指定的是上游发送事件的线程, observeOn() 指定的是下游接收事件的线程. 多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn()只有第一次的有效。 多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次。  

Android RxJava2(五shi)功能操作符_好好学习 天天向上-CSDN博客

subscribeOn()

方法:

public final Observable subscribeOn(Scheduler scheduler)

作用: 指定被观察者的线程,有一点需要注意就是如果多次调用此方法,只有第一次有效。

代码:

        Observable.create(new ObservableOnSubscribe() {
            @Override
            public void subscribe(ObservableEmitter e) throws Exception {
                Log.e("---","threadName:"+Thread.currentThread().getName());
                e.onNext(1);
            }
        })
//                .subscribeOn(Schedulers.newThread())
                .subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---","onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.e("---","onNext:"+value);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","onError");
            }

            @Override
            public void onComplete() {
                Log.e("---","onComplete");
            }
        });

如果不指定被观察者运行线程,打印结果:

06-07 22:05:22.422 32737-32737/ E/---: onSubscribe
06-07 22:05:22.422 32737-32737/ E/---: threadName:main
06-07 22:05:22.422 32737-32737/ E/---: onNext:1

如果调用subscribeOn(Schedulers.newThread()),打印结果:

06-07 22:08:00.349 3772-3772/ E/---: onSubscribe
06-07 22:08:00.352 3772-3812/ E/---: threadName:RxNewThreadScheduler-1
06-07 22:08:00.352 3772-3812 E/---: onNext:1
observerOn()

方法:

public final Observable observeOn(Scheduler scheduler)

作用: 指定观察者的线程,每指定一次就会生效一次。代码:

        Observable.just(1).observeOn(Schedulers.newThread())
                .map(new Function() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        Log.e("---","threadName:"+Thread.currentThread().getName());
                        return integer * 2;
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("---","onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.e("---","onNext:"+value);
                Log.e("---","threadName:"+Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                Log.e("---","onError");
            }

            @Override
            public void onComplete() {
                Log.e("---","onComplete");
            }
        });

打印结果:

06-07 22:21:10.473 15136-15136/ E/---: onSubscribe
06-07 22:21:10.474 15136-15163/ E/---: threadName:RxNewThreadScheduler-1
06-07 22:21:10.492 15136-15136/ E/---: onNext:2
06-07 22:21:10.492 15136-15136/ E/---: threadName:main
06-07 22:21:10.492 15136-15136/ E/---: onComplete
二、RxJava2的线程选择:

Schedulers.io() :代表io操作的线程, 通常用于网络,读写文件等io密集型的操作 Schedulers.computation() :代表CPU计算密集型的操作, 例如需要大量计算的操作 Schedulers.newThread() :代表一个常规的新线程 AndroidSchedulers.mainThread() :代表Android的主线程

RxJava内置的Scheduler足够满足我们平时开发的需求, RxJava内部使用的是线程池来维护这些线程, 效率也比较高。 

关注
打赏
1657159701
查看更多评论
立即登录/注册

微信扫码登录

0.0388s