From 6b428e5a476719640df67d709a2848b0bfc97d3a Mon Sep 17 00:00:00 2001 From: Marco Allegretti Date: Wed, 11 Mar 2026 14:40:55 +0100 Subject: [PATCH] feat(appd): add compositor IPC client; send AppSurfaceCreated/Destroyed on session lifecycle Adds crates/weft-appd/src/compositor_client.rs: async Tokio client that connects to the compositor's Unix socket (/weft/compositor.sock or WEFT_COMPOSITOR_SOCKET), retrying every 2s on failure and 500ms on write error. Incoming CompositorToAppd frames are decoded and logged (SurfaceReady, ClientDisconnected). Wires compositor_tx into SessionRegistry. The supervise task now sends AppSurfaceCreated (with child PID) immediately after process spawn, and AppSurfaceDestroyed when the process exits. All three existing supervisor tests updated to pass None for compositor_tx. --- crates/weft-appd/Cargo.toml | 1 + crates/weft-appd/src/compositor_client.rs | 117 ++++++++++++++++++++++ crates/weft-appd/src/main.rs | 27 ++++- crates/weft-appd/src/runtime.rs | 20 ++++ 4 files changed, 161 insertions(+), 4 deletions(-) create mode 100644 crates/weft-appd/src/compositor_client.rs diff --git a/crates/weft-appd/Cargo.toml b/crates/weft-appd/Cargo.toml index dd6d3db..eb784b6 100644 --- a/crates/weft-appd/Cargo.toml +++ b/crates/weft-appd/Cargo.toml @@ -11,6 +11,7 @@ path = "src/main.rs" [dependencies] anyhow = "1.0" sd-notify = "0.4" +weft-ipc-types = { path = "../weft-ipc-types" } toml = "0.8" tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "signal", "sync", "process", "time"] } serde = { version = "1", features = ["derive"] } diff --git a/crates/weft-appd/src/compositor_client.rs b/crates/weft-appd/src/compositor_client.rs new file mode 100644 index 0000000..3b97437 --- /dev/null +++ b/crates/weft-appd/src/compositor_client.rs @@ -0,0 +1,117 @@ +use std::path::PathBuf; + +use tokio::io::AsyncWriteExt; +use tokio::sync::mpsc; +use weft_ipc_types::{ + AppdToCompositor, CompositorToAppd, MAX_FRAME_LEN, frame_decode, frame_encode, +}; + +pub type CompositorSender = mpsc::Sender; + +/// Resolve the compositor IPC socket path. +/// +/// Returns `None` if neither `WEFT_COMPOSITOR_SOCKET` nor `XDG_RUNTIME_DIR` is set, +/// meaning no compositor IPC is available in this environment. +pub fn socket_path() -> Option { + if let Ok(p) = std::env::var("WEFT_COMPOSITOR_SOCKET") { + return Some(PathBuf::from(p)); + } + if let Ok(dir) = std::env::var("XDG_RUNTIME_DIR") { + return Some(PathBuf::from(dir).join("weft").join("compositor.sock")); + } + None +} + +/// Spawn the compositor IPC client task and return a sender for outbound messages. +/// +/// The task connects to `socket_path`, retrying every 2 s on failure. If the +/// connection drops while sending, it waits 500 ms then reconnects. Incoming +/// `CompositorToAppd` frames are decoded and logged; no behavioural action is +/// taken yet (surface lifecycle hookup happens in a later task). +pub fn spawn(socket_path: PathBuf) -> CompositorSender { + let (tx, rx) = mpsc::channel::(32); + tokio::spawn(run_client(socket_path, rx)); + tx +} + +async fn run_client(socket_path: PathBuf, mut rx: mpsc::Receiver) { + loop { + let stream = loop { + match tokio::net::UnixStream::connect(&socket_path).await { + Ok(s) => { + tracing::info!(path = %socket_path.display(), "connected to compositor IPC"); + break s; + } + Err(_) => { + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + } + } + }; + + let (read_half, mut write_half) = stream.into_split(); + + tokio::spawn(read_unix_incoming(read_half)); + + loop { + let Some(msg) = rx.recv().await else { + return; + }; + match frame_encode(&msg) { + Ok(frame) => { + if write_half.write_all(&frame).await.is_err() { + tracing::warn!("compositor IPC write failed; reconnecting"); + break; + } + } + Err(e) => tracing::warn!(?e, "compositor IPC encode error"), + } + } + + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + } +} + +async fn read_unix_incoming(mut reader: tokio::net::unix::OwnedReadHalf) { + use tokio::io::AsyncReadExt; + + let mut buf: Vec = Vec::new(); + let mut tmp = [0u8; 4096]; + loop { + match reader.read(&mut tmp).await { + Ok(0) | Err(_) => break, + Ok(n) => { + buf.extend_from_slice(&tmp[..n]); + loop { + if buf.len() < 4 { + break; + } + let declared_len = + u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize; + if declared_len > MAX_FRAME_LEN { + buf.clear(); + break; + } + if buf.len() < 4 + declared_len { + break; + } + let frame_end = 4 + declared_len; + if let Ok(msg) = frame_decode::(&buf[..frame_end]) { + handle_incoming(msg); + } + buf.drain(..frame_end); + } + } + } + } +} + +fn handle_incoming(msg: CompositorToAppd) { + match msg { + CompositorToAppd::SurfaceReady { session_id } => { + tracing::debug!(session_id, "SurfaceReady from compositor"); + } + CompositorToAppd::ClientDisconnected { pid } => { + tracing::debug!(pid, "ClientDisconnected from compositor"); + } + } +} diff --git a/crates/weft-appd/src/main.rs b/crates/weft-appd/src/main.rs index c4be56e..b4aa308 100644 --- a/crates/weft-appd/src/main.rs +++ b/crates/weft-appd/src/main.rs @@ -5,6 +5,7 @@ use anyhow::Context; use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; +mod compositor_client; mod ipc; mod runtime; mod ws; @@ -23,6 +24,7 @@ struct SessionRegistry { sessions: std::collections::HashMap, broadcast: tokio::sync::broadcast::Sender, abort_senders: std::collections::HashMap>, + compositor_tx: Option, } impl Default for SessionRegistry { @@ -33,6 +35,7 @@ impl Default for SessionRegistry { sessions: std::collections::HashMap::new(), broadcast, abort_senders: std::collections::HashMap::new(), + compositor_tx: None, } } } @@ -126,6 +129,11 @@ async fn run() -> anyhow::Result<()> { let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default())); + if let Some(path) = compositor_client::socket_path() { + let tx = compositor_client::spawn(path); + registry.lock().await.compositor_tx = Some(tx); + } + let ws_port = ws_port(); let ws_addr: std::net::SocketAddr = format!("127.0.0.1:{ws_port}").parse()?; let ws_listener = tokio::net::TcpListener::bind(ws_addr) @@ -231,10 +239,13 @@ pub(crate) async fn dispatch(req: Request, registry: &Registry) -> Response { let session_id = registry.lock().await.launch(&app_id); tracing::info!(session_id, %app_id, "launched"); let abort_rx = registry.lock().await.register_abort(session_id); + let compositor_tx = registry.lock().await.compositor_tx.clone(); let reg = Arc::clone(registry); let aid = app_id.clone(); tokio::spawn(async move { - if let Err(e) = runtime::supervise(session_id, &aid, reg, abort_rx).await { + if let Err(e) = + runtime::supervise(session_id, &aid, reg, abort_rx, compositor_tx).await + { tracing::warn!(session_id, error = %e, "runtime supervisor error"); } }); @@ -647,9 +658,15 @@ mod tests { let session_id = registry.lock().await.launch("test.app"); let abort_rx = registry.lock().await.register_abort(session_id); - runtime::supervise(session_id, "test.app", Arc::clone(®istry), abort_rx) - .await - .unwrap(); + runtime::supervise( + session_id, + "test.app", + Arc::clone(®istry), + abort_rx, + None, + ) + .await + .unwrap(); assert!(matches!( registry.lock().await.state(session_id), @@ -703,6 +720,7 @@ mod tests { "test.abort.startup", Arc::clone(®istry), abort_rx, + None, ) .await .unwrap(); @@ -748,6 +766,7 @@ mod tests { "test.spawn.fail", Arc::clone(®istry), abort_rx, + None, ) .await .unwrap(); diff --git a/crates/weft-appd/src/runtime.rs b/crates/weft-appd/src/runtime.rs index 91330bc..01375f0 100644 --- a/crates/weft-appd/src/runtime.rs +++ b/crates/weft-appd/src/runtime.rs @@ -1,8 +1,10 @@ use std::time::Duration; use tokio::io::{AsyncBufReadExt, BufReader}; +use weft_ipc_types::AppdToCompositor; use crate::Registry; +use crate::compositor_client::CompositorSender; use crate::ipc::{AppStateKind, Response}; const READY_TIMEOUT: Duration = Duration::from_secs(30); @@ -12,6 +14,7 @@ pub(crate) async fn supervise( app_id: &str, registry: Registry, abort_rx: tokio::sync::oneshot::Receiver<()>, + compositor_tx: Option, ) -> anyhow::Result<()> { let mut abort_rx = abort_rx; let bin = match std::env::var("WEFT_RUNTIME_BIN") { @@ -43,6 +46,17 @@ pub(crate) async fn supervise( } }; + if let Some(tx) = &compositor_tx { + let pid = child.id().unwrap_or(0); + let _ = tx + .send(AppdToCompositor::AppSurfaceCreated { + app_id: app_id.to_owned(), + session_id, + pid, + }) + .await; + } + let stdout = child.stdout.take().expect("stdout piped"); let stderr = child.stderr.take().expect("stderr piped"); @@ -103,6 +117,12 @@ pub(crate) async fn supervise( } } + if let Some(tx) = &compositor_tx { + let _ = tx + .send(AppdToCompositor::AppSurfaceDestroyed { session_id }) + .await; + } + { let mut reg = registry.lock().await; reg.set_state(session_id, AppStateKind::Stopped);