RxJava - NewThread Scheduler

RxJava - NewThread Scheduler

New Thread Scheduler spawns a new thread for each active observable. This can be used to offload time consuming operation from main thread onto other thread. Since it just spawns a new thread whenever it requires, you need to take care of this because thread creation is a costly operation and can have a drastic effect in mobile environment if the number of observables are high enough

Example

public class RxSchedulers {
    
    public static void main(String[] args) throws InterruptedException
    {
        
         Observable.just("Rx", "Java", "Scheduler")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.newThread()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }

}

 

Output

Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 2
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 4
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 9