feat(appd): implement IPC server with Unix socket and MessagePack framing

Replaces the skeleton bail with a functional IPC server.

ipc.rs — transport layer:
- Request enum: LaunchApp, TerminateApp, QueryRunning, QueryAppState.
  Serialized with serde MessagePack (rmp-serde, SCREAMING_SNAKE_CASE
  type tag).
- Response enum: LaunchAck, AppReady, RunningApps, AppState, Error.
- AppStateKind: Starting, Running, Stopping, Stopped, NotFound.
- read_frame / write_frame: async 4-byte LE length-prefixed codec over
  any AsyncRead / AsyncWrite.

main.rs — server:
- SessionRegistry: in-memory HashMap<session_id, AppStateKind> with
  monotonic ID counter; launch / terminate / running_ids / state.
- run(): creates socket parent directory, removes stale socket, binds
  UnixListener, sends sd_notify READY=1, then accept-loops with
  ctrl-c / SIGTERM shutdown. Cleans up socket on exit.
- handle_connection(): splits stream into BufReader/BufWriter, reads
  request frames, calls dispatch, writes response frames.
- dispatch(): maps Request variants to SessionRegistry operations;
  returns typed Response. Wasmtime spawning and compositor client
  deferred to later implementation.

New deps: serde (derive), rmp-serde, tokio net/io-util/sync/rt-multi-thread.
This commit is contained in:
Marco Allegretti 2026-03-11 08:25:55 +01:00
parent 3abc83f9ed
commit 538eccd4c6
4 changed files with 235 additions and 22 deletions

34
Cargo.lock generated
View file

