WEFT_OS/crates/weft-appd/src/main.rs

802 lines
25 KiB
Rust
Raw Normal View History

use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Context;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
mod compositor_client;
mod ipc;
mod mount;
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
mod runtime;
feat(appd): add WebSocket UI endpoint for Servo shell integration Implements the weft-appd WebSocket server that allows the system-ui.html page running inside Servo to send requests and receive push notifications without requiring custom SpiderMonkey bindings. ws.rs — WebSocket connection handler: - Accepts a tokio TcpStream, performs WebSocket handshake via tokio-tungstenite accept_async. - Reads JSON Text frames, deserializes as Request (serde_json), calls dispatch(), sends Response as JSON Text. - Subscribes to a broadcast::Receiver<Response> for server-push notifications (APP_READY, etc.); forwards to client via select!. - Handles close frames, partial errors, and lagged broadcast gracefully. main.rs — server changes: - broadcast::channel(16) created at startup; WebSocket handlers subscribe for push delivery. - TcpListener bound on 127.0.0.1:7410 (default) or WEFT_APPD_WS_PORT. - ws_port() / write_ws_port(): port written to XDG_RUNTIME_DIR/weft/appd.wsport for runtime discovery. - WS accept branch added to the main select! loop alongside Unix socket. ipc.rs — Response and AppStateKind now derive Clone (required by broadcast::Sender<Response>). system-ui.html — appd WebSocket client: - appdConnect(): opens ws://127.0.0.1:<port>/appd with exponential backoff reconnect (1s → 16s max). - On open: sends QUERY_RUNNING to populate taskbar with live sessions. - handleAppdMessage(): maps LAUNCH_ACK and RUNNING_APPS to taskbar entries; APP_READY shows a timed notification; APP_STATE::stopped removes the taskbar entry. - WEFT_APPD_WS_PORT window global overrides the default port. New deps: tokio-tungstenite 0.24, futures-util 0.3 (sink+std), serde_json 1.
2026-03-11 08:01:54 +00:00
mod ws;
use ipc::{AppInfo, AppStateKind, Request, Response, SessionInfo};
feat(appd): add WebSocket UI endpoint for Servo shell integration Implements the weft-appd WebSocket server that allows the system-ui.html page running inside Servo to send requests and receive push notifications without requiring custom SpiderMonkey bindings. ws.rs — WebSocket connection handler: - Accepts a tokio TcpStream, performs WebSocket handshake via tokio-tungstenite accept_async. - Reads JSON Text frames, deserializes as Request (serde_json), calls dispatch(), sends Response as JSON Text. - Subscribes to a broadcast::Receiver<Response> for server-push notifications (APP_READY, etc.); forwards to client via select!. - Handles close frames, partial errors, and lagged broadcast gracefully. main.rs — server changes: - broadcast::channel(16) created at startup; WebSocket handlers subscribe for push delivery. - TcpListener bound on 127.0.0.1:7410 (default) or WEFT_APPD_WS_PORT. - ws_port() / write_ws_port(): port written to XDG_RUNTIME_DIR/weft/appd.wsport for runtime discovery. - WS accept branch added to the main select! loop alongside Unix socket. ipc.rs — Response and AppStateKind now derive Clone (required by broadcast::Sender<Response>). system-ui.html — appd WebSocket client: - appdConnect(): opens ws://127.0.0.1:<port>/appd with exponential backoff reconnect (1s → 16s max). - On open: sends QUERY_RUNNING to populate taskbar with live sessions. - handleAppdMessage(): maps LAUNCH_ACK and RUNNING_APPS to taskbar entries; APP_READY shows a timed notification; APP_STATE::stopped removes the taskbar entry. - WEFT_APPD_WS_PORT window global overrides the default port. New deps: tokio-tungstenite 0.24, futures-util 0.3 (sink+std), serde_json 1.
2026-03-11 08:01:54 +00:00
pub(crate) type Registry = Arc<Mutex<SessionRegistry>>;
struct SessionEntry {
app_id: String,
state: AppStateKind,
}
struct SessionRegistry {
next_id: u64,
sessions: std::collections::HashMap<u64, SessionEntry>,
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
broadcast: tokio::sync::broadcast::Sender<Response>,
abort_senders: std::collections::HashMap<u64, tokio::sync::oneshot::Sender<()>>,
compositor_tx: Option<compositor_client::CompositorSender>,
ipc_socket: Option<PathBuf>,
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
}
impl Default for SessionRegistry {
fn default() -> Self {
let (broadcast, _) = tokio::sync::broadcast::channel(16);
Self {
next_id: 0,
sessions: std::collections::HashMap::new(),
broadcast,
abort_senders: std::collections::HashMap::new(),
compositor_tx: None,
ipc_socket: None,
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
}
}
}
impl SessionRegistry {
fn launch(&mut self, app_id: &str) -> u64 {
self.next_id += 1;
let id = self.next_id;
self.sessions.insert(
id,
SessionEntry {
app_id: app_id.to_owned(),
state: AppStateKind::Starting,
},
);
id
}
fn terminate(&mut self, session_id: u64) -> bool {
let found = self.sessions.remove(&session_id).is_some();
self.abort_senders.remove(&session_id);
found
}
pub(crate) fn register_abort(&mut self, session_id: u64) -> tokio::sync::oneshot::Receiver<()> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.abort_senders.insert(session_id, tx);
rx
}
fn running_sessions(&self) -> Vec<SessionInfo> {
self.sessions
.iter()
.filter(|(_, e)| !matches!(e.state, AppStateKind::Stopped))
.map(|(&session_id, e)| SessionInfo {
session_id,
app_id: e.app_id.clone(),
})
.collect()
}
fn state(&self, session_id: u64) -> AppStateKind {
self.sessions
.get(&session_id)
.map(|e| e.state.clone())
.unwrap_or(AppStateKind::NotFound)
}
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
pub(crate) fn set_state(&mut self, session_id: u64, state: AppStateKind) {
if let Some(entry) = self.sessions.get_mut(&session_id) {
entry.state = state;
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
}
}
pub(crate) fn subscribe(&self) -> tokio::sync::broadcast::Receiver<Response> {
self.broadcast.subscribe()
}
pub(crate) fn broadcast(&self) -> &tokio::sync::broadcast::Sender<Response> {
&self.broadcast
}
pub(crate) fn shutdown_all(&mut self) {
self.abort_senders.clear();
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
run().await
}
async fn run() -> anyhow::Result<()> {
let socket_path = appd_socket_path()?;
if let Some(parent) = socket_path.parent() {
std::fs::create_dir_all(parent).with_context(|| format!("create {}", parent.display()))?;
}
let _ = std::fs::remove_file(&socket_path);
let listener = tokio::net::UnixListener::bind(&socket_path)
.with_context(|| format!("bind {}", socket_path.display()))?;
tracing::info!(path = %socket_path.display(), "IPC socket listening");
let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default()));
registry.lock().await.ipc_socket = Some(socket_path.clone());
feat(appd): add WebSocket UI endpoint for Servo shell integration Implements the weft-appd WebSocket server that allows the system-ui.html page running inside Servo to send requests and receive push notifications without requiring custom SpiderMonkey bindings. ws.rs — WebSocket connection handler: - Accepts a tokio TcpStream, performs WebSocket handshake via tokio-tungstenite accept_async. - Reads JSON Text frames, deserializes as Request (serde_json), calls dispatch(), sends Response as JSON Text. - Subscribes to a broadcast::Receiver<Response> for server-push notifications (APP_READY, etc.); forwards to client via select!. - Handles close frames, partial errors, and lagged broadcast gracefully. main.rs — server changes: - broadcast::channel(16) created at startup; WebSocket handlers subscribe for push delivery. - TcpListener bound on 127.0.0.1:7410 (default) or WEFT_APPD_WS_PORT. - ws_port() / write_ws_port(): port written to XDG_RUNTIME_DIR/weft/appd.wsport for runtime discovery. - WS accept branch added to the main select! loop alongside Unix socket. ipc.rs — Response and AppStateKind now derive Clone (required by broadcast::Sender<Response>). system-ui.html — appd WebSocket client: - appdConnect(): opens ws://127.0.0.1:<port>/appd with exponential backoff reconnect (1s → 16s max). - On open: sends QUERY_RUNNING to populate taskbar with live sessions. - handleAppdMessage(): maps LAUNCH_ACK and RUNNING_APPS to taskbar entries; APP_READY shows a timed notification; APP_STATE::stopped removes the taskbar entry. - WEFT_APPD_WS_PORT window global overrides the default port. New deps: tokio-tungstenite 0.24, futures-util 0.3 (sink+std), serde_json 1.
2026-03-11 08:01:54 +00:00
if let Some(path) = compositor_client::socket_path() {
let tx = compositor_client::spawn(path);
registry.lock().await.compositor_tx = Some(tx);
}
feat(appd): add WebSocket UI endpoint for Servo shell integration Implements the weft-appd WebSocket server that allows the system-ui.html page running inside Servo to send requests and receive push notifications without requiring custom SpiderMonkey bindings. ws.rs — WebSocket connection handler: - Accepts a tokio TcpStream, performs WebSocket handshake via tokio-tungstenite accept_async. - Reads JSON Text frames, deserializes as Request (serde_json), calls dispatch(), sends Response as JSON Text. - Subscribes to a broadcast::Receiver<Response> for server-push notifications (APP_READY, etc.); forwards to client via select!. - Handles close frames, partial errors, and lagged broadcast gracefully. main.rs — server changes: - broadcast::channel(16) created at startup; WebSocket handlers subscribe for push delivery. - TcpListener bound on 127.0.0.1:7410 (default) or WEFT_APPD_WS_PORT. - ws_port() / write_ws_port(): port written to XDG_RUNTIME_DIR/weft/appd.wsport for runtime discovery. - WS accept branch added to the main select! loop alongside Unix socket. ipc.rs — Response and AppStateKind now derive Clone (required by broadcast::Sender<Response>). system-ui.html — appd WebSocket client: - appdConnect(): opens ws://127.0.0.1:<port>/appd with exponential backoff reconnect (1s → 16s max). - On open: sends QUERY_RUNNING to populate taskbar with live sessions. - handleAppdMessage(): maps LAUNCH_ACK and RUNNING_APPS to taskbar entries; APP_READY shows a timed notification; APP_STATE::stopped removes the taskbar entry. - WEFT_APPD_WS_PORT window global overrides the default port. New deps: tokio-tungstenite 0.24, futures-util 0.3 (sink+std), serde_json 1.
2026-03-11 08:01:54 +00:00
let ws_port = ws_port();
let ws_addr: std::net::SocketAddr = format!("127.0.0.1:{ws_port}").parse()?;
let ws_listener = tokio::net::TcpListener::bind(ws_addr)
.await
.with_context(|| format!("bind WebSocket {ws_addr}"))?;
let ws_bound_port = ws_listener.local_addr()?.port();
tracing::info!(port = ws_bound_port, "WebSocket listener ready");
if let Err(e) = write_ws_port(ws_bound_port) {
tracing::warn!(error = %e, "could not write appd.wsport; servo-shell port discovery will fall back to default");
}
feat(appd): add WebSocket UI endpoint for Servo shell integration Implements the weft-appd WebSocket server that allows the system-ui.html page running inside Servo to send requests and receive push notifications without requiring custom SpiderMonkey bindings. ws.rs — WebSocket connection handler: - Accepts a tokio TcpStream, performs WebSocket handshake via tokio-tungstenite accept_async. - Reads JSON Text frames, deserializes as Request (serde_json), calls dispatch(), sends Response as JSON Text. - Subscribes to a broadcast::Receiver<Response> for server-push notifications (APP_READY, etc.); forwards to client via select!. - Handles close frames, partial errors, and lagged broadcast gracefully. main.rs — server changes: - broadcast::channel(16) created at startup; WebSocket handlers subscribe for push delivery. - TcpListener bound on 127.0.0.1:7410 (default) or WEFT_APPD_WS_PORT. - ws_port() / write_ws_port(): port written to XDG_RUNTIME_DIR/weft/appd.wsport for runtime discovery. - WS accept branch added to the main select! loop alongside Unix socket. ipc.rs — Response and AppStateKind now derive Clone (required by broadcast::Sender<Response>). system-ui.html — appd WebSocket client: - appdConnect(): opens ws://127.0.0.1:<port>/appd with exponential backoff reconnect (1s → 16s max). - On open: sends QUERY_RUNNING to populate taskbar with live sessions. - handleAppdMessage(): maps LAUNCH_ACK and RUNNING_APPS to taskbar entries; APP_READY shows a timed notification; APP_STATE::stopped removes the taskbar entry. - WEFT_APPD_WS_PORT window global overrides the default port. New deps: tokio-tungstenite 0.24, futures-util 0.3 (sink+std), serde_json 1.
2026-03-11 08:01:54 +00:00
let _ = sd_notify::notify(false, &[sd_notify::NotifyState::Ready]);
#[cfg(unix)]
let mut sigterm = {
use tokio::signal::unix::{SignalKind, signal};
signal(SignalKind::terminate()).context("SIGTERM handler")?
};
loop {
#[cfg(unix)]
let term = sigterm.recv();
#[cfg(not(unix))]
let term = std::future::pending::<Option<()>>();
tokio::select! {
result = listener.accept() => {
let (stream, _) = result.context("accept")?;
let reg = Arc::clone(&registry);
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, reg).await {
feat(appd): add WebSocket UI endpoint for Servo shell integration Implements the weft-appd WebSocket server that allows the system-ui.html page running inside Servo to send requests and receive push notifications without requiring custom SpiderMonkey bindings. ws.rs — WebSocket connection handler: - Accepts a tokio TcpStream, performs WebSocket handshake via tokio-tungstenite accept_async. - Reads JSON Text frames, deserializes as Request (serde_json), calls dispatch(), sends Response as JSON Text. - Subscribes to a broadcast::Receiver<Response> for server-push notifications (APP_READY, etc.); forwards to client via select!. - Handles close frames, partial errors, and lagged broadcast gracefully. main.rs — server changes: - broadcast::channel(16) created at startup; WebSocket handlers subscribe for push delivery. - TcpListener bound on 127.0.0.1:7410 (default) or WEFT_APPD_WS_PORT. - ws_port() / write_ws_port(): port written to XDG_RUNTIME_DIR/weft/appd.wsport for runtime discovery. - WS accept branch added to the main select! loop alongside Unix socket. ipc.rs — Response and AppStateKind now derive Clone (required by broadcast::Sender<Response>). system-ui.html — appd WebSocket client: - appdConnect(): opens ws://127.0.0.1:<port>/appd with exponential backoff reconnect (1s → 16s max). - On open: sends QUERY_RUNNING to populate taskbar with live sessions. - handleAppdMessage(): maps LAUNCH_ACK and RUNNING_APPS to taskbar entries; APP_READY shows a timed notification; APP_STATE::stopped removes the taskbar entry. - WEFT_APPD_WS_PORT window global overrides the default port. New deps: tokio-tungstenite 0.24, futures-util 0.3 (sink+std), serde_json 1.
2026-03-11 08:01:54 +00:00
tracing::warn!(error = %e, "unix connection error");
}
});
}
result = ws_listener.accept() => {
let (stream, _) = result.context("ws accept")?;
let reg = Arc::clone(&registry);
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
let rx = registry.lock().await.subscribe();
feat(appd): add WebSocket UI endpoint for Servo shell integration Implements the weft-appd WebSocket server that allows the system-ui.html page running inside Servo to send requests and receive push notifications without requiring custom SpiderMonkey bindings. ws.rs — WebSocket connection handler: - Accepts a tokio TcpStream, performs WebSocket handshake via tokio-tungstenite accept_async. - Reads JSON Text frames, deserializes as Request (serde_json), calls dispatch(), sends Response as JSON Text. - Subscribes to a broadcast::Receiver<Response> for server-push notifications (APP_READY, etc.); forwards to client via select!. - Handles close frames, partial errors, and lagged broadcast gracefully. main.rs — server changes: - broadcast::channel(16) created at startup; WebSocket handlers subscribe for push delivery. - TcpListener bound on 127.0.0.1:7410 (default) or WEFT_APPD_WS_PORT. - ws_port() / write_ws_port(): port written to XDG_RUNTIME_DIR/weft/appd.wsport for runtime discovery. - WS accept branch added to the main select! loop alongside Unix socket. ipc.rs — Response and AppStateKind now derive Clone (required by broadcast::Sender<Response>). system-ui.html — appd WebSocket client: - appdConnect(): opens ws://127.0.0.1:<port>/appd with exponential backoff reconnect (1s → 16s max). - On open: sends QUERY_RUNNING to populate taskbar with live sessions. - handleAppdMessage(): maps LAUNCH_ACK and RUNNING_APPS to taskbar entries; APP_READY shows a timed notification; APP_STATE::stopped removes the taskbar entry. - WEFT_APPD_WS_PORT window global overrides the default port. New deps: tokio-tungstenite 0.24, futures-util 0.3 (sink+std), serde_json 1.
2026-03-11 08:01:54 +00:00
tokio::spawn(async move {
if let Err(e) = ws::handle_ws_connection(stream, reg, rx).await {
tracing::warn!(error = %e, "ws connection error");
}
});
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("SIGINT received; shutting down");
break;
}
_ = term => {
tracing::info!("SIGTERM received; shutting down");
break;
}
}
}
registry.lock().await.shutdown_all();
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let _ = std::fs::remove_file(&socket_path);
Ok(())
}
feat(appd): add WebSocket UI endpoint for Servo shell integration Implements the weft-appd WebSocket server that allows the system-ui.html page running inside Servo to send requests and receive push notifications without requiring custom SpiderMonkey bindings. ws.rs — WebSocket connection handler: - Accepts a tokio TcpStream, performs WebSocket handshake via tokio-tungstenite accept_async. - Reads JSON Text frames, deserializes as Request (serde_json), calls dispatch(), sends Response as JSON Text. - Subscribes to a broadcast::Receiver<Response> for server-push notifications (APP_READY, etc.); forwards to client via select!. - Handles close frames, partial errors, and lagged broadcast gracefully. main.rs — server changes: - broadcast::channel(16) created at startup; WebSocket handlers subscribe for push delivery. - TcpListener bound on 127.0.0.1:7410 (default) or WEFT_APPD_WS_PORT. - ws_port() / write_ws_port(): port written to XDG_RUNTIME_DIR/weft/appd.wsport for runtime discovery. - WS accept branch added to the main select! loop alongside Unix socket. ipc.rs — Response and AppStateKind now derive Clone (required by broadcast::Sender<Response>). system-ui.html — appd WebSocket client: - appdConnect(): opens ws://127.0.0.1:<port>/appd with exponential backoff reconnect (1s → 16s max). - On open: sends QUERY_RUNNING to populate taskbar with live sessions. - handleAppdMessage(): maps LAUNCH_ACK and RUNNING_APPS to taskbar entries; APP_READY shows a timed notification; APP_STATE::stopped removes the taskbar entry. - WEFT_APPD_WS_PORT window global overrides the default port. New deps: tokio-tungstenite 0.24, futures-util 0.3 (sink+std), serde_json 1.
2026-03-11 08:01:54 +00:00
fn ws_port() -> u16 {
std::env::var("WEFT_APPD_WS_PORT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(7410)
}
fn write_ws_port(port: u16) -> anyhow::Result<()> {
let runtime_dir = std::env::var("XDG_RUNTIME_DIR").context("XDG_RUNTIME_DIR not set")?;
let path = PathBuf::from(runtime_dir).join("weft/appd.wsport");
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).with_context(|| format!("create {}", parent.display()))?;
feat(appd): add WebSocket UI endpoint for Servo shell integration Implements the weft-appd WebSocket server that allows the system-ui.html page running inside Servo to send requests and receive push notifications without requiring custom SpiderMonkey bindings. ws.rs — WebSocket connection handler: - Accepts a tokio TcpStream, performs WebSocket handshake via tokio-tungstenite accept_async. - Reads JSON Text frames, deserializes as Request (serde_json), calls dispatch(), sends Response as JSON Text. - Subscribes to a broadcast::Receiver<Response> for server-push notifications (APP_READY, etc.); forwards to client via select!. - Handles close frames, partial errors, and lagged broadcast gracefully. main.rs — server changes: - broadcast::channel(16) created at startup; WebSocket handlers subscribe for push delivery. - TcpListener bound on 127.0.0.1:7410 (default) or WEFT_APPD_WS_PORT. - ws_port() / write_ws_port(): port written to XDG_RUNTIME_DIR/weft/appd.wsport for runtime discovery. - WS accept branch added to the main select! loop alongside Unix socket. ipc.rs — Response and AppStateKind now derive Clone (required by broadcast::Sender<Response>). system-ui.html — appd WebSocket client: - appdConnect(): opens ws://127.0.0.1:<port>/appd with exponential backoff reconnect (1s → 16s max). - On open: sends QUERY_RUNNING to populate taskbar with live sessions. - handleAppdMessage(): maps LAUNCH_ACK and RUNNING_APPS to taskbar entries; APP_READY shows a timed notification; APP_STATE::stopped removes the taskbar entry. - WEFT_APPD_WS_PORT window global overrides the default port. New deps: tokio-tungstenite 0.24, futures-util 0.3 (sink+std), serde_json 1.
2026-03-11 08:01:54 +00:00
}
std::fs::write(&path, port.to_string()).with_context(|| format!("write {}", path.display()))?;
feat(appd): add WebSocket UI endpoint for Servo shell integration Implements the weft-appd WebSocket server that allows the system-ui.html page running inside Servo to send requests and receive push notifications without requiring custom SpiderMonkey bindings. ws.rs — WebSocket connection handler: - Accepts a tokio TcpStream, performs WebSocket handshake via tokio-tungstenite accept_async. - Reads JSON Text frames, deserializes as Request (serde_json), calls dispatch(), sends Response as JSON Text. - Subscribes to a broadcast::Receiver<Response> for server-push notifications (APP_READY, etc.); forwards to client via select!. - Handles close frames, partial errors, and lagged broadcast gracefully. main.rs — server changes: - broadcast::channel(16) created at startup; WebSocket handlers subscribe for push delivery. - TcpListener bound on 127.0.0.1:7410 (default) or WEFT_APPD_WS_PORT. - ws_port() / write_ws_port(): port written to XDG_RUNTIME_DIR/weft/appd.wsport for runtime discovery. - WS accept branch added to the main select! loop alongside Unix socket. ipc.rs — Response and AppStateKind now derive Clone (required by broadcast::Sender<Response>). system-ui.html — appd WebSocket client: - appdConnect(): opens ws://127.0.0.1:<port>/appd with exponential backoff reconnect (1s → 16s max). - On open: sends QUERY_RUNNING to populate taskbar with live sessions. - handleAppdMessage(): maps LAUNCH_ACK and RUNNING_APPS to taskbar entries; APP_READY shows a timed notification; APP_STATE::stopped removes the taskbar entry. - WEFT_APPD_WS_PORT window global overrides the default port. New deps: tokio-tungstenite 0.24, futures-util 0.3 (sink+std), serde_json 1.
2026-03-11 08:01:54 +00:00
Ok(())
}
async fn handle_connection(
stream: tokio::net::UnixStream,
registry: Registry,
) -> anyhow::Result<()> {
let (reader, writer) = tokio::io::split(stream);
let mut reader = tokio::io::BufReader::new(reader);
let mut writer = tokio::io::BufWriter::new(writer);
while let Some(req) = ipc::read_frame(&mut reader).await? {
tracing::debug!(?req, "request");
let resp = dispatch(req, &registry).await;
ipc::write_frame(&mut writer, &resp).await?;
writer.flush().await?;
}
Ok(())
}
feat(appd): add WebSocket UI endpoint for Servo shell integration Implements the weft-appd WebSocket server that allows the system-ui.html page running inside Servo to send requests and receive push notifications without requiring custom SpiderMonkey bindings. ws.rs — WebSocket connection handler: - Accepts a tokio TcpStream, performs WebSocket handshake via tokio-tungstenite accept_async. - Reads JSON Text frames, deserializes as Request (serde_json), calls dispatch(), sends Response as JSON Text. - Subscribes to a broadcast::Receiver<Response> for server-push notifications (APP_READY, etc.); forwards to client via select!. - Handles close frames, partial errors, and lagged broadcast gracefully. main.rs — server changes: - broadcast::channel(16) created at startup; WebSocket handlers subscribe for push delivery. - TcpListener bound on 127.0.0.1:7410 (default) or WEFT_APPD_WS_PORT. - ws_port() / write_ws_port(): port written to XDG_RUNTIME_DIR/weft/appd.wsport for runtime discovery. - WS accept branch added to the main select! loop alongside Unix socket. ipc.rs — Response and AppStateKind now derive Clone (required by broadcast::Sender<Response>). system-ui.html — appd WebSocket client: - appdConnect(): opens ws://127.0.0.1:<port>/appd with exponential backoff reconnect (1s → 16s max). - On open: sends QUERY_RUNNING to populate taskbar with live sessions. - handleAppdMessage(): maps LAUNCH_ACK and RUNNING_APPS to taskbar entries; APP_READY shows a timed notification; APP_STATE::stopped removes the taskbar entry. - WEFT_APPD_WS_PORT window global overrides the default port. New deps: tokio-tungstenite 0.24, futures-util 0.3 (sink+std), serde_json 1.
2026-03-11 08:01:54 +00:00
pub(crate) async fn dispatch(req: Request, registry: &Registry) -> Response {
match req {
Request::LaunchApp {
app_id,
surface_id: _,
} => {
let session_id = registry.lock().await.launch(&app_id);
tracing::info!(session_id, %app_id, "launched");
let abort_rx = registry.lock().await.register_abort(session_id);
let compositor_tx = registry.lock().await.compositor_tx.clone();
let ipc_socket = registry.lock().await.ipc_socket.clone();
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
let reg = Arc::clone(registry);
let aid = app_id.clone();
tokio::spawn(async move {
if let Err(e) =
runtime::supervise(session_id, &aid, reg, abort_rx, compositor_tx, ipc_socket)
.await
{
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
tracing::warn!(session_id, error = %e, "runtime supervisor error");
}
});
Response::LaunchAck { session_id, app_id }
}
Request::TerminateApp { session_id } => {
let found = registry.lock().await.terminate(session_id);
if found {
tracing::info!(session_id, "terminated");
Response::AppState {
session_id,
state: AppStateKind::Stopped,
}
} else {
Response::Error {
code: 1,
message: format!("session {session_id} not found"),
}
}
}
Request::QueryRunning => {
let sessions = registry.lock().await.running_sessions();
Response::RunningApps { sessions }
}
Request::QueryAppState { session_id } => {
let state = registry.lock().await.state(session_id);
Response::AppState { session_id, state }
}
Request::QueryInstalledApps => {
let apps = scan_installed_apps();
Response::InstalledApps { apps }
}
}
}
pub(crate) fn app_store_roots() -> Vec<std::path::PathBuf> {
if let Ok(explicit) = std::env::var("WEFT_APP_STORE") {
return vec![std::path::PathBuf::from(explicit)];
}
let mut roots = Vec::new();
if let Ok(home) = std::env::var("HOME") {
roots.push(
std::path::PathBuf::from(home)
.join(".local")
.join("share")
.join("weft")
.join("apps"),
);
}
roots.push(std::path::PathBuf::from("/usr/share/weft/apps"));
roots
}
#[derive(serde::Deserialize)]
struct WappPackage {
id: String,
name: String,
version: String,
}
#[derive(serde::Deserialize)]
struct WappManifest {
package: WappPackage,
}
fn scan_installed_apps() -> Vec<AppInfo> {
let mut seen = std::collections::HashSet::new();
let mut apps = Vec::new();
for root in app_store_roots() {
let Ok(entries) = std::fs::read_dir(&root) else {
continue;
};
for entry in entries.flatten() {
let manifest_path = entry.path().join("wapp.toml");
let Ok(contents) = std::fs::read_to_string(&manifest_path) else {
continue;
};
let Ok(m) = toml::from_str::<WappManifest>(&contents) else {
continue;
};
if seen.insert(m.package.id.clone()) {
apps.push(AppInfo {
app_id: m.package.id,
name: m.package.name,
version: m.package.version,
});
}
}
}
apps
}
fn appd_socket_path() -> anyhow::Result<PathBuf> {
if let Ok(p) = std::env::var("WEFT_APPD_SOCKET") {
return Ok(PathBuf::from(p));
}
let runtime_dir = std::env::var("XDG_RUNTIME_DIR").context("XDG_RUNTIME_DIR not set")?;
Ok(PathBuf::from(runtime_dir).join("weft/appd.sock"))
}
#[cfg(test)]
mod tests {
use super::*;
use ipc::AppStateKind;
fn make_registry() -> Registry {
Arc::new(Mutex::new(SessionRegistry::default()))
}
#[test]
fn ws_port_defaults_to_7410() {
let prior = std::env::var("WEFT_APPD_WS_PORT").ok();
unsafe { std::env::remove_var("WEFT_APPD_WS_PORT") };
let port = ws_port();
unsafe {
match prior {
Some(v) => std::env::set_var("WEFT_APPD_WS_PORT", v),
None => {}
}
}
assert_eq!(port, 7410);
}
#[test]
fn ws_port_uses_env_override() {
let prior = std::env::var("WEFT_APPD_WS_PORT").ok();
unsafe { std::env::set_var("WEFT_APPD_WS_PORT", "9000") };
let port = ws_port();
unsafe {
match prior {
Some(v) => std::env::set_var("WEFT_APPD_WS_PORT", v),
None => std::env::remove_var("WEFT_APPD_WS_PORT"),
}
}
assert_eq!(port, 9000);
}
#[test]
fn appd_socket_path_uses_override_env() {
let prior = std::env::var("WEFT_APPD_SOCKET").ok();
unsafe { std::env::set_var("WEFT_APPD_SOCKET", "/tmp/custom.sock") };
let path = appd_socket_path().unwrap();
unsafe {
match prior {
Some(v) => std::env::set_var("WEFT_APPD_SOCKET", v),
None => std::env::remove_var("WEFT_APPD_SOCKET"),
}
}
assert_eq!(path, PathBuf::from("/tmp/custom.sock"));
}
#[test]
fn appd_socket_path_errors_without_xdg_and_no_override() {
let prior_sock = std::env::var("WEFT_APPD_SOCKET").ok();
let prior_xdg = std::env::var("XDG_RUNTIME_DIR").ok();
unsafe {
std::env::remove_var("WEFT_APPD_SOCKET");
std::env::remove_var("XDG_RUNTIME_DIR");
}
let result = appd_socket_path();
unsafe {
match prior_sock {
Some(v) => std::env::set_var("WEFT_APPD_SOCKET", v),
None => {}
}
match prior_xdg {
Some(v) => std::env::set_var("XDG_RUNTIME_DIR", v),
None => {}
}
}
assert!(result.is_err());
}
#[tokio::test]
async fn dispatch_launch_returns_ack() {
let reg = make_registry();
let resp = dispatch(
Request::LaunchApp {
app_id: "com.test.app".into(),
surface_id: 0,
},
&reg,
)
.await;
match resp {
Response::LaunchAck {
session_id,
ref app_id,
} => {
assert!(session_id > 0);
assert_eq!(app_id, "com.test.app");
}
_ => panic!("expected LaunchAck"),
}
}
#[tokio::test]
async fn dispatch_terminate_known_returns_stopped() {
let reg = make_registry();
let ack = dispatch(
Request::LaunchApp {
app_id: "app".into(),
surface_id: 0,
},
&reg,
)
.await;
let session_id = match ack {
Response::LaunchAck { session_id, .. } => session_id,
_ => panic!("expected LaunchAck"),
};
let resp = dispatch(Request::TerminateApp { session_id }, &reg).await;
assert!(matches!(
resp,
Response::AppState {
state: AppStateKind::Stopped,
..
}
));
}
#[tokio::test]
async fn dispatch_terminate_unknown_returns_error() {
let reg = make_registry();
let resp = dispatch(Request::TerminateApp { session_id: 999 }, &reg).await;
assert!(matches!(resp, Response::Error { .. }));
}
#[tokio::test]
async fn running_sessions_excludes_stopped() {
let reg = make_registry();
let session_id = reg.lock().await.launch("com.test.app");
reg.lock()
.await
.set_state(session_id, AppStateKind::Stopped);
let sessions = reg.lock().await.running_sessions();
assert!(sessions.is_empty());
}
#[tokio::test]
async fn dispatch_query_running_lists_active_sessions() {
let reg = make_registry();
dispatch(
Request::LaunchApp {
app_id: "a".into(),
surface_id: 0,
},
&reg,
)
.await;
dispatch(
Request::LaunchApp {
app_id: "b".into(),
surface_id: 0,
},
&reg,
)
.await;
let resp = dispatch(Request::QueryRunning, &reg).await;
match resp {
Response::RunningApps { sessions } => {
assert_eq!(sessions.len(), 2);
let mut ids: Vec<&str> = sessions.iter().map(|s| s.app_id.as_str()).collect();
ids.sort();
assert_eq!(ids, vec!["a", "b"]);
}
_ => panic!("expected RunningApps"),
}
}
#[tokio::test]
async fn dispatch_query_app_state_returns_starting() {
let reg = make_registry();
let ack = dispatch(
Request::LaunchApp {
app_id: "app".into(),
surface_id: 0,
},
&reg,
)
.await;
let session_id = match ack {
Response::LaunchAck { session_id, .. } => session_id,
_ => panic!(),
};
let resp = dispatch(Request::QueryAppState { session_id }, &reg).await;
assert!(matches!(
resp,
Response::AppState {
state: AppStateKind::Starting,
..
}
));
}
#[tokio::test]
async fn dispatch_query_app_state_unknown_returns_not_found() {
let reg = make_registry();
let resp = dispatch(Request::QueryAppState { session_id: 9999 }, &reg).await;
assert!(matches!(
resp,
Response::AppState {
state: AppStateKind::NotFound,
..
}
));
}
#[test]
fn registry_launch_increments_id() {
let mut reg = SessionRegistry::default();
let id1 = reg.launch("com.example.a");
let id2 = reg.launch("com.example.b");
assert!(id2 > id1);
}
#[test]
fn registry_terminate_known_session() {
let mut reg = SessionRegistry::default();
let id = reg.launch("com.example.app");
assert!(reg.terminate(id));
assert!(matches!(reg.state(id), AppStateKind::NotFound));
}
#[test]
fn registry_terminate_unknown_returns_false() {
let mut reg = SessionRegistry::default();
assert!(!reg.terminate(999));
}
#[test]
fn registry_running_sessions_reflects_live_sessions() {
let mut reg = SessionRegistry::default();
let id1 = reg.launch("com.example.a");
let id2 = reg.launch("com.example.b");
let mut sessions = reg.running_sessions();
sessions.sort_by_key(|s| s.session_id);
assert_eq!(sessions.len(), 2);
assert_eq!(sessions[0].session_id, id1);
assert_eq!(sessions[0].app_id, "com.example.a");
assert_eq!(sessions[1].session_id, id2);
assert_eq!(sessions[1].app_id, "com.example.b");
reg.terminate(id1);
assert_eq!(reg.running_sessions().len(), 1);
assert_eq!(reg.running_sessions()[0].session_id, id2);
}
#[test]
fn registry_state_not_found_for_unknown() {
let reg = SessionRegistry::default();
assert!(matches!(reg.state(42), AppStateKind::NotFound));
}
#[tokio::test]
async fn dispatch_query_installed_returns_installed_apps() {
let reg = make_registry();
let resp = dispatch(Request::QueryInstalledApps, &reg).await;
assert!(matches!(resp, Response::InstalledApps { .. }));
}
#[test]
fn scan_installed_apps_finds_valid_packages() {
use std::fs;
let store = std::env::temp_dir().join(format!("weft_appd_scan_{}", std::process::id()));
let app_dir = store.join("com.example.scanner");
fs::create_dir_all(&app_dir).unwrap();
fs::write(
app_dir.join("wapp.toml"),
"[package]\nid = \"com.example.scanner\"\nname = \"Scanner\"\nversion = \"1.0.0\"\n\
[runtime]\nmodule = \"app.wasm\"\n[ui]\nentry = \"ui/index.html\"\n",
)
.unwrap();
let prior = std::env::var("WEFT_APP_STORE").ok();
unsafe { std::env::set_var("WEFT_APP_STORE", &store) };
let apps = scan_installed_apps();
assert_eq!(apps.len(), 1);
assert_eq!(apps[0].app_id, "com.example.scanner");
assert_eq!(apps[0].name, "Scanner");
assert_eq!(apps[0].version, "1.0.0");
unsafe {
match prior {
Some(v) => std::env::set_var("WEFT_APP_STORE", v),
None => std::env::remove_var("WEFT_APP_STORE"),
}
}
let _ = fs::remove_dir_all(&store);
}
#[cfg(unix)]
#[tokio::test(flavor = "current_thread")]
async fn supervisor_transitions_through_ready_to_stopped() {
use std::os::unix::fs::PermissionsExt;
let script =
std::env::temp_dir().join(format!("weft_test_runtime_{}.sh", std::process::id()));
std::fs::write(&script, "#!/bin/sh\necho READY\n").unwrap();
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
let prior = std::env::var("WEFT_RUNTIME_BIN").ok();
// SAFETY: single-threaded test (flavor = "current_thread"); no concurrent env access.
unsafe { std::env::set_var("WEFT_RUNTIME_BIN", &script) };
let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default()));
let mut rx = registry.lock().await.subscribe();
let session_id = registry.lock().await.launch("test.app");
let abort_rx = registry.lock().await.register_abort(session_id);
runtime::supervise(
session_id,
"test.app",
Arc::clone(&registry),
abort_rx,
None,
None,
)
.await
.unwrap();
assert!(matches!(
registry.lock().await.state(session_id),
AppStateKind::Stopped
));
let notification = rx.try_recv();
assert!(matches!(
notification,
Ok(Response::AppReady { session_id: sid, .. }) if sid == session_id
));
let stopped = rx.try_recv();
assert!(matches!(
stopped,
Ok(Response::AppState { session_id: sid, state: AppStateKind::Stopped }) if sid == session_id
));
let _ = std::fs::remove_file(&script);
// SAFETY: single-threaded test; restoring env to prior state.
unsafe {
match prior {
Some(v) => std::env::set_var("WEFT_RUNTIME_BIN", v),
None => std::env::remove_var("WEFT_RUNTIME_BIN"),
}
}
}
#[cfg(unix)]
#[tokio::test(flavor = "current_thread")]
async fn supervisor_abort_during_startup_broadcasts_stopped() {
use std::os::unix::fs::PermissionsExt;
let script =
std::env::temp_dir().join(format!("weft_test_sleep_{}.sh", std::process::id()));
std::fs::write(&script, "#!/bin/sh\nsleep 60\n").unwrap();
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
let prior = std::env::var("WEFT_RUNTIME_BIN").ok();
unsafe { std::env::set_var("WEFT_RUNTIME_BIN", &script) };
let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default()));
let mut rx = registry.lock().await.subscribe();
let session_id = registry.lock().await.launch("test.abort.startup");
let abort_rx = registry.lock().await.register_abort(session_id);
registry.lock().await.terminate(session_id);
runtime::supervise(
session_id,
"test.abort.startup",
Arc::clone(&registry),
abort_rx,
None,
None,
)
.await
.unwrap();
assert!(matches!(
registry.lock().await.state(session_id),
AppStateKind::NotFound
));
let broadcast = rx.try_recv();
assert!(matches!(
broadcast,
Ok(Response::AppState { session_id: sid, state: AppStateKind::Stopped }) if sid == session_id
));
let _ = std::fs::remove_file(&script);
unsafe {
match prior {
Some(v) => std::env::set_var("WEFT_RUNTIME_BIN", v),
None => std::env::remove_var("WEFT_RUNTIME_BIN"),
}
}
}
#[cfg(unix)]
#[tokio::test(flavor = "current_thread")]
async fn supervisor_spawn_failure_broadcasts_stopped() {
let prior = std::env::var("WEFT_RUNTIME_BIN").ok();
unsafe {
std::env::set_var(
"WEFT_RUNTIME_BIN",
"/nonexistent/path/to/weft-runtime-does-not-exist",
)
};
let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default()));
let mut rx = registry.lock().await.subscribe();
let session_id = registry.lock().await.launch("test.spawn.fail");
let abort_rx = registry.lock().await.register_abort(session_id);
runtime::supervise(
session_id,
"test.spawn.fail",
Arc::clone(&registry),
abort_rx,
None,
None,
)
.await
.unwrap();
assert!(matches!(
registry.lock().await.state(session_id),
AppStateKind::Stopped
));
let broadcast = rx.try_recv();
assert!(matches!(
broadcast,
Ok(Response::AppState { session_id: sid, state: AppStateKind::Stopped }) if sid == session_id
));
unsafe {
match prior {
Some(v) => std::env::set_var("WEFT_RUNTIME_BIN", v),
None => std::env::remove_var("WEFT_RUNTIME_BIN"),
}
}
}
}