From ca2cc38d4d729131052c5fd0cf92c4154830fe04 Mon Sep 17 00:00:00 2001 From: Marco Allegretti Date: Wed, 11 Mar 2026 14:29:22 +0100 Subject: [PATCH] feat(compositor): add appd IPC server (Unix socket, length-prefixed MessagePack framing) Adds weft-compositor/src/appd_ipc.rs: WeftAppdIpc state, setup() registers a calloop UnixListener source. Accepted connections are registered as edge-triggered read sources. Incoming AppdToCompositor frames are decoded and dispatched; AppSurfaceCreated records pid->session mapping in pending_pids for later wl_surface association. Wires into both the DRM and Winit backends. Socket path: /weft/compositor.sock or WEFT_COMPOSITOR_SOCKET override. --- crates/weft-compositor/Cargo.toml | 1 + crates/weft-compositor/src/appd_ipc.rs | 227 ++++++++++++++++++++ crates/weft-compositor/src/backend/drm.rs | 6 + crates/weft-compositor/src/backend/winit.rs | 9 + crates/weft-compositor/src/main.rs | 1 + crates/weft-compositor/src/state.rs | 8 + 6 files changed, 252 insertions(+) create mode 100644 crates/weft-compositor/src/appd_ipc.rs diff --git a/crates/weft-compositor/Cargo.toml b/crates/weft-compositor/Cargo.toml index 5f1406b..3db3c76 100644 --- a/crates/weft-compositor/Cargo.toml +++ b/crates/weft-compositor/Cargo.toml @@ -21,6 +21,7 @@ smithay = { version = "0.7", default-features = false, features = [ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } anyhow = "1" +weft-ipc-types = { path = "../weft-ipc-types" } wayland-scanner = "0.31" wayland-server = "0.31" wayland-backend = "0.3" diff --git a/crates/weft-compositor/src/appd_ipc.rs b/crates/weft-compositor/src/appd_ipc.rs new file mode 100644 index 0000000..4fdadef --- /dev/null +++ b/crates/weft-compositor/src/appd_ipc.rs @@ -0,0 +1,227 @@ +#[cfg(unix)] +use std::{ + collections::HashMap, + io::{self, Read, Write}, + os::unix::net::{UnixListener, UnixStream}, + path::PathBuf, +}; + +#[cfg(unix)] +use smithay::reexports::calloop::{Interest, Mode, PostAction, generic::Generic}; + +#[cfg(unix)] +use weft_ipc_types::{ + AppdToCompositor, CompositorToAppd, MAX_FRAME_LEN, frame_decode, frame_encode, +}; + +#[cfg(unix)] +use crate::state::WeftCompositorState; + +#[cfg(unix)] +pub struct WeftAppdIpc { + pub socket_path: PathBuf, + read_buf: Vec, + write_stream: Option, + /// pid → (app_id, session_id) for Wayland clients whose surface has not yet arrived. + pub pending_pids: HashMap, +} + +#[cfg(unix)] +impl WeftAppdIpc { + pub fn new(socket_path: PathBuf) -> Self { + Self { + socket_path, + read_buf: Vec::new(), + write_stream: None, + pending_pids: HashMap::new(), + } + } + + #[allow(dead_code)] + pub fn send(&mut self, msg: &CompositorToAppd) { + let Some(stream) = &mut self.write_stream else { + return; + }; + match frame_encode(msg) { + Ok(frame) => { + if stream.write_all(&frame).is_err() { + self.write_stream = None; + } + } + Err(e) => tracing::warn!(?e, "failed to encode compositor IPC message"), + } + } + + fn try_decode_frames(&mut self) -> Vec { + let mut out = Vec::new(); + loop { + if self.read_buf.len() < 4 { + break; + } + let declared_len = u32::from_le_bytes([ + self.read_buf[0], + self.read_buf[1], + self.read_buf[2], + self.read_buf[3], + ]) as usize; + if declared_len > MAX_FRAME_LEN { + tracing::warn!(declared_len, "appd IPC frame too large; disconnecting"); + self.write_stream = None; + self.read_buf.clear(); + break; + } + if self.read_buf.len() < 4 + declared_len { + break; + } + let frame_end = 4 + declared_len; + match frame_decode::(&self.read_buf[..frame_end]) { + Ok(msg) => out.push(msg), + Err(e) => tracing::warn!(?e, "appd IPC frame decode error"), + } + self.read_buf.drain(..frame_end); + } + out + } + + pub fn on_read(&mut self, stream: &mut UnixStream) -> (Vec, bool) { + let mut buf = [0u8; 8192]; + let mut eof = false; + loop { + match stream.read(&mut buf) { + Ok(0) => { + eof = true; + break; + } + Ok(n) => self.read_buf.extend_from_slice(&buf[..n]), + Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, + Err(e) => { + tracing::warn!(?e, "appd IPC stream read error"); + eof = true; + break; + } + } + } + let messages = self.try_decode_frames(); + if eof { + self.write_stream = None; + self.read_buf.clear(); + } + (messages, eof) + } +} + +#[cfg(unix)] +fn handle_message(state: &mut WeftCompositorState, msg: AppdToCompositor) { + match msg { + AppdToCompositor::AppSurfaceCreated { + app_id, + session_id, + pid, + } => { + tracing::debug!(app_id, session_id, pid, "AppSurfaceCreated"); + if let Some(ipc) = &mut state.appd_ipc { + ipc.pending_pids.insert(pid, (app_id, session_id)); + } + } + AppdToCompositor::AppSurfaceDestroyed { session_id } => { + tracing::debug!(session_id, "AppSurfaceDestroyed"); + } + AppdToCompositor::AppFocusRequest { session_id } => { + tracing::debug!(session_id, "AppFocusRequest"); + } + } +} + +#[cfg(unix)] +pub fn compositor_socket_path() -> PathBuf { + if let Ok(p) = std::env::var("WEFT_COMPOSITOR_SOCKET") { + return PathBuf::from(p); + } + if let Ok(dir) = std::env::var("XDG_RUNTIME_DIR") { + return PathBuf::from(dir).join("weft").join("compositor.sock"); + } + PathBuf::from("/tmp/weft-compositor.sock") +} + +#[cfg(unix)] +pub fn setup(state: &mut WeftCompositorState) -> anyhow::Result<()> { + use anyhow::Context; + + let socket_path = { + let ipc = state + .appd_ipc + .as_ref() + .context("appd_ipc not initialised")?; + ipc.socket_path.clone() + }; + + if socket_path.exists() { + std::fs::remove_file(&socket_path).ok(); + } + if let Some(parent) = socket_path.parent() { + std::fs::create_dir_all(parent).context("create compositor IPC socket directory")?; + } + + let listener = UnixListener::bind(&socket_path) + .with_context(|| format!("bind compositor IPC socket at {}", socket_path.display()))?; + listener.set_nonblocking(true)?; + + tracing::info!(path = %socket_path.display(), "compositor IPC socket open"); + + let handle = state.loop_handle.clone(); + state + .loop_handle + .insert_source( + Generic::new(listener, Interest::READ, Mode::Level), + move |_, listener, state| { + loop { + match listener.accept() { + Ok((stream, _addr)) => { + tracing::info!("weft-appd connected to compositor IPC"); + let write_clone = match stream.try_clone() { + Ok(c) => c, + Err(e) => { + tracing::warn!(?e, "try_clone failed for appd IPC stream"); + continue; + } + }; + if let Some(ipc) = &mut state.appd_ipc { + ipc.write_stream = Some(write_clone); + ipc.read_buf.clear(); + } + stream.set_nonblocking(true).ok(); + let _ = handle.insert_source( + Generic::new(stream, Interest::READ, Mode::Edge), + |_, stream, state| { + // Safety: calloop wraps the fd in NoIoDrop to prevent + // accidental drops; get_mut gives the inner stream. + let inner: &mut UnixStream = unsafe { stream.get_mut() }; + let (messages, eof) = match state.appd_ipc.as_mut() { + Some(ipc) => ipc.on_read(inner), + None => return Ok(PostAction::Remove), + }; + for msg in messages { + handle_message(state, msg); + } + if eof { + tracing::info!( + "weft-appd disconnected from compositor IPC" + ); + Ok(PostAction::Remove) + } else { + Ok(PostAction::Continue) + } + }, + ); + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, + Err(e) => tracing::warn!(?e, "accept error on compositor IPC socket"), + } + } + Ok(PostAction::Continue) + }, + ) + .map_err(|e| anyhow::anyhow!("insert compositor IPC listener source: {e}"))?; + + Ok(()) +} diff --git a/crates/weft-compositor/src/backend/drm.rs b/crates/weft-compositor/src/backend/drm.rs index c930243..567c125 100644 --- a/crates/weft-compositor/src/backend/drm.rs +++ b/crates/weft-compositor/src/backend/drm.rs @@ -62,6 +62,7 @@ use smithay_drm_extras::drm_scanner::{DrmScanEvent, DrmScanner}; #[cfg(target_os = "linux")] use crate::{ + appd_ipc::{self, WeftAppdIpc}, input, state::{WeftClientState, WeftCompositorState}, }; @@ -243,6 +244,11 @@ pub fn run() -> anyhow::Result<()> { let mut state = WeftCompositorState::new(display_handle.clone(), loop_signal, loop_handle, seat_name); + state.appd_ipc = Some(WeftAppdIpc::new(appd_ipc::compositor_socket_path())); + if let Err(e) = appd_ipc::setup(&mut state) { + tracing::warn!(?e, "compositor IPC setup failed"); + } + state.drm = Some(WeftDrmData { session, primary_gpu: primary_gpu_node, diff --git a/crates/weft-compositor/src/backend/winit.rs b/crates/weft-compositor/src/backend/winit.rs index b0b1278..c515c18 100644 --- a/crates/weft-compositor/src/backend/winit.rs +++ b/crates/weft-compositor/src/backend/winit.rs @@ -15,6 +15,7 @@ use smithay::{ }; use crate::{ + appd_ipc::{self, WeftAppdIpc}, input, state::{WeftClientState, WeftCompositorState}, }; @@ -92,6 +93,14 @@ pub fn run() -> anyhow::Result<()> { ); state.space.map_output(&output, (0, 0)); + #[cfg(unix)] + { + state.appd_ipc = Some(WeftAppdIpc::new(appd_ipc::compositor_socket_path())); + if let Err(e) = appd_ipc::setup(&mut state) { + tracing::warn!(?e, "compositor IPC setup failed"); + } + } + let mut damage_tracker = OutputDamageTracker::from_output(&output); let start_time = std::time::Instant::now(); diff --git a/crates/weft-compositor/src/main.rs b/crates/weft-compositor/src/main.rs index 6f4412a..84334c4 100644 --- a/crates/weft-compositor/src/main.rs +++ b/crates/weft-compositor/src/main.rs @@ -1,5 +1,6 @@ use tracing_subscriber::EnvFilter; +mod appd_ipc; mod backend; mod input; mod protocols; diff --git a/crates/weft-compositor/src/state.rs b/crates/weft-compositor/src/state.rs index 5adaa77..6aeaf75 100644 --- a/crates/weft-compositor/src/state.rs +++ b/crates/weft-compositor/src/state.rs @@ -1,3 +1,5 @@ +#[cfg(unix)] +use crate::appd_ipc::WeftAppdIpc; #[cfg(target_os = "linux")] use crate::backend::drm_device::WeftDrmData; use crate::protocols::{ @@ -96,6 +98,10 @@ pub struct WeftCompositorState { // WEFT compositor–shell protocol global. pub weft_shell_state: WeftShellState, + // IPC channel with weft-appd (compositor is the server, appd is the client). + #[cfg(unix)] + pub appd_ipc: Option, + #[cfg(target_os = "linux")] pub drm: Option, } @@ -153,6 +159,8 @@ impl WeftCompositorState { cursor_image_status: CursorImageStatus::Hidden, dmabuf_global: None, running: true, + #[cfg(unix)] + appd_ipc: None, #[cfg(target_os = "linux")] drm: None, }