diff --git a/crates/weft-appd/src/main.rs b/crates/weft-appd/src/main.rs index c533c7d..897654e 100644 --- a/crates/weft-appd/src/main.rs +++ b/crates/weft-appd/src/main.rs @@ -599,6 +599,55 @@ mod tests { } } + #[cfg(unix)] + #[tokio::test(flavor = "current_thread")] + async fn supervisor_abort_during_startup_broadcasts_stopped() { + use std::os::unix::fs::PermissionsExt; + + let script = + std::env::temp_dir().join(format!("weft_test_sleep_{}.sh", std::process::id())); + std::fs::write(&script, "#!/bin/sh\nsleep 60\n").unwrap(); + std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap(); + + let prior = std::env::var("WEFT_RUNTIME_BIN").ok(); + unsafe { std::env::set_var("WEFT_RUNTIME_BIN", &script) }; + + let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default())); + let mut rx = registry.lock().await.subscribe(); + let session_id = registry.lock().await.launch("test.abort.startup"); + let abort_rx = registry.lock().await.register_abort(session_id); + + registry.lock().await.terminate(session_id); + + runtime::supervise( + session_id, + "test.abort.startup", + Arc::clone(®istry), + abort_rx, + ) + .await + .unwrap(); + + assert!(matches!( + registry.lock().await.state(session_id), + AppStateKind::NotFound + )); + + let broadcast = rx.try_recv(); + assert!(matches!( + broadcast, + Ok(Response::AppState { session_id: sid, state: AppStateKind::Stopped }) if sid == session_id + )); + + let _ = std::fs::remove_file(&script); + unsafe { + match prior { + Some(v) => std::env::set_var("WEFT_RUNTIME_BIN", v), + None => std::env::remove_var("WEFT_RUNTIME_BIN"), + } + } + } + #[cfg(unix)] #[tokio::test(flavor = "current_thread")] async fn supervisor_spawn_failure_broadcasts_stopped() { diff --git a/crates/weft-appd/src/runtime.rs b/crates/weft-appd/src/runtime.rs index 5269123..91330bc 100644 --- a/crates/weft-appd/src/runtime.rs +++ b/crates/weft-appd/src/runtime.rs @@ -13,6 +13,7 @@ pub(crate) async fn supervise( registry: Registry, abort_rx: tokio::sync::oneshot::Receiver<()>, ) -> anyhow::Result<()> { + let mut abort_rx = abort_rx; let bin = match std::env::var("WEFT_RUNTIME_BIN") { Ok(b) => b, Err(_) => { @@ -45,10 +46,13 @@ pub(crate) async fn supervise( let stdout = child.stdout.take().expect("stdout piped"); let stderr = child.stderr.take().expect("stderr piped"); - let ready_result = tokio::time::timeout(READY_TIMEOUT, wait_for_ready(stdout)).await; + let ready_result = tokio::select! { + r = tokio::time::timeout(READY_TIMEOUT, wait_for_ready(stdout)) => Some(r), + _ = &mut abort_rx => None, + }; match ready_result { - Ok(Ok(remaining_stdout)) => { + Some(Ok(Ok(remaining_stdout))) => { registry .lock() .await @@ -60,10 +64,10 @@ pub(crate) async fn supervise( tracing::info!(session_id, %app_id, "app ready"); tokio::spawn(drain_stdout(remaining_stdout, session_id)); } - Ok(Err(e)) => { + Some(Ok(Err(e))) => { tracing::warn!(session_id, %app_id, error = %e, "stdout read error before READY"); } - Err(_elapsed) => { + Some(Err(_elapsed)) => { tracing::warn!(session_id, %app_id, "READY timeout after 30s; killing process"); let _ = child.kill().await; let mut reg = registry.lock().await; @@ -74,6 +78,17 @@ pub(crate) async fn supervise( }); return Ok(()); } + None => { + tracing::info!(session_id, %app_id, "abort during startup; killing process"); + let _ = child.kill().await; + let mut reg = registry.lock().await; + reg.set_state(session_id, AppStateKind::Stopped); + let _ = reg.broadcast().send(Response::AppState { + session_id, + state: AppStateKind::Stopped, + }); + return Ok(()); + } } tokio::spawn(drain_stderr(stderr, session_id));