@ -1395,6 +1395,25 @@ version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a"
[[package]]
name = "rmp"
version = "0.8.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ba8be72d372b2c9b35542551678538b562e7cf86c3315773cae48dfbfe7790c"
dependencies = [
"num-traits",
]
[[package]]
name = "rmp-serde"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72f81bee8c8ef9b577d1681a70ebbc962c232461e397b22c208c43c04b67a155"
dependencies = [
"rmp",
"serde",
]
[[package]]
name = "rustix"
version = "0.38.44"
@ -1464,6 +1483,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
dependencies = [
"serde_core",
"serde_derive",
]
[[package]]
@ -1649,6 +1669,16 @@ dependencies = [
"serde",
]
[[package]]
name = "socket2"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e"
dependencies = [
"libc",
"windows-sys 0.61.2",
]
[[package]]
name = "syn"
version = "2.0.117"
@ -1747,10 +1777,12 @@ version = "1.50.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d"
dependencies = [
"bytes",
"libc",
"mio",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.61.2",
]
@ -2243,7 +2275,9 @@ name = "weft-appd"
version = "0.1.0"
dependencies = [
"anyhow",
"rmp-serde",
"sd-notify",
"serde",
"tokio",
"tracing",
"tracing-subscriber",

View file

@ -11,6 +11,8 @@ path = "src/main.rs"
[dependencies]
anyhow = "1.0"
sd-notify = "0.4"
tokio = { version = "1", features = ["rt", "macros", "signal"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "signal", "sync"] }
serde = { version = "1", features = ["derive"] }
rmp-serde = "1"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View file

@ -0,0 +1,70 @@
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")]
pub enum Request {
LaunchApp { app_id: String, surface_id: u64 },
TerminateApp { session_id: u64 },
QueryRunning,
QueryAppState { session_id: u64 },
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")]
pub enum Response {
LaunchAck {
session_id: u64,
},
AppReady {
session_id: u64,
},
RunningApps {
session_ids: Vec<u64>,
},
AppState {
session_id: u64,
state: AppStateKind,
},
Error {
code: u32,
message: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AppStateKind {
Starting,
Running,
Stopping,
Stopped,
NotFound,
}
pub async fn read_frame(
reader: &mut (impl AsyncReadExt + Unpin),
) -> anyhow::Result<Option<Request>> {
let mut len_buf = [0u8; 4];
match reader.read_exact(&mut len_buf).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => return Err(e.into()),
}
let len = u32::from_le_bytes(len_buf) as usize;
let mut body = vec![0u8; len];
reader.read_exact(&mut body).await?;
let req = rmp_serde::from_slice(&body)?;
Ok(Some(req))
}
pub async fn write_frame(
writer: &mut (impl AsyncWriteExt + Unpin),
response: &Response,
) -> anyhow::Result<()> {
let body = rmp_serde::to_vec(response)?;
let len = (body.len() as u32).to_le_bytes();
writer.write_all(&len).await?;
writer.write_all(&body).await?;
Ok(())
}

View file

@ -1,8 +1,47 @@
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Context;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
#[tokio::main(flavor = "current_thread")]
mod ipc;
use ipc::{AppStateKind, Request, Response};
type Registry = Arc<Mutex<SessionRegistry>>;
#[derive(Default)]
struct SessionRegistry {
next_id: u64,
sessions: std::collections::HashMap<u64, AppStateKind>,
}
impl SessionRegistry {
fn launch(&mut self, _app_id: &str) -> u64 {
self.next_id += 1;
let id = self.next_id;
self.sessions.insert(id, AppStateKind::Starting);
id
}
fn terminate(&mut self, session_id: u64) -> bool {
self.sessions.remove(&session_id).is_some()
}
fn running_ids(&self) -> Vec<u64> {
self.sessions.keys().copied().collect()
}
fn state(&self, session_id: u64) -> AppStateKind {
self.sessions
.get(&session_id)
.cloned()
.unwrap_or(AppStateKind::NotFound)
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
@ -16,27 +55,95 @@ async fn main() -> anyhow::Result<()> {
async fn run() -> anyhow::Result<()> {
let socket_path = appd_socket_path()?;
tracing::info!(path = %socket_path.display(), "weft-appd IPC socket");
// Wave 5 skeleton entry point.
//
// Components to implement (see docu_dev/WEFT-OS-APPD-DESIGN.md):
// AppRegistry — in-memory map of running app sessions and state
// IpcServer — Unix socket at socket_path, serves servo-shell requests
// CompositorClient — Unix socket client to weft-compositor IPC server
// RuntimeSupervisor — spawns and monitors Wasmtime child processes
// CapabilityBroker — resolves manifest permissions to runtime handles
// ResourceController — configures cgroups via systemd transient units
//
// IPC transport: 4-byte LE length-prefixed MessagePack frames.
// Socket path: /run/user/<uid>/weft/appd.sock (overridable via WEFT_APPD_SOCKET).
//
// sd_notify(READY=1) is sent after IpcServer is bound and CompositorClient
// has established its connection, satisfying Type=notify in weft-appd.service.
anyhow::bail!(
"weft-appd event loop not yet implemented; \
see docu_dev/WEFT-OS-APPD-DESIGN.md"
)
if let Some(parent) = socket_path.parent() {
std::fs::create_dir_all(parent).with_context(|| format!("create {}", parent.display()))?;
}
let _ = std::fs::remove_file(&socket_path);
let listener = tokio::net::UnixListener::bind(&socket_path)
.with_context(|| format!("bind {}", socket_path.display()))?;
tracing::info!(path = %socket_path.display(), "IPC socket listening");
let _ = sd_notify::notify(false, &[sd_notify::NotifyState::Ready]);
let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default()));
let mut shutdown = std::pin::pin!(tokio::signal::ctrl_c());
loop {
tokio::select! {
result = listener.accept() => {
let (stream, _) = result.context("accept")?;
let reg = Arc::clone(&registry);
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, reg).await {
tracing::warn!(error = %e, "connection error");
}
});
}
_ = &mut shutdown => {
tracing::info!("shutting down");
break;
}
}
}
let _ = std::fs::remove_file(&socket_path);
Ok(())
}
async fn handle_connection(
stream: tokio::net::UnixStream,
registry: Registry,
) -> anyhow::Result<()> {
let (reader, writer) = tokio::io::split(stream);
let mut reader = tokio::io::BufReader::new(reader);
let mut writer = tokio::io::BufWriter::new(writer);
while let Some(req) = ipc::read_frame(&mut reader).await? {
tracing::debug!(?req, "request");
let resp = dispatch(req, &registry).await;
ipc::write_frame(&mut writer, &resp).await?;
writer.flush().await?;
}
Ok(())
}
async fn dispatch(req: Request, registry: &Registry) -> Response {
match req {
Request::LaunchApp {
app_id,
surface_id: _,
} => {
let session_id = registry.lock().await.launch(&app_id);
tracing::info!(session_id, %app_id, "launched");
Response::LaunchAck { session_id }
}
Request::TerminateApp { session_id } => {
let found = registry.lock().await.terminate(session_id);
if found {
tracing::info!(session_id, "terminated");
Response::AppState {
session_id,
state: AppStateKind::Stopped,
}
} else {
Response::Error {
code: 1,
message: format!("session {session_id} not found"),
}
}
}
Request::QueryRunning => {
let session_ids = registry.lock().await.running_ids();
Response::RunningApps { session_ids }
}
Request::QueryAppState { session_id } => {
let state = registry.lock().await.state(session_id);
Response::AppState { session_id, state }
}
}
}
fn appd_socket_path() -> anyhow::Result<PathBuf> {