RxJava - Using CompositeDisposable

RxJava - Using CompositeDisposable

A Disposable is returned as a result from a subscription so that we can control when to unsubscribe so that the
underlying Observable can stop emitting events. Failure to do so can cause unwanted memory leaks
that are hard to trace

In Many scenarios we need to use multiple disposable in the applications. to handle all these we need to use CompositeDisposables to manage the resources. It implements Disposable and then holds a collection of disposables

Example

package rxjava;

import io.reactivex.Completable;
import io.reactivex.CompletableObserver;
import io.reactivex.Maybe;
import io.reactivex.MaybeObserver;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.operators.completable.CompletableToObservable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
import static javafx.scene.input.KeyCode.T;

/**
 *
 * @author User
 */
public class RxMaybe {
    
    public static void main(String[]args) throws InterruptedException
    {
//      
        
         CompositeDisposable compositeDisposable = new CompositeDisposable();

      //Create an Single observer 
      Disposable disposableSingle = Single.just("Hello Diposable")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(
      new DisposableSingleObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 

      //Create an observer
      Disposable disposableMayBe = Maybe.just("Hi")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 

      Thread.sleep(3000);

      compositeDisposable.add(disposableSingle);
      compositeDisposable.add(disposableMayBe);

      //start observing
      compositeDisposable.dispose();
        
    }
}

 

 

Output

Hello Diposable
Hi