From 1e4ced9a39a8e3194980527215add9b1f218f62d Mon Sep 17 00:00:00 2001 From: Marco Allegretti Date: Wed, 11 Mar 2026 09:37:09 +0100 Subject: [PATCH] feat(appd): implement TerminateApp process signaling via abort channel SessionRegistry now tracks a oneshot abort sender per active session: - abort_senders: HashMap> field added. - register_abort(session_id): creates the channel, stores the sender, returns the receiver to the supervise task. - terminate(): removes the session state AND drops the abort sender, closing the channel and triggering the receiver in supervise. runtime::supervise() now accepts abort_rx: oneshot::Receiver<()>: - After the READY signal is received, the process-wait loop uses tokio::select! on child.wait() vs abort_rx. - On abort: logs intent, calls child.kill(), then sets state Stopped. - On natural exit: logs exit status, sets state Stopped. dispatch::LaunchApp: calls register_abort immediately after launch, passes the receiver to the spawned supervise task. Integration test updated to pass the abort receiver. --- crates/weft-appd/src/main.rs | 18 +++++++++++++++--- crates/weft-appd/src/runtime.rs | 13 +++++++++++-- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/crates/weft-appd/src/main.rs b/crates/weft-appd/src/main.rs index 93aa2c5..3928f95 100644 --- a/crates/weft-appd/src/main.rs +++ b/crates/weft-appd/src/main.rs @@ -17,6 +17,7 @@ struct SessionRegistry { next_id: u64, sessions: std::collections::HashMap, broadcast: tokio::sync::broadcast::Sender, + abort_senders: std::collections::HashMap>, } impl Default for SessionRegistry { @@ -26,6 +27,7 @@ impl Default for SessionRegistry { next_id: 0, sessions: std::collections::HashMap::new(), broadcast, + abort_senders: std::collections::HashMap::new(), } } } @@ -39,7 +41,15 @@ impl SessionRegistry { } fn terminate(&mut self, session_id: u64) -> bool { - self.sessions.remove(&session_id).is_some() + let found = self.sessions.remove(&session_id).is_some(); + self.abort_senders.remove(&session_id); + found + } + + pub(crate) fn register_abort(&mut self, session_id: u64) -> tokio::sync::oneshot::Receiver<()> { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.abort_senders.insert(session_id, tx); + rx } fn running_ids(&self) -> Vec { @@ -180,10 +190,11 @@ pub(crate) async fn dispatch(req: Request, registry: &Registry) -> Response { } => { let session_id = registry.lock().await.launch(&app_id); tracing::info!(session_id, %app_id, "launched"); + let abort_rx = registry.lock().await.register_abort(session_id); let reg = Arc::clone(registry); let aid = app_id.clone(); tokio::spawn(async move { - if let Err(e) = runtime::supervise(session_id, &aid, reg).await { + if let Err(e) = runtime::supervise(session_id, &aid, reg, abort_rx).await { tracing::warn!(session_id, error = %e, "runtime supervisor error"); } }); @@ -391,8 +402,9 @@ mod tests { let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default())); let mut rx = registry.lock().await.subscribe(); let session_id = registry.lock().await.launch("test.app"); + let abort_rx = registry.lock().await.register_abort(session_id); - runtime::supervise(session_id, "test.app", Arc::clone(®istry)) + runtime::supervise(session_id, "test.app", Arc::clone(®istry), abort_rx) .await .unwrap(); diff --git a/crates/weft-appd/src/runtime.rs b/crates/weft-appd/src/runtime.rs index c95f745..6121b71 100644 --- a/crates/weft-appd/src/runtime.rs +++ b/crates/weft-appd/src/runtime.rs @@ -12,6 +12,7 @@ pub(crate) async fn supervise( session_id: u64, app_id: &str, registry: Registry, + abort_rx: tokio::sync::oneshot::Receiver<()>, ) -> anyhow::Result<()> { let bin = match std::env::var("WEFT_RUNTIME_BIN") { Ok(b) => b, @@ -64,8 +65,16 @@ pub(crate) async fn supervise( tokio::spawn(drain_stderr(stderr, session_id)); - let status = child.wait().await?; - tracing::info!(session_id, %app_id, exit_status = ?status, "process exited"); + tokio::select! { + status = child.wait() => { + tracing::info!(session_id, %app_id, exit_status = ?status, "process exited"); + } + _ = abort_rx => { + tracing::info!(session_id, %app_id, "abort received; sending SIGTERM"); + let _ = child.kill().await; + } + } + registry .lock() .await