Skip to content

Confused by behavior of publish+flatten #2775

@dvtomas

Description

@dvtomas

RxScala 0.23.1 (that means RxJava 1.0.4)

import rx.lang.scala.Observable

val o1 = Observable.just(1, 2, 3)
val o2 = Observable.just(10, 20, 30)
val o3 = Observable.just(100, 200, 300)

val roundRobinSource = o1.publish(po1  o2.publish(po2  o3.publish(po3  {
  def oneRound: Observable[Int] = po1.take(1) ++ po2.take(1) ++ po3.take(1)
  Observable.just(oneRound, oneRound, oneRound, oneRound, oneRound).flatten
})))
roundRobinSource.subscribe(println, println)

gives me

1
10
100
1
20
200
1
30
300
1
1

That just bemuses me. Why am I getting an infinite stream of ones for the first observable, but for the others it seems OK. And I'm not even sure that the behavior of others is what I'd expect according to the documentation stating that publish Returns a rx.lang.scala.observables.ConnectableObservable, which waits until the connect function is called before it begins emitting items from this rx.lang.scala.Observable to those rx.lang.scala.Observers that have subscribed to. It probably is OK, it is just that I'm not able to deduce from that documentation that publish is somehow internally keeping track of which items from the published observable have already been emitted and which not even in case of resubscribing to it multiple times...

Can someone enlighten me on this, please?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions