From e02ce5722da1ce7474b398a722c40efc0945f815 Mon Sep 17 00:00:00 2001 From: Marco Allegretti Date: Fri, 13 Mar 2026 14:05:31 +0100 Subject: [PATCH] fix(appd,runtime): harden session lifecycle and IPC mutex safety - supervise: in the stdout-read-error-before-READY case, immediately kill the child process, tear down the file portal, mark the session Stopped, and return; previously the function fell through to child.wait() leaving the session in Starting state with no guaranteed cleanup path - supervise: restructure app_shell binding as a match expression so the compiler can verify the initial None value is not silently discarded - weft-runtime: replace Mutex::lock().unwrap() with unwrap_or_else in the weft:app/ipc send, recv, and connect host functions so a poisoned mutex does not panic inside the Wasmtime host-call context --- crates/weft-appd/src/runtime.rs | 19 ++++++++++++++----- crates/weft-runtime/src/main.rs | 6 +++--- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/crates/weft-appd/src/runtime.rs b/crates/weft-appd/src/runtime.rs index 3958e7b..8f9aa8d 100644 --- a/crates/weft-appd/src/runtime.rs +++ b/crates/weft-appd/src/runtime.rs @@ -313,8 +313,7 @@ pub(crate) async fn supervise( _ = &mut abort_rx => None, }; - let mut app_shell: Option = None; - match ready_result { + let app_shell = match ready_result { Some(Ok(Ok(remaining_stdout))) => { registry .lock() @@ -326,10 +325,20 @@ pub(crate) async fn supervise( }); tracing::info!(session_id, %app_id, "app ready"); tokio::spawn(drain_stdout(remaining_stdout, session_id)); - app_shell = spawn_app_shell(session_id, app_id).await; + spawn_app_shell(session_id, app_id).await } Some(Ok(Err(e))) => { - tracing::warn!(session_id, %app_id, error = %e, "stdout read error before READY"); + tracing::warn!(session_id, %app_id, error = %e, "stdout read error before READY; killing process"); + let _ = child.kill().await; + kill_portal(portal).await; + let mut reg = registry.lock().await; + reg.set_state(session_id, AppStateKind::Stopped); + reg.remove_abort_sender(session_id); + let _ = reg.broadcast().send(Response::AppState { + session_id, + state: AppStateKind::Stopped, + }); + return Ok(()); } Some(Err(_elapsed)) => { tracing::warn!(session_id, %app_id, "READY timeout after 30s; killing process"); @@ -356,7 +365,7 @@ pub(crate) async fn supervise( }); return Ok(()); } - } + }; tokio::spawn(drain_stderr(stderr, session_id)); diff --git a/crates/weft-runtime/src/main.rs b/crates/weft-runtime/src/main.rs index 3841805..bb74bfc 100644 --- a/crates/weft-runtime/src/main.rs +++ b/crates/weft-runtime/src/main.rs @@ -212,7 +212,7 @@ fn run_module( move |_: wasmtime::StoreContextMut<'_, State>, (payload,): (String,)| -> wasmtime::Result<(Result<(), String>,)> { - let mut guard = ipc_send.lock().unwrap(); + let mut guard = ipc_send.lock().unwrap_or_else(|p| p.into_inner()); match guard.as_mut() { Some(ipc) => Ok((ipc.send(&payload),)), None => Ok((Err("IPC not connected".to_owned()),)), @@ -227,7 +227,7 @@ fn run_module( move |_: wasmtime::StoreContextMut<'_, State>, ()| -> wasmtime::Result<(Option,)> { - let mut guard = ipc_recv.lock().unwrap(); + let mut guard = ipc_recv.lock().unwrap_or_else(|p| p.into_inner()); Ok((guard.as_mut().and_then(|ipc| ipc.recv()),)) }, ) @@ -299,7 +299,7 @@ fn run_module( if let Some(socket_path) = ipc_socket { ctx_builder.env("WEFT_IPC_SOCKET", socket_path); if let Some(ipc) = IpcState::connect(socket_path) { - *ipc_state.lock().unwrap() = Some(ipc); + *ipc_state.lock().unwrap_or_else(|p| p.into_inner()) = Some(ipc); } else { tracing::warn!("weft:app/ipc: could not connect to IPC socket {socket_path}"); }