mirror of
https://github.com/marcoallegretti/WEFT_OS.git
synced 2026-03-27 01:13:09 +00:00
fix(appd): drain module stdout after READY signal to prevent pipe stall
wait_for_ready() now returns the BufReader<ChildStdout> with the READY line already consumed. supervise() spawns drain_stdout() on that reader so any subsequent module output is forwarded to the trace log and the pipe buffer never fills up. Without this, a long-running Wasm module that writes to stdout after printing READY would eventually block waiting on a full pipe.
This commit is contained in:
parent
a409b954ab
commit
68e1f82ca7
1 changed files with 24 additions and 6 deletions
|
|
@ -37,7 +37,7 @@ pub(crate) async fn supervise(
|
||||||
let ready_result = tokio::time::timeout(READY_TIMEOUT, wait_for_ready(stdout)).await;
|
let ready_result = tokio::time::timeout(READY_TIMEOUT, wait_for_ready(stdout)).await;
|
||||||
|
|
||||||
match ready_result {
|
match ready_result {
|
||||||
Ok(Ok(())) => {
|
Ok(Ok(remaining_stdout)) => {
|
||||||
registry
|
registry
|
||||||
.lock()
|
.lock()
|
||||||
.await
|
.await
|
||||||
|
|
@ -47,6 +47,7 @@ pub(crate) async fn supervise(
|
||||||
app_id: app_id.to_owned(),
|
app_id: app_id.to_owned(),
|
||||||
});
|
});
|
||||||
tracing::info!(session_id, %app_id, "app ready");
|
tracing::info!(session_id, %app_id, "app ready");
|
||||||
|
tokio::spawn(drain_stdout(remaining_stdout, session_id));
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
tracing::warn!(session_id, %app_id, error = %e, "stdout read error before READY");
|
tracing::warn!(session_id, %app_id, error = %e, "stdout read error before READY");
|
||||||
|
|
@ -82,14 +83,31 @@ pub(crate) async fn supervise(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn wait_for_ready(stdout: tokio::process::ChildStdout) -> anyhow::Result<()> {
|
async fn wait_for_ready(
|
||||||
let mut lines = BufReader::new(stdout).lines();
|
stdout: tokio::process::ChildStdout,
|
||||||
while let Some(line) = lines.next_line().await? {
|
) -> anyhow::Result<BufReader<tokio::process::ChildStdout>> {
|
||||||
|
let mut reader = BufReader::new(stdout);
|
||||||
|
loop {
|
||||||
|
let mut line = String::new();
|
||||||
|
let n = reader.read_line(&mut line).await?;
|
||||||
|
if n == 0 {
|
||||||
|
return Err(anyhow::anyhow!("stdout closed without READY signal"));
|
||||||
|
}
|
||||||
if line.trim() == "READY" {
|
if line.trim() == "READY" {
|
||||||
return Ok(());
|
return Ok(reader);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn drain_stdout(mut reader: BufReader<tokio::process::ChildStdout>, session_id: u64) {
|
||||||
|
let mut line = String::new();
|
||||||
|
loop {
|
||||||
|
line.clear();
|
||||||
|
match reader.read_line(&mut line).await {
|
||||||
|
Ok(0) | Err(_) => break,
|
||||||
|
Ok(_) => tracing::debug!(session_id, stdout = %line.trim_end(), "app stdout"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(anyhow::anyhow!("stdout closed without READY signal"))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn drain_stderr(stderr: tokio::process::ChildStderr, session_id: u64) {
|
async fn drain_stderr(stderr: tokio::process::ChildStderr, session_id: u64) {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue