RxJava - Buffering

RxJava - Buffering

Buffer gathers items emitted by an Observable into batches and emit the batch instead of emitting one item at a time

Example

public class RxBuffering {
    public static void main(String args[]) throws InterruptedException
    {
        Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4,
        5, 6, 7, 8, 9);
 
        integerObservable.subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
                .delay(2, TimeUnit.SECONDS, Schedulers.io())
        .buffer(3)
        .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
 
            }
 
            @Override
            public void onNext(List<Integer> integers) {
              
                for (Integer integer : integers) {
                    System.out.println("Items "+integer);
                }
            }
 
            @Override
            public void onError(Throwable e) {
 
            }
 
            @Override
            public void onComplete() {
                System.out.println("Items Completed ");
            }

           
        });
         Thread.sleep(3000);
    }
}

 

Output

Items 1
Items 2
Items 3
Items 4
Items 5
Items 6
Items 7
Items 8
Items 9
Items Completed