Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 6 additions & 39 deletions src/main/java/rx/internal/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ private void handleNewSource(Observable<? extends T> t) {
InnerSubscriber<T> i = new InnerSubscriber<T>(this, producerIfNeeded);
i.sindex = childrenSubscribers.add(i);
t.unsafeSubscribe(i);
if (!isUnsubscribed())
if (!isUnsubscribed()) {
request(1);
}
}

private void handleScalarSynchronousObservable(ScalarSynchronousObservable<? extends T> t) {
Expand Down Expand Up @@ -382,19 +383,8 @@ private int drainScalarValueQueue() {
public Boolean call(InnerSubscriber<T> s) {
if (s.q != null) {
long r = mergeProducer.requested;
int emitted = 0;
emitted += s.drainQueue();
int emitted = s.drainQueue();
if (emitted > 0) {
/*
* `s.emitted` is not volatile (because of performance impact of making it so shown by JMH tests)
* but `emitted` can ONLY be touched by the thread holding the `emitLock` which we're currently inside.
*
* Entering and leaving the emitLock flushes all values so this is visible to us.
*/
emitted += s.emitted;
// TODO we may want to store this in s.emitted and only request if above batch
// reset this since we have requested them all
s.emitted = 0;
s.requestMore(emitted);
}
if (emitted == r) {
Expand Down Expand Up @@ -542,9 +532,6 @@ private static final class InnerSubscriber<T> extends Subscriber<T> {
static final AtomicIntegerFieldUpdater<InnerSubscriber> ONCE_TERMINATED = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "terminated");

private final RxRingBuffer q = RxRingBuffer.getSpmcInstance();
/* protected by emitLock */
int emitted = 0;
final int THRESHOLD = (int) (q.capacity() * 0.7);

public InnerSubscriber(MergeSubscriber<T> parent, MergeProducer<T> producer) {
this.parentSubscriber = parent;
Expand Down Expand Up @@ -618,6 +605,7 @@ private void emit(T t, boolean complete) {
* putting in the queue, it attempts to get the lock. We are optimizing for the non-contended case.
*/
if (parentSubscriber.getEmitLock()) {
long emitted = 0;
enqueue = false;
try {
// drain the queue if there is anything in it before emitting the current value
Expand Down Expand Up @@ -660,30 +648,9 @@ private void emit(T t, boolean complete) {
} finally {
drain = parentSubscriber.releaseEmitLock();
}
if (emitted > THRESHOLD) {
// this is for batching requests when we're in a use case that isn't queueing, always fast-pathing the onNext
/**
* <pre> {@code
* Without this batching:
*
* Benchmark (size) Mode Samples Score Score error Units
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5060743.715 100445.513 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 36606.582 1610.582 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 38.476 0.973 ops/s
*
* With this batching:
*
* Benchmark (size) Mode Samples Score Score error Units
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 5367945.738 262740.137 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 62703.930 8496.036 ops/s
* r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 72.711 3.746 ops/s
*} </pre>
*/
// request upstream what we just emitted
if(emitted > 0) {
request(emitted);
// we are modifying this outside of the emit lock ... but this can be considered a "lazySet"
// and it will be flushed before anything else touches it because the emitLock will be obtained
// before any other usage of it
emitted = 0;
}
}
if (enqueue) {
Expand Down
92 changes: 91 additions & 1 deletion src/test/java/rx/internal/operators/OperatorMergeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -40,9 +41,14 @@
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import rx.*;
import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
Expand Down Expand Up @@ -1105,4 +1111,88 @@ public void shouldNotReceivedDelayedErrorWhileThereAreStillNormalEmissionsInTheQ
subscriber.assertReceivedOnNext(asList(1, 2, 3, 4));
assertEquals(asList(exception), subscriber.getOnErrorEvents());
}

@Test
public void testMergeKeepsRequesting() throws InterruptedException {
//for (int i = 0; i < 5000; i++) {
//System.out.println(i + ".......................................................................");
final CountDownLatch latch = new CountDownLatch(1);
final ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<String>();

Observable.range(1, 2)
// produce many integers per second
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(final Integer number) {
return Observable.range(1, Integer.MAX_VALUE)
.doOnRequest(new Action1<Long>() {

@Override
public void call(Long n) {
messages.add(">>>>>>>> A requested[" + number + "]: " + n);
}

})
// pause a bit
.doOnNext(pauseForMs(3))
// buffer on backpressure
.onBackpressureBuffer()
// do in parallel
.subscribeOn(Schedulers.computation())
.doOnRequest(new Action1<Long>() {

@Override
public void call(Long n) {
messages.add(">>>>>>>> B requested[" + number + "]: " + n);
}

});
}

})
// take a number bigger than 2* RxRingBuffer.SIZE (used by OperatorMerge)
.take(RxRingBuffer.SIZE * 2 + 1)
// log count
.doOnNext(printCount())
// release latch
.doOnCompleted(new Action0() {
@Override
public void call() {
latch.countDown();
}
}).subscribe();
boolean a = latch.await(2, TimeUnit.SECONDS);
if (!a) {
for (String s : messages) {
System.out.println("DEBUG => " + s);
}
}
assertTrue(a);
//}
}

private static Action1<Integer> printCount() {
return new Action1<Integer>() {
long count;

@Override
public void call(Integer t1) {
count++;
System.out.println("count=" + count);
}
};
}

private static Action1<Integer> pauseForMs(final long time) {
return new Action1<Integer>() {
@Override
public void call(Integer s) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
}
}