mirror of
https://github.com/marcoallegretti/WEFT_OS.git
synced 2026-03-27 01:13:09 +00:00
feat(runtime): add --preopen and --ipc-socket CLI arguments
weft-runtime now parses optional flags after <app_id> <session_id>:
--preopen HOST::GUEST pre-opens a host directory at GUEST path in the
WASI filesystem (HOST::GUEST or HOST for same path)
--ipc-socket PATH sets WEFT_IPC_SOCKET env var inside the component
wasmtime-runtime path applies preopened dirs via cap_std and WasiCtxBuilder,
and injects WEFT_IPC_SOCKET when --ipc-socket is present. Stub path ignores
both flags.
weft-appd: SessionRegistry gains ipc_socket field (set to the appd Unix
socket path in run()), extracted alongside compositor_tx in dispatch(), and
forwarded to supervise() as ipc_socket_path. supervise() passes
--ipc-socket <path> to the spawned runtime when present.
cap-std added as optional dep under wasmtime-runtime feature.
This commit is contained in:
parent
e56daf6570
commit
b2ac279dc5
4 changed files with 80 additions and 16 deletions
|
|
@ -25,6 +25,7 @@ struct SessionRegistry {
|
||||||
broadcast: tokio::sync::broadcast::Sender<Response>,
|
broadcast: tokio::sync::broadcast::Sender<Response>,
|
||||||
abort_senders: std::collections::HashMap<u64, tokio::sync::oneshot::Sender<()>>,
|
abort_senders: std::collections::HashMap<u64, tokio::sync::oneshot::Sender<()>>,
|
||||||
compositor_tx: Option<compositor_client::CompositorSender>,
|
compositor_tx: Option<compositor_client::CompositorSender>,
|
||||||
|
ipc_socket: Option<PathBuf>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for SessionRegistry {
|
impl Default for SessionRegistry {
|
||||||
|
|
@ -36,6 +37,7 @@ impl Default for SessionRegistry {
|
||||||
broadcast,
|
broadcast,
|
||||||
abort_senders: std::collections::HashMap::new(),
|
abort_senders: std::collections::HashMap::new(),
|
||||||
compositor_tx: None,
|
compositor_tx: None,
|
||||||
|
ipc_socket: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -128,6 +130,7 @@ async fn run() -> anyhow::Result<()> {
|
||||||
tracing::info!(path = %socket_path.display(), "IPC socket listening");
|
tracing::info!(path = %socket_path.display(), "IPC socket listening");
|
||||||
|
|
||||||
let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default()));
|
let registry: Registry = Arc::new(Mutex::new(SessionRegistry::default()));
|
||||||
|
registry.lock().await.ipc_socket = Some(socket_path.clone());
|
||||||
|
|
||||||
if let Some(path) = compositor_client::socket_path() {
|
if let Some(path) = compositor_client::socket_path() {
|
||||||
let tx = compositor_client::spawn(path);
|
let tx = compositor_client::spawn(path);
|
||||||
|
|
@ -240,11 +243,13 @@ pub(crate) async fn dispatch(req: Request, registry: &Registry) -> Response {
|
||||||
tracing::info!(session_id, %app_id, "launched");
|
tracing::info!(session_id, %app_id, "launched");
|
||||||
let abort_rx = registry.lock().await.register_abort(session_id);
|
let abort_rx = registry.lock().await.register_abort(session_id);
|
||||||
let compositor_tx = registry.lock().await.compositor_tx.clone();
|
let compositor_tx = registry.lock().await.compositor_tx.clone();
|
||||||
|
let ipc_socket = registry.lock().await.ipc_socket.clone();
|
||||||
let reg = Arc::clone(registry);
|
let reg = Arc::clone(registry);
|
||||||
let aid = app_id.clone();
|
let aid = app_id.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) =
|
if let Err(e) =
|
||||||
runtime::supervise(session_id, &aid, reg, abort_rx, compositor_tx).await
|
runtime::supervise(session_id, &aid, reg, abort_rx, compositor_tx, ipc_socket)
|
||||||
|
.await
|
||||||
{
|
{
|
||||||
tracing::warn!(session_id, error = %e, "runtime supervisor error");
|
tracing::warn!(session_id, error = %e, "runtime supervisor error");
|
||||||
}
|
}
|
||||||
|
|
@ -664,6 +669,7 @@ mod tests {
|
||||||
Arc::clone(®istry),
|
Arc::clone(®istry),
|
||||||
abort_rx,
|
abort_rx,
|
||||||
None,
|
None,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
@ -721,6 +727,7 @@ mod tests {
|
||||||
Arc::clone(®istry),
|
Arc::clone(®istry),
|
||||||
abort_rx,
|
abort_rx,
|
||||||
None,
|
None,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
@ -767,6 +774,7 @@ mod tests {
|
||||||
Arc::clone(®istry),
|
Arc::clone(®istry),
|
||||||
abort_rx,
|
abort_rx,
|
||||||
None,
|
None,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ pub(crate) async fn supervise(
|
||||||
registry: Registry,
|
registry: Registry,
|
||||||
abort_rx: tokio::sync::oneshot::Receiver<()>,
|
abort_rx: tokio::sync::oneshot::Receiver<()>,
|
||||||
compositor_tx: Option<CompositorSender>,
|
compositor_tx: Option<CompositorSender>,
|
||||||
|
ipc_socket_path: Option<std::path::PathBuf>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut abort_rx = abort_rx;
|
let mut abort_rx = abort_rx;
|
||||||
let bin = match std::env::var("WEFT_RUNTIME_BIN") {
|
let bin = match std::env::var("WEFT_RUNTIME_BIN") {
|
||||||
|
|
@ -25,14 +26,18 @@ pub(crate) async fn supervise(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut child = match tokio::process::Command::new(&bin)
|
let mut cmd = tokio::process::Command::new(&bin);
|
||||||
.arg(app_id)
|
cmd.arg(app_id)
|
||||||
.arg(session_id.to_string())
|
.arg(session_id.to_string())
|
||||||
.stdout(std::process::Stdio::piped())
|
.stdout(std::process::Stdio::piped())
|
||||||
.stderr(std::process::Stdio::piped())
|
.stderr(std::process::Stdio::piped())
|
||||||
.stdin(std::process::Stdio::null())
|
.stdin(std::process::Stdio::null());
|
||||||
.spawn()
|
|
||||||
{
|
if let Some(ref sock) = ipc_socket_path {
|
||||||
|
cmd.arg("--ipc-socket").arg(sock);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut child = match cmd.spawn() {
|
||||||
Ok(c) => c,
|
Ok(c) => c,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::warn!(session_id, %app_id, error = %e, "failed to spawn runtime; marking session stopped");
|
tracing::warn!(session_id, %app_id, error = %e, "failed to spawn runtime; marking session stopped");
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ path = "src/main.rs"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
wasmtime-runtime = ["dep:wasmtime", "dep:wasmtime-wasi"]
|
wasmtime-runtime = ["dep:wasmtime", "dep:wasmtime-wasi", "dep:cap-std"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
|
|
@ -18,3 +18,4 @@ tracing = "0.1"
|
||||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||||
wasmtime = { version = "30", optional = true }
|
wasmtime = { version = "30", optional = true }
|
||||||
wasmtime-wasi = { version = "30", optional = true }
|
wasmtime-wasi = { version = "30", optional = true }
|
||||||
|
cap-std = { version = "3", optional = true }
|
||||||
|
|
|
||||||
|
|
@ -12,13 +12,44 @@ fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
let args: Vec<String> = std::env::args().collect();
|
let args: Vec<String> = std::env::args().collect();
|
||||||
if args.len() < 3 {
|
if args.len() < 3 {
|
||||||
anyhow::bail!("usage: weft-runtime <app_id> <session_id>");
|
anyhow::bail!(
|
||||||
|
"usage: weft-runtime <app_id> <session_id> \
|
||||||
|
[--preopen HOST::GUEST]... [--ipc-socket PATH]"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
let app_id = &args[1];
|
let app_id = &args[1];
|
||||||
let session_id: u64 = args[2]
|
let session_id: u64 = args[2]
|
||||||
.parse()
|
.parse()
|
||||||
.with_context(|| format!("invalid session_id: {}", args[2]))?;
|
.with_context(|| format!("invalid session_id: {}", args[2]))?;
|
||||||
|
|
||||||
|
let mut preopen: Vec<(String, String)> = Vec::new();
|
||||||
|
let mut ipc_socket: Option<String> = None;
|
||||||
|
|
||||||
|
let mut i = 3usize;
|
||||||
|
while i < args.len() {
|
||||||
|
match args[i].as_str() {
|
||||||
|
"--preopen" => {
|
||||||
|
i += 1;
|
||||||
|
let spec = args.get(i).context("--preopen requires an argument")?;
|
||||||
|
if let Some((host, guest)) = spec.split_once("::") {
|
||||||
|
preopen.push((host.to_string(), guest.to_string()));
|
||||||
|
} else {
|
||||||
|
preopen.push((spec.clone(), spec.clone()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"--ipc-socket" => {
|
||||||
|
i += 1;
|
||||||
|
ipc_socket = Some(
|
||||||
|
args.get(i)
|
||||||
|
.context("--ipc-socket requires an argument")?
|
||||||
|
.clone(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
other => anyhow::bail!("unexpected argument: {other}"),
|
||||||
|
}
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
|
||||||
tracing::info!(session_id, %app_id, "weft-runtime starting");
|
tracing::info!(session_id, %app_id, "weft-runtime starting");
|
||||||
|
|
||||||
let pkg_dir = resolve_package(app_id)?;
|
let pkg_dir = resolve_package(app_id)?;
|
||||||
|
|
@ -30,7 +61,7 @@ fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!(session_id, %app_id, wasm = %wasm_path.display(), "executing module");
|
tracing::info!(session_id, %app_id, wasm = %wasm_path.display(), "executing module");
|
||||||
run_module(&wasm_path)?;
|
run_module(&wasm_path, &preopen, ipc_socket.as_deref())?;
|
||||||
|
|
||||||
tracing::info!(session_id, %app_id, "exiting");
|
tracing::info!(session_id, %app_id, "exiting");
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
@ -48,19 +79,28 @@ fn resolve_package(app_id: &str) -> anyhow::Result<PathBuf> {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(feature = "wasmtime-runtime"))]
|
#[cfg(not(feature = "wasmtime-runtime"))]
|
||||||
fn run_module(_wasm_path: &std::path::Path) -> anyhow::Result<()> {
|
fn run_module(
|
||||||
|
_wasm_path: &std::path::Path,
|
||||||
|
_preopen: &[(String, String)],
|
||||||
|
_ipc_socket: Option<&str>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
println!("READY");
|
println!("READY");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "wasmtime-runtime")]
|
#[cfg(feature = "wasmtime-runtime")]
|
||||||
fn run_module(wasm_path: &std::path::Path) -> anyhow::Result<()> {
|
fn run_module(
|
||||||
|
wasm_path: &std::path::Path,
|
||||||
|
preopen: &[(String, String)],
|
||||||
|
ipc_socket: Option<&str>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
use cap_std::{ambient_authority, fs::Dir};
|
||||||
use wasmtime::{
|
use wasmtime::{
|
||||||
Config, Engine, Store,
|
Config, Engine, Store,
|
||||||
component::{Component, Linker},
|
component::{Component, Linker},
|
||||||
};
|
};
|
||||||
use wasmtime_wasi::{
|
use wasmtime_wasi::{
|
||||||
ResourceTable, WasiCtx, WasiCtxBuilder, WasiView, add_to_linker_sync,
|
DirPerms, FilePerms, ResourceTable, WasiCtx, WasiCtxBuilder, WasiView, add_to_linker_sync,
|
||||||
bindings::sync::Command,
|
bindings::sync::Command,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -88,10 +128,20 @@ fn run_module(wasm_path: &std::path::Path) -> anyhow::Result<()> {
|
||||||
let mut linker: Linker<State> = Linker::new(&engine);
|
let mut linker: Linker<State> = Linker::new(&engine);
|
||||||
add_to_linker_sync(&mut linker).context("add WASI to linker")?;
|
add_to_linker_sync(&mut linker).context("add WASI to linker")?;
|
||||||
|
|
||||||
let ctx = WasiCtxBuilder::new()
|
let mut ctx_builder = WasiCtxBuilder::new();
|
||||||
.inherit_stdout()
|
ctx_builder.inherit_stdout().inherit_stderr();
|
||||||
.inherit_stderr()
|
|
||||||
.build();
|
if let Some(socket_path) = ipc_socket {
|
||||||
|
ctx_builder.env("WEFT_IPC_SOCKET", socket_path);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (host_path, guest_path) in preopen {
|
||||||
|
let dir = Dir::open_ambient_dir(host_path, ambient_authority())
|
||||||
|
.with_context(|| format!("open preopen dir {host_path}"))?;
|
||||||
|
ctx_builder.preopened_dir(dir, DirPerms::all(), FilePerms::all(), guest_path);
|
||||||
|
}
|
||||||
|
|
||||||
|
let ctx = ctx_builder.build();
|
||||||
let mut store = Store::new(
|
let mut store = Store::new(
|
||||||
&engine,
|
&engine,
|
||||||
State {
|
State {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue