加入收藏 | 设为首页 | 会员中心 | 我要投稿 我爱资讯网 (https://www.52junxun.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Unix > 正文

你真的了解RxJava的线程切换吗?

发布时间:2022-10-21 10:56:43 所属栏目:Unix 来源:
导读:  使用RxJava可以轻松地实现线程切换,所以在Android中常被用来替代AsyncTask、Handler等原生工具类。使用起来虽然简单,但如果不了解其背后的基本原理unix线程切换,很可能因为使用不当而写出bug。本文将带大家简
  使用RxJava可以轻松地实现线程切换,所以在Android中常被用来替代AsyncTask、Handler等原生工具类。使用起来虽然简单,但如果不了解其背后的基本原理unix线程切换,很可能因为使用不当而写出bug。本文将带大家简单了解一下RxJava线程切换的实现原理以及开发中的注意事项
 
  1. Basic UsageIf you want to introduce multithreading into your cascade of Observable operators, you can do so by instructing those operators to operate on particular Schedulers.
 
  通过Scheduler让操作符跑在指定线程,从而实现多线程调度specify the Scheduler on which an observer will observe this Observable
 
  指定一个观察者在哪个调度器上观察这个Observablespecify the Scheduler on which an Observable will operate
 
  指定Observable自身在哪个调度器上执行
 
  4核4线程和4核8线程_unix线程切换_4核4线程 4核8线程
 
  4核4线程和4核8线程_unix线程切换_4核4线程 4核8线程
 
  RxJava调用链中每个操作符都会创建一个新的Observable,操作符产生的新Observable都会向上层的Observable注册回调。 subscribeOn和observeOn的实现原理一样:
 
  RxJava自下而上建立订阅,而后自上而下发射数据,所以 subscribeOn 即使出现 observeOn 之后也能保证数据源运行的线程,因为订阅永远发生在前。
 
  2. subscribeOn2.1 实现原理
 
  通过源码了解一下subscribeOn实现线程切换的基本原理
 
  //ObservableSubscribeOn.java
  final class ObservableSubscribeOn extends Observable<T> {
      @Override
      public void subscribeActual(final Observer super T> s) {
          final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
          s.onSubscribe(parent);
          // 没有直接调用subscribe订阅,而是先进行了线程变换(scheduler.scheduleDirect)
          parent.setDisposable(
              scheduler.scheduleDirect(new SubscribeTask(parent)));
      }
      final class SubscribeTask implements Runnable {
          @Override
          public void run() {
              // run()会在指定的scheduler调用,向上游订阅时线程已经发生了变化
              // 所以保证了上游所运行的线程
              source.subscribe(parent);
          }
      }
      static final
      class SubscribeOnObserver<T> implements Observer<T>, Disposable {
          @Override
          public void onNext(T t) {
              // 收到数据后不进行线程变换
              actual.onNext(t);
          }
      }
  }
  2.2 subscribeOn只生效一次
 
  subscribeOn通过切换订阅线程,改变Observable.create所在线程,从而影响数据的发射线程。
 
  由于订阅过程自下而上,所以Observable.create只受最近一次subscribeOn影响,当调用链中有多个subscribeOn时只有第一个有效。其他subscibeOn仍然可以影响其上游的doOnSubscribe的执行线程。
 
  @Test
  fun test() {
      Observable.create<Unit> { emitter ->
          log("onSubscribe")
          emitter.onNext(Unit)
          emitter.onComplete()
      }.subscribeOn(namedScheduler("1 - subscribeOn"))
          .doOnSubscribe { log("1 - doOnSubscribe") }
          .subscribeOn(namedScheduler("2 - subscribeOn"))
          .doOnSubscribe { log("2 - doOnSubscribe") }
          .doOnNext { log("onNext") }
          .test().awaitTerminalEvent() // Wait until observable completes
   }
  4核4线程 4核8线程_4核4线程和4核8线程_unix线程切换
 
  2.3 正确理解subscribeOn的意义Even though we added .subscribeOn() that is not enough. SubscribeOn operator only switches the subscribing process to the desired thread, but that doesn’t mean the items will be emitted on that thread.
 
  subscribeOn用来决定订阅线程,但这并不意味着上游数据一定来自此线程
 
  @Test
  fun test() {
      val observable = Observable.create<Int> { emitter ->
          log("onSubscribe")
          thread(name = "Main thread", isDaemon = false) {
              log("1 - emitting"); emitter.onNext(1)
              log("2 - emitting"); emitter.onNext(2)
              log("3 - emitting"); emitter.onNext(3)
              emitter.onComplete()
          }
      }
      observable
          .subscribeOn(Schedulers.computation())
          .doOnNext { log("$it - after subscribeOn") }
          .test().awaitTerminalEvent() // Wait until observable completes
  }
  4核4线程 4核8线程_unix线程切换_4核4线程和4核8线程
 
  正确理解subscribeOn的含义有助于避免一些使用上的误区:
 
  对于PublishSubject无效
 
  @Test
  fun test() {
      val subject = PublishSubject.create<Int>()
      val observer1 = subject
          .subscribeOn(Schedulers.io())
          .doOnNext { log("$it - I want this happen on an IO thread") }
          .test()
      val observer2 = subject
          .subscribeOn(Schedulers.newThread())
          .doOnNext { log("$it - I want this happen on a new thread") }
          .test()
      sleep(10);
      subject.onNext(1)
      subject.onNext(2)
      subject.onNext(3)
      subject.onComplete()
      observer1.awaitTerminalEvent()
      observer2.awaitTerminalEvent()
  }
  4核4线程 4核8线程_unix线程切换_4核4线程和4核8线程
 
  对于PublishSubject来说,上游数据来自哪个线程是在onNext时决定的,所以对一个PublishSubject使用使用subscribeOn没有意义。
 
  对于Observable.just()无效
 
  通常subcribeOn可以决定Observable.create {...} 的执行线程,因此很多初学者容易犯的一个错误是在Observable.just(...)里做耗时任务,并误认为会跑在subscribeOn的线程:
 
  4核4线程 4核8线程_unix线程切换_4核4线程和4核8线程
 
  如上,readFromDb() 放在just中显然是不合适的。just()在当前线程立即执行,因此不受subscribeOn影响,应该修改如下:
 
  //Observable.defer
  Observable.defer { Observable.just(readFromDb()) }
      .subscribeOn(Schedulers.io())
      .subscribe { ... }
  //Observable.fromCallable
  Observable.fromCallable { readFromDb() }
      .subscribeOn(Schedulers.io())
      .subscribe { ... }
  使用flatMap处理并发
 
  subscribeOn决定的当前Observable的订阅线程,因此对于flatMap的使用要特别留心
 
  Observable.fromIterable(listOf("id1", "id2", "id3"))
      .flatMap { id -> loadData(id) }
      .subscribeOn(Schedulers.io())
      .observeOn(mainThread())
      .toList()
      .subscribe { result -> log(result) }
  如果我们希望多个loadData(id)并发执行,上述写法是错误的。
 
  subscribeOn决定了flatMap上游线程,flatMap返回多个Observable的订阅都是发生在此线程,多个loadData只能运行在单一线程,无法实现并行。
 
  想要达到并行执行效果,需要修改如下:
 
  Observable.fromIterable(listOf("id1", "id2", "id3"))
      .flatMap { id ->
          loadData(id)
              .subscribeOn(Schedulers.io())
      }
      .observeOn(mainThread())
      .toList()
      .subscribe { result -> log(result) }
  3.observeOn3.1 实现原理
 
  通过源码了解一下observeOn实现线程切换的基本原理
 
  //ObservableObserveOn.java
  final class ObservableObserveOn extends Observable<T> {
      @Override
      protected void subscribeActual(Observer super T> observer) {
          if (scheduler instanceof TrampolineScheduler) {
              source.subscribe(observer);
          } else {
              Scheduler.Worker w = scheduler.createWorker();
              // 直接向上游订阅数据,不进行线程切换,切换操作在Observer中进行
              source.subscribe(
                  new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
          }
      }
      static final class ObserveOnObserver<T> implements Observer<T>, Runnable {
          @Override
          public void onNext(T t) {
              if (done) {
                  return;
              }
              // 这里选把数据放到队列中,增加吞吐量,提高性能
              if (sourceMode != QueueDisposable.ASYNC) {
                  queue.offer(t);
              }
              // 在schedule方法里进行线程切换并把数据循环取出
              // 回调给下游,下游会在指定的线程中收到数据
              schedule();
          }
          void schedule() {
              if (this.getAndIncrement() == 0) {
                  //切换线程
                  this.worker.schedule(this);
              }
          }
      }
  }
  3.2 observeOn多次生效
 
  不同于subscribeOn,observeOn可以有多个而且每个都会生效
 
  3.3 连续发射多个item时能否保证串行?
 
  observeOn使用Scheduler调度线程后,下游是运行在单线程中还是多个线程中?能否保证下游数据的有序性?
 
  @Test
  fun test() {
      Observable.create<Int> { emitter ->
          repeat(10) {
              emitter.onNext(it)
          }
          emitter.onComplete()
      }.observeOn(Schedulers.io())
          .subscribe {
              log(" - $it")
          }
  }
  unix线程切换_4核4线程和4核8线程_4核4线程 4核8线程
 
  通过结果可以看到,即使经Scheduler调度之后,下游仍然运行在单一线程,可以保证数据在整个调用链上的有序性。
 
  那么为什么经过Scheduler调度后都跑在单一线程呢?
 
  4. Scheduler4.1 实现原理
 
  4核4线程和4核8线程_4核4线程 4核8线程_unix线程切换
 
  4核4线程和4核8线程_unix线程切换_4核4线程 4核8线程
 
  Scheculer并非直接调度Runnable,而是创建Worker,再由Worker来调度具体任务。
 
  subscribeOn中的SubscribeTask以及observeOn中的ObserveOnObserver都实现了Runnable,所以最终都是在Worker中执行。
 
  4.2 任务是由Worker调度的
 
  4核4线程 4核8线程_unix线程切换_4核4线程和4核8线程
 
  一个Scheduler可以创建多个 Worker,一个Worker可以管理多个Task(Runnable)
 
  Worker 的存在为了确保两件事:
 
  4.3 Worker何保证串行?
 
  非常简单,每个Worker只有一个线程
 
  unix线程切换_4核4线程 4核8线程_4核4线程和4核8线程
 
  4核4线程和4核8线程_unix线程切换_4核4线程 4核8线程
 
  现在可以解答疑问了:为什么observeOn经过Scheduerl调度后,仍然跑在单一线程?
 
  Scheduler为每个observeOn分配唯一Worker,因此observeOn的下游可以保证在单一线程串行执行。
 
  //ObservableObserveOn.java
  final class ObservableObserveOn extends Observable<T> {
      @Override
      protected void subscribeActual(Observer super T> observer) {
          if (scheduler instanceof TrampolineScheduler) {
              source.subscribe(observer);
          } else {
              Scheduler.Worker w = scheduler.createWorker();
              source.subscribe(
                  new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); //传入worker
          }
      }
    ...
  }
  如上,Worker作为ObserveOnObserver的成员变量被持有
 
  4.4 预置Schedulers
 
  如同Executors提供了多种ThreadPoolExecutor一样,Schedulers提供了多种预设的Scheduler
 
  4核4线程和4核8线程_4核4线程 4核8线程_unix线程切换
 
  //Sample of Schedulers.from
  fun namedScheduler(name: String): Scheduler {
      return Schedulers.from(
          Executors.newCachedThreadPool { Thread(it, name) }
      )
  }
  Thread-Safety5.1 RxJava操作符是否线程安全?
 
  @Test
  fun test() {
      val numberOfThreads = 1000
      val publishSubject = PublishSubject.create<Int>()
      val actuallyReceived = AtomicInteger()
      publishSubject
          .take(300).subscribe {
              actuallyReceived.incrementAndGet()
          }
      val latch = CountDownLatch(numberOfThreads)
      var threads = listOf<Thread>()
      (0..numberOfThreads).forEach {
          threads += thread(start = false) {
              publishSubject.onNext(it)
              latch.countDown()
          }
      }
      threads.forEach { it.start() }
      latch.await()
      val sum = actuallyReceived.get()
      check(sum == 300) { "$sum != 300" }
  }
  结果不符合预期,因为take不是线程安全的
 
  看一下take的源码
 
  public final class ObservableTake<T> extends AbstractObservableWithUpstream<T, T> {
      final long limit;
      public ObservableTake(ObservableSource<T> source, long limit) {
          super(source);
          this.limit = limit;
      }
      protected void subscribeActual(Observer super T> observer) {
          this.source.subscribe(new ObservableTake.TakeObserver(observer, this.limit));
      }
      static final class TakeObserver<T> implements Observer<T>, Disposable {
          final Observer super T> downstream;
          boolean done;
          Disposable upstream;
          long remaining;
          TakeObserver(Observer super T> actual, long limit) {
              this.downstream = actual;
              this.remaining = limit;
          }
          public void onNext(T t) {
              if (!this.done && this.remaining-- > 0L) {
                  boolean stop = this.remaining == 0L;
                  this.downstream.onNext(t);
                  if (stop) {
                      this.onComplete();
                  }
              }
          }
      }
  }
  果然不出所料 remaining--没有加锁操作
 
  5.2 observableOn的线程安全
 
  那如果加上observableOn是不是就保证串行了呢,因为take可以跑在单一线程上了。
 
  @Test
  fun test() {
      repeat(10000) {
          val numberOfThreads = 1000
          val publishSubject = PublishSubject.create<Int>()
          val actuallyReceived = AtomicInteger()
          publishSubject
              .observeOn(Schedulers.io())
              .take(300).subscribe {
                  actuallyReceived.incrementAndGet()
              }
          val latch = CountDownLatch(numberOfThreads)
          var threads = listOf<Thread>()
          (0..numberOfThreads).forEach {
              threads += thread(start = false) {
                  publishSubject.onNext(it)
                  latch.countDown()
              }
          }
          threads.forEach { it.start() }
          latch.await()
          check(actuallyReceived.get() == 300)
      }
  }
  很遗憾,多次运行后发现依然有问题,因为observableOn本身也不是线程安全的,observableOn中使用的queue是一个非线程安全队列。
 
  4核4线程和4核8线程_4核4线程 4核8线程_unix线程切换
 
  4核4线程和4核8线程_4核4线程 4核8线程_unix线程切换
 
  5.3 The Observable Contract
 
  Rx在对Observable的定义中已经明确告诉我们了:
 
  Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.
 
  reactivex.io/documentati…
 
  作为结论,RxJava的操作符默认并非线程安全的。
 
  但是对于接收多个Observable的操作符,例如 merge()、combineLatest()、zip()等 是线程安全的,所以即使多个Observable来自不线程时,也不需要考虑线程安全问题。
 

(编辑:我爱资讯网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章