WEFT_OS/crates/weft-appd/src/runtime.rs

285 lines
8.6 KiB
Rust
Raw Normal View History

use std::path::PathBuf;
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, BufReader};
use weft_ipc_types::AppdToCompositor;
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
use crate::Registry;
use crate::compositor_client::CompositorSender;
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
use crate::ipc::{AppStateKind, Response};
const READY_TIMEOUT: Duration = Duration::from_secs(30);
fn systemd_cgroup_available() -> bool {
if std::env::var("WEFT_DISABLE_CGROUP").is_ok() {
return false;
}
let Ok(runtime_dir) = std::env::var("XDG_RUNTIME_DIR") else {
return false;
};
std::path::Path::new(&runtime_dir)
.join("systemd/private")
.exists()
}
fn resolve_preopens(app_id: &str) -> Vec<(String, String)> {
#[derive(serde::Deserialize)]
struct Pkg {
capabilities: Option<Vec<String>>,
}
#[derive(serde::Deserialize)]
struct M {
package: Pkg,
}
let pkg_dir = crate::app_store_roots().into_iter().find_map(|root| {
let dir = root.join(app_id);
if dir.join("wapp.toml").exists() {
Some(dir)
} else {
None
}
});
let caps = match pkg_dir {
None => return Vec::new(),
Some(dir) => {
let Ok(text) = std::fs::read_to_string(dir.join("wapp.toml")) else {
return Vec::new();
};
match toml::from_str::<M>(&text) {
Ok(m) => m.package.capabilities.unwrap_or_default(),
Err(_) => return Vec::new(),
}
}
};
let home = match std::env::var("HOME") {
Ok(h) => PathBuf::from(h),
Err(_) => return Vec::new(),
};
let mut preopens = Vec::new();
for cap in &caps {
match cap.as_str() {
"fs:rw:app-data" | "fs:read:app-data" => {
let data_dir = home
.join(".local/share/weft/apps")
.join(app_id)
.join("data");
let _ = std::fs::create_dir_all(&data_dir);
preopens.push((data_dir.to_string_lossy().into_owned(), "/data".to_string()));
}
"fs:rw:xdg-documents" | "fs:read:xdg-documents" => {
let docs = home.join("Documents");
if docs.exists() {
preopens.push((
docs.to_string_lossy().into_owned(),
"/xdg/documents".to_string(),
));
}
}
other => {
tracing::debug!(capability = other, "not mapped to preopen; skipped");
}
}
}
preopens
}
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
pub(crate) async fn supervise(
session_id: u64,
app_id: &str,
registry: Registry,
abort_rx: tokio::sync::oneshot::Receiver<()>,
compositor_tx: Option<CompositorSender>,
ipc_socket_path: Option<std::path::PathBuf>,
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
) -> anyhow::Result<()> {
let mut abort_rx = abort_rx;
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
let bin = match std::env::var("WEFT_RUNTIME_BIN") {
Ok(b) => b,
Err(_) => {
tracing::debug!(session_id, %app_id, "WEFT_RUNTIME_BIN not set; skipping process spawn");
return Ok(());
}
};
let (mount_orch, store_override) =
crate::mount::MountOrchestrator::mount_if_needed(app_id, session_id);
let mut cmd = if systemd_cgroup_available() {
let mut c = tokio::process::Command::new("systemd-run");
c.args([
"--user",
"--scope",
"--wait",
"--collect",
"--slice=weft-apps.slice",
"-p",
"CPUQuota=200%",
"-p",
"MemoryMax=512M",
"--",
&bin,
]);
c
} else {
tokio::process::Command::new(&bin)
};
cmd.arg(app_id)
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
.arg(session_id.to_string())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.stdin(std::process::Stdio::null());
if let Some(ref sock) = ipc_socket_path {
cmd.arg("--ipc-socket").arg(sock);
}
if let Some(ref root) = store_override {
cmd.env("WEFT_APP_STORE", root);
}
for (host, guest) in resolve_preopens(app_id) {
cmd.arg("--preopen").arg(format!("{host}::{guest}"));
}
let mut child = match cmd.spawn() {
Ok(c) => c,
Err(e) => {
tracing::warn!(session_id, %app_id, error = %e, "failed to spawn runtime; marking session stopped");
let mut reg = registry.lock().await;
reg.set_state(session_id, AppStateKind::Stopped);
let _ = reg.broadcast().send(Response::AppState {
session_id,
state: AppStateKind::Stopped,
});
return Ok(());
}
};
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
if let Some(tx) = &compositor_tx {
let pid = child.id().unwrap_or(0);
let _ = tx
.send(AppdToCompositor::AppSurfaceCreated {
app_id: app_id.to_owned(),
session_id,
pid,
})
.await;
}
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
let stdout = child.stdout.take().expect("stdout piped");
let stderr = child.stderr.take().expect("stderr piped");
let ready_result = tokio::select! {
r = tokio::time::timeout(READY_TIMEOUT, wait_for_ready(stdout)) => Some(r),
_ = &mut abort_rx => None,
};
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
match ready_result {
Some(Ok(Ok(remaining_stdout))) => {
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
registry
.lock()
.await
.set_state(session_id, AppStateKind::Running);
let _ = registry.lock().await.broadcast().send(Response::AppReady {
session_id,
app_id: app_id.to_owned(),
});
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
tracing::info!(session_id, %app_id, "app ready");
tokio::spawn(drain_stdout(remaining_stdout, session_id));
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
}
Some(Ok(Err(e))) => {
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
tracing::warn!(session_id, %app_id, error = %e, "stdout read error before READY");
}
Some(Err(_elapsed)) => {
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
tracing::warn!(session_id, %app_id, "READY timeout after 30s; killing process");
let _ = child.kill().await;
let mut reg = registry.lock().await;
reg.set_state(session_id, AppStateKind::Stopped);
let _ = reg.broadcast().send(Response::AppState {
session_id,
state: AppStateKind::Stopped,
});
return Ok(());
}
None => {
tracing::info!(session_id, %app_id, "abort during startup; killing process");
let _ = child.kill().await;
let mut reg = registry.lock().await;
reg.set_state(session_id, AppStateKind::Stopped);
let _ = reg.broadcast().send(Response::AppState {
session_id,
state: AppStateKind::Stopped,
});
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
return Ok(());
}
}
tokio::spawn(drain_stderr(stderr, session_id));
tokio::select! {
status = child.wait() => {
tracing::info!(session_id, %app_id, exit_status = ?status, "process exited");
}
_ = abort_rx => {
tracing::info!(session_id, %app_id, "abort received; sending SIGTERM");
let _ = child.kill().await;
}
}
if let Some(tx) = &compositor_tx {
let _ = tx
.send(AppdToCompositor::AppSurfaceDestroyed { session_id })
.await;
}
mount_orch.umount();
{
let mut reg = registry.lock().await;
reg.set_state(session_id, AppStateKind::Stopped);
let _ = reg.broadcast().send(Response::AppState {
session_id,
state: AppStateKind::Stopped,
});
}
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
Ok(())
}
async fn wait_for_ready(
stdout: tokio::process::ChildStdout,
) -> anyhow::Result<BufReader<tokio::process::ChildStdout>> {
let mut reader = BufReader::new(stdout);
loop {
let mut line = String::new();
let n = reader.read_line(&mut line).await?;
if n == 0 {
return Err(anyhow::anyhow!("stdout closed without READY signal"));
}
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
if line.trim() == "READY" {
return Ok(reader);
}
}
}
async fn drain_stdout(mut reader: BufReader<tokio::process::ChildStdout>, session_id: u64) {
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) | Err(_) => break,
Ok(_) => tracing::debug!(session_id, stdout = %line.trim_end(), "app stdout"),
feat(appd): implement runtime supervisor with process spawning and READY signal runtime.rs — process lifecycle manager: - supervise(session_id, app_id, registry): spawns the weft-runtime child process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug and returns immediately (no-op until runtime binary is available). - Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id> with stdout/stderr piped, stdin closed. - wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first line matching 'READY'; returns Err if stdout closes without it. - 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills the child and transitions session to Stopped. - On success: sets session state to Running, broadcasts AppReady to all connected WebSocket clients via registry broadcast channel. - drain_stderr(): async task that forwards child stderr lines to tracing at WARN level for observability. - On process exit: sets session state to Stopped regardless of exit code. main.rs — wiring: - SessionRegistry now owns broadcast::Sender<Response>; Default creates the channel internally. Added set_state(), subscribe(), broadcast() methods. Removed standalone broadcast_tx from run(); WS handlers subscribe via registry.lock().await.subscribe(). - dispatch::LaunchApp spawns a tokio task calling runtime::supervise immediately after creating the session. supervise is a no-op when WEFT_RUNTIME_BIN is unset, so existing tests are unaffected. Cargo.toml: added tokio 'process' and 'time' features.
2026-03-11 08:17:20 +00:00
}
}
}
async fn drain_stderr(stderr: tokio::process::ChildStderr, session_id: u64) {
let mut lines = BufReader::new(stderr).lines();
while let Ok(Some(line)) = lines.next_line().await {
tracing::warn!(session_id, stderr = %line, "app stderr");
}
}