fix(appd): stabilize tests and unix-gate sd-notify

This commit is contained in:
Marco Allegretti 2026-03-13 12:52:39 +01:00
parent 84f82fbadf
commit a46af65e6e
2 changed files with 77 additions and 43 deletions

View file

@ -21,3 +21,6 @@ futures-util = { version = "0.3", default-features = false, features = ["sink",
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
libc = "0.2" libc = "0.2"
[target.'cfg(unix)'.dependencies]
sd-notify = "0.4"

View file

@ -178,7 +178,10 @@ async fn run() -> anyhow::Result<()> {
tracing::warn!(error = %e, "could not write appd.wsport; servo-shell port discovery will fall back to default"); tracing::warn!(error = %e, "could not write appd.wsport; servo-shell port discovery will fall back to default");
} }
#[cfg(unix)]
{
let _ = sd_notify::notify(false, &[sd_notify::NotifyState::Ready]); let _ = sd_notify::notify(false, &[sd_notify::NotifyState::Ready]);
}
if let Some(app_ids) = load_session() { if let Some(app_ids) = load_session() {
tracing::info!(count = app_ids.len(), "restoring previous session"); tracing::info!(count = app_ids.len(), "restoring previous session");
@ -332,6 +335,23 @@ pub(crate) async fn dispatch(req: Request, registry: &Registry) -> Response {
tracing::info!(session_id, %app_id, "launched"); tracing::info!(session_id, %app_id, "launched");
let abort_rx = registry.lock().await.register_abort(session_id); let abort_rx = registry.lock().await.register_abort(session_id);
let compositor_tx = registry.lock().await.compositor_tx.clone(); let compositor_tx = registry.lock().await.compositor_tx.clone();
if std::env::var("WEFT_RUNTIME_BIN").is_err() {
let _ = registry.lock().await.broadcast().send(Response::LaunchAck {
session_id,
app_id: app_id.clone(),
});
{
let mut reg = registry.lock().await;
reg.set_state(session_id, AppStateKind::Stopped);
reg.remove_abort_sender(session_id);
}
let _ = registry.lock().await.broadcast().send(Response::AppState {
session_id,
state: AppStateKind::Stopped,
});
return Response::LaunchAck { session_id, app_id };
}
let ipc_socket = session_ipc_socket_path(session_id); let ipc_socket = session_ipc_socket_path(session_id);
let broadcast = registry.lock().await.broadcast().clone(); let broadcast = registry.lock().await.broadcast().clone();
if let Some(ref sock_path) = ipc_socket if let Some(ref sock_path) = ipc_socket
@ -488,12 +508,19 @@ mod tests {
use super::*; use super::*;
use ipc::AppStateKind; use ipc::AppStateKind;
static ENV_LOCK: std::sync::OnceLock<tokio::sync::Mutex<()>> = std::sync::OnceLock::new();
fn env_lock() -> &'static tokio::sync::Mutex<()> {
ENV_LOCK.get_or_init(|| tokio::sync::Mutex::new(()))
}
fn make_registry() -> Registry { fn make_registry() -> Registry {
Arc::new(Mutex::new(SessionRegistry::default())) Arc::new(Mutex::new(SessionRegistry::default()))
} }
#[test] #[tokio::test]
fn ws_port_defaults_to_7410() { async fn ws_port_defaults_to_7410() {
let _env = env_lock().lock().await;
let prior = std::env::var("WEFT_APPD_WS_PORT").ok(); let prior = std::env::var("WEFT_APPD_WS_PORT").ok();
unsafe { std::env::remove_var("WEFT_APPD_WS_PORT") }; unsafe { std::env::remove_var("WEFT_APPD_WS_PORT") };
let port = ws_port(); let port = ws_port();
@ -505,8 +532,9 @@ mod tests {
assert_eq!(port, 7410); assert_eq!(port, 7410);
} }
#[test] #[tokio::test]
fn ws_port_uses_env_override() { async fn ws_port_uses_env_override() {
let _env = env_lock().lock().await;
let prior = std::env::var("WEFT_APPD_WS_PORT").ok(); let prior = std::env::var("WEFT_APPD_WS_PORT").ok();
unsafe { std::env::set_var("WEFT_APPD_WS_PORT", "9000") }; unsafe { std::env::set_var("WEFT_APPD_WS_PORT", "9000") };
let port = ws_port(); let port = ws_port();
@ -521,6 +549,7 @@ mod tests {
#[test] #[test]
fn appd_socket_path_uses_override_env() { fn appd_socket_path_uses_override_env() {
let _env = env_lock().blocking_lock();
let prior = std::env::var("WEFT_APPD_SOCKET").ok(); let prior = std::env::var("WEFT_APPD_SOCKET").ok();
unsafe { std::env::set_var("WEFT_APPD_SOCKET", "/tmp/custom.sock") }; unsafe { std::env::set_var("WEFT_APPD_SOCKET", "/tmp/custom.sock") };
let path = appd_socket_path().unwrap(); let path = appd_socket_path().unwrap();
@ -535,6 +564,7 @@ mod tests {
#[test] #[test]
fn appd_socket_path_errors_without_xdg_and_no_override() { fn appd_socket_path_errors_without_xdg_and_no_override() {
let _env = env_lock().blocking_lock();
let prior_sock = std::env::var("WEFT_APPD_SOCKET").ok(); let prior_sock = std::env::var("WEFT_APPD_SOCKET").ok();
let prior_xdg = std::env::var("XDG_RUNTIME_DIR").ok(); let prior_xdg = std::env::var("XDG_RUNTIME_DIR").ok();
unsafe { unsafe {
@ -613,8 +643,10 @@ mod tests {
assert!(matches!(resp, Response::Error { .. })); assert!(matches!(resp, Response::Error { .. }));
} }
#[tokio::test] #[tokio::test(flavor = "current_thread")]
async fn session_stops_when_runtime_bin_absent() { async fn session_stops_when_runtime_bin_absent() {
let _env = env_lock().lock().await;
let prior = std::env::var("WEFT_RUNTIME_BIN").ok();
unsafe { std::env::remove_var("WEFT_RUNTIME_BIN") }; unsafe { std::env::remove_var("WEFT_RUNTIME_BIN") };
let reg = make_registry(); let reg = make_registry();
let ack = dispatch( let ack = dispatch(
@ -629,10 +661,24 @@ mod tests {
Response::LaunchAck { session_id, .. } => session_id, Response::LaunchAck { session_id, .. } => session_id,
_ => panic!("expected LaunchAck"), _ => panic!("expected LaunchAck"),
}; };
let mut stopped = false;
for _ in 0..50 {
tokio::task::yield_now().await; tokio::task::yield_now().await;
if matches!(reg.lock().await.state(session_id), AppStateKind::Stopped) {
stopped = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
let state = reg.lock().await.state(session_id); let state = reg.lock().await.state(session_id);
unsafe {
match prior {
Some(v) => std::env::set_var("WEFT_RUNTIME_BIN", v),
None => std::env::remove_var("WEFT_RUNTIME_BIN"),
}
}
assert!( assert!(
matches!(state, AppStateKind::Stopped), stopped && matches!(state, AppStateKind::Stopped),
"session should be Stopped when WEFT_RUNTIME_BIN is absent" "session should be Stopped when WEFT_RUNTIME_BIN is absent"
); );
} }
@ -651,22 +697,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn dispatch_query_running_lists_active_sessions() { async fn dispatch_query_running_lists_active_sessions() {
let reg = make_registry(); let reg = make_registry();
dispatch( reg.lock().await.launch("a");
Request::LaunchApp { reg.lock().await.launch("b");
app_id: "a".into(),
surface_id: 0,
},
&reg,
)
.await;
dispatch(
Request::LaunchApp {
app_id: "b".into(),
surface_id: 0,
},
&reg,
)
.await;
let resp = dispatch(Request::QueryRunning, &reg).await; let resp = dispatch(Request::QueryRunning, &reg).await;
match resp { match resp {
Response::RunningApps { sessions } => { Response::RunningApps { sessions } => {
@ -682,18 +714,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn dispatch_query_app_state_returns_starting() { async fn dispatch_query_app_state_returns_starting() {
let reg = make_registry(); let reg = make_registry();
let ack = dispatch( let session_id = reg.lock().await.launch("app");
Request::LaunchApp {
app_id: "app".into(),
surface_id: 0,
},
&reg,
)
.await;
let session_id = match ack {
Response::LaunchAck { session_id, .. } => session_id,
_ => panic!(),
};
let resp = dispatch(Request::QueryAppState { session_id }, &reg).await; let resp = dispatch(Request::QueryAppState { session_id }, &reg).await;
assert!(matches!( assert!(matches!(
resp, resp,
@ -772,6 +793,8 @@ mod tests {
#[test] #[test]
fn scan_installed_apps_finds_valid_packages() { fn scan_installed_apps_finds_valid_packages() {
use std::fs; use std::fs;
let _env = env_lock().blocking_lock();
let store = std::env::temp_dir().join(format!("weft_appd_scan_{}", std::process::id())); let store = std::env::temp_dir().join(format!("weft_appd_scan_{}", std::process::id()));
let app_dir = store.join("com.example.scanner"); let app_dir = store.join("com.example.scanner");
fs::create_dir_all(&app_dir).unwrap(); fs::create_dir_all(&app_dir).unwrap();
@ -805,9 +828,10 @@ mod tests {
async fn supervisor_transitions_through_ready_to_stopped() { async fn supervisor_transitions_through_ready_to_stopped() {
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
let _env = env_lock().lock().await;
let script = let script =
std::env::temp_dir().join(format!("weft_test_runtime_{}.sh", std::process::id())); std::env::temp_dir().join(format!("weft_test_runtime_{}.sh", std::process::id()));
std::fs::write(&script, "#!/bin/sh\necho READY\n").unwrap(); std::fs::write(&script, "#!/bin/sh\necho READY\nsleep 1\n").unwrap();
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap(); std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
let prior = std::env::var("WEFT_RUNTIME_BIN").ok(); let prior = std::env::var("WEFT_RUNTIME_BIN").ok();
@ -839,16 +863,17 @@ mod tests {
AppStateKind::Stopped AppStateKind::Stopped
)); ));
let notification = rx.try_recv(); let notification =
tokio::time::timeout(std::time::Duration::from_millis(200), rx.recv()).await;
assert!(matches!( assert!(matches!(
notification, notification,
Ok(Response::AppReady { session_id: sid, .. }) if sid == session_id Ok(Ok(Response::AppReady { session_id: sid, .. })) if sid == session_id
)); ));
let stopped = rx.try_recv(); let stopped = tokio::time::timeout(std::time::Duration::from_millis(200), rx.recv()).await;
assert!(matches!( assert!(matches!(
stopped, stopped,
Ok(Response::AppState { session_id: sid, state: AppStateKind::Stopped }) if sid == session_id Ok(Ok(Response::AppState { session_id: sid, state: AppStateKind::Stopped })) if sid == session_id
)); ));
let _ = std::fs::remove_file(&script); let _ = std::fs::remove_file(&script);
@ -870,6 +895,7 @@ mod tests {
async fn supervisor_abort_during_startup_broadcasts_stopped() { async fn supervisor_abort_during_startup_broadcasts_stopped() {
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
let _env = env_lock().lock().await;
let script = let script =
std::env::temp_dir().join(format!("weft_test_sleep_{}.sh", std::process::id())); std::env::temp_dir().join(format!("weft_test_sleep_{}.sh", std::process::id()));
std::fs::write(&script, "#!/bin/sh\nsleep 60\n").unwrap(); std::fs::write(&script, "#!/bin/sh\nsleep 60\n").unwrap();
@ -919,6 +945,7 @@ mod tests {
#[cfg(unix)] #[cfg(unix)]
#[tokio::test(flavor = "current_thread")] #[tokio::test(flavor = "current_thread")]
async fn supervisor_spawn_failure_broadcasts_stopped() { async fn supervisor_spawn_failure_broadcasts_stopped() {
let _env = env_lock().lock().await;
let prior = std::env::var("WEFT_RUNTIME_BIN").ok(); let prior = std::env::var("WEFT_RUNTIME_BIN").ok();
unsafe { unsafe {
std::env::set_var( std::env::set_var(
@ -948,10 +975,11 @@ mod tests {
AppStateKind::Stopped AppStateKind::Stopped
)); ));
let broadcast = rx.try_recv(); let broadcast =
tokio::time::timeout(std::time::Duration::from_millis(200), rx.recv()).await;
assert!(matches!( assert!(matches!(
broadcast, broadcast,
Ok(Response::AppState { session_id: sid, state: AppStateKind::Stopped }) if sid == session_id Ok(Ok(Response::AppState { session_id: sid, state: AppStateKind::Stopped })) if sid == session_id
)); ));
unsafe { unsafe {
@ -964,6 +992,7 @@ mod tests {
#[tokio::test(flavor = "current_thread")] #[tokio::test(flavor = "current_thread")]
async fn session_save_load_roundtrip() { async fn session_save_load_roundtrip() {
let _env = env_lock().lock().await;
let tmp = std::env::temp_dir().join(format!("weft_session_test_{}", std::process::id())); let tmp = std::env::temp_dir().join(format!("weft_session_test_{}", std::process::id()));
std::fs::create_dir_all(&tmp).unwrap(); std::fs::create_dir_all(&tmp).unwrap();
let prior = std::env::var("XDG_RUNTIME_DIR").ok(); let prior = std::env::var("XDG_RUNTIME_DIR").ok();
@ -993,6 +1022,7 @@ mod tests {
#[tokio::test(flavor = "current_thread")] #[tokio::test(flavor = "current_thread")]
async fn session_save_empty_load_returns_empty_vec() { async fn session_save_empty_load_returns_empty_vec() {
let _env = env_lock().lock().await;
let tmp = std::env::temp_dir().join(format!("weft_session_empty_{}", std::process::id())); let tmp = std::env::temp_dir().join(format!("weft_session_empty_{}", std::process::id()));
std::fs::create_dir_all(&tmp).unwrap(); std::fs::create_dir_all(&tmp).unwrap();
let prior = std::env::var("XDG_RUNTIME_DIR").ok(); let prior = std::env::var("XDG_RUNTIME_DIR").ok();
@ -1013,6 +1043,7 @@ mod tests {
#[test] #[test]
fn load_session_no_file_returns_none() { fn load_session_no_file_returns_none() {
let _env = env_lock().blocking_lock();
let tmp = std::env::temp_dir().join(format!("weft_session_missing_{}", std::process::id())); let tmp = std::env::temp_dir().join(format!("weft_session_missing_{}", std::process::id()));
let prior = std::env::var("XDG_RUNTIME_DIR").ok(); let prior = std::env::var("XDG_RUNTIME_DIR").ok();
unsafe { std::env::set_var("XDG_RUNTIME_DIR", &tmp) }; unsafe { std::env::set_var("XDG_RUNTIME_DIR", &tmp) };