wasm: dedupe capability resolution

This commit is contained in:
Marco Allegretti 2026-02-02 11:33:24 +01:00
parent 91d2f79740
commit 3aba16a8e8

View file

@ -1,308 +1,261 @@
//! WASM plugin implementation. //! WASM plugin implementation.
//! //!
//! Wraps compiled WASM modules and implements the Plugin trait for //! Wraps compiled WASM modules and implements the Plugin trait for
//! integration with the hook system. //! integration with the hook system.
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use serde_json::{json, Value}; use serde_json::{json, Value};
use sqlx::PgPool; use sqlx::PgPool;
use uuid::Uuid; use uuid::Uuid;
use super::host_api::{Capability, HostState, PluginManifest, CAP_EMIT_EVENTS, CAP_KV_STORE, CAP_OUTBOUND_HTTP, CAP_SETTINGS}; use super::host_api::{Capability, HostState, PluginManifest, CAP_EMIT_EVENTS, CAP_KV_STORE, CAP_OUTBOUND_HTTP, CAP_SETTINGS};
use super::runtime::{CompiledPlugin, ExecutionLimits, PluginInstance}; use super::runtime::{CompiledPlugin, ExecutionLimits, PluginInstance};
use crate::plugins::hooks::{HookContext, PluginError}; use crate::plugins::hooks::{HookContext, PluginError};
use crate::plugins::manager::{Plugin, PluginMetadata, PluginScope, PluginSystem}; use crate::plugins::manager::{Plugin, PluginMetadata, PluginScope, PluginSystem};
/// A WASM-based plugin that can be loaded dynamically. /// A WASM-based plugin that can be loaded dynamically.
pub struct WasmPlugin { pub struct WasmPlugin {
package_id: Uuid, package_id: Uuid,
manifest: PluginManifest, manifest: PluginManifest,
compiled: Arc<CompiledPlugin>, compiled: Arc<CompiledPlugin>,
limits: ExecutionLimits, limits: ExecutionLimits,
} }
impl WasmPlugin { async fn capabilities_for_manifest(
/// Creates a new WASM plugin from a manifest and compiled module. pool: &PgPool,
pub fn new( community_id: Option<Uuid>,
package_id: Uuid, manifest_capabilities: &[String],
manifest: PluginManifest, ) -> Result<Vec<Capability>, PluginError> {
compiled: Arc<CompiledPlugin>, let mut out: Vec<Capability> = Vec::new();
) -> Self {
Self { let (allow_http, allowlist) = if let Some(cid) = community_id {
package_id, let row = sqlx::query!(
manifest, r#"SELECT settings as "settings!: serde_json::Value" FROM communities WHERE id = $1"#,
compiled, cid
limits: ExecutionLimits::default(), )
} .fetch_optional(pool)
} .await?;
/// Sets custom execution limits for this plugin. if let Some(row) = row {
#[allow(dead_code)] // API for future use let allow_http = row
pub fn with_limits(mut self, limits: ExecutionLimits) -> Self { .settings
self.limits = limits; .get("plugin_allow_outbound_http")
self .and_then(|v: &serde_json::Value| v.as_bool())
} .unwrap_or(false);
async fn capabilities_for(&self, pool: &PgPool, ctx: &HookContext) -> Result<Vec<Capability>, PluginError> { let allowlist: Vec<String> = row
let mut out: Vec<Capability> = Vec::new(); .settings
.get("plugin_http_egress_allowlist")
// Community policy influences outbound HTTP. .and_then(|v: &serde_json::Value| v.as_array())
let (allow_http, allowlist) = if let Some(cid) = ctx.community_id { .map(|arr: &Vec<serde_json::Value>| {
let row = sqlx::query!( arr.iter()
r#"SELECT settings as "settings!: serde_json::Value" FROM communities WHERE id = $1"#, .filter_map(|v: &serde_json::Value| v.as_str().map(|s: &str| s.to_string()))
cid .collect()
) })
.fetch_optional(pool) .unwrap_or_default();
.await?;
(allow_http, allowlist)
if let Some(row) = row { } else {
let allow_http = row (false, Vec::new())
.settings }
.get("plugin_allow_outbound_http") } else {
.and_then(|v: &serde_json::Value| v.as_bool()) (false, Vec::new())
.unwrap_or(false); };
let allowlist: Vec<String> = row for cap in manifest_capabilities {
.settings match cap.as_str() {
.get("plugin_http_egress_allowlist") CAP_OUTBOUND_HTTP => {
.and_then(|v: &serde_json::Value| v.as_array()) let allowed = allow_http && !allowlist.is_empty();
.map(|arr: &Vec<serde_json::Value>| { out.push(Capability {
arr.iter() name: cap.clone(),
.filter_map(|v: &serde_json::Value| v.as_str().map(|s: &str| s.to_string())) allowed,
.collect() config: serde_json::json!({"allowlist": allowlist}),
}) });
.unwrap_or_default(); }
CAP_SETTINGS | CAP_KV_STORE | CAP_EMIT_EVENTS => {
(allow_http, allowlist) out.push(Capability {
} else { name: cap.clone(),
(false, Vec::new()) allowed: true,
} config: serde_json::json!({}),
} else { });
(false, Vec::new()) }
}; _ => {
out.push(Capability {
for cap in &self.manifest.capabilities { name: cap.clone(),
match cap.as_str() { allowed: false,
CAP_OUTBOUND_HTTP => { config: serde_json::json!({}),
let allowed = allow_http && !allowlist.is_empty(); });
out.push(Capability { }
name: cap.clone(), }
allowed, }
config: serde_json::json!({"allowlist": allowlist}),
}); Ok(out)
} }
CAP_SETTINGS | CAP_KV_STORE | CAP_EMIT_EVENTS => {
out.push(Capability { impl WasmPlugin {
name: cap.clone(), /// Creates a new WASM plugin from a manifest and compiled module.
allowed: true, pub fn new(
config: serde_json::json!({}), package_id: Uuid,
}); manifest: PluginManifest,
} compiled: Arc<CompiledPlugin>,
_ => { ) -> Self {
// Unknown capability is denied by default. Self {
out.push(Capability { package_id,
name: cap.clone(), manifest,
allowed: false, compiled,
config: serde_json::json!({}), limits: ExecutionLimits::default(),
}); }
} }
}
} /// Sets custom execution limits for this plugin.
#[allow(dead_code)] // API for future use
Ok(out) pub fn with_limits(mut self, limits: ExecutionLimits) -> Self {
} self.limits = limits;
self
async fn create_instance(&self, ctx: &HookContext) -> Result<PluginInstance, PluginError> { }
let capabilities = self.capabilities_for(&ctx.pool, ctx).await?;
let host_state = HostState::new( async fn capabilities_for(&self, pool: &PgPool, ctx: &HookContext) -> Result<Vec<Capability>, PluginError> {
self.manifest.name.clone(), capabilities_for_manifest(pool, ctx.community_id, &self.manifest.capabilities).await
ctx.community_id, }
ctx.actor_user_id,
ctx.pool.clone(), async fn create_instance(&self, ctx: &HookContext) -> Result<PluginInstance, PluginError> {
self.package_id, let capabilities = self.capabilities_for(&ctx.pool, ctx).await?;
capabilities, let host_state = HostState::new(
); self.manifest.name.clone(),
ctx.community_id,
PluginInstance::new(&self.compiled, host_state, self.limits.clone()).await ctx.actor_user_id,
} ctx.pool.clone(),
} self.package_id,
capabilities,
#[async_trait] );
impl Plugin for WasmPlugin {
fn metadata(&self) -> PluginMetadata { PluginInstance::new(&self.compiled, host_state, self.limits.clone()).await
PluginMetadata { }
name: Box::leak(self.manifest.name.clone().into_boxed_str()), }
version: Box::leak(self.manifest.version.clone().into_boxed_str()),
description: Box::leak(self.manifest.description.clone().into_boxed_str()), #[async_trait]
is_core: false, impl Plugin for WasmPlugin {
scope: PluginScope::Community, fn metadata(&self) -> PluginMetadata {
default_enabled: false, PluginMetadata {
settings_schema: self.manifest.settings_schema.clone(), name: Box::leak(self.manifest.name.clone().into_boxed_str()),
} version: Box::leak(self.manifest.version.clone().into_boxed_str()),
} description: Box::leak(self.manifest.description.clone().into_boxed_str()),
is_core: false,
fn register(&self, system: &mut PluginSystem) { scope: PluginScope::Community,
let plugin_name = self.manifest.name.clone(); default_enabled: false,
let package_id = self.package_id; settings_schema: self.manifest.settings_schema.clone(),
let handler_plugin_id = format!("wasm:{}", package_id); }
let manifest_capabilities = self.manifest.capabilities.clone(); }
let compiled = self.compiled.clone();
let limits = self.limits.clone(); fn register(&self, system: &mut PluginSystem) {
let plugin_name = self.manifest.name.clone();
for hook in &self.manifest.hooks { let package_id = self.package_id;
let hook_name = hook.clone(); let handler_plugin_id = format!("wasm:{}", package_id);
let hook_name_ref = hook_name.clone(); let manifest_capabilities = self.manifest.capabilities.clone();
let plugin_name_clone = plugin_name.clone(); let compiled = self.compiled.clone();
let handler_plugin_id_clone = handler_plugin_id.clone(); let limits = self.limits.clone();
let compiled_clone = compiled.clone();
let limits_clone = limits.clone(); for hook in &self.manifest.hooks {
let manifest_capabilities_for_hook = manifest_capabilities.clone(); let hook_name = hook.clone();
let hook_name_ref = hook_name.clone();
system.add_action( let plugin_name_clone = plugin_name.clone();
&hook_name_ref, let handler_plugin_id_clone = handler_plugin_id.clone();
handler_plugin_id_clone.clone(), let compiled_clone = compiled.clone();
50, let limits_clone = limits.clone();
Arc::new(move |ctx: HookContext, payload: Value| { let manifest_capabilities_for_hook = manifest_capabilities.clone();
let hook = hook_name.clone();
let plugin = plugin_name_clone.clone(); system.add_action(
let package_id = package_id; &hook_name_ref,
let manifest_capabilities = manifest_capabilities_for_hook.clone(); handler_plugin_id_clone.clone(),
let compiled = compiled_clone.clone(); 50,
let lim = limits_clone.clone(); Arc::new(move |ctx: HookContext, payload: Value| {
let hook = hook_name.clone();
Box::pin(async move { let plugin = plugin_name_clone.clone();
let (allow_http, allowlist) = if let Some(cid) = ctx.community_id { let package_id = package_id;
let row = sqlx::query!( let manifest_capabilities = manifest_capabilities_for_hook.clone();
r#"SELECT settings as "settings!: serde_json::Value" FROM communities WHERE id = $1"#, let compiled = compiled_clone.clone();
cid let lim = limits_clone.clone();
)
.fetch_optional(&ctx.pool) Box::pin(async move {
.await?; let capabilities = capabilities_for_manifest(
&ctx.pool,
if let Some(row) = row { ctx.community_id,
let allow_http = row &manifest_capabilities,
.settings )
.get("plugin_allow_outbound_http") .await?;
.and_then(|v: &serde_json::Value| v.as_bool())
.unwrap_or(false); let host_state = HostState::new(
plugin.clone(),
let allowlist: Vec<String> = row ctx.community_id,
.settings ctx.actor_user_id,
.get("plugin_http_egress_allowlist") ctx.pool.clone(),
.and_then(|v: &serde_json::Value| v.as_array()) package_id,
.map(|arr: &Vec<serde_json::Value>| { capabilities,
arr.iter() );
.filter_map(|v: &serde_json::Value| v.as_str().map(|s: &str| s.to_string()))
.collect() let mut instance = PluginInstance::new(&compiled, host_state, lim).await?;
})
.unwrap_or_default(); let payload_json = serde_json::to_string(&payload)
.map_err(|e| PluginError::Message(format!("Failed to serialize payload: {e}")))?;
(allow_http, allowlist)
} else { let _result = instance.call_hook(&hook, &payload_json).await?;
(false, Vec::new())
} let remaining_fuel = instance.get_fuel();
} else { tracing::debug!(
(false, Vec::new()) plugin = %plugin,
}; hook = %hook,
fuel_remaining = remaining_fuel,
let mut capabilities: Vec<Capability> = Vec::new(); "WASM plugin hook completed"
for cap in &manifest_capabilities { );
match cap.as_str() {
CAP_OUTBOUND_HTTP => { Ok(())
let allowed = allow_http && !allowlist.is_empty(); })
capabilities.push(Capability { }),
name: cap.clone(), );
allowed, }
config: serde_json::json!({"allowlist": allowlist.clone()}), }
});
} async fn activate(&self, ctx: HookContext, settings: Value) -> Result<(), PluginError> {
CAP_SETTINGS | CAP_KV_STORE | CAP_EMIT_EVENTS => { let mut instance = self.create_instance(&ctx).await?;
capabilities.push(Capability { let payload = json!({
name: cap.clone(), "event": "activate",
allowed: true, "settings": settings
config: serde_json::json!({}), });
}); let payload_json = serde_json::to_string(&payload)
} .map_err(|e| PluginError::Message(format!("Failed to serialize: {e}")))?;
_ => capabilities.push(Capability { instance.call_hook("lifecycle.activate", &payload_json).await.ok();
name: cap.clone(), Ok(())
allowed: false, }
config: serde_json::json!({}),
}), async fn deactivate(&self, ctx: HookContext, settings: Value) -> Result<(), PluginError> {
} let mut instance = self.create_instance(&ctx).await?;
} let payload = json!({
"event": "deactivate",
let host_state = HostState::new( "settings": settings
plugin.clone(), });
ctx.community_id, let payload_json = serde_json::to_string(&payload)
ctx.actor_user_id, .map_err(|e| PluginError::Message(format!("Failed to serialize: {e}")))?;
ctx.pool.clone(), instance.call_hook("lifecycle.deactivate", &payload_json).await.ok();
package_id, Ok(())
capabilities, }
);
async fn settings_updated(
let mut instance = PluginInstance::new(&compiled, host_state, lim).await?; &self,
ctx: HookContext,
let payload_json = serde_json::to_string(&payload) old_settings: Value,
.map_err(|e| PluginError::Message(format!("Failed to serialize payload: {e}")))?; new_settings: Value,
) -> Result<(), PluginError> {
let _result = instance.call_hook(&hook, &payload_json).await?; let mut instance = self.create_instance(&ctx).await?;
let payload = json!({
let remaining_fuel = instance.get_fuel(); "event": "settings_updated",
tracing::debug!( "old_settings": old_settings,
plugin = %plugin, "new_settings": new_settings
hook = %hook, });
fuel_remaining = remaining_fuel, let payload_json = serde_json::to_string(&payload)
"WASM plugin hook completed" .map_err(|e| PluginError::Message(format!("Failed to serialize: {e}")))?;
); instance.call_hook("lifecycle.settings_updated", &payload_json).await.ok();
Ok(())
Ok(()) }
}) }
}),
);
}
}
async fn activate(&self, ctx: HookContext, settings: Value) -> Result<(), PluginError> {
let mut instance = self.create_instance(&ctx).await?;
let payload = json!({
"event": "activate",
"settings": settings
});
let payload_json = serde_json::to_string(&payload)
.map_err(|e| PluginError::Message(format!("Failed to serialize: {e}")))?;
instance.call_hook("lifecycle.activate", &payload_json).await.ok();
Ok(())
}
async fn deactivate(&self, ctx: HookContext, settings: Value) -> Result<(), PluginError> {
let mut instance = self.create_instance(&ctx).await?;
let payload = json!({
"event": "deactivate",
"settings": settings
});
let payload_json = serde_json::to_string(&payload)
.map_err(|e| PluginError::Message(format!("Failed to serialize: {e}")))?;
instance.call_hook("lifecycle.deactivate", &payload_json).await.ok();
Ok(())
}
async fn settings_updated(
&self,
ctx: HookContext,
old_settings: Value,
new_settings: Value,
) -> Result<(), PluginError> {
let mut instance = self.create_instance(&ctx).await?;
let payload = json!({
"event": "settings_updated",
"old_settings": old_settings,
"new_settings": new_settings
});
let payload_json = serde_json::to_string(&payload)
.map_err(|e| PluginError::Message(format!("Failed to serialize: {e}")))?;
instance.call_hook("lifecycle.settings_updated", &payload_json).await.ok();
Ok(())
}
}