Skip to content

OperatorMerge does not request enough #1941

@davidmoten

Description

@davidmoten

Merging of async sources using the backpressure path can result in a stalled stream because not enough items are requested of the InnerSubscribers.

The test below fails 6 out of 10 runs.

The situation is this:

Given a merge of two slowish (1/ms) asynchronous sources that downstream has a take(N) where N >256, then 6 out 10 runs OperatorMerge requests 128 (RxRingBuffer.SIZE) emissions from each source but the operator fails to request more than 256 and the stream stalls.

        @Test
    public void testMergeKeepsRequesting() throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        Observable.range(1, 2)
        // produce many integers per second
                .flatMap(new Func1<Integer, Observable<Integer>>() {
                    @Override
                    public Observable<Integer> call(final Integer number) {
                        return Observable.range(1, Integer.MAX_VALUE)
                        // pause a bit
                                .doOnNext(pauseForMs(1))
                                // buffer on backpressure
                                .onBackpressureBuffer()
                                // do in parallel
                                .subscribeOn(Schedulers.computation());
                    }

                })
                // take a number bigger than 2* RxRingBuffer.SIZE (used by
                // OperatorMerge)
                .take(RxRingBuffer.SIZE * 2 + 1)
                // log count
                .doOnNext(printCount())
                // release latch
                .doOnCompleted(new Action0() {
                    @Override
                    public void call() {
                        latch.countDown();
                    }
                }).subscribe();
        assertTrue(latch.await(1, TimeUnit.SECONDS));
    }

        private static Action1<Integer> printCount() {
        return new Action1<Integer>() {
            long count;

            @Override
            public void call(Integer t1) {
                count++;
                System.out.println("count=" + count);
            }
        };
    }

    private static Action1<Integer> pauseForMs(final long time) {
        return new Action1<Integer>() {
            @Override
            public void call(Integer s) {
                try {
                    Thread.sleep(time);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions