//! WASM plugin implementation. //! //! Wraps compiled WASM modules and implements the Plugin trait for //! integration with the hook system. use std::sync::Arc; use async_trait::async_trait; use serde_json::{json, Value}; use sqlx::PgPool; use uuid::Uuid; use super::host_api::{Capability, HostState, PluginManifest, CAP_EMIT_EVENTS, CAP_KV_STORE, CAP_OUTBOUND_HTTP, CAP_SETTINGS}; use super::runtime::{CompiledPlugin, ExecutionLimits, PluginInstance}; use crate::plugins::hooks::{HookContext, PluginError}; use crate::plugins::manager::{Plugin, PluginMetadata, PluginScope, PluginSystem}; /// A WASM-based plugin that can be loaded dynamically. pub struct WasmPlugin { package_id: Uuid, manifest: PluginManifest, compiled: Arc, limits: ExecutionLimits, } impl WasmPlugin { /// Creates a new WASM plugin from a manifest and compiled module. pub fn new( package_id: Uuid, manifest: PluginManifest, compiled: Arc, ) -> Self { Self { package_id, manifest, compiled, limits: ExecutionLimits::default(), } } /// Sets custom execution limits for this plugin. #[allow(dead_code)] // API for future use pub fn with_limits(mut self, limits: ExecutionLimits) -> Self { self.limits = limits; self } async fn capabilities_for(&self, pool: &PgPool, ctx: &HookContext) -> Result, PluginError> { let mut out: Vec = Vec::new(); // Community policy influences outbound HTTP. let (allow_http, allowlist) = if let Some(cid) = ctx.community_id { let row = sqlx::query!( r#"SELECT settings as "settings!: serde_json::Value" FROM communities WHERE id = $1"#, cid ) .fetch_optional(pool) .await?; if let Some(row) = row { let allow_http = row .settings .get("plugin_allow_outbound_http") .and_then(|v: &serde_json::Value| v.as_bool()) .unwrap_or(false); let allowlist: Vec = row .settings .get("plugin_http_egress_allowlist") .and_then(|v: &serde_json::Value| v.as_array()) .map(|arr: &Vec| { arr.iter() .filter_map(|v: &serde_json::Value| v.as_str().map(|s: &str| s.to_string())) .collect() }) .unwrap_or_default(); (allow_http, allowlist) } else { (false, Vec::new()) } } else { (false, Vec::new()) }; for cap in &self.manifest.capabilities { match cap.as_str() { CAP_OUTBOUND_HTTP => { let allowed = allow_http && !allowlist.is_empty(); out.push(Capability { name: cap.clone(), allowed, config: serde_json::json!({"allowlist": allowlist}), }); } CAP_SETTINGS | CAP_KV_STORE | CAP_EMIT_EVENTS => { out.push(Capability { name: cap.clone(), allowed: true, config: serde_json::json!({}), }); } _ => { // Unknown capability is denied by default. out.push(Capability { name: cap.clone(), allowed: false, config: serde_json::json!({}), }); } } } Ok(out) } async fn create_instance(&self, ctx: &HookContext) -> Result { let capabilities = self.capabilities_for(&ctx.pool, ctx).await?; let host_state = HostState::new( self.manifest.name.clone(), ctx.community_id, ctx.actor_user_id, ctx.pool.clone(), self.package_id, capabilities, ); PluginInstance::new(&self.compiled, host_state, self.limits.clone()).await } } #[async_trait] impl Plugin for WasmPlugin { fn metadata(&self) -> PluginMetadata { PluginMetadata { name: Box::leak(self.manifest.name.clone().into_boxed_str()), version: Box::leak(self.manifest.version.clone().into_boxed_str()), description: Box::leak(self.manifest.description.clone().into_boxed_str()), is_core: false, scope: PluginScope::Community, default_enabled: false, settings_schema: self.manifest.settings_schema.clone(), } } fn register(&self, system: &mut PluginSystem) { let plugin_name = self.manifest.name.clone(); let package_id = self.package_id; let handler_plugin_id = format!("wasm:{}", package_id); let manifest_capabilities = self.manifest.capabilities.clone(); let compiled = self.compiled.clone(); let limits = self.limits.clone(); for hook in &self.manifest.hooks { let hook_name = hook.clone(); let hook_name_ref = hook_name.clone(); let plugin_name_clone = plugin_name.clone(); let handler_plugin_id_clone = handler_plugin_id.clone(); let compiled_clone = compiled.clone(); let limits_clone = limits.clone(); let manifest_capabilities_for_hook = manifest_capabilities.clone(); system.add_action( &hook_name_ref, handler_plugin_id_clone.clone(), 50, Arc::new(move |ctx: HookContext, payload: Value| { let hook = hook_name.clone(); let plugin = plugin_name_clone.clone(); let package_id = package_id; let manifest_capabilities = manifest_capabilities_for_hook.clone(); let compiled = compiled_clone.clone(); let lim = limits_clone.clone(); Box::pin(async move { let (allow_http, allowlist) = if let Some(cid) = ctx.community_id { let row = sqlx::query!( r#"SELECT settings as "settings!: serde_json::Value" FROM communities WHERE id = $1"#, cid ) .fetch_optional(&ctx.pool) .await?; if let Some(row) = row { let allow_http = row .settings .get("plugin_allow_outbound_http") .and_then(|v: &serde_json::Value| v.as_bool()) .unwrap_or(false); let allowlist: Vec = row .settings .get("plugin_http_egress_allowlist") .and_then(|v: &serde_json::Value| v.as_array()) .map(|arr: &Vec| { arr.iter() .filter_map(|v: &serde_json::Value| v.as_str().map(|s: &str| s.to_string())) .collect() }) .unwrap_or_default(); (allow_http, allowlist) } else { (false, Vec::new()) } } else { (false, Vec::new()) }; let mut capabilities: Vec = Vec::new(); for cap in &manifest_capabilities { match cap.as_str() { CAP_OUTBOUND_HTTP => { let allowed = allow_http && !allowlist.is_empty(); capabilities.push(Capability { name: cap.clone(), allowed, config: serde_json::json!({"allowlist": allowlist.clone()}), }); } CAP_SETTINGS | CAP_KV_STORE | CAP_EMIT_EVENTS => { capabilities.push(Capability { name: cap.clone(), allowed: true, config: serde_json::json!({}), }); } _ => capabilities.push(Capability { name: cap.clone(), allowed: false, config: serde_json::json!({}), }), } } let host_state = HostState::new( plugin.clone(), ctx.community_id, ctx.actor_user_id, ctx.pool.clone(), package_id, capabilities, ); let mut instance = PluginInstance::new(&compiled, host_state, lim).await?; let payload_json = serde_json::to_string(&payload) .map_err(|e| PluginError::Message(format!("Failed to serialize payload: {e}")))?; let _result = instance.call_hook(&hook, &payload_json).await?; let remaining_fuel = instance.get_fuel(); tracing::debug!( plugin = %plugin, hook = %hook, fuel_remaining = remaining_fuel, "WASM plugin hook completed" ); Ok(()) }) }), ); } } async fn activate(&self, ctx: HookContext, settings: Value) -> Result<(), PluginError> { let mut instance = self.create_instance(&ctx).await?; let payload = json!({ "event": "activate", "settings": settings }); let payload_json = serde_json::to_string(&payload) .map_err(|e| PluginError::Message(format!("Failed to serialize: {e}")))?; instance.call_hook("lifecycle.activate", &payload_json).await.ok(); Ok(()) } async fn deactivate(&self, ctx: HookContext, settings: Value) -> Result<(), PluginError> { let mut instance = self.create_instance(&ctx).await?; let payload = json!({ "event": "deactivate", "settings": settings }); let payload_json = serde_json::to_string(&payload) .map_err(|e| PluginError::Message(format!("Failed to serialize: {e}")))?; instance.call_hook("lifecycle.deactivate", &payload_json).await.ok(); Ok(()) } async fn settings_updated( &self, ctx: HookContext, old_settings: Value, new_settings: Value, ) -> Result<(), PluginError> { let mut instance = self.create_instance(&ctx).await?; let payload = json!({ "event": "settings_updated", "old_settings": old_settings, "new_settings": new_settings }); let payload_json = serde_json::to_string(&payload) .map_err(|e| PluginError::Message(format!("Failed to serialize: {e}")))?; instance.call_hook("lifecycle.settings_updated", &payload_json).await.ok(); Ok(()) } }