最简单易懂的RxJava2.0学习教程之RxJava2的线程调度(二)_简简单单_zz的博客-CSDN博客
使用RxJava的时候,在没有切换线程的情况下, 上游==被观察者(observable)和下游==观察者(observer)是工作在同一个线程中的,即都在主线程中。
话不多说上代码:
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emitter=1");
emitter.onNext(1);
}
});
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
observable.subscribe(observer);
}
运行结果:
从结果中可以看到,Observable和Observer都是在主线程中。这样肯定是不行的,因为我们在实际开发中,是不允许在主线程中进行耗时操作的,比如网络请求。我们肯定是要在子线程中网络请求,然后在主线程更新UI。那应该怎么做呢?
首先,我们需要先改变上游Observable发送事件的线程, 让它去子线程中发送事件, 然后再改变下游Observer的线程, 让它在主线程接收事件,然后做相应的处理. 通过RxJava内置的线程调度器就可以实现这个需求了。 代码如下:
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
Log.d(TAG, "emitter=1");
emitter.onNext(1);
}
});
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
}
运行结果:
可以看到,发送事件的线程改变了, 是在一个叫RxNewThreadScheduler-1的线程中发送的事件, 而下游在主线程中接收事件, 我们仅仅添加了下边两行代码就实现了这个需求。
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
简单的来说, subscribeOn() 指定的是上游发送事件的线程, observeOn() 指定的是下游接收事件的线程. 多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn()只有第一次的有效。 多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次。
Android RxJava2(五shi)功能操作符_好好学习 天天向上-CSDN博客
subscribeOn()方法:
public final Observable subscribeOn(Scheduler scheduler)
作用: 指定被观察者的线程,有一点需要注意就是如果多次调用此方法,只有第一次有效。
代码:
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
Log.e("---","threadName:"+Thread.currentThread().getName());
e.onNext(1);
}
})
// .subscribeOn(Schedulers.newThread())
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.e("---","onSubscribe");
}
@Override
public void onNext(Integer value) {
Log.e("---","onNext:"+value);
}
@Override
public void onError(Throwable e) {
Log.e("---","onError");
}
@Override
public void onComplete() {
Log.e("---","onComplete");
}
});
如果不指定被观察者运行线程,打印结果:
06-07 22:05:22.422 32737-32737/ E/---: onSubscribe
06-07 22:05:22.422 32737-32737/ E/---: threadName:main
06-07 22:05:22.422 32737-32737/ E/---: onNext:1
如果调用subscribeOn(Schedulers.newThread()),打印结果:
06-07 22:08:00.349 3772-3772/ E/---: onSubscribe
06-07 22:08:00.352 3772-3812/ E/---: threadName:RxNewThreadScheduler-1
06-07 22:08:00.352 3772-3812 E/---: onNext:1
observerOn()
方法:
public final Observable observeOn(Scheduler scheduler)
作用: 指定观察者的线程,每指定一次就会生效一次。代码:
Observable.just(1).observeOn(Schedulers.newThread())
.map(new Function() {
@Override
public Integer apply(Integer integer) throws Exception {
Log.e("---","threadName:"+Thread.currentThread().getName());
return integer * 2;
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.e("---","onSubscribe");
}
@Override
public void onNext(Integer value) {
Log.e("---","onNext:"+value);
Log.e("---","threadName:"+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.e("---","onError");
}
@Override
public void onComplete() {
Log.e("---","onComplete");
}
});
打印结果:
06-07 22:21:10.473 15136-15136/ E/---: onSubscribe
06-07 22:21:10.474 15136-15163/ E/---: threadName:RxNewThreadScheduler-1
06-07 22:21:10.492 15136-15136/ E/---: onNext:2
06-07 22:21:10.492 15136-15136/ E/---: threadName:main
06-07 22:21:10.492 15136-15136/ E/---: onComplete
二、RxJava2的线程选择:
Schedulers.io() :代表io操作的线程, 通常用于网络,读写文件等io密集型的操作 Schedulers.computation() :代表CPU计算密集型的操作, 例如需要大量计算的操作 Schedulers.newThread() :代表一个常规的新线程 AndroidSchedulers.mainThread() :代表Android的主线程
RxJava内置的Scheduler足够满足我们平时开发的需求, RxJava内部使用的是线程池来维护这些线程, 效率也比较高。