Skip to content
Prev Previous commit
Next Next commit
Update as per @akarnokd's suggestion
  • Loading branch information
zsxwing committed Dec 12, 2014
commit e309e12b65d46c80000d48337750c3a4784bf39f
30 changes: 11 additions & 19 deletions src/main/java/rx/internal/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ static final class GroupBySubscriber<K, T, R> extends Subscriber<T> {

@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<GroupBySubscriber> WIP_FOR_UNSUBSCRIBE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "wipForUnsubscribe");
volatile int wipForUnsubscribe = 0;
boolean isUnsubscribed = false;
volatile int wipForUnsubscribe = 1;

public GroupBySubscriber(
Func1<? super T, ? extends K> keySelector,
Expand All @@ -95,15 +94,7 @@ public GroupBySubscriber(

@Override
public void call() {
if (WIP_FOR_UNSUBSCRIBE_UPDATER.getAndIncrement(self) == 0) {
if (groups.isEmpty()) {
isUnsubscribed = true;
}
} else {
// someone is putting, so groups is not empty
}
WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(self);
if (isUnsubscribed) {
if (WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(self) == 0) {
self.unsubscribe();
}
}
Expand Down Expand Up @@ -278,17 +269,15 @@ public void onNext(T t) {
});

GroupState<K, T> putIfAbsent;
while (true) {
if (WIP_FOR_UNSUBSCRIBE_UPDATER.getAndIncrement(this) == 0) {
if (isUnsubscribed) {
WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(this);
return null;
}
for (;;) {
int wip = wipForUnsubscribe;
if (wip <= 0) {
return null;
}
if (WIP_FOR_UNSUBSCRIBE_UPDATER.compareAndSet(this, wip, wip + 1)) {
putIfAbsent = groups.putIfAbsent(key, groupState);
WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(this);
break;
}
WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(this);
}
if (putIfAbsent != null) {
// this shouldn't happen (because we receive onNext sequentially) and would mean we have a bug
Expand Down Expand Up @@ -381,6 +370,9 @@ private void drainIfPossible(GroupState<K, T> groupState) {
}

private void completeInner() {
if (WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(this) == 0) {
unsubscribe();
}
// if we have no outstanding groups (all completed or unsubscribe) and terminated/unsubscribed on outer
if (groups.isEmpty() && (terminated == 1 || child.isUnsubscribed())) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel some redundancy here. Maybe it is worth reviewing the other counters and state indicators.

// completionEmitted ensures we only emit onCompleted once
Expand Down