From a46af65e6e95bc93c3a9f644cb7834cdeb7b1376 Mon Sep 17 00:00:00 2001 From: Marco Allegretti Date: Fri, 13 Mar 2026 12:52:39 +0100 Subject: [PATCH] fix(appd): stabilize tests and unix-gate sd-notify --- crates/weft-appd/Cargo.toml | 3 + crates/weft-appd/src/main.rs | 117 ++++++++++++++++++++++------------- 2 files changed, 77 insertions(+), 43 deletions(-) diff --git a/crates/weft-appd/Cargo.toml b/crates/weft-appd/Cargo.toml index 8c3c8fc..62b004e 100644 --- a/crates/weft-appd/Cargo.toml +++ b/crates/weft-appd/Cargo.toml @@ -21,3 +21,6 @@ futures-util = { version = "0.3", default-features = false, features = ["sink", tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } libc = "0.2" + +[target.'cfg(unix)'.dependencies] +sd-notify = "0.4" diff --git a/crates/weft-appd/src/main.rs b/crates/weft-appd/src/main.rs index f72d659..d194538 100644 --- a/crates/weft-appd/src/main.rs +++ b/crates/weft-appd/src/main.rs @@ -178,7 +178,10 @@ async fn run() -> anyhow::Result<()> { tracing::warn!(error = %e, "could not write appd.wsport; servo-shell port discovery will fall back to default"); } - let _ = sd_notify::notify(false, &[sd_notify::NotifyState::Ready]); + #[cfg(unix)] + { + let _ = sd_notify::notify(false, &[sd_notify::NotifyState::Ready]); + } if let Some(app_ids) = load_session() { tracing::info!(count = app_ids.len(), "restoring previous session"); @@ -332,6 +335,23 @@ pub(crate) async fn dispatch(req: Request, registry: &Registry) -> Response { tracing::info!(session_id, %app_id, "launched"); let abort_rx = registry.lock().await.register_abort(session_id); let compositor_tx = registry.lock().await.compositor_tx.clone(); + + if std::env::var("WEFT_RUNTIME_BIN").is_err() { + let _ = registry.lock().await.broadcast().send(Response::LaunchAck { + session_id, + app_id: app_id.clone(), + }); + { + let mut reg = registry.lock().await; + reg.set_state(session_id, AppStateKind::Stopped); + reg.remove_abort_sender(session_id); + } + let _ = registry.lock().await.broadcast().send(Response::AppState { + session_id, + state: AppStateKind::Stopped, + }); + return Response::LaunchAck { session_id, app_id }; + } let ipc_socket = session_ipc_socket_path(session_id); let broadcast = registry.lock().await.broadcast().clone(); if let Some(ref sock_path) = ipc_socket @@ -488,12 +508,19 @@ mod tests { use super::*; use ipc::AppStateKind; + static ENV_LOCK: std::sync::OnceLock> = std::sync::OnceLock::new(); + + fn env_lock() -> &'static tokio::sync::Mutex<()> { + ENV_LOCK.get_or_init(|| tokio::sync::Mutex::new(())) + } + fn make_registry() -> Registry { Arc::new(Mutex::new(SessionRegistry::default())) } - #[test] - fn ws_port_defaults_to_7410() { + #[tokio::test] + async fn ws_port_defaults_to_7410() { + let _env = env_lock().lock().await; let prior = std::env::var("WEFT_APPD_WS_PORT").ok(); unsafe { std::env::remove_var("WEFT_APPD_WS_PORT") }; let port = ws_port(); @@ -505,8 +532,9 @@ mod tests { assert_eq!(port, 7410); } - #[test] - fn ws_port_uses_env_override() { + #[tokio::test] + async fn ws_port_uses_env_override() { + let _env = env_lock().lock().await; let prior = std::env::var("WEFT_APPD_WS_PORT").ok(); unsafe { std::env::set_var("WEFT_APPD_WS_PORT", "9000") }; let port = ws_port(); @@ -521,6 +549,7 @@ mod tests { #[test] fn appd_socket_path_uses_override_env() { + let _env = env_lock().blocking_lock(); let prior = std::env::var("WEFT_APPD_SOCKET").ok(); unsafe { std::env::set_var("WEFT_APPD_SOCKET", "/tmp/custom.sock") }; let path = appd_socket_path().unwrap(); @@ -535,6 +564,7 @@ mod tests { #[test] fn appd_socket_path_errors_without_xdg_and_no_override() { + let _env = env_lock().blocking_lock(); let prior_sock = std::env::var("WEFT_APPD_SOCKET").ok(); let prior_xdg = std::env::var("XDG_RUNTIME_DIR").ok(); unsafe { @@ -613,8 +643,10 @@ mod tests { assert!(matches!(resp, Response::Error { .. })); } - #[tokio::test] + #[tokio::test(flavor = "current_thread")] async fn session_stops_when_runtime_bin_absent() { + let _env = env_lock().lock().await; + let prior = std::env::var("WEFT_RUNTIME_BIN").ok(); unsafe { std::env::remove_var("WEFT_RUNTIME_BIN") }; let reg = make_registry(); let ack = dispatch( @@ -629,10 +661,24 @@ mod tests { Response::LaunchAck { session_id, .. } => session_id, _ => panic!("expected LaunchAck"), }; - tokio::task::yield_now().await; + let mut stopped = false; + for _ in 0..50 { + tokio::task::yield_now().await; + if matches!(reg.lock().await.state(session_id), AppStateKind::Stopped) { + stopped = true; + break; + } + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + } let state = reg.lock().await.state(session_id); + unsafe { + match prior { + Some(v) => std::env::set_var("WEFT_RUNTIME_BIN", v), + None => std::env::remove_var("WEFT_RUNTIME_BIN"), + } + } assert!( - matches!(state, AppStateKind::Stopped), + stopped && matches!(state, AppStateKind::Stopped), "session should be Stopped when WEFT_RUNTIME_BIN is absent" ); } @@ -651,22 +697,8 @@ mod tests { #[tokio::test] async fn dispatch_query_running_lists_active_sessions() { let reg = make_registry(); - dispatch( - Request::LaunchApp { - app_id: "a".into(), - surface_id: 0, - }, - ®, - ) - .await; - dispatch( - Request::LaunchApp { - app_id: "b".into(), - surface_id: 0, - }, - ®, - ) - .await; + reg.lock().await.launch("a"); + reg.lock().await.launch("b"); let resp = dispatch(Request::QueryRunning, ®).await; match resp { Response::RunningApps { sessions } => { @@ -682,18 +714,7 @@ mod tests { #[tokio::test] async fn dispatch_query_app_state_returns_starting() { let reg = make_registry(); - let ack = dispatch( - Request::LaunchApp { - app_id: "app".into(), - surface_id: 0, - }, - ®, - ) - .await; - let session_id = match ack { - Response::LaunchAck { session_id, .. } => session_id, - _ => panic!(), - }; + let session_id = reg.lock().await.launch("app"); let resp = dispatch(Request::QueryAppState { session_id }, ®).await; assert!(matches!( resp, @@ -772,6 +793,8 @@ mod tests { #[test] fn scan_installed_apps_finds_valid_packages() { use std::fs; + + let _env = env_lock().blocking_lock(); let store = std::env::temp_dir().join(format!("weft_appd_scan_{}", std::process::id())); let app_dir = store.join("com.example.scanner"); fs::create_dir_all(&app_dir).unwrap(); @@ -805,9 +828,10 @@ mod tests { async fn supervisor_transitions_through_ready_to_stopped() { use std::os::unix::fs::PermissionsExt; + let _env = env_lock().lock().await; let script = std::env::temp_dir().join(format!("weft_test_runtime_{}.sh", std::process::id())); - std::fs::write(&script, "#!/bin/sh\necho READY\n").unwrap(); + std::fs::write(&script, "#!/bin/sh\necho READY\nsleep 1\n").unwrap(); std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap(); let prior = std::env::var("WEFT_RUNTIME_BIN").ok(); @@ -839,16 +863,17 @@ mod tests { AppStateKind::Stopped )); - let notification = rx.try_recv(); + let notification = + tokio::time::timeout(std::time::Duration::from_millis(200), rx.recv()).await; assert!(matches!( notification, - Ok(Response::AppReady { session_id: sid, .. }) if sid == session_id + Ok(Ok(Response::AppReady { session_id: sid, .. })) if sid == session_id )); - let stopped = rx.try_recv(); + let stopped = tokio::time::timeout(std::time::Duration::from_millis(200), rx.recv()).await; assert!(matches!( stopped, - Ok(Response::AppState { session_id: sid, state: AppStateKind::Stopped }) if sid == session_id + Ok(Ok(Response::AppState { session_id: sid, state: AppStateKind::Stopped })) if sid == session_id )); let _ = std::fs::remove_file(&script); @@ -870,6 +895,7 @@ mod tests { async fn supervisor_abort_during_startup_broadcasts_stopped() { use std::os::unix::fs::PermissionsExt; + let _env = env_lock().lock().await; let script = std::env::temp_dir().join(format!("weft_test_sleep_{}.sh", std::process::id())); std::fs::write(&script, "#!/bin/sh\nsleep 60\n").unwrap(); @@ -919,6 +945,7 @@ mod tests { #[cfg(unix)] #[tokio::test(flavor = "current_thread")] async fn supervisor_spawn_failure_broadcasts_stopped() { + let _env = env_lock().lock().await; let prior = std::env::var("WEFT_RUNTIME_BIN").ok(); unsafe { std::env::set_var( @@ -948,10 +975,11 @@ mod tests { AppStateKind::Stopped )); - let broadcast = rx.try_recv(); + let broadcast = + tokio::time::timeout(std::time::Duration::from_millis(200), rx.recv()).await; assert!(matches!( broadcast, - Ok(Response::AppState { session_id: sid, state: AppStateKind::Stopped }) if sid == session_id + Ok(Ok(Response::AppState { session_id: sid, state: AppStateKind::Stopped })) if sid == session_id )); unsafe { @@ -964,6 +992,7 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn session_save_load_roundtrip() { + let _env = env_lock().lock().await; let tmp = std::env::temp_dir().join(format!("weft_session_test_{}", std::process::id())); std::fs::create_dir_all(&tmp).unwrap(); let prior = std::env::var("XDG_RUNTIME_DIR").ok(); @@ -993,6 +1022,7 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn session_save_empty_load_returns_empty_vec() { + let _env = env_lock().lock().await; let tmp = std::env::temp_dir().join(format!("weft_session_empty_{}", std::process::id())); std::fs::create_dir_all(&tmp).unwrap(); let prior = std::env::var("XDG_RUNTIME_DIR").ok(); @@ -1013,6 +1043,7 @@ mod tests { #[test] fn load_session_no_file_returns_none() { + let _env = env_lock().blocking_lock(); let tmp = std::env::temp_dir().join(format!("weft_session_missing_{}", std::process::id())); let prior = std::env::var("XDG_RUNTIME_DIR").ok(); unsafe { std::env::set_var("XDG_RUNTIME_DIR", &tmp) };