RxJava - Conditional Operators

RxJava - Conditional Operators

This section will discuss about Conditional operators. These operators are simple and easy to understand and gives seamless conditional flow and control over Observables

Below are the operators which evaluates one or multiple Observables or items emitted.

Sr.No. Operator & Description
1

All

Evaluates all items emitted to meet given criteria.

2

Amb

Emits all items from the first Observable only given multiple Observables.

3

Contains

Checks if an Observable emits a particular item or not.

4

DefaultIfEmpty

Emits default item if Observable do not emit anything.

5

SequenceEqual

Checks if two Observables emit the same sequence of items.

6

SkipUntil

Discards items emitted by first Observable until a second Observable emits an item.

7

SkipWhile

Discard items emitted by an Observable until a given condition becomes false.

8

TakeUntil

Discards items emitted by an Observable after a second Observable emits an item or terminates.

9

TakeWhile

Discard items emitted by an Observable after a specified condition becomes false.

 

Example :

All : This operator determines whether all items emitted by an Observable meet some criteria

public class RxConditional {
    
    public static void main(String args[])
    {
        Observable.just("Hello","RXJava","Learning","Framework")
                .all(item -> item.length() > 4)
                .subscribe(new SingleObserver<Boolean>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onSuccess(Boolean aBoolean) {
                        System.out.println("onNext: " + aBoolean);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

            
                });
    }
}

 

Output

onNext: true

 

 

Amb

Emits all items from the first Observable only given multiple Observables

public class RxConditional {
    
    public static void main(String args[])
    {
       

Observable<Integer> observable1 = Observable.timer(4, TimeUnit.SECONDS)
                .flatMap(new Function<Long, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(Long aLong) throws Exception {
                        return Observable.just(10, 20, 30, 40, 50);
                    }
                });

        Observable<Integer> observable2 = Observable.timer(3, TimeUnit.SECONDS)
                .flatMap(new Function<Long, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(Long aLong) throws Exception {
                        return Observable.just(100, 200, 300, 400, 500);
                    }
                });

        Observable
                .ambArray(observable1, observable2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

        Thread.sleep(5000);

    }
}

 

Output

onNext: 100
onNext: 200
onNext: 300
onNext: 400
onNext: 500

 

DefaultIfEmpty

This operator emits items from the source Observable, or a default item if the source Observable emits nothing

public class RxConditional {
    
    public static void main(String args[])
    {
       

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) {
                int num = (int) (Math.random() * 10);
                if (num % 2 == 0) {
                    emitter.onNext(num);
                }
                emitter.onComplete();
            }

           
        })
                .defaultIfEmpty(-10)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

    }
}

 

Output

onNext: -10