mirror of
https://codeberg.org/likwid/likwid.git
synced 2026-02-10 05:23:09 +00:00
231 lines
7.8 KiB
Rust
231 lines
7.8 KiB
Rust
|
|
//! WASM runtime for plugin execution.
|
||
|
|
//!
|
||
|
|
//! Provides sandboxed execution of WASM plugins with resource limits,
|
||
|
|
//! timeout handling via epoch interruption, and fuel metering.
|
||
|
|
|
||
|
|
use std::sync::Arc;
|
||
|
|
use std::time::Duration;
|
||
|
|
|
||
|
|
use tokio::sync::oneshot;
|
||
|
|
use wasmtime::{Config, Engine, Linker, Module, Store, StoreLimitsBuilder};
|
||
|
|
|
||
|
|
use super::host_api::{HostState, HostStateWithLimits};
|
||
|
|
use crate::plugins::PluginError;
|
||
|
|
|
||
|
|
/// Default fuel limit for WASM execution (computational steps).
|
||
|
|
pub const DEFAULT_FUEL_LIMIT: u64 = 10_000_000;
|
||
|
|
/// Default timeout in milliseconds.
|
||
|
|
pub const DEFAULT_TIMEOUT_MS: u64 = 5000;
|
||
|
|
/// Default memory limit (16 MB).
|
||
|
|
pub const DEFAULT_MEMORY_LIMIT_BYTES: usize = 16 * 1024 * 1024;
|
||
|
|
/// Default table element limit.
|
||
|
|
pub const DEFAULT_TABLE_ELEMENTS: u32 = 10_000;
|
||
|
|
|
||
|
|
/// WASM runtime engine with epoch-based timeout support.
|
||
|
|
pub struct WasmRuntime {
|
||
|
|
engine: Arc<Engine>,
|
||
|
|
_epoch_ticker_shutdown: Option<oneshot::Sender<()>>,
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Execution limits for WASM plugins.
|
||
|
|
#[derive(Clone, Debug)]
|
||
|
|
pub struct ExecutionLimits {
|
||
|
|
pub fuel: u64,
|
||
|
|
pub timeout_ms: u64,
|
||
|
|
pub memory_bytes: usize,
|
||
|
|
pub table_elements: u32,
|
||
|
|
}
|
||
|
|
|
||
|
|
impl Default for ExecutionLimits {
|
||
|
|
fn default() -> Self {
|
||
|
|
Self {
|
||
|
|
fuel: DEFAULT_FUEL_LIMIT,
|
||
|
|
timeout_ms: DEFAULT_TIMEOUT_MS,
|
||
|
|
memory_bytes: DEFAULT_MEMORY_LIMIT_BYTES,
|
||
|
|
table_elements: DEFAULT_TABLE_ELEMENTS,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
impl WasmRuntime {
|
||
|
|
/// Creates a new WASM runtime with epoch ticker for timeout handling.
|
||
|
|
pub fn new() -> Result<Self, PluginError> {
|
||
|
|
let mut config = Config::new();
|
||
|
|
config.async_support(true);
|
||
|
|
config.consume_fuel(true);
|
||
|
|
config.epoch_interruption(true);
|
||
|
|
|
||
|
|
let engine = Arc::new(
|
||
|
|
Engine::new(&config)
|
||
|
|
.map_err(|e| PluginError::Message(format!("Failed to create WASM engine: {e}")))?
|
||
|
|
);
|
||
|
|
|
||
|
|
// Spawn epoch ticker for timeout enforcement
|
||
|
|
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
|
||
|
|
let engine_clone = engine.clone();
|
||
|
|
|
||
|
|
tokio::spawn(async move {
|
||
|
|
let mut interval = tokio::time::interval(Duration::from_millis(10));
|
||
|
|
loop {
|
||
|
|
tokio::select! {
|
||
|
|
_ = interval.tick() => {
|
||
|
|
engine_clone.increment_epoch();
|
||
|
|
}
|
||
|
|
_ = &mut shutdown_rx => {
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
});
|
||
|
|
|
||
|
|
Ok(Self {
|
||
|
|
engine,
|
||
|
|
_epoch_ticker_shutdown: Some(shutdown_tx),
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Compiles WASM bytes into a reusable module.
|
||
|
|
pub fn compile(&self, wasm_bytes: &[u8]) -> Result<CompiledPlugin, PluginError> {
|
||
|
|
let module = Module::new(&self.engine, wasm_bytes)
|
||
|
|
.map_err(|e| PluginError::Message(format!("Failed to compile WASM module: {e}")))?;
|
||
|
|
|
||
|
|
Ok(CompiledPlugin {
|
||
|
|
engine: self.engine.clone(),
|
||
|
|
module,
|
||
|
|
})
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/// A compiled WASM plugin ready for instantiation.
|
||
|
|
pub struct CompiledPlugin {
|
||
|
|
engine: Arc<Engine>,
|
||
|
|
module: Module,
|
||
|
|
}
|
||
|
|
|
||
|
|
impl CompiledPlugin {
|
||
|
|
pub fn engine(&self) -> &Engine {
|
||
|
|
&self.engine
|
||
|
|
}
|
||
|
|
|
||
|
|
pub fn module(&self) -> &Module {
|
||
|
|
&self.module
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/// An instantiated WASM plugin ready for hook execution.
|
||
|
|
pub struct PluginInstance {
|
||
|
|
store: Store<HostStateWithLimits>,
|
||
|
|
instance: wasmtime::Instance,
|
||
|
|
}
|
||
|
|
|
||
|
|
impl PluginInstance {
|
||
|
|
/// Creates a new plugin instance with the given host state and limits.
|
||
|
|
pub async fn new(
|
||
|
|
compiled: &CompiledPlugin,
|
||
|
|
host_state: HostState,
|
||
|
|
limits: ExecutionLimits,
|
||
|
|
) -> Result<Self, PluginError> {
|
||
|
|
let store_limits = StoreLimitsBuilder::new()
|
||
|
|
.memory_size(limits.memory_bytes)
|
||
|
|
.table_elements(limits.table_elements)
|
||
|
|
.instances(10)
|
||
|
|
.tables(10)
|
||
|
|
.memories(1)
|
||
|
|
.build();
|
||
|
|
|
||
|
|
let state_with_limits = HostStateWithLimits {
|
||
|
|
inner: host_state,
|
||
|
|
limits: store_limits,
|
||
|
|
};
|
||
|
|
|
||
|
|
let mut store = Store::new(compiled.engine(), state_with_limits);
|
||
|
|
store.limiter(|state| &mut state.limits);
|
||
|
|
|
||
|
|
store.set_fuel(limits.fuel).map_err(|e| {
|
||
|
|
PluginError::Message(format!("Failed to set fuel limit: {e}"))
|
||
|
|
})?;
|
||
|
|
|
||
|
|
let epoch_deadline = (limits.timeout_ms / 10).max(1);
|
||
|
|
store.epoch_deadline_async_yield_and_update(epoch_deadline);
|
||
|
|
|
||
|
|
let mut linker: Linker<HostStateWithLimits> = Linker::new(compiled.engine());
|
||
|
|
super::host_api::register_host_functions(&mut linker)?;
|
||
|
|
|
||
|
|
let instance = linker
|
||
|
|
.instantiate_async(&mut store, compiled.module())
|
||
|
|
.await
|
||
|
|
.map_err(|e| PluginError::Message(format!("Failed to instantiate WASM module: {e}")))?;
|
||
|
|
|
||
|
|
Ok(Self { store, instance })
|
||
|
|
}
|
||
|
|
|
||
|
|
pub async fn call_hook(
|
||
|
|
&mut self,
|
||
|
|
hook_name: &str,
|
||
|
|
payload_json: &str,
|
||
|
|
) -> Result<String, PluginError> {
|
||
|
|
let alloc = self
|
||
|
|
.instance
|
||
|
|
.get_typed_func::<u32, u32>(&mut self.store, "alloc")
|
||
|
|
.map_err(|e| PluginError::Message(format!("Plugin missing 'alloc' export: {e}")))?;
|
||
|
|
|
||
|
|
let dealloc = self
|
||
|
|
.instance
|
||
|
|
.get_typed_func::<(u32, u32), ()>(&mut self.store, "dealloc")
|
||
|
|
.map_err(|e| PluginError::Message(format!("Plugin missing 'dealloc' export: {e}")))?;
|
||
|
|
|
||
|
|
let handle_hook = self
|
||
|
|
.instance
|
||
|
|
.get_typed_func::<(u32, u32, u32, u32), u64>(&mut self.store, "handle_hook")
|
||
|
|
.map_err(|e| PluginError::Message(format!("Plugin missing 'handle_hook' export: {e}")))?;
|
||
|
|
|
||
|
|
let memory = self
|
||
|
|
.instance
|
||
|
|
.get_memory(&mut self.store, "memory")
|
||
|
|
.ok_or_else(|| PluginError::Message("Plugin missing 'memory' export".to_string()))?;
|
||
|
|
|
||
|
|
let hook_bytes = hook_name.as_bytes();
|
||
|
|
let hook_ptr = alloc.call_async(&mut self.store, hook_bytes.len() as u32).await
|
||
|
|
.map_err(|e| PluginError::Message(format!("alloc failed for hook name: {e}")))?;
|
||
|
|
memory.write(&mut self.store, hook_ptr as usize, hook_bytes)
|
||
|
|
.map_err(|e| PluginError::Message(format!("Failed to write hook name: {e}")))?;
|
||
|
|
|
||
|
|
let payload_bytes = payload_json.as_bytes();
|
||
|
|
let payload_ptr = alloc.call_async(&mut self.store, payload_bytes.len() as u32).await
|
||
|
|
.map_err(|e| PluginError::Message(format!("alloc failed for payload: {e}")))?;
|
||
|
|
memory.write(&mut self.store, payload_ptr as usize, payload_bytes)
|
||
|
|
.map_err(|e| PluginError::Message(format!("Failed to write payload: {e}")))?;
|
||
|
|
|
||
|
|
let result = handle_hook
|
||
|
|
.call_async(
|
||
|
|
&mut self.store,
|
||
|
|
(
|
||
|
|
hook_ptr,
|
||
|
|
hook_bytes.len() as u32,
|
||
|
|
payload_ptr,
|
||
|
|
payload_bytes.len() as u32,
|
||
|
|
),
|
||
|
|
)
|
||
|
|
.await
|
||
|
|
.map_err(|e| PluginError::Message(format!("handle_hook failed: {e}")))?;
|
||
|
|
|
||
|
|
let result_ptr = (result >> 32) as u32;
|
||
|
|
let result_len = (result & 0xFFFFFFFF) as u32;
|
||
|
|
|
||
|
|
let mut result_bytes = vec![0u8; result_len as usize];
|
||
|
|
memory.read(&self.store, result_ptr as usize, &mut result_bytes)
|
||
|
|
.map_err(|e| PluginError::Message(format!("Failed to read result: {e}")))?;
|
||
|
|
|
||
|
|
dealloc.call_async(&mut self.store, (hook_ptr, hook_bytes.len() as u32)).await.ok();
|
||
|
|
dealloc.call_async(&mut self.store, (payload_ptr, payload_bytes.len() as u32)).await.ok();
|
||
|
|
dealloc.call_async(&mut self.store, (result_ptr, result_len)).await.ok();
|
||
|
|
|
||
|
|
String::from_utf8(result_bytes)
|
||
|
|
.map_err(|e| PluginError::Message(format!("Result is not valid UTF-8: {e}")))
|
||
|
|
}
|
||
|
|
|
||
|
|
pub fn get_fuel(&self) -> u64 {
|
||
|
|
self.store.get_fuel().unwrap_or(0)
|
||
|
|
}
|
||
|
|
}
|