WEFT_OS/crates/weft-appd/src/main.rs

204 lines
5.8 KiB
Rust
Raw Normal View History

use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Context;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
mod ipc;
use ipc::{AppStateKind, Request, Response};
type Registry = Arc<Mutex<SessionRegistry>>;
#[derive(Default)]
struct SessionRegistry {
next_id: u64,
sessions: std::collections::HashMap<u64, AppStateKind>,
}
impl SessionRegistry {
fn launch(&mut self, _app_id: &str) -> u64 {
self.next_id += 1;
let id = self.next_id;
self.sessions.insert(id, AppStateKind::Starting);
id
}
fn terminate(&mut self, session_id: u64) -> bool {
self.sessions.remove(&session_id).is_some()
}
fn running_ids(&self) -> Vec<u64> {
self.sessions.keys().copied().collect()
}
fn state(&self, session_id: u64) -> AppStateKind {
self.sessions
.get(&session_id)
.cloned()
.unwrap_or(AppStateKind::NotFound)
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
run().await
}
async fn run() -> anyhow::Result<()> {
let socket_path = appd_socket_path()?;
if let Some(parent) = socket_path.parent() {
std::fs::create_dir_all(parent).with_context(|| format!("create {}", parent.display()))?;
}
let _ = std::fs::remove_file(&socket_path);
let listener = tokio::net::UnixListener::bind(&socket_path)
.with_context(|| format!("bind {}", socket_path.display()))?;
tracing::info!(path = %socket_path.display(), "IPC socket listening");
let _ = sd_notify::notify(false, &[sd_notify::NotifyState::Ready]);
let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default()));
let mut shutdown = std::pin::pin!(tokio::signal::ctrl_c());
loop {
tokio::select! {
result = listener.accept() => {
let (stream, _) = result.context("accept")?;
let reg = Arc::clone(&registry);
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, reg).await {
tracing::warn!(error = %e, "connection error");
}
});
}
_ = &mut shutdown => {
tracing::info!("shutting down");
break;
}
}
}
let _ = std::fs::remove_file(&socket_path);
Ok(())
}
async fn handle_connection(
stream: tokio::net::UnixStream,
registry: Registry,
) -> anyhow::Result<()> {
let (reader, writer) = tokio::io::split(stream);
let mut reader = tokio::io::BufReader::new(reader);
let mut writer = tokio::io::BufWriter::new(writer);
while let Some(req) = ipc::read_frame(&mut reader).await? {
tracing::debug!(?req, "request");
let resp = dispatch(req, &registry).await;
ipc::write_frame(&mut writer, &resp).await?;
writer.flush().await?;
}
Ok(())
}
async fn dispatch(req: Request, registry: &Registry) -> Response {
match req {
Request::LaunchApp {
app_id,
surface_id: _,
} => {
let session_id = registry.lock().await.launch(&app_id);
tracing::info!(session_id, %app_id, "launched");
Response::LaunchAck { session_id }
}
Request::TerminateApp { session_id } => {
let found = registry.lock().await.terminate(session_id);
if found {
tracing::info!(session_id, "terminated");
Response::AppState {
session_id,
state: AppStateKind::Stopped,
}
} else {
Response::Error {
code: 1,
message: format!("session {session_id} not found"),
}
}
}
Request::QueryRunning => {
let session_ids = registry.lock().await.running_ids();
Response::RunningApps { session_ids }
}
Request::QueryAppState { session_id } => {
let state = registry.lock().await.state(session_id);
Response::AppState { session_id, state }
}
}
}
fn appd_socket_path() -> anyhow::Result<PathBuf> {
if let Ok(p) = std::env::var("WEFT_APPD_SOCKET") {
return Ok(PathBuf::from(p));
}
let runtime_dir = std::env::var("XDG_RUNTIME_DIR").context("XDG_RUNTIME_DIR not set")?;
Ok(PathBuf::from(runtime_dir).join("weft/appd.sock"))
}
#[cfg(test)]
mod tests {
use super::*;
use ipc::AppStateKind;
#[test]
fn registry_launch_increments_id() {
let mut reg = SessionRegistry::default();
let id1 = reg.launch("com.example.a");
let id2 = reg.launch("com.example.b");
assert!(id2 > id1);
}
#[test]
fn registry_terminate_known_session() {
let mut reg = SessionRegistry::default();
let id = reg.launch("com.example.app");
assert!(reg.terminate(id));
assert!(matches!(reg.state(id), AppStateKind::NotFound));
}
#[test]
fn registry_terminate_unknown_returns_false() {
let mut reg = SessionRegistry::default();
assert!(!reg.terminate(999));
}
#[test]
fn registry_running_ids_reflects_live_sessions() {
let mut reg = SessionRegistry::default();
let id1 = reg.launch("a");
let id2 = reg.launch("b");
let mut ids = reg.running_ids();
ids.sort();
assert_eq!(ids, vec![id1, id2]);
reg.terminate(id1);
assert_eq!(reg.running_ids(), vec![id2]);
}
#[test]
fn registry_state_not_found_for_unknown() {
let reg = SessionRegistry::default();
assert!(matches!(reg.state(42), AppStateKind::NotFound));
}
}