feat(appd): implement runtime supervisor with process spawning and READY signal

runtime.rs — process lifecycle manager:
- supervise(session_id, app_id, registry): spawns the weft-runtime child
  process identified by WEFT_RUNTIME_BIN env var. If unset, logs debug
  and returns immediately (no-op until runtime binary is available).
- Child process invoked as: <WEFT_RUNTIME_BIN> <app_id> <session_id>
  with stdout/stderr piped, stdin closed.
- wait_for_ready(): reads stdout line-by-line; returns Ok(()) on first
  line matching 'READY'; returns Err if stdout closes without it.
- 30-second READY_TIMEOUT via tokio::time::timeout; on expiry, kills
  the child and transitions session to Stopped.
- On success: sets session state to Running, broadcasts AppReady to all
  connected WebSocket clients via registry broadcast channel.
- drain_stderr(): async task that forwards child stderr lines to tracing
  at WARN level for observability.
- On process exit: sets session state to Stopped regardless of exit code.

main.rs — wiring:
- SessionRegistry now owns broadcast::Sender<Response>; Default creates
  the channel internally. Added set_state(), subscribe(), broadcast()
  methods. Removed standalone broadcast_tx from run(); WS handlers
  subscribe via registry.lock().await.subscribe().
- dispatch::LaunchApp spawns a tokio task calling runtime::supervise
  immediately after creating the session. supervise is a no-op when
  WEFT_RUNTIME_BIN is unset, so existing tests are unaffected.

Cargo.toml: added tokio 'process' and 'time' features.
This commit is contained in:
Marco Allegretti 2026-03-11 09:17:20 +01:00
parent 668063c34b
commit 86d0011016
3 changed files with 128 additions and 4 deletions

View file

@ -11,7 +11,7 @@ path = "src/main.rs"
[dependencies]
anyhow = "1.0"
sd-notify = "0.4"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "signal", "sync"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "signal", "sync", "process", "time"] }
serde = { version = "1", features = ["derive"] }
rmp-serde = "1"
serde_json = "1"

View file

@ -6,16 +6,28 @@ use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
mod ipc;
mod runtime;
mod ws;
use ipc::{AppStateKind, Request, Response};
pub(crate) type Registry = Arc<Mutex<SessionRegistry>>;
#[derive(Default)]
struct SessionRegistry {
next_id: u64,
sessions: std::collections::HashMap<u64, AppStateKind>,
broadcast: tokio::sync::broadcast::Sender<Response>,
}
impl Default for SessionRegistry {
fn default() -> Self {
let (broadcast, _) = tokio::sync::broadcast::channel(16);
Self {
next_id: 0,
sessions: std::collections::HashMap::new(),
broadcast,
}
}
}
impl SessionRegistry {
@ -40,6 +52,20 @@ impl SessionRegistry {
.cloned()
.unwrap_or(AppStateKind::NotFound)
}
pub(crate) fn set_state(&mut self, session_id: u64, state: AppStateKind) {
if let Some(entry) = self.sessions.get_mut(&session_id) {
*entry = state;
}
}
pub(crate) fn subscribe(&self) -> tokio::sync::broadcast::Receiver<Response> {
self.broadcast.subscribe()
}
pub(crate) fn broadcast(&self) -> &tokio::sync::broadcast::Sender<Response> {
&self.broadcast
}
}
#[tokio::main]
@ -67,7 +93,6 @@ async fn run() -> anyhow::Result<()> {
tracing::info!(path = %socket_path.display(), "IPC socket listening");
let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default()));
let (broadcast_tx, _) = tokio::sync::broadcast::channel::<Response>(16);
let ws_port = ws_port();
let ws_addr: std::net::SocketAddr = format!("127.0.0.1:{ws_port}").parse()?;
@ -95,7 +120,7 @@ async fn run() -> anyhow::Result<()> {
result = ws_listener.accept() => {
let (stream, _) = result.context("ws accept")?;
let reg = Arc::clone(&registry);
let rx = broadcast_tx.subscribe();
let rx = registry.lock().await.subscribe();
tokio::spawn(async move {
if let Err(e) = ws::handle_ws_connection(stream, reg, rx).await {
tracing::warn!(error = %e, "ws connection error");
@ -155,6 +180,13 @@ pub(crate) async fn dispatch(req: Request, registry: &Registry) -> Response {
} => {
let session_id = registry.lock().await.launch(&app_id);
tracing::info!(session_id, %app_id, "launched");
let reg = Arc::clone(registry);
let aid = app_id.clone();
tokio::spawn(async move {
if let Err(e) = runtime::supervise(session_id, &aid, reg).await {
tracing::warn!(session_id, error = %e, "runtime supervisor error");
}
});
Response::LaunchAck { session_id }
}
Request::TerminateApp { session_id } => {

View file

@ -0,0 +1,92 @@
use std::time::Duration;
use anyhow::Context;
use tokio::io::{AsyncBufReadExt, BufReader};
use crate::Registry;
use crate::ipc::{AppStateKind, Response};
const READY_TIMEOUT: Duration = Duration::from_secs(30);
pub(crate) async fn supervise(
session_id: u64,
app_id: &str,
registry: Registry,
) -> anyhow::Result<()> {
let bin = match std::env::var("WEFT_RUNTIME_BIN") {
Ok(b) => b,
Err(_) => {
tracing::debug!(session_id, %app_id, "WEFT_RUNTIME_BIN not set; skipping process spawn");
return Ok(());
}
};
let mut child = tokio::process::Command::new(&bin)
.arg(app_id)
.arg(session_id.to_string())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.stdin(std::process::Stdio::null())
.spawn()
.with_context(|| format!("spawn {bin}"))?;
let stdout = child.stdout.take().expect("stdout piped");
let stderr = child.stderr.take().expect("stderr piped");
let ready_result = tokio::time::timeout(READY_TIMEOUT, wait_for_ready(stdout)).await;
match ready_result {
Ok(Ok(())) => {
registry
.lock()
.await
.set_state(session_id, AppStateKind::Running);
let _ = registry
.lock()
.await
.broadcast()
.send(Response::AppReady { session_id });
tracing::info!(session_id, %app_id, "app ready");
}
Ok(Err(e)) => {
tracing::warn!(session_id, %app_id, error = %e, "stdout read error before READY");
}
Err(_elapsed) => {
tracing::warn!(session_id, %app_id, "READY timeout after 30s; killing process");
let _ = child.kill().await;
registry
.lock()
.await
.set_state(session_id, AppStateKind::Stopped);
return Ok(());
}
}
tokio::spawn(drain_stderr(stderr, session_id));
let status = child.wait().await?;
tracing::info!(session_id, %app_id, exit_status = ?status, "process exited");
registry
.lock()
.await
.set_state(session_id, AppStateKind::Stopped);
Ok(())
}
async fn wait_for_ready(stdout: tokio::process::ChildStdout) -> anyhow::Result<()> {
let mut lines = BufReader::new(stdout).lines();
while let Some(line) = lines.next_line().await? {
if line.trim() == "READY" {
return Ok(());
}
}
Err(anyhow::anyhow!("stdout closed without READY signal"))
}
async fn drain_stderr(stderr: tokio::process::ChildStderr, session_id: u64) {
let mut lines = BufReader::new(stderr).lines();
while let Ok(Some(line)) = lines.next_line().await {
tracing::warn!(session_id, stderr = %line, "app stderr");
}
}