diff --git a/crates/weft-appd/Cargo.toml b/crates/weft-appd/Cargo.toml index dd6d3db..eb784b6 100644 --- a/crates/weft-appd/Cargo.toml +++ b/crates/weft-appd/Cargo.toml @@ -11,6 +11,7 @@ path = "src/main.rs" [dependencies] anyhow = "1.0" sd-notify = "0.4" +weft-ipc-types = { path = "../weft-ipc-types" } toml = "0.8" tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "signal", "sync", "process", "time"] } serde = { version = "1", features = ["derive"] } diff --git a/crates/weft-appd/src/compositor_client.rs b/crates/weft-appd/src/compositor_client.rs new file mode 100644 index 0000000..3b97437 --- /dev/null +++ b/crates/weft-appd/src/compositor_client.rs @@ -0,0 +1,117 @@ +use std::path::PathBuf; + +use tokio::io::AsyncWriteExt; +use tokio::sync::mpsc; +use weft_ipc_types::{ + AppdToCompositor, CompositorToAppd, MAX_FRAME_LEN, frame_decode, frame_encode, +}; + +pub type CompositorSender = mpsc::Sender; + +/// Resolve the compositor IPC socket path. +/// +/// Returns `None` if neither `WEFT_COMPOSITOR_SOCKET` nor `XDG_RUNTIME_DIR` is set, +/// meaning no compositor IPC is available in this environment. +pub fn socket_path() -> Option { + if let Ok(p) = std::env::var("WEFT_COMPOSITOR_SOCKET") { + return Some(PathBuf::from(p)); + } + if let Ok(dir) = std::env::var("XDG_RUNTIME_DIR") { + return Some(PathBuf::from(dir).join("weft").join("compositor.sock")); + } + None +} + +/// Spawn the compositor IPC client task and return a sender for outbound messages. +/// +/// The task connects to `socket_path`, retrying every 2 s on failure. If the +/// connection drops while sending, it waits 500 ms then reconnects. Incoming +/// `CompositorToAppd` frames are decoded and logged; no behavioural action is +/// taken yet (surface lifecycle hookup happens in a later task). +pub fn spawn(socket_path: PathBuf) -> CompositorSender { + let (tx, rx) = mpsc::channel::(32); + tokio::spawn(run_client(socket_path, rx)); + tx +} + +async fn run_client(socket_path: PathBuf, mut rx: mpsc::Receiver) { + loop { + let stream = loop { + match tokio::net::UnixStream::connect(&socket_path).await { + Ok(s) => { + tracing::info!(path = %socket_path.display(), "connected to compositor IPC"); + break s; + } + Err(_) => { + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + } + } + }; + + let (read_half, mut write_half) = stream.into_split(); + + tokio::spawn(read_unix_incoming(read_half)); + + loop { + let Some(msg) = rx.recv().await else { + return; + }; + match frame_encode(&msg) { + Ok(frame) => { + if write_half.write_all(&frame).await.is_err() { + tracing::warn!("compositor IPC write failed; reconnecting"); + break; + } + } + Err(e) => tracing::warn!(?e, "compositor IPC encode error"), + } + } + + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + } +} + +async fn read_unix_incoming(mut reader: tokio::net::unix::OwnedReadHalf) { + use tokio::io::AsyncReadExt; + + let mut buf: Vec = Vec::new(); + let mut tmp = [0u8; 4096]; + loop { + match reader.read(&mut tmp).await { + Ok(0) | Err(_) => break, + Ok(n) => { + buf.extend_from_slice(&tmp[..n]); + loop { + if buf.len() < 4 { + break; + } + let declared_len = + u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize; + if declared_len > MAX_FRAME_LEN { + buf.clear(); + break; + } + if buf.len() < 4 + declared_len { + break; + } + let frame_end = 4 + declared_len; + if let Ok(msg) = frame_decode::(&buf[..frame_end]) { + handle_incoming(msg); + } + buf.drain(..frame_end); + } + } + } + } +} + +fn handle_incoming(msg: CompositorToAppd) { + match msg { + CompositorToAppd::SurfaceReady { session_id } => { + tracing::debug!(session_id, "SurfaceReady from compositor"); + } + CompositorToAppd::ClientDisconnected { pid } => { + tracing::debug!(pid, "ClientDisconnected from compositor"); + } + } +} diff --git a/crates/weft-appd/src/main.rs b/crates/weft-appd/src/main.rs index c4be56e..b4aa308 100644 --- a/crates/weft-appd/src/main.rs +++ b/crates/weft-appd/src/main.rs @@ -5,6 +5,7 @@ use anyhow::Context; use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; +mod compositor_client; mod ipc; mod runtime; mod ws; @@ -23,6 +24,7 @@ struct SessionRegistry { sessions: std::collections::HashMap, broadcast: tokio::sync::broadcast::Sender, abort_senders: std::collections::HashMap>, + compositor_tx: Option, } impl Default for SessionRegistry { @@ -33,6 +35,7 @@ impl Default for SessionRegistry { sessions: std::collections::HashMap::new(), broadcast, abort_senders: std::collections::HashMap::new(), + compositor_tx: None, } } } @@ -126,6 +129,11 @@ async fn run() -> anyhow::Result<()> { let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default())); + if let Some(path) = compositor_client::socket_path() { + let tx = compositor_client::spawn(path); + registry.lock().await.compositor_tx = Some(tx); + } + let ws_port = ws_port(); let ws_addr: std::net::SocketAddr = format!("127.0.0.1:{ws_port}").parse()?; let ws_listener = tokio::net::TcpListener::bind(ws_addr) @@ -231,10 +239,13 @@ pub(crate) async fn dispatch(req: Request, registry: &Registry) -> Response { let session_id = registry.lock().await.launch(&app_id); tracing::info!(session_id, %app_id, "launched"); let abort_rx = registry.lock().await.register_abort(session_id); + let compositor_tx = registry.lock().await.compositor_tx.clone(); let reg = Arc::clone(registry); let aid = app_id.clone(); tokio::spawn(async move { - if let Err(e) = runtime::supervise(session_id, &aid, reg, abort_rx).await { + if let Err(e) = + runtime::supervise(session_id, &aid, reg, abort_rx, compositor_tx).await + { tracing::warn!(session_id, error = %e, "runtime supervisor error"); } }); @@ -647,9 +658,15 @@ mod tests { let session_id = registry.lock().await.launch("test.app"); let abort_rx = registry.lock().await.register_abort(session_id); - runtime::supervise(session_id, "test.app", Arc::clone(®istry), abort_rx) - .await - .unwrap(); + runtime::supervise( + session_id, + "test.app", + Arc::clone(®istry), + abort_rx, + None, + ) + .await + .unwrap(); assert!(matches!( registry.lock().await.state(session_id), @@ -703,6 +720,7 @@ mod tests { "test.abort.startup", Arc::clone(®istry), abort_rx, + None, ) .await .unwrap(); @@ -748,6 +766,7 @@ mod tests { "test.spawn.fail", Arc::clone(®istry), abort_rx, + None, ) .await .unwrap(); diff --git a/crates/weft-appd/src/runtime.rs b/crates/weft-appd/src/runtime.rs index 91330bc..01375f0 100644 --- a/crates/weft-appd/src/runtime.rs +++ b/crates/weft-appd/src/runtime.rs @@ -1,8 +1,10 @@ use std::time::Duration; use tokio::io::{AsyncBufReadExt, BufReader}; +use weft_ipc_types::AppdToCompositor; use crate::Registry; +use crate::compositor_client::CompositorSender; use crate::ipc::{AppStateKind, Response}; const READY_TIMEOUT: Duration = Duration::from_secs(30); @@ -12,6 +14,7 @@ pub(crate) async fn supervise( app_id: &str, registry: Registry, abort_rx: tokio::sync::oneshot::Receiver<()>, + compositor_tx: Option, ) -> anyhow::Result<()> { let mut abort_rx = abort_rx; let bin = match std::env::var("WEFT_RUNTIME_BIN") { @@ -43,6 +46,17 @@ pub(crate) async fn supervise( } }; + if let Some(tx) = &compositor_tx { + let pid = child.id().unwrap_or(0); + let _ = tx + .send(AppdToCompositor::AppSurfaceCreated { + app_id: app_id.to_owned(), + session_id, + pid, + }) + .await; + } + let stdout = child.stdout.take().expect("stdout piped"); let stderr = child.stderr.take().expect("stderr piped"); @@ -103,6 +117,12 @@ pub(crate) async fn supervise( } } + if let Some(tx) = &compositor_tx { + let _ = tx + .send(AppdToCompositor::AppSurfaceDestroyed { session_id }) + .await; + } + { let mut reg = registry.lock().await; reg.set_state(session_id, AppStateKind::Stopped);