From 68e1f82ca70767d65c44ea22e44c79dc4d736a99 Mon Sep 17 00:00:00 2001 From: Marco Allegretti Date: Wed, 11 Mar 2026 11:14:18 +0100 Subject: [PATCH] fix(appd): drain module stdout after READY signal to prevent pipe stall wait_for_ready() now returns the BufReader with the READY line already consumed. supervise() spawns drain_stdout() on that reader so any subsequent module output is forwarded to the trace log and the pipe buffer never fills up. Without this, a long-running Wasm module that writes to stdout after printing READY would eventually block waiting on a full pipe. --- crates/weft-appd/src/runtime.rs | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/crates/weft-appd/src/runtime.rs b/crates/weft-appd/src/runtime.rs index 1b9335a..b5ec290 100644 --- a/crates/weft-appd/src/runtime.rs +++ b/crates/weft-appd/src/runtime.rs @@ -37,7 +37,7 @@ pub(crate) async fn supervise( let ready_result = tokio::time::timeout(READY_TIMEOUT, wait_for_ready(stdout)).await; match ready_result { - Ok(Ok(())) => { + Ok(Ok(remaining_stdout)) => { registry .lock() .await @@ -47,6 +47,7 @@ pub(crate) async fn supervise( app_id: app_id.to_owned(), }); tracing::info!(session_id, %app_id, "app ready"); + tokio::spawn(drain_stdout(remaining_stdout, session_id)); } Ok(Err(e)) => { tracing::warn!(session_id, %app_id, error = %e, "stdout read error before READY"); @@ -82,14 +83,31 @@ pub(crate) async fn supervise( Ok(()) } -async fn wait_for_ready(stdout: tokio::process::ChildStdout) -> anyhow::Result<()> { - let mut lines = BufReader::new(stdout).lines(); - while let Some(line) = lines.next_line().await? { +async fn wait_for_ready( + stdout: tokio::process::ChildStdout, +) -> anyhow::Result> { + let mut reader = BufReader::new(stdout); + loop { + let mut line = String::new(); + let n = reader.read_line(&mut line).await?; + if n == 0 { + return Err(anyhow::anyhow!("stdout closed without READY signal")); + } if line.trim() == "READY" { - return Ok(()); + return Ok(reader); + } + } +} + +async fn drain_stdout(mut reader: BufReader, session_id: u64) { + let mut line = String::new(); + loop { + line.clear(); + match reader.read_line(&mut line).await { + Ok(0) | Err(_) => break, + Ok(_) => tracing::debug!(session_id, stdout = %line.trim_end(), "app stdout"), } } - Err(anyhow::anyhow!("stdout closed without READY signal")) } async fn drain_stderr(stderr: tokio::process::ChildStderr, session_id: u64) {