fix(appd,runtime): harden session lifecycle and IPC mutex safety

- supervise: in the stdout-read-error-before-READY case, immediately
  kill the child process, tear down the file portal, mark the session
  Stopped, and return; previously the function fell through to child.wait()
  leaving the session in Starting state with no guaranteed cleanup path
- supervise: restructure app_shell binding as a match expression so the
  compiler can verify the initial None value is not silently discarded
- weft-runtime: replace Mutex::lock().unwrap() with unwrap_or_else in
  the weft:app/ipc send, recv, and connect host functions so a poisoned
  mutex does not panic inside the Wasmtime host-call context
This commit is contained in:
Marco Allegretti 2026-03-13 14:05:31 +01:00
parent a098b3e93d
commit e02ce5722d
2 changed files with 17 additions and 8 deletions

View file

@ -313,8 +313,7 @@ pub(crate) async fn supervise(
_ = &mut abort_rx => None, _ = &mut abort_rx => None,
}; };
let mut app_shell: Option<tokio::process::Child> = None; let app_shell = match ready_result {
match ready_result {
Some(Ok(Ok(remaining_stdout))) => { Some(Ok(Ok(remaining_stdout))) => {
registry registry
.lock() .lock()
@ -326,10 +325,20 @@ pub(crate) async fn supervise(
}); });
tracing::info!(session_id, %app_id, "app ready"); tracing::info!(session_id, %app_id, "app ready");
tokio::spawn(drain_stdout(remaining_stdout, session_id)); tokio::spawn(drain_stdout(remaining_stdout, session_id));
app_shell = spawn_app_shell(session_id, app_id).await; spawn_app_shell(session_id, app_id).await
} }
Some(Ok(Err(e))) => { Some(Ok(Err(e))) => {
tracing::warn!(session_id, %app_id, error = %e, "stdout read error before READY"); tracing::warn!(session_id, %app_id, error = %e, "stdout read error before READY; killing process");
let _ = child.kill().await;
kill_portal(portal).await;
let mut reg = registry.lock().await;
reg.set_state(session_id, AppStateKind::Stopped);
reg.remove_abort_sender(session_id);
let _ = reg.broadcast().send(Response::AppState {
session_id,
state: AppStateKind::Stopped,
});
return Ok(());
} }
Some(Err(_elapsed)) => { Some(Err(_elapsed)) => {
tracing::warn!(session_id, %app_id, "READY timeout after 30s; killing process"); tracing::warn!(session_id, %app_id, "READY timeout after 30s; killing process");
@ -356,7 +365,7 @@ pub(crate) async fn supervise(
}); });
return Ok(()); return Ok(());
} }
} };
tokio::spawn(drain_stderr(stderr, session_id)); tokio::spawn(drain_stderr(stderr, session_id));

View file

@ -212,7 +212,7 @@ fn run_module(
move |_: wasmtime::StoreContextMut<'_, State>, move |_: wasmtime::StoreContextMut<'_, State>,
(payload,): (String,)| (payload,): (String,)|
-> wasmtime::Result<(Result<(), String>,)> { -> wasmtime::Result<(Result<(), String>,)> {
let mut guard = ipc_send.lock().unwrap(); let mut guard = ipc_send.lock().unwrap_or_else(|p| p.into_inner());
match guard.as_mut() { match guard.as_mut() {
Some(ipc) => Ok((ipc.send(&payload),)), Some(ipc) => Ok((ipc.send(&payload),)),
None => Ok((Err("IPC not connected".to_owned()),)), None => Ok((Err("IPC not connected".to_owned()),)),
@ -227,7 +227,7 @@ fn run_module(
move |_: wasmtime::StoreContextMut<'_, State>, move |_: wasmtime::StoreContextMut<'_, State>,
()| ()|
-> wasmtime::Result<(Option<String>,)> { -> wasmtime::Result<(Option<String>,)> {
let mut guard = ipc_recv.lock().unwrap(); let mut guard = ipc_recv.lock().unwrap_or_else(|p| p.into_inner());
Ok((guard.as_mut().and_then(|ipc| ipc.recv()),)) Ok((guard.as_mut().and_then(|ipc| ipc.recv()),))
}, },
) )
@ -299,7 +299,7 @@ fn run_module(
if let Some(socket_path) = ipc_socket { if let Some(socket_path) = ipc_socket {
ctx_builder.env("WEFT_IPC_SOCKET", socket_path); ctx_builder.env("WEFT_IPC_SOCKET", socket_path);
if let Some(ipc) = IpcState::connect(socket_path) { if let Some(ipc) = IpcState::connect(socket_path) {
*ipc_state.lock().unwrap() = Some(ipc); *ipc_state.lock().unwrap_or_else(|p| p.into_inner()) = Some(ipc);
} else { } else {
tracing::warn!("weft:app/ipc: could not connect to IPC socket {socket_path}"); tracing::warn!("weft:app/ipc: could not connect to IPC socket {socket_path}");
} }