Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 59 additions & 50 deletions crates/adapters/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ mod checkpoint;
mod error;
mod journal;
mod pipeline_diff;
#[cfg(target_os = "macos")]
mod samply_spawn;
mod stats;
mod sync;
mod validate;
Expand Down Expand Up @@ -1145,8 +1147,6 @@ impl Controller {
.to_str()
.context("failed to convert path to samply profile to str")?;

let mut cmd = tokio::process::Command::new("samply");

// Calculate a maximum memory consumption for markers.
//
// In experiments, a busy worker thread can emit over 1,000 markers per
Expand All @@ -1161,57 +1161,66 @@ impl Controller {
.with_memory_limit(Some(memory_limit))
.start()
.await;
let mut child = cmd
.args([
"record",
"-p",
&std::process::id().to_string(),
"-o",
profile_file,
"--save-only",
"--presymbolicate",
])
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.context("failed to spawn samply process")?;

let child_pid = child.id().context("failed to get samply process id")?;

// Workaround as samply's `--duration` flag doesn't seem to work.
// See: https://github.com/mstange/samply/issues/716
//
// As the duration flag doesn't work, we have to send a SIGINT to
// tell samply to stop recording.
//
// If samply returns before the specified duration, it is likely due
// to an error, and in such cases, we want to report it immediately.
tokio::select! {
_ = child.wait() => {}
_ = tokio::time::sleep(Duration::from_secs(duration)) => {
// Send SIGINT to the samply process to stop recording.
nix::sys::signal::kill(
nix::unistd::Pid::from_raw(child_pid as i32),
nix::sys::signal::Signal::SIGINT,
)
.context("failed to send SIGINT to samply process")?;
}
}
let annotations = capture.finish();
let output = child
.wait_with_output()
.await
.context("failed when waiting for samply process")?;

if !output.status.success() {
anyhow::bail!(
"samply process failed with status: `{}`, samply stdout: `{}`, samply stderr: `{}`",
output.status,
String::from_utf8_lossy(&output.stdout).trim(),
String::from_utf8_lossy(&output.stderr).trim(),
);
#[cfg(target_os = "macos")]
samply_spawn::run_detached_samply_record(std::process::id(), profile_file, duration)
.await?;

#[cfg(all(unix, not(target_os = "macos")))]
{
let mut cmd = tokio::process::Command::new("samply");
let mut child = cmd
.args([
"record",
"-p",
&std::process::id().to_string(),
"-o",
profile_file,
"--save-only",
"--presymbolicate",
])
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.context("failed to spawn samply process")?;

let child_pid = child.id().context("failed to get samply process id")?;

// Workaround as samply's `--duration` flag doesn't seem to work.
// See: https://github.com/mstange/samply/issues/716
//
// As the duration flag doesn't work, we have to send a SIGINT to
// tell samply to stop recording.
//
// If samply returns before the specified duration, it is likely due
// to an error, and in such cases, we want to report it immediately.
tokio::select! {
_ = child.wait() => {}
_ = tokio::time::sleep(Duration::from_secs(duration)) => {
// Send SIGINT to the samply process to stop recording.
nix::sys::signal::kill(
nix::unistd::Pid::from_raw(child_pid as i32),
nix::sys::signal::Signal::SIGINT,
)
.context("failed to send SIGINT to samply process")?;
}
}
let output = child
.wait_with_output()
.await
.context("failed when waiting for samply process")?;

if !output.status.success() {
anyhow::bail!(
"samply process failed with status: `{}`, samply stdout: `{}`, samply stderr: `{}`",
output.status,
String::from_utf8_lossy(&output.stdout).trim(),
String::from_utf8_lossy(&output.stderr).trim(),
);
}
}

let annotations = capture.finish();
let buf = tokio::fs::read(profile_file)
.await
.with_context(|| format!("failed to read samply profile file `{profile_file}`"))?;
Expand Down
142 changes: 142 additions & 0 deletions crates/adapters/src/controller/samply_spawn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
//! macOS-specific samply process spawning.
//!
//! On macOS, samply must not be a direct child of the process it profiles
//! (the process deadlocks when trying to run samply on itself).
//! Spawning through a subshell that exits immediately reparents samply to
//! launchd, which allows `task_for_pid` attach to succeed.

use anyhow::{Context, Error as AnyError, bail};
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use std::time::Duration;
use tokio::process::Command;
use tracing::info;

/// Quotes `s` for safe embedding in a `/bin/sh -c` script.
fn sh_quote(s: &str) -> String {
format!("'{}'", s.replace('\'', "'\\''"))
}

/// Returns `true` if a process with `pid` is still running.
async fn process_exists(pid: u32) -> bool {
tokio::task::spawn_blocking(move || kill(Pid::from_raw(pid as i32), None).is_ok())
.await
.unwrap_or(false)
}
Comment on lines +22 to +25

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spawn_blocking doesn't hurt but I'm surprised that it's useful. The kill system call shouldn't block AFAIK.


/// Blocks until the process `pid` exits.
async fn wait_for_process_exit_unbounded(pid: u32) {
while process_exists(pid).await {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: 100ms × N polls for process liveness is fine in practice, but on macOS kill(pid, 0) returns Ok for zombies too. If the subshell exits before samply does, samply gets reparented to launchd and you're polling launchd-owned PID which is fine — just worth a one-line comment that this is intentionally PID-not-pgid based.


/// Blocks until the process `pid` exits or `timeout` elapses.
async fn wait_for_process_exit(pid: u32, timeout: Duration) -> bool {
let deadline = tokio::time::Instant::now() + timeout;
while tokio::time::Instant::now() < deadline {
if !process_exists(pid).await {
return true;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
false
}
Comment on lines +36 to +44

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be reduced to something like tokio::time::timeout(timeout, wait_for_process_exit_unbounded(pid)).await.map_or(false, |_| true)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Current approach is OK too.)


/// Spawns samply in a detached subshell to profile `target_pid`, records for
/// up to `duration` seconds, then returns.
///
/// Samply stdout/stderr are written to a temporary log file. On failure the log
/// contents are included in the error.
pub async fn run_detached_samply_record(
target_pid: u32,
profile_file: &str,
duration: u64,
) -> Result<(), AnyError> {
let log_file = tempfile::Builder::new()
.prefix("samply_log_")
.suffix(".log")
.rand_bytes(10)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are these bytes?

.tempfile()
.context("failed to create tempfile for samply log")?;
let log_path = log_file
.path()
.to_str()
.context("failed to convert samply log path to str")?;

// The subshell (`( )`) is used to ensure that the samply process is detached from the current process.
let sh_cmd = format!(
"( samply record -p {target_pid} -o {} --save-only --presymbolicate > {} 2>&1 & echo $! )",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The additional subshell (the "()" around the shell commands) looks weird, but I assume it's necessary?

sh_quote(profile_file),
sh_quote(log_path),
);

let launcher = Command::new("sh")
.arg("-c")
.arg(&sh_cmd)
.output()
.await
.context("failed to spawn detached samply process")?;

let samply_pid = parse_samply_pid(&launcher.stdout).with_context(|| {
format!(
"failed to parse samply pid from launcher output `{}`; launcher status: {}, \
launcher stderr: `{}`",
String::from_utf8_lossy(&launcher.stdout).trim(),
launcher.status,
String::from_utf8_lossy(&launcher.stderr).trim(),
)
})?;

info!(samply_pid, target_pid, "started detached samply profiler");

tokio::select! {
_ = wait_for_process_exit_unbounded(samply_pid) => {}
_ = tokio::time::sleep(Duration::from_secs(duration)) => {
kill(Pid::from_raw(samply_pid as i32), Signal::SIGINT)
.context("failed to send SIGINT to samply process")?;
}
}

if !wait_for_process_exit(samply_pid, Duration::from_secs(30)).await {
let _ = kill(Pid::from_raw(samply_pid as i32), Signal::SIGKILL);
wait_for_process_exit(samply_pid, Duration::from_secs(5)).await;
}

let log = tokio::fs::read_to_string(log_path)
.await
.unwrap_or_default();

let profile = tokio::fs::read(profile_file).await.unwrap_or_default();
if profile.is_empty() {
bail!("samply profile is empty; samply log: `{}`", log.trim());
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.contains("Error:") is a fragile failure signal — it relies on samply's specific log formatting and false-positives if the profiled program prints Error: to stdout (it doesn't here, but only because we redirect samply's own pipes). Since you already check profile.is_empty() above, this branch is mostly a belt-and-suspenders extra. Either drop it or capture samply's exit status (the launcher subshell could write $! and later wait $!; echo $? to a second tempfile).

if log.contains("Error:") {
bail!("samply process failed; samply log: `{}`", log.trim());
}

Ok(())
}

fn parse_samply_pid(stdout: &[u8]) -> Result<u32, AnyError> {
let stdout = String::from_utf8_lossy(stdout);
let pid_str = stdout.trim();
if pid_str.is_empty() {
bail!("launcher returned empty pid");
}
pid_str
.parse::<u32>()
.with_context(|| format!("invalid pid `{pid_str}`"))
}

#[cfg(test)]
mod tests {
use super::sh_quote;

#[test]
fn sh_quote_escapes_single_quotes() {
assert_eq!(sh_quote("/tmp/foo"), "'/tmp/foo'");
assert_eq!(sh_quote("/tmp/a'b"), "'/tmp/a'\\''b'");
}
}
12 changes: 11 additions & 1 deletion python/feldera/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ def commit_transaction(
transaction_id: Optional[int] = None,
wait: bool = True,
timeout_s: Optional[float] = None,
poll_interval_s: float = 0.5,
Comment thread
ryzhyk marked this conversation as resolved.
):
"""
Commit the currently active transaction.
Expand All @@ -745,13 +746,22 @@ def commit_transaction(
:param timeout_s: Maximum time (in seconds) to wait for the transaction to commit when `wait` is True.
If None, the function will wait indefinitely.

:param poll_interval_s: Polling interval at which to check while waiting for the
transaction to commit (default is every 0.5 seconds). Not used if `wait=False`.

:raises RuntimeError: If there is currently no transaction in progress.
:raises ValueError: If the provided `transaction_id` does not match the current transaction.
:raises TimeoutError: If the transaction does not commit within the specified timeout (when `wait` is True).
:raises FelderaAPIError: If the pipeline fails to commit a transaction.
"""

self.client.commit_transaction(self.name, transaction_id, wait, timeout_s)
self.client.commit_transaction(
self.name,
transaction_id,
wait,
timeout_s,
poll_interval_s=poll_interval_s,
)

def transaction_status(self) -> TransactionStatus:
"""
Expand Down
11 changes: 9 additions & 2 deletions python/feldera/rest/feldera_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,7 @@ def commit_transaction(
transaction_id: Optional[int] = None,
wait: bool = True,
timeout_s: Optional[float] = None,
poll_interval_s: float = 0.5,
):
"""
Commits the currently active transaction.
Expand All @@ -855,6 +856,9 @@ def commit_transaction(
:param timeout_s: Maximum time (in seconds) to wait for the transaction to commit when `wait` is True.
If None, the function will wait indefinitely.

:param poll_interval_s: Polling interval at which to check while waiting for the
transaction to commit (default is every 0.5 seconds). Not used if `wait=False`.

:raises RuntimeError: If there is currently no transaction in progress.
:raises ValueError: If the provided `transaction_id` does not match the current transaction.
:raises TimeoutError: If the transaction does not commit within the specified timeout (when `wait` is True).
Expand Down Expand Up @@ -896,8 +900,11 @@ def commit_transaction(
if stats["global_metrics"]["transaction_id"] != transaction_id:
return

logging.debug("commit hasn't completed, waiting for 1 more second")
time.sleep(1.0)
logging.debug(
"commit hasn't completed, waiting for %.1f more seconds",
poll_interval_s,
)
time.sleep(poll_interval_s)

def checkpoint_pipeline(self, pipeline_name: str) -> int:
"""
Expand Down
Loading