Conversation
mythical-fred
left a comment
There was a problem hiding this comment.
Two blockers: a variable-name swap that produces wrong profiling output, and behavior changes without tests. See inline.
| // Record when marker space was exhausted. The combination | ||
| // of `load` and `store` is not an atomic transaction, but | ||
| // it's good enough. | ||
| if MARKERS_EXHAUSTED.load().is_none() { | ||
| MARKERS_EXHAUSTED.store(Some(Timestamp::now())); | ||
| } |
There was a problem hiding this comment.
The load-then-store race is acknowledged and is fine for this use case. However, this marker-overflow path has no test. A unit test that fills the block capacity and verifies MARKERS_EXHAUSTED gets set (and that the resulting Capture::finish() annotations contain the overflow marker) would be valuable.
mihaibudiu
left a comment
There was a problem hiding this comment.
I think Leonid is indeed the right person to review this.
| /// An operator doesn't initially know its global node ID, but the global node | ||
| /// ID is useful for debugging and profiling. This type allows the name to | ||
| /// initially omit the ID but adds it when it becomes available. | ||
| pub struct OperatorName(ArcSwap<String>); |
There was a problem hiding this comment.
why not keep the name and the id separate, and just concatenate them on Display?
There was a problem hiding this comment.
I think that would work too.
There are a few reasons I went with the current approach:
- I wanted to minimize the space and time overheads (this is just a single pointer and a single atomic increment/decrement) because the names end up getting copied around a bit and most of the time they won't get used.
- From memory (I didn't go back and recheck), the names also end up getting used in places where there's already some other string type and it's not necessarily convenient to do a conversion at that point.
Really I wish that the global node id was available at construction instead of sometime later, it would simplify a lot here.
ryzhyk
left a comment
There was a problem hiding this comment.
Haven't read every line. Adding operator names to batches is a neat improvement.
mythical-fred
left a comment
There was a problem hiding this comment.
Re-APPROVE after the blocker fix in 8df3120 / 2b83ea4.
- spine_async.rs backpressure_wait: the swapped (initial_batches, name) destructuring is gone. New shape is much cleaner — single
let (name, initial_batches);initialised under the lock with an early-return for the already-drained case, then a tight wait loop, then unconditional Span recording. tooltip format string now reads correctly. - exchange.rs / sharded_accumulator.rs:
receiver_global_node_id.get().map_or("Exchange"/"ShardedAccumulator", ...)replaces the panicking.unwrap()infn name()— addresses the multihost CPU profiling crash directly. - samply: marker-overflow recording lands as in-profile markers +
const fn newcleanup; AtomicOptionTimestamp tightened. - sharded_accumulator tests:
test_confighelper +with_streaming_exchange(true)threaded through single-host and multihost test_circuit paths.
Non-blocking: still no unit test directly asserting the new fn name() fallback string for the unset-OnceCell branch, and the samply marker-overflow in-profile recording is exercised only at integration time. Both are reasonable to defer.
Sometimes `receiver_global_node_id` can be uninitialized on multihost pipelines. The reason isn't entirely clear to me. Signed-off-by: Ben Pfaff <blp@feldera.com>
DBSP doesn't use `arcstr` so there is no need to use the `size-of` feature for it. feldera-sqllib does use it, so enable it there. Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
… out. Before, we just logged a message. Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
This adds a bunch of useful features in CPU profiles. It fixes a bug that made multihost profiling panic. It also adds node ids to CPU profiling data related to spines, so that it is clear what operators cause exchanges, merges, and other work in spines.
Describe Manual Test Plan
I ran a pipeline and captured CPU profiles a few times and looked at the output.