diff --git a/Cargo.lock b/Cargo.lock index 11aa6c4..b3d6e70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1395,6 +1395,25 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "rmp" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ba8be72d372b2c9b35542551678538b562e7cf86c3315773cae48dfbfe7790c" +dependencies = [ + "num-traits", +] + +[[package]] +name = "rmp-serde" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72f81bee8c8ef9b577d1681a70ebbc962c232461e397b22c208c43c04b67a155" +dependencies = [ + "rmp", + "serde", +] + [[package]] name = "rustix" version = "0.38.44" @@ -1464,6 +1483,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" dependencies = [ "serde_core", + "serde_derive", ] [[package]] @@ -1649,6 +1669,16 @@ dependencies = [ "serde", ] +[[package]] +name = "socket2" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" +dependencies = [ + "libc", + "windows-sys 0.61.2", +] + [[package]] name = "syn" version = "2.0.117" @@ -1747,10 +1777,12 @@ version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" dependencies = [ + "bytes", "libc", "mio", "pin-project-lite", "signal-hook-registry", + "socket2", "tokio-macros", "windows-sys 0.61.2", ] @@ -2243,7 +2275,9 @@ name = "weft-appd" version = "0.1.0" dependencies = [ "anyhow", + "rmp-serde", "sd-notify", + "serde", "tokio", "tracing", "tracing-subscriber", diff --git a/crates/weft-appd/Cargo.toml b/crates/weft-appd/Cargo.toml index 6261af3..ee6783e 100644 --- a/crates/weft-appd/Cargo.toml +++ b/crates/weft-appd/Cargo.toml @@ -11,6 +11,8 @@ path = "src/main.rs" [dependencies] anyhow = "1.0" sd-notify = "0.4" -tokio = { version = "1", features = ["rt", "macros", "signal"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "signal", "sync"] } +serde = { version = "1", features = ["derive"] } +rmp-serde = "1" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/crates/weft-appd/src/ipc.rs b/crates/weft-appd/src/ipc.rs new file mode 100644 index 0000000..1899190 --- /dev/null +++ b/crates/weft-appd/src/ipc.rs @@ -0,0 +1,70 @@ +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")] +pub enum Request { + LaunchApp { app_id: String, surface_id: u64 }, + TerminateApp { session_id: u64 }, + QueryRunning, + QueryAppState { session_id: u64 }, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")] +pub enum Response { + LaunchAck { + session_id: u64, + }, + AppReady { + session_id: u64, + }, + RunningApps { + session_ids: Vec, + }, + AppState { + session_id: u64, + state: AppStateKind, + }, + Error { + code: u32, + message: String, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum AppStateKind { + Starting, + Running, + Stopping, + Stopped, + NotFound, +} + +pub async fn read_frame( + reader: &mut (impl AsyncReadExt + Unpin), +) -> anyhow::Result> { + let mut len_buf = [0u8; 4]; + match reader.read_exact(&mut len_buf).await { + Ok(_) => {} + Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None), + Err(e) => return Err(e.into()), + } + let len = u32::from_le_bytes(len_buf) as usize; + let mut body = vec![0u8; len]; + reader.read_exact(&mut body).await?; + let req = rmp_serde::from_slice(&body)?; + Ok(Some(req)) +} + +pub async fn write_frame( + writer: &mut (impl AsyncWriteExt + Unpin), + response: &Response, +) -> anyhow::Result<()> { + let body = rmp_serde::to_vec(response)?; + let len = (body.len() as u32).to_le_bytes(); + writer.write_all(&len).await?; + writer.write_all(&body).await?; + Ok(()) +} diff --git a/crates/weft-appd/src/main.rs b/crates/weft-appd/src/main.rs index 1287b2d..bc1714e 100644 --- a/crates/weft-appd/src/main.rs +++ b/crates/weft-appd/src/main.rs @@ -1,8 +1,47 @@ use std::path::PathBuf; +use std::sync::Arc; use anyhow::Context; +use tokio::io::AsyncWriteExt; +use tokio::sync::Mutex; -#[tokio::main(flavor = "current_thread")] +mod ipc; + +use ipc::{AppStateKind, Request, Response}; + +type Registry = Arc>; + +#[derive(Default)] +struct SessionRegistry { + next_id: u64, + sessions: std::collections::HashMap, +} + +impl SessionRegistry { + fn launch(&mut self, _app_id: &str) -> u64 { + self.next_id += 1; + let id = self.next_id; + self.sessions.insert(id, AppStateKind::Starting); + id + } + + fn terminate(&mut self, session_id: u64) -> bool { + self.sessions.remove(&session_id).is_some() + } + + fn running_ids(&self) -> Vec { + self.sessions.keys().copied().collect() + } + + fn state(&self, session_id: u64) -> AppStateKind { + self.sessions + .get(&session_id) + .cloned() + .unwrap_or(AppStateKind::NotFound) + } +} + +#[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt() .with_env_filter( @@ -16,27 +55,95 @@ async fn main() -> anyhow::Result<()> { async fn run() -> anyhow::Result<()> { let socket_path = appd_socket_path()?; - tracing::info!(path = %socket_path.display(), "weft-appd IPC socket"); - // Wave 5 skeleton entry point. - // - // Components to implement (see docu_dev/WEFT-OS-APPD-DESIGN.md): - // AppRegistry — in-memory map of running app sessions and state - // IpcServer — Unix socket at socket_path, serves servo-shell requests - // CompositorClient — Unix socket client to weft-compositor IPC server - // RuntimeSupervisor — spawns and monitors Wasmtime child processes - // CapabilityBroker — resolves manifest permissions to runtime handles - // ResourceController — configures cgroups via systemd transient units - // - // IPC transport: 4-byte LE length-prefixed MessagePack frames. - // Socket path: /run/user//weft/appd.sock (overridable via WEFT_APPD_SOCKET). - // - // sd_notify(READY=1) is sent after IpcServer is bound and CompositorClient - // has established its connection, satisfying Type=notify in weft-appd.service. - anyhow::bail!( - "weft-appd event loop not yet implemented; \ - see docu_dev/WEFT-OS-APPD-DESIGN.md" - ) + if let Some(parent) = socket_path.parent() { + std::fs::create_dir_all(parent).with_context(|| format!("create {}", parent.display()))?; + } + let _ = std::fs::remove_file(&socket_path); + + let listener = tokio::net::UnixListener::bind(&socket_path) + .with_context(|| format!("bind {}", socket_path.display()))?; + tracing::info!(path = %socket_path.display(), "IPC socket listening"); + + let _ = sd_notify::notify(false, &[sd_notify::NotifyState::Ready]); + + let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default())); + + let mut shutdown = std::pin::pin!(tokio::signal::ctrl_c()); + + loop { + tokio::select! { + result = listener.accept() => { + let (stream, _) = result.context("accept")?; + let reg = Arc::clone(®istry); + tokio::spawn(async move { + if let Err(e) = handle_connection(stream, reg).await { + tracing::warn!(error = %e, "connection error"); + } + }); + } + _ = &mut shutdown => { + tracing::info!("shutting down"); + break; + } + } + } + + let _ = std::fs::remove_file(&socket_path); + Ok(()) +} + +async fn handle_connection( + stream: tokio::net::UnixStream, + registry: Registry, +) -> anyhow::Result<()> { + let (reader, writer) = tokio::io::split(stream); + let mut reader = tokio::io::BufReader::new(reader); + let mut writer = tokio::io::BufWriter::new(writer); + + while let Some(req) = ipc::read_frame(&mut reader).await? { + tracing::debug!(?req, "request"); + let resp = dispatch(req, ®istry).await; + ipc::write_frame(&mut writer, &resp).await?; + writer.flush().await?; + } + Ok(()) +} + +async fn dispatch(req: Request, registry: &Registry) -> Response { + match req { + Request::LaunchApp { + app_id, + surface_id: _, + } => { + let session_id = registry.lock().await.launch(&app_id); + tracing::info!(session_id, %app_id, "launched"); + Response::LaunchAck { session_id } + } + Request::TerminateApp { session_id } => { + let found = registry.lock().await.terminate(session_id); + if found { + tracing::info!(session_id, "terminated"); + Response::AppState { + session_id, + state: AppStateKind::Stopped, + } + } else { + Response::Error { + code: 1, + message: format!("session {session_id} not found"), + } + } + } + Request::QueryRunning => { + let session_ids = registry.lock().await.running_ids(); + Response::RunningApps { session_ids } + } + Request::QueryAppState { session_id } => { + let state = registry.lock().await.state(session_id); + Response::AppState { session_id, state } + } + } } fn appd_socket_path() -> anyhow::Result {