From 3aba16a8e8e290c343ac794f189a93875f83585b Mon Sep 17 00:00:00 2001 From: Marco Allegretti Date: Mon, 2 Feb 2026 11:33:24 +0100 Subject: [PATCH] wasm: dedupe capability resolution --- backend/src/plugins/wasm/plugin.rs | 569 +++++++++++++---------------- 1 file changed, 261 insertions(+), 308 deletions(-) diff --git a/backend/src/plugins/wasm/plugin.rs b/backend/src/plugins/wasm/plugin.rs index a04ad9a..05dc095 100644 --- a/backend/src/plugins/wasm/plugin.rs +++ b/backend/src/plugins/wasm/plugin.rs @@ -1,308 +1,261 @@ -//! WASM plugin implementation. -//! -//! Wraps compiled WASM modules and implements the Plugin trait for -//! integration with the hook system. - -use std::sync::Arc; - -use async_trait::async_trait; -use serde_json::{json, Value}; -use sqlx::PgPool; -use uuid::Uuid; - -use super::host_api::{Capability, HostState, PluginManifest, CAP_EMIT_EVENTS, CAP_KV_STORE, CAP_OUTBOUND_HTTP, CAP_SETTINGS}; -use super::runtime::{CompiledPlugin, ExecutionLimits, PluginInstance}; -use crate::plugins::hooks::{HookContext, PluginError}; -use crate::plugins::manager::{Plugin, PluginMetadata, PluginScope, PluginSystem}; - -/// A WASM-based plugin that can be loaded dynamically. -pub struct WasmPlugin { - package_id: Uuid, - manifest: PluginManifest, - compiled: Arc, - limits: ExecutionLimits, -} - -impl WasmPlugin { - /// Creates a new WASM plugin from a manifest and compiled module. - pub fn new( - package_id: Uuid, - manifest: PluginManifest, - compiled: Arc, - ) -> Self { - Self { - package_id, - manifest, - compiled, - limits: ExecutionLimits::default(), - } - } - - /// Sets custom execution limits for this plugin. - #[allow(dead_code)] // API for future use - pub fn with_limits(mut self, limits: ExecutionLimits) -> Self { - self.limits = limits; - self - } - - async fn capabilities_for(&self, pool: &PgPool, ctx: &HookContext) -> Result, PluginError> { - let mut out: Vec = Vec::new(); - - // Community policy influences outbound HTTP. - let (allow_http, allowlist) = if let Some(cid) = ctx.community_id { - let row = sqlx::query!( - r#"SELECT settings as "settings!: serde_json::Value" FROM communities WHERE id = $1"#, - cid - ) - .fetch_optional(pool) - .await?; - - if let Some(row) = row { - let allow_http = row - .settings - .get("plugin_allow_outbound_http") - .and_then(|v: &serde_json::Value| v.as_bool()) - .unwrap_or(false); - - let allowlist: Vec = row - .settings - .get("plugin_http_egress_allowlist") - .and_then(|v: &serde_json::Value| v.as_array()) - .map(|arr: &Vec| { - arr.iter() - .filter_map(|v: &serde_json::Value| v.as_str().map(|s: &str| s.to_string())) - .collect() - }) - .unwrap_or_default(); - - (allow_http, allowlist) - } else { - (false, Vec::new()) - } - } else { - (false, Vec::new()) - }; - - for cap in &self.manifest.capabilities { - match cap.as_str() { - CAP_OUTBOUND_HTTP => { - let allowed = allow_http && !allowlist.is_empty(); - out.push(Capability { - name: cap.clone(), - allowed, - config: serde_json::json!({"allowlist": allowlist}), - }); - } - CAP_SETTINGS | CAP_KV_STORE | CAP_EMIT_EVENTS => { - out.push(Capability { - name: cap.clone(), - allowed: true, - config: serde_json::json!({}), - }); - } - _ => { - // Unknown capability is denied by default. - out.push(Capability { - name: cap.clone(), - allowed: false, - config: serde_json::json!({}), - }); - } - } - } - - Ok(out) - } - - async fn create_instance(&self, ctx: &HookContext) -> Result { - let capabilities = self.capabilities_for(&ctx.pool, ctx).await?; - let host_state = HostState::new( - self.manifest.name.clone(), - ctx.community_id, - ctx.actor_user_id, - ctx.pool.clone(), - self.package_id, - capabilities, - ); - - PluginInstance::new(&self.compiled, host_state, self.limits.clone()).await - } -} - -#[async_trait] -impl Plugin for WasmPlugin { - fn metadata(&self) -> PluginMetadata { - PluginMetadata { - name: Box::leak(self.manifest.name.clone().into_boxed_str()), - version: Box::leak(self.manifest.version.clone().into_boxed_str()), - description: Box::leak(self.manifest.description.clone().into_boxed_str()), - is_core: false, - scope: PluginScope::Community, - default_enabled: false, - settings_schema: self.manifest.settings_schema.clone(), - } - } - - fn register(&self, system: &mut PluginSystem) { - let plugin_name = self.manifest.name.clone(); - let package_id = self.package_id; - let handler_plugin_id = format!("wasm:{}", package_id); - let manifest_capabilities = self.manifest.capabilities.clone(); - let compiled = self.compiled.clone(); - let limits = self.limits.clone(); - - for hook in &self.manifest.hooks { - let hook_name = hook.clone(); - let hook_name_ref = hook_name.clone(); - let plugin_name_clone = plugin_name.clone(); - let handler_plugin_id_clone = handler_plugin_id.clone(); - let compiled_clone = compiled.clone(); - let limits_clone = limits.clone(); - let manifest_capabilities_for_hook = manifest_capabilities.clone(); - - system.add_action( - &hook_name_ref, - handler_plugin_id_clone.clone(), - 50, - Arc::new(move |ctx: HookContext, payload: Value| { - let hook = hook_name.clone(); - let plugin = plugin_name_clone.clone(); - let package_id = package_id; - let manifest_capabilities = manifest_capabilities_for_hook.clone(); - let compiled = compiled_clone.clone(); - let lim = limits_clone.clone(); - - Box::pin(async move { - let (allow_http, allowlist) = if let Some(cid) = ctx.community_id { - let row = sqlx::query!( - r#"SELECT settings as "settings!: serde_json::Value" FROM communities WHERE id = $1"#, - cid - ) - .fetch_optional(&ctx.pool) - .await?; - - if let Some(row) = row { - let allow_http = row - .settings - .get("plugin_allow_outbound_http") - .and_then(|v: &serde_json::Value| v.as_bool()) - .unwrap_or(false); - - let allowlist: Vec = row - .settings - .get("plugin_http_egress_allowlist") - .and_then(|v: &serde_json::Value| v.as_array()) - .map(|arr: &Vec| { - arr.iter() - .filter_map(|v: &serde_json::Value| v.as_str().map(|s: &str| s.to_string())) - .collect() - }) - .unwrap_or_default(); - - (allow_http, allowlist) - } else { - (false, Vec::new()) - } - } else { - (false, Vec::new()) - }; - - let mut capabilities: Vec = Vec::new(); - for cap in &manifest_capabilities { - match cap.as_str() { - CAP_OUTBOUND_HTTP => { - let allowed = allow_http && !allowlist.is_empty(); - capabilities.push(Capability { - name: cap.clone(), - allowed, - config: serde_json::json!({"allowlist": allowlist.clone()}), - }); - } - CAP_SETTINGS | CAP_KV_STORE | CAP_EMIT_EVENTS => { - capabilities.push(Capability { - name: cap.clone(), - allowed: true, - config: serde_json::json!({}), - }); - } - _ => capabilities.push(Capability { - name: cap.clone(), - allowed: false, - config: serde_json::json!({}), - }), - } - } - - let host_state = HostState::new( - plugin.clone(), - ctx.community_id, - ctx.actor_user_id, - ctx.pool.clone(), - package_id, - capabilities, - ); - - let mut instance = PluginInstance::new(&compiled, host_state, lim).await?; - - let payload_json = serde_json::to_string(&payload) - .map_err(|e| PluginError::Message(format!("Failed to serialize payload: {e}")))?; - - let _result = instance.call_hook(&hook, &payload_json).await?; - - let remaining_fuel = instance.get_fuel(); - tracing::debug!( - plugin = %plugin, - hook = %hook, - fuel_remaining = remaining_fuel, - "WASM plugin hook completed" - ); - - Ok(()) - }) - }), - ); - } - } - - async fn activate(&self, ctx: HookContext, settings: Value) -> Result<(), PluginError> { - let mut instance = self.create_instance(&ctx).await?; - let payload = json!({ - "event": "activate", - "settings": settings - }); - let payload_json = serde_json::to_string(&payload) - .map_err(|e| PluginError::Message(format!("Failed to serialize: {e}")))?; - instance.call_hook("lifecycle.activate", &payload_json).await.ok(); - Ok(()) - } - - async fn deactivate(&self, ctx: HookContext, settings: Value) -> Result<(), PluginError> { - let mut instance = self.create_instance(&ctx).await?; - let payload = json!({ - "event": "deactivate", - "settings": settings - }); - let payload_json = serde_json::to_string(&payload) - .map_err(|e| PluginError::Message(format!("Failed to serialize: {e}")))?; - instance.call_hook("lifecycle.deactivate", &payload_json).await.ok(); - Ok(()) - } - - async fn settings_updated( - &self, - ctx: HookContext, - old_settings: Value, - new_settings: Value, - ) -> Result<(), PluginError> { - let mut instance = self.create_instance(&ctx).await?; - let payload = json!({ - "event": "settings_updated", - "old_settings": old_settings, - "new_settings": new_settings - }); - let payload_json = serde_json::to_string(&payload) - .map_err(|e| PluginError::Message(format!("Failed to serialize: {e}")))?; - instance.call_hook("lifecycle.settings_updated", &payload_json).await.ok(); - Ok(()) - } -} +//! WASM plugin implementation. +//! +//! Wraps compiled WASM modules and implements the Plugin trait for +//! integration with the hook system. + +use std::sync::Arc; + +use async_trait::async_trait; +use serde_json::{json, Value}; +use sqlx::PgPool; +use uuid::Uuid; + +use super::host_api::{Capability, HostState, PluginManifest, CAP_EMIT_EVENTS, CAP_KV_STORE, CAP_OUTBOUND_HTTP, CAP_SETTINGS}; +use super::runtime::{CompiledPlugin, ExecutionLimits, PluginInstance}; +use crate::plugins::hooks::{HookContext, PluginError}; +use crate::plugins::manager::{Plugin, PluginMetadata, PluginScope, PluginSystem}; + +/// A WASM-based plugin that can be loaded dynamically. +pub struct WasmPlugin { + package_id: Uuid, + manifest: PluginManifest, + compiled: Arc, + limits: ExecutionLimits, +} + +async fn capabilities_for_manifest( + pool: &PgPool, + community_id: Option, + manifest_capabilities: &[String], +) -> Result, PluginError> { + let mut out: Vec = Vec::new(); + + let (allow_http, allowlist) = if let Some(cid) = community_id { + let row = sqlx::query!( + r#"SELECT settings as "settings!: serde_json::Value" FROM communities WHERE id = $1"#, + cid + ) + .fetch_optional(pool) + .await?; + + if let Some(row) = row { + let allow_http = row + .settings + .get("plugin_allow_outbound_http") + .and_then(|v: &serde_json::Value| v.as_bool()) + .unwrap_or(false); + + let allowlist: Vec = row + .settings + .get("plugin_http_egress_allowlist") + .and_then(|v: &serde_json::Value| v.as_array()) + .map(|arr: &Vec| { + arr.iter() + .filter_map(|v: &serde_json::Value| v.as_str().map(|s: &str| s.to_string())) + .collect() + }) + .unwrap_or_default(); + + (allow_http, allowlist) + } else { + (false, Vec::new()) + } + } else { + (false, Vec::new()) + }; + + for cap in manifest_capabilities { + match cap.as_str() { + CAP_OUTBOUND_HTTP => { + let allowed = allow_http && !allowlist.is_empty(); + out.push(Capability { + name: cap.clone(), + allowed, + config: serde_json::json!({"allowlist": allowlist}), + }); + } + CAP_SETTINGS | CAP_KV_STORE | CAP_EMIT_EVENTS => { + out.push(Capability { + name: cap.clone(), + allowed: true, + config: serde_json::json!({}), + }); + } + _ => { + out.push(Capability { + name: cap.clone(), + allowed: false, + config: serde_json::json!({}), + }); + } + } + } + + Ok(out) +} + +impl WasmPlugin { + /// Creates a new WASM plugin from a manifest and compiled module. + pub fn new( + package_id: Uuid, + manifest: PluginManifest, + compiled: Arc, + ) -> Self { + Self { + package_id, + manifest, + compiled, + limits: ExecutionLimits::default(), + } + } + + /// Sets custom execution limits for this plugin. + #[allow(dead_code)] // API for future use + pub fn with_limits(mut self, limits: ExecutionLimits) -> Self { + self.limits = limits; + self + } + + async fn capabilities_for(&self, pool: &PgPool, ctx: &HookContext) -> Result, PluginError> { + capabilities_for_manifest(pool, ctx.community_id, &self.manifest.capabilities).await + } + + async fn create_instance(&self, ctx: &HookContext) -> Result { + let capabilities = self.capabilities_for(&ctx.pool, ctx).await?; + let host_state = HostState::new( + self.manifest.name.clone(), + ctx.community_id, + ctx.actor_user_id, + ctx.pool.clone(), + self.package_id, + capabilities, + ); + + PluginInstance::new(&self.compiled, host_state, self.limits.clone()).await + } +} + +#[async_trait] +impl Plugin for WasmPlugin { + fn metadata(&self) -> PluginMetadata { + PluginMetadata { + name: Box::leak(self.manifest.name.clone().into_boxed_str()), + version: Box::leak(self.manifest.version.clone().into_boxed_str()), + description: Box::leak(self.manifest.description.clone().into_boxed_str()), + is_core: false, + scope: PluginScope::Community, + default_enabled: false, + settings_schema: self.manifest.settings_schema.clone(), + } + } + + fn register(&self, system: &mut PluginSystem) { + let plugin_name = self.manifest.name.clone(); + let package_id = self.package_id; + let handler_plugin_id = format!("wasm:{}", package_id); + let manifest_capabilities = self.manifest.capabilities.clone(); + let compiled = self.compiled.clone(); + let limits = self.limits.clone(); + + for hook in &self.manifest.hooks { + let hook_name = hook.clone(); + let hook_name_ref = hook_name.clone(); + let plugin_name_clone = plugin_name.clone(); + let handler_plugin_id_clone = handler_plugin_id.clone(); + let compiled_clone = compiled.clone(); + let limits_clone = limits.clone(); + let manifest_capabilities_for_hook = manifest_capabilities.clone(); + + system.add_action( + &hook_name_ref, + handler_plugin_id_clone.clone(), + 50, + Arc::new(move |ctx: HookContext, payload: Value| { + let hook = hook_name.clone(); + let plugin = plugin_name_clone.clone(); + let package_id = package_id; + let manifest_capabilities = manifest_capabilities_for_hook.clone(); + let compiled = compiled_clone.clone(); + let lim = limits_clone.clone(); + + Box::pin(async move { + let capabilities = capabilities_for_manifest( + &ctx.pool, + ctx.community_id, + &manifest_capabilities, + ) + .await?; + + let host_state = HostState::new( + plugin.clone(), + ctx.community_id, + ctx.actor_user_id, + ctx.pool.clone(), + package_id, + capabilities, + ); + + let mut instance = PluginInstance::new(&compiled, host_state, lim).await?; + + let payload_json = serde_json::to_string(&payload) + .map_err(|e| PluginError::Message(format!("Failed to serialize payload: {e}")))?; + + let _result = instance.call_hook(&hook, &payload_json).await?; + + let remaining_fuel = instance.get_fuel(); + tracing::debug!( + plugin = %plugin, + hook = %hook, + fuel_remaining = remaining_fuel, + "WASM plugin hook completed" + ); + + Ok(()) + }) + }), + ); + } + } + + async fn activate(&self, ctx: HookContext, settings: Value) -> Result<(), PluginError> { + let mut instance = self.create_instance(&ctx).await?; + let payload = json!({ + "event": "activate", + "settings": settings + }); + let payload_json = serde_json::to_string(&payload) + .map_err(|e| PluginError::Message(format!("Failed to serialize: {e}")))?; + instance.call_hook("lifecycle.activate", &payload_json).await.ok(); + Ok(()) + } + + async fn deactivate(&self, ctx: HookContext, settings: Value) -> Result<(), PluginError> { + let mut instance = self.create_instance(&ctx).await?; + let payload = json!({ + "event": "deactivate", + "settings": settings + }); + let payload_json = serde_json::to_string(&payload) + .map_err(|e| PluginError::Message(format!("Failed to serialize: {e}")))?; + instance.call_hook("lifecycle.deactivate", &payload_json).await.ok(); + Ok(()) + } + + async fn settings_updated( + &self, + ctx: HookContext, + old_settings: Value, + new_settings: Value, + ) -> Result<(), PluginError> { + let mut instance = self.create_instance(&ctx).await?; + let payload = json!({ + "event": "settings_updated", + "old_settings": old_settings, + "new_settings": new_settings + }); + let payload_json = serde_json::to_string(&payload) + .map_err(|e| PluginError::Message(format!("Failed to serialize: {e}")))?; + instance.call_hook("lifecycle.settings_updated", &payload_json).await.ok(); + Ok(()) + } +}