mirror of
https://github.com/marcoallegretti/WEFT_OS.git
synced 2026-03-27 01:13:09 +00:00
feat(appd): add compositor IPC client; send AppSurfaceCreated/Destroyed on session lifecycle
Adds crates/weft-appd/src/compositor_client.rs: async Tokio client that connects to the compositor's Unix socket (/weft/compositor.sock or WEFT_COMPOSITOR_SOCKET), retrying every 2s on failure and 500ms on write error. Incoming CompositorToAppd frames are decoded and logged (SurfaceReady, ClientDisconnected). Wires compositor_tx into SessionRegistry. The supervise task now sends AppSurfaceCreated (with child PID) immediately after process spawn, and AppSurfaceDestroyed when the process exits. All three existing supervisor tests updated to pass None for compositor_tx.
This commit is contained in:
parent
69d29ee3a8
commit
6b428e5a47
4 changed files with 161 additions and 4 deletions
|
|
@ -11,6 +11,7 @@ path = "src/main.rs"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
sd-notify = "0.4"
|
sd-notify = "0.4"
|
||||||
|
weft-ipc-types = { path = "../weft-ipc-types" }
|
||||||
toml = "0.8"
|
toml = "0.8"
|
||||||
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "signal", "sync", "process", "time"] }
|
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "signal", "sync", "process", "time"] }
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
|
|
||||||
117
crates/weft-appd/src/compositor_client.rs
Normal file
117
crates/weft-appd/src/compositor_client.rs
Normal file
|
|
@ -0,0 +1,117 @@
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use weft_ipc_types::{
|
||||||
|
AppdToCompositor, CompositorToAppd, MAX_FRAME_LEN, frame_decode, frame_encode,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub type CompositorSender = mpsc::Sender<AppdToCompositor>;
|
||||||
|
|
||||||
|
/// Resolve the compositor IPC socket path.
|
||||||
|
///
|
||||||
|
/// Returns `None` if neither `WEFT_COMPOSITOR_SOCKET` nor `XDG_RUNTIME_DIR` is set,
|
||||||
|
/// meaning no compositor IPC is available in this environment.
|
||||||
|
pub fn socket_path() -> Option<PathBuf> {
|
||||||
|
if let Ok(p) = std::env::var("WEFT_COMPOSITOR_SOCKET") {
|
||||||
|
return Some(PathBuf::from(p));
|
||||||
|
}
|
||||||
|
if let Ok(dir) = std::env::var("XDG_RUNTIME_DIR") {
|
||||||
|
return Some(PathBuf::from(dir).join("weft").join("compositor.sock"));
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn the compositor IPC client task and return a sender for outbound messages.
|
||||||
|
///
|
||||||
|
/// The task connects to `socket_path`, retrying every 2 s on failure. If the
|
||||||
|
/// connection drops while sending, it waits 500 ms then reconnects. Incoming
|
||||||
|
/// `CompositorToAppd` frames are decoded and logged; no behavioural action is
|
||||||
|
/// taken yet (surface lifecycle hookup happens in a later task).
|
||||||
|
pub fn spawn(socket_path: PathBuf) -> CompositorSender {
|
||||||
|
let (tx, rx) = mpsc::channel::<AppdToCompositor>(32);
|
||||||
|
tokio::spawn(run_client(socket_path, rx));
|
||||||
|
tx
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_client(socket_path: PathBuf, mut rx: mpsc::Receiver<AppdToCompositor>) {
|
||||||
|
loop {
|
||||||
|
let stream = loop {
|
||||||
|
match tokio::net::UnixStream::connect(&socket_path).await {
|
||||||
|
Ok(s) => {
|
||||||
|
tracing::info!(path = %socket_path.display(), "connected to compositor IPC");
|
||||||
|
break s;
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let (read_half, mut write_half) = stream.into_split();
|
||||||
|
|
||||||
|
tokio::spawn(read_unix_incoming(read_half));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let Some(msg) = rx.recv().await else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
match frame_encode(&msg) {
|
||||||
|
Ok(frame) => {
|
||||||
|
if write_half.write_all(&frame).await.is_err() {
|
||||||
|
tracing::warn!("compositor IPC write failed; reconnecting");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => tracing::warn!(?e, "compositor IPC encode error"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_unix_incoming(mut reader: tokio::net::unix::OwnedReadHalf) {
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
let mut buf: Vec<u8> = Vec::new();
|
||||||
|
let mut tmp = [0u8; 4096];
|
||||||
|
loop {
|
||||||
|
match reader.read(&mut tmp).await {
|
||||||
|
Ok(0) | Err(_) => break,
|
||||||
|
Ok(n) => {
|
||||||
|
buf.extend_from_slice(&tmp[..n]);
|
||||||
|
loop {
|
||||||
|
if buf.len() < 4 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let declared_len =
|
||||||
|
u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as usize;
|
||||||
|
if declared_len > MAX_FRAME_LEN {
|
||||||
|
buf.clear();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if buf.len() < 4 + declared_len {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let frame_end = 4 + declared_len;
|
||||||
|
if let Ok(msg) = frame_decode::<CompositorToAppd>(&buf[..frame_end]) {
|
||||||
|
handle_incoming(msg);
|
||||||
|
}
|
||||||
|
buf.drain(..frame_end);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_incoming(msg: CompositorToAppd) {
|
||||||
|
match msg {
|
||||||
|
CompositorToAppd::SurfaceReady { session_id } => {
|
||||||
|
tracing::debug!(session_id, "SurfaceReady from compositor");
|
||||||
|
}
|
||||||
|
CompositorToAppd::ClientDisconnected { pid } => {
|
||||||
|
tracing::debug!(pid, "ClientDisconnected from compositor");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -5,6 +5,7 @@ use anyhow::Context;
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
mod compositor_client;
|
||||||
mod ipc;
|
mod ipc;
|
||||||
mod runtime;
|
mod runtime;
|
||||||
mod ws;
|
mod ws;
|
||||||
|
|
@ -23,6 +24,7 @@ struct SessionRegistry {
|
||||||
sessions: std::collections::HashMap<u64, SessionEntry>,
|
sessions: std::collections::HashMap<u64, SessionEntry>,
|
||||||
broadcast: tokio::sync::broadcast::Sender<Response>,
|
broadcast: tokio::sync::broadcast::Sender<Response>,
|
||||||
abort_senders: std::collections::HashMap<u64, tokio::sync::oneshot::Sender<()>>,
|
abort_senders: std::collections::HashMap<u64, tokio::sync::oneshot::Sender<()>>,
|
||||||
|
compositor_tx: Option<compositor_client::CompositorSender>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for SessionRegistry {
|
impl Default for SessionRegistry {
|
||||||
|
|
@ -33,6 +35,7 @@ impl Default for SessionRegistry {
|
||||||
sessions: std::collections::HashMap::new(),
|
sessions: std::collections::HashMap::new(),
|
||||||
broadcast,
|
broadcast,
|
||||||
abort_senders: std::collections::HashMap::new(),
|
abort_senders: std::collections::HashMap::new(),
|
||||||
|
compositor_tx: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -126,6 +129,11 @@ async fn run() -> anyhow::Result<()> {
|
||||||
|
|
||||||
let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default()));
|
let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default()));
|
||||||
|
|
||||||
|
if let Some(path) = compositor_client::socket_path() {
|
||||||
|
let tx = compositor_client::spawn(path);
|
||||||
|
registry.lock().await.compositor_tx = Some(tx);
|
||||||
|
}
|
||||||
|
|
||||||
let ws_port = ws_port();
|
let ws_port = ws_port();
|
||||||
let ws_addr: std::net::SocketAddr = format!("127.0.0.1:{ws_port}").parse()?;
|
let ws_addr: std::net::SocketAddr = format!("127.0.0.1:{ws_port}").parse()?;
|
||||||
let ws_listener = tokio::net::TcpListener::bind(ws_addr)
|
let ws_listener = tokio::net::TcpListener::bind(ws_addr)
|
||||||
|
|
@ -231,10 +239,13 @@ pub(crate) async fn dispatch(req: Request, registry: &Registry) -> Response {
|
||||||
let session_id = registry.lock().await.launch(&app_id);
|
let session_id = registry.lock().await.launch(&app_id);
|
||||||
tracing::info!(session_id, %app_id, "launched");
|
tracing::info!(session_id, %app_id, "launched");
|
||||||
let abort_rx = registry.lock().await.register_abort(session_id);
|
let abort_rx = registry.lock().await.register_abort(session_id);
|
||||||
|
let compositor_tx = registry.lock().await.compositor_tx.clone();
|
||||||
let reg = Arc::clone(registry);
|
let reg = Arc::clone(registry);
|
||||||
let aid = app_id.clone();
|
let aid = app_id.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = runtime::supervise(session_id, &aid, reg, abort_rx).await {
|
if let Err(e) =
|
||||||
|
runtime::supervise(session_id, &aid, reg, abort_rx, compositor_tx).await
|
||||||
|
{
|
||||||
tracing::warn!(session_id, error = %e, "runtime supervisor error");
|
tracing::warn!(session_id, error = %e, "runtime supervisor error");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
@ -647,9 +658,15 @@ mod tests {
|
||||||
let session_id = registry.lock().await.launch("test.app");
|
let session_id = registry.lock().await.launch("test.app");
|
||||||
let abort_rx = registry.lock().await.register_abort(session_id);
|
let abort_rx = registry.lock().await.register_abort(session_id);
|
||||||
|
|
||||||
runtime::supervise(session_id, "test.app", Arc::clone(®istry), abort_rx)
|
runtime::supervise(
|
||||||
.await
|
session_id,
|
||||||
.unwrap();
|
"test.app",
|
||||||
|
Arc::clone(®istry),
|
||||||
|
abort_rx,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
assert!(matches!(
|
assert!(matches!(
|
||||||
registry.lock().await.state(session_id),
|
registry.lock().await.state(session_id),
|
||||||
|
|
@ -703,6 +720,7 @@ mod tests {
|
||||||
"test.abort.startup",
|
"test.abort.startup",
|
||||||
Arc::clone(®istry),
|
Arc::clone(®istry),
|
||||||
abort_rx,
|
abort_rx,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
@ -748,6 +766,7 @@ mod tests {
|
||||||
"test.spawn.fail",
|
"test.spawn.fail",
|
||||||
Arc::clone(®istry),
|
Arc::clone(®istry),
|
||||||
abort_rx,
|
abort_rx,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,10 @@
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||||
|
use weft_ipc_types::AppdToCompositor;
|
||||||
|
|
||||||
use crate::Registry;
|
use crate::Registry;
|
||||||
|
use crate::compositor_client::CompositorSender;
|
||||||
use crate::ipc::{AppStateKind, Response};
|
use crate::ipc::{AppStateKind, Response};
|
||||||
|
|
||||||
const READY_TIMEOUT: Duration = Duration::from_secs(30);
|
const READY_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
|
|
@ -12,6 +14,7 @@ pub(crate) async fn supervise(
|
||||||
app_id: &str,
|
app_id: &str,
|
||||||
registry: Registry,
|
registry: Registry,
|
||||||
abort_rx: tokio::sync::oneshot::Receiver<()>,
|
abort_rx: tokio::sync::oneshot::Receiver<()>,
|
||||||
|
compositor_tx: Option<CompositorSender>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut abort_rx = abort_rx;
|
let mut abort_rx = abort_rx;
|
||||||
let bin = match std::env::var("WEFT_RUNTIME_BIN") {
|
let bin = match std::env::var("WEFT_RUNTIME_BIN") {
|
||||||
|
|
@ -43,6 +46,17 @@ pub(crate) async fn supervise(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if let Some(tx) = &compositor_tx {
|
||||||
|
let pid = child.id().unwrap_or(0);
|
||||||
|
let _ = tx
|
||||||
|
.send(AppdToCompositor::AppSurfaceCreated {
|
||||||
|
app_id: app_id.to_owned(),
|
||||||
|
session_id,
|
||||||
|
pid,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
let stdout = child.stdout.take().expect("stdout piped");
|
let stdout = child.stdout.take().expect("stdout piped");
|
||||||
let stderr = child.stderr.take().expect("stderr piped");
|
let stderr = child.stderr.take().expect("stderr piped");
|
||||||
|
|
||||||
|
|
@ -103,6 +117,12 @@ pub(crate) async fn supervise(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(tx) = &compositor_tx {
|
||||||
|
let _ = tx
|
||||||
|
.send(AppdToCompositor::AppSurfaceDestroyed { session_id })
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut reg = registry.lock().await;
|
let mut reg = registry.lock().await;
|
||||||
reg.set_state(session_id, AppStateKind::Stopped);
|
reg.set_state(session_id, AppStateKind::Stopped);
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue