diff --git a/crates/weft-appd/Cargo.toml b/crates/weft-appd/Cargo.toml index 51c17df..7a71dd0 100644 --- a/crates/weft-appd/Cargo.toml +++ b/crates/weft-appd/Cargo.toml @@ -11,7 +11,7 @@ path = "src/main.rs" [dependencies] anyhow = "1.0" sd-notify = "0.4" -tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "signal", "sync"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "signal", "sync", "process", "time"] } serde = { version = "1", features = ["derive"] } rmp-serde = "1" serde_json = "1" diff --git a/crates/weft-appd/src/main.rs b/crates/weft-appd/src/main.rs index eedd70d..0d9d486 100644 --- a/crates/weft-appd/src/main.rs +++ b/crates/weft-appd/src/main.rs @@ -6,16 +6,28 @@ use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; mod ipc; +mod runtime; mod ws; use ipc::{AppStateKind, Request, Response}; pub(crate) type Registry = Arc>; -#[derive(Default)] struct SessionRegistry { next_id: u64, sessions: std::collections::HashMap, + broadcast: tokio::sync::broadcast::Sender, +} + +impl Default for SessionRegistry { + fn default() -> Self { + let (broadcast, _) = tokio::sync::broadcast::channel(16); + Self { + next_id: 0, + sessions: std::collections::HashMap::new(), + broadcast, + } + } } impl SessionRegistry { @@ -40,6 +52,20 @@ impl SessionRegistry { .cloned() .unwrap_or(AppStateKind::NotFound) } + + pub(crate) fn set_state(&mut self, session_id: u64, state: AppStateKind) { + if let Some(entry) = self.sessions.get_mut(&session_id) { + *entry = state; + } + } + + pub(crate) fn subscribe(&self) -> tokio::sync::broadcast::Receiver { + self.broadcast.subscribe() + } + + pub(crate) fn broadcast(&self) -> &tokio::sync::broadcast::Sender { + &self.broadcast + } } #[tokio::main] @@ -67,7 +93,6 @@ async fn run() -> anyhow::Result<()> { tracing::info!(path = %socket_path.display(), "IPC socket listening"); let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default())); - let (broadcast_tx, _) = tokio::sync::broadcast::channel::(16); let ws_port = ws_port(); let ws_addr: std::net::SocketAddr = format!("127.0.0.1:{ws_port}").parse()?; @@ -95,7 +120,7 @@ async fn run() -> anyhow::Result<()> { result = ws_listener.accept() => { let (stream, _) = result.context("ws accept")?; let reg = Arc::clone(®istry); - let rx = broadcast_tx.subscribe(); + let rx = registry.lock().await.subscribe(); tokio::spawn(async move { if let Err(e) = ws::handle_ws_connection(stream, reg, rx).await { tracing::warn!(error = %e, "ws connection error"); @@ -155,6 +180,13 @@ pub(crate) async fn dispatch(req: Request, registry: &Registry) -> Response { } => { let session_id = registry.lock().await.launch(&app_id); tracing::info!(session_id, %app_id, "launched"); + let reg = Arc::clone(registry); + let aid = app_id.clone(); + tokio::spawn(async move { + if let Err(e) = runtime::supervise(session_id, &aid, reg).await { + tracing::warn!(session_id, error = %e, "runtime supervisor error"); + } + }); Response::LaunchAck { session_id } } Request::TerminateApp { session_id } => { diff --git a/crates/weft-appd/src/runtime.rs b/crates/weft-appd/src/runtime.rs new file mode 100644 index 0000000..c95f745 --- /dev/null +++ b/crates/weft-appd/src/runtime.rs @@ -0,0 +1,92 @@ +use std::time::Duration; + +use anyhow::Context; +use tokio::io::{AsyncBufReadExt, BufReader}; + +use crate::Registry; +use crate::ipc::{AppStateKind, Response}; + +const READY_TIMEOUT: Duration = Duration::from_secs(30); + +pub(crate) async fn supervise( + session_id: u64, + app_id: &str, + registry: Registry, +) -> anyhow::Result<()> { + let bin = match std::env::var("WEFT_RUNTIME_BIN") { + Ok(b) => b, + Err(_) => { + tracing::debug!(session_id, %app_id, "WEFT_RUNTIME_BIN not set; skipping process spawn"); + return Ok(()); + } + }; + + let mut child = tokio::process::Command::new(&bin) + .arg(app_id) + .arg(session_id.to_string()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .stdin(std::process::Stdio::null()) + .spawn() + .with_context(|| format!("spawn {bin}"))?; + + let stdout = child.stdout.take().expect("stdout piped"); + let stderr = child.stderr.take().expect("stderr piped"); + + let ready_result = tokio::time::timeout(READY_TIMEOUT, wait_for_ready(stdout)).await; + + match ready_result { + Ok(Ok(())) => { + registry + .lock() + .await + .set_state(session_id, AppStateKind::Running); + let _ = registry + .lock() + .await + .broadcast() + .send(Response::AppReady { session_id }); + tracing::info!(session_id, %app_id, "app ready"); + } + Ok(Err(e)) => { + tracing::warn!(session_id, %app_id, error = %e, "stdout read error before READY"); + } + Err(_elapsed) => { + tracing::warn!(session_id, %app_id, "READY timeout after 30s; killing process"); + let _ = child.kill().await; + registry + .lock() + .await + .set_state(session_id, AppStateKind::Stopped); + return Ok(()); + } + } + + tokio::spawn(drain_stderr(stderr, session_id)); + + let status = child.wait().await?; + tracing::info!(session_id, %app_id, exit_status = ?status, "process exited"); + registry + .lock() + .await + .set_state(session_id, AppStateKind::Stopped); + + Ok(()) +} + +async fn wait_for_ready(stdout: tokio::process::ChildStdout) -> anyhow::Result<()> { + let mut lines = BufReader::new(stdout).lines(); + while let Some(line) = lines.next_line().await? { + if line.trim() == "READY" { + return Ok(()); + } + } + Err(anyhow::anyhow!("stdout closed without READY signal")) +} + +async fn drain_stderr(stderr: tokio::process::ChildStderr, session_id: u64) { + let mut lines = BufReader::new(stderr).lines(); + while let Ok(Some(line)) = lines.next_line().await { + tracing::warn!(session_id, stderr = %line, "app stderr"); + } +}