use axum::{ extract::{Path, State}, http::StatusCode, routing::{get, post, put}, Extension, Json, Router, }; use base64::{engine::general_purpose, Engine as _}; use chrono::{DateTime, Utc}; use ed25519_dalek::{Signature, VerifyingKey}; use jsonschema::{Draft, JSONSchema}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use sqlx::PgPool; use sqlx::Row; use std::net::IpAddr; use std::sync::Arc; use uuid::Uuid; use sha2::{Digest, Sha256}; use reqwest::Url; use crate::auth::AuthUser; use crate::plugins::PluginManager; use crate::plugins::wasm::host_api::PluginManifest; #[derive(Debug, Serialize)] pub struct CommunityPluginInfo { pub name: String, pub version: String, pub description: Option, pub is_core: bool, pub global_is_active: bool, pub community_is_active: bool, pub settings: Value, pub settings_schema: Option, } async fn get_plugin_policy( auth: AuthUser, Path(community_id): Path, State(pool): State, ) -> Result, (StatusCode, String)> { let membership = sqlx::query!( "SELECT role FROM community_members WHERE user_id = $1 AND community_id = $2", auth.user_id, community_id ) .fetch_optional(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; match membership { Some(m) if m.role == "admin" || m.role == "moderator" => {} _ => return Err((StatusCode::FORBIDDEN, "Must be admin or moderator".to_string())), } let row = sqlx::query!( r#"SELECT settings as "settings!: serde_json::Value" FROM communities WHERE id = $1 AND is_active = true"#, community_id ) .fetch_optional(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? .ok_or((StatusCode::NOT_FOUND, "Community not found".to_string()))?; Ok(Json(PluginPolicyResponse { trust_policy: parse_trust_policy(&row.settings), install_sources: parse_install_sources(&row.settings), allow_outbound_http: parse_bool(&row.settings, "plugin_allow_outbound_http", false), http_egress_allowlist: parse_string_list(&row.settings, "plugin_http_egress_allowlist"), registry_allowlist: parse_string_list(&row.settings, "plugin_registry_allowlist"), allow_background_jobs: parse_bool(&row.settings, "plugin_allow_background_jobs", false), trusted_publishers: parse_string_list(&row.settings, "plugin_trusted_publishers"), })) } async fn update_plugin_policy( auth: AuthUser, Path(community_id): Path, State(pool): State, Json(req): Json, ) -> Result, (StatusCode, String)> { let membership = sqlx::query!( "SELECT role FROM community_members WHERE user_id = $1 AND community_id = $2", auth.user_id, community_id ) .fetch_optional(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; match membership { Some(m) if m.role == "admin" || m.role == "moderator" => {} _ => return Err((StatusCode::FORBIDDEN, "Must be admin or moderator".to_string())), } let current = sqlx::query!( r#"SELECT settings as "settings!: serde_json::Value" FROM communities WHERE id = $1 AND is_active = true"#, community_id ) .fetch_optional(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? .ok_or((StatusCode::NOT_FOUND, "Community not found".to_string()))?; let mut patch = serde_json::Map::new(); let mut event_patch = serde_json::Map::new(); if let Some(tp) = req.trust_policy { let v = trust_policy_str(tp).to_string(); patch.insert("plugin_trust_policy".to_string(), Value::String(v.clone())); event_patch.insert("trust_policy".to_string(), Value::String(v)); } if let Some(sources) = &req.install_sources { let v = install_sources_json(sources); patch.insert("plugin_install_sources".to_string(), v.clone()); event_patch.insert("install_sources".to_string(), v); } if let Some(v) = req.allow_outbound_http { patch.insert("plugin_allow_outbound_http".to_string(), Value::Bool(v)); event_patch.insert("allow_outbound_http".to_string(), Value::Bool(v)); } if let Some(list) = &req.http_egress_allowlist { let mut out = list.clone(); out.sort(); out.dedup(); let v = Value::Array(out.into_iter().map(Value::String).collect()); patch.insert("plugin_http_egress_allowlist".to_string(), v.clone()); event_patch.insert("http_egress_allowlist".to_string(), v); } if let Some(list) = &req.registry_allowlist { let mut out = list.clone(); out.sort(); out.dedup(); let v = Value::Array(out.into_iter().map(Value::String).collect()); patch.insert("plugin_registry_allowlist".to_string(), v.clone()); event_patch.insert("registry_allowlist".to_string(), v); } if let Some(v) = req.allow_background_jobs { patch.insert("plugin_allow_background_jobs".to_string(), Value::Bool(v)); event_patch.insert("allow_background_jobs".to_string(), Value::Bool(v)); } if let Some(list) = &req.trusted_publishers { let mut out = list.clone(); out.sort(); out.dedup(); let v = Value::Array(out.into_iter().map(Value::String).collect()); patch.insert("plugin_trusted_publishers".to_string(), v.clone()); event_patch.insert("trusted_publishers".to_string(), v); } if patch.is_empty() { return Ok(Json(PluginPolicyResponse { trust_policy: parse_trust_policy(¤t.settings), install_sources: parse_install_sources(¤t.settings), allow_outbound_http: parse_bool(¤t.settings, "plugin_allow_outbound_http", false), http_egress_allowlist: parse_string_list(¤t.settings, "plugin_http_egress_allowlist"), registry_allowlist: parse_string_list(¤t.settings, "plugin_registry_allowlist"), allow_background_jobs: parse_bool(¤t.settings, "plugin_allow_background_jobs", false), trusted_publishers: parse_string_list(¤t.settings, "plugin_trusted_publishers"), })); } let patch_value = Value::Object(patch); let row = sqlx::query!( r#" UPDATE communities SET settings = settings || $2::jsonb, updated_at = NOW() WHERE id = $1 AND is_active = true RETURNING settings as "settings!: serde_json::Value" "#, community_id, patch_value ) .fetch_optional(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? .ok_or((StatusCode::NOT_FOUND, "Community not found".to_string()))?; let _ = sqlx::query!( r#"INSERT INTO public_events (community_id, actor_user_id, plugin_name, event_type, payload) VALUES ($1, $2, NULL, 'plugin.policy_updated', $3)"#, community_id, auth.user_id, Value::Object(event_patch) ) .execute(&pool) .await; Ok(Json(PluginPolicyResponse { trust_policy: parse_trust_policy(&row.settings), install_sources: parse_install_sources(&row.settings), allow_outbound_http: parse_bool(&row.settings, "plugin_allow_outbound_http", false), http_egress_allowlist: parse_string_list(&row.settings, "plugin_http_egress_allowlist"), registry_allowlist: parse_string_list(&row.settings, "plugin_registry_allowlist"), allow_background_jobs: parse_bool(&row.settings, "plugin_allow_background_jobs", false), trusted_publishers: parse_string_list(&row.settings, "plugin_trusted_publishers"), })) } #[derive(Debug, Deserialize)] pub struct UpdateCommunityPluginRequest { pub is_active: Option, pub settings: Option, } #[derive(Debug, Clone, Copy, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum PluginTrustPolicy { SignedOnly, UnsignedAllowed, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum PluginInstallSource { Upload, Registry, } #[derive(Debug, Serialize)] pub struct PluginPolicyResponse { pub trust_policy: PluginTrustPolicy, pub install_sources: Vec, pub allow_outbound_http: bool, pub http_egress_allowlist: Vec, pub registry_allowlist: Vec, pub allow_background_jobs: bool, pub trusted_publishers: Vec, } #[derive(Debug, Deserialize)] pub struct UpdatePluginPolicyRequest { pub trust_policy: Option, pub install_sources: Option>, pub allow_outbound_http: Option, pub http_egress_allowlist: Option>, pub registry_allowlist: Option>, pub allow_background_jobs: Option, pub trusted_publishers: Option>, } #[derive(Debug, Deserialize)] pub struct UploadPluginPackageRequest { pub name: String, pub version: String, pub description: Option, pub publisher: Option, pub manifest: Value, pub wasm_base64: String, pub signature_base64: Option, } #[derive(Debug, Deserialize)] pub struct InstallRegistryPluginPackageRequest { pub url: String, } #[derive(Debug, Serialize)] pub struct CommunityPluginPackageInfo { pub package_id: Uuid, pub name: String, pub version: String, pub description: Option, pub publisher: String, pub source: String, pub registry_url: Option, pub wasm_sha256: String, pub manifest: Value, pub settings: Value, pub signature_present: bool, pub installed_at: DateTime, pub is_active: bool, } #[derive(Debug, Deserialize)] pub struct UpdateCommunityPluginPackageRequest { pub is_active: Option, pub settings: Option, } fn redacted_settings_keys(settings: &Value) -> Vec { let Some(obj) = settings.as_object() else { return Vec::new(); }; let mut keys: Vec = obj .keys() .map(|k| { let lower = k.to_ascii_lowercase(); if lower.contains("secret") || lower.contains("token") || lower.contains("password") || lower.ends_with("_key") || lower.ends_with("_token") { "".to_string() } else { k.to_string() } }) .collect(); keys.sort(); keys.dedup(); keys } pub fn router(pool: PgPool) -> Router { Router::new() .route( "/api/communities/{community_id}/plugins", get(list_community_plugins), ) .route( "/api/communities/{community_id}/plugins/{plugin_name}", put(update_community_plugin), ) .route( "/api/communities/{community_id}/plugin-policy", get(get_plugin_policy).put(update_plugin_policy), ) .route( "/api/communities/{community_id}/plugin-packages", get(list_community_plugin_packages).post(upload_plugin_package), ) .route( "/api/communities/{community_id}/plugin-packages/{package_id}", put(update_community_plugin_package), ) .route( "/api/communities/{community_id}/plugin-packages/install-registry", post(install_registry_plugin_package), ) .with_state(pool) } fn redact_settings_values(settings: &Value) -> Value { let Some(obj) = settings.as_object() else { return settings.clone(); }; let mut out = serde_json::Map::new(); for (k, v) in obj { let lower = k.to_ascii_lowercase(); if lower.contains("secret") || lower.contains("token") || lower.contains("password") || lower.ends_with("_key") || lower.ends_with("_token") { out.insert(k.clone(), Value::String("".to_string())); } else { out.insert(k.clone(), v.clone()); } } Value::Object(out) } fn decode_base64(s: &str) -> Result, (StatusCode, String)> { general_purpose::STANDARD .decode(s) .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid base64".to_string())) } fn sha256_hex(bytes: &[u8]) -> String { let mut hasher = Sha256::new(); hasher.update(bytes); let out = hasher.finalize(); format!("{:x}", out) } fn verify_signature_if_required( trust_policy: PluginTrustPolicy, trusted_publishers: &[String], publisher: &str, signature_base64: Option<&str>, wasm_sha256: &str, ) -> Result>, (StatusCode, String)> { let Some(sig_b64) = signature_base64 else { if matches!(trust_policy, PluginTrustPolicy::SignedOnly) { return Err((StatusCode::FORBIDDEN, "Plugin must be signed".to_string())); } return Ok(None); }; let sig_bytes = decode_base64(sig_b64)?; let sig_arr: [u8; 64] = sig_bytes .as_slice() .try_into() .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid signature".to_string()))?; let sig = Signature::from_bytes(&sig_arr); if matches!(trust_policy, PluginTrustPolicy::SignedOnly) { if trusted_publishers.is_empty() { return Err((StatusCode::FORBIDDEN, "No trusted publishers configured".to_string())); } if !trusted_publishers.iter().any(|p| p == publisher) { return Err((StatusCode::FORBIDDEN, "Publisher not trusted".to_string())); } } let key_bytes = decode_base64(publisher) .map_err(|_| (StatusCode::FORBIDDEN, "Invalid publisher key".to_string()))?; let key_arr: [u8; 32] = key_bytes .as_slice() .try_into() .map_err(|_| (StatusCode::FORBIDDEN, "Invalid publisher key".to_string()))?; let key = VerifyingKey::from_bytes(&key_arr) .map_err(|_| (StatusCode::FORBIDDEN, "Invalid publisher key".to_string()))?; key.verify_strict(wasm_sha256.as_bytes(), &sig) .map_err(|_| (StatusCode::FORBIDDEN, "Invalid signature".to_string()))?; Ok(Some(sig_arr.to_vec())) } fn enforce_registry_allowlist(url: &Url, allowlist: &[String]) -> Result<(), (StatusCode, String)> { let host = url .host_str() .ok_or((StatusCode::BAD_REQUEST, "Registry URL must include host".to_string()))?; if let Ok(ip) = host.parse::() { let is_disallowed = match ip { IpAddr::V4(v4) => v4.is_loopback() || v4.is_private() || v4.is_link_local() || v4.is_unspecified(), IpAddr::V6(v6) => { v6.is_loopback() || v6.is_unique_local() || v6.is_unicast_link_local() || v6.is_unspecified() } }; if is_disallowed { return Err((StatusCode::FORBIDDEN, "Registry host is not allowed".to_string())); } } if host.eq_ignore_ascii_case("localhost") { return Err((StatusCode::FORBIDDEN, "Registry host is not allowed".to_string())); } if !allowlist.is_empty() && !allowlist.iter().any(|h| h == host) { return Err((StatusCode::FORBIDDEN, "Registry host not in allowlist".to_string())); } Ok(()) } async fn ensure_admin_or_moderator( pool: &PgPool, user_id: Uuid, community_id: Uuid, ) -> Result<(), (StatusCode, String)> { let membership = sqlx::query!( "SELECT role FROM community_members WHERE user_id = $1 AND community_id = $2", user_id, community_id ) .fetch_optional(pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; match membership { Some(m) if m.role == "admin" || m.role == "moderator" => Ok(()), _ => Err((StatusCode::FORBIDDEN, "Must be admin or moderator".to_string())), } } async fn load_community_settings( pool: &PgPool, community_id: Uuid, ) -> Result { let row = sqlx::query!( r#"SELECT settings as "settings!: serde_json::Value" FROM communities WHERE id = $1 AND is_active = true"#, community_id ) .fetch_optional(pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? .ok_or((StatusCode::NOT_FOUND, "Community not found".to_string()))?; Ok(row.settings) } async fn list_community_plugin_packages( auth: AuthUser, Path(community_id): Path, State(pool): State, ) -> Result>, (StatusCode, String)> { ensure_admin_or_moderator(&pool, auth.user_id, community_id).await?; let rows = sqlx::query( r#" SELECT pp.id as package_id, pp.name, pp.version, pp.description, COALESCE(pp.publisher, '') as publisher, pp.source, pp.registry_url, pp.wasm_sha256, pp.manifest, (pp.signature IS NOT NULL) as signature_present, COALESCE(cpp.settings, '{}'::jsonb) as settings, cpp.installed_at, cpp.is_active FROM community_plugin_packages cpp JOIN plugin_packages pp ON pp.id = cpp.package_id WHERE cpp.community_id = $1 ORDER BY cpp.installed_at DESC "#, ) .bind(community_id) .fetch_all(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let mut out: Vec = Vec::new(); for r in rows { out.push(CommunityPluginPackageInfo { package_id: r .try_get::("package_id") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, name: r .try_get::("name") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, version: r .try_get::("version") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, description: r .try_get::, _>("description") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, publisher: r .try_get::("publisher") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, source: r .try_get::("source") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, registry_url: r .try_get::, _>("registry_url") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, wasm_sha256: r .try_get::("wasm_sha256") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, manifest: r .try_get::("manifest") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, settings: redact_settings_values( &r.try_get::("settings") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, ), signature_present: r .try_get::("signature_present") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, installed_at: r .try_get::, _>("installed_at") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, is_active: r .try_get::("is_active") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?, }); } Ok(Json(out)) } async fn update_community_plugin_package( auth: AuthUser, Path((community_id, package_id)): Path<(Uuid, Uuid)>, State(pool): State, Extension(plugins): Extension>, Json(req): Json, ) -> Result, (StatusCode, String)> { if req.is_active.is_none() && req.settings.is_none() { return Err(( StatusCode::BAD_REQUEST, "Must provide is_active and/or settings".to_string(), )); } ensure_admin_or_moderator(&pool, auth.user_id, community_id).await?; let pkg = sqlx::query!( r#"SELECT id, name, version, description, COALESCE(publisher,'') as publisher, source, registry_url, wasm_sha256, manifest as "manifest!: serde_json::Value", (signature IS NOT NULL) as "signature_present!: bool" FROM plugin_packages WHERE id = $1"#, package_id ) .fetch_optional(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? .ok_or((StatusCode::NOT_FOUND, "Package not found".to_string()))?; let link_row = sqlx::query( r#"SELECT is_active, COALESCE(settings,'{}'::jsonb) as settings, installed_at FROM community_plugin_packages WHERE community_id = $1 AND package_id = $2"#, ) .bind(community_id) .bind(package_id) .fetch_optional(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? .ok_or((StatusCode::NOT_FOUND, "Package not installed for community".to_string()))?; let link_is_active: bool = link_row .try_get("is_active") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let link_settings: Value = link_row .try_get("settings") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let _link_installed_at: DateTime = link_row .try_get("installed_at") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let manifest: PluginManifest = serde_json::from_value(pkg.manifest.clone()) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Stored manifest invalid: {e}")))?; if let Some(settings) = &req.settings { if let Some(schema) = &manifest.settings_schema { if !schema.is_null() { let compiled = JSONSchema::options() .with_draft(Draft::Draft7) .compile(schema) .map_err(|e| { (StatusCode::INTERNAL_SERVER_ERROR, format!("Invalid settings schema: {e}")) })?; if !compiled.is_valid(settings) { let errors = compiled.validate(settings).err().unwrap(); let msgs: Vec = errors.take(5).map(|e| e.to_string()).collect(); return Err(( StatusCode::BAD_REQUEST, format!("Invalid settings: {}", msgs.join("; ")), )); } } } } let new_is_active = req.is_active.unwrap_or(link_is_active); let new_settings = req.settings.clone().unwrap_or(link_settings.clone()); let row = sqlx::query( r#"UPDATE community_plugin_packages SET is_active = $3, settings = $4 WHERE community_id = $1 AND package_id = $2 RETURNING installed_at"#, ) .bind(community_id) .bind(package_id) .bind(new_is_active) .bind(new_settings.clone()) .fetch_one(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let updated_installed_at: DateTime = row .try_get("installed_at") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; // If activating, deactivate any other active versions of this plugin name in THIS community, // and invoke their lifecycle hooks. if new_is_active { let rows = sqlx::query( r#" SELECT cpp.package_id, COALESCE(cpp.settings,'{}'::jsonb) as settings FROM community_plugin_packages cpp JOIN plugin_packages pp ON pp.id = cpp.package_id WHERE cpp.community_id = $1 AND cpp.is_active = true AND pp.name = $2 AND cpp.package_id <> $3 "#, ) .bind(community_id) .bind(&pkg.name) .bind(package_id) .fetch_all(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; for r in rows { let other_package_id: Uuid = r .try_get("package_id") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let other_settings: Value = r .try_get("settings") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let _ = sqlx::query( "UPDATE community_plugin_packages SET is_active = false WHERE community_id = $1 AND package_id = $2", ) .bind(community_id) .bind(other_package_id) .execute(&pool) .await; plugins .handle_community_plugin_package_change( community_id, Some(auth.user_id), other_package_id, true, other_settings.clone(), false, other_settings, ) .await; } } plugins .handle_community_plugin_package_change( community_id, Some(auth.user_id), package_id, link_is_active, link_settings.clone(), new_is_active, new_settings.clone(), ) .await; if req.settings.is_some() { let keys = redacted_settings_keys(req.settings.as_ref().unwrap()); let _ = sqlx::query!( r#"INSERT INTO public_events (community_id, actor_user_id, plugin_name, event_type, payload) VALUES ($1, $2, NULL, 'plugin.package_settings_updated', $3)"#, community_id, auth.user_id, json!({"package_id": package_id, "name": pkg.name, "version": pkg.version, "keys": keys}) ) .execute(&pool) .await; } if req.is_active.is_some() { let _ = sqlx::query!( r#"INSERT INTO public_events (community_id, actor_user_id, plugin_name, event_type, payload) VALUES ($1, $2, NULL, $3, $4)"#, community_id, auth.user_id, if new_is_active { "plugin.package_activated" } else { "plugin.package_deactivated" }, json!({"package_id": package_id, "name": pkg.name, "version": pkg.version}) ) .execute(&pool) .await; } Ok(Json(CommunityPluginPackageInfo { package_id, name: pkg.name, version: pkg.version, description: pkg.description, publisher: pkg.publisher.unwrap_or_default(), source: pkg.source, registry_url: pkg.registry_url, wasm_sha256: pkg.wasm_sha256, manifest: pkg.manifest, settings: redact_settings_values(&new_settings), signature_present: pkg.signature_present, installed_at: updated_installed_at, is_active: new_is_active, })) } async fn upload_plugin_package( auth: AuthUser, Path(community_id): Path, State(pool): State, Extension(plugins): Extension>, Json(req): Json, ) -> Result, (StatusCode, String)> { ensure_admin_or_moderator(&pool, auth.user_id, community_id).await?; let settings = load_community_settings(&pool, community_id).await?; let trust_policy = parse_trust_policy(&settings); let sources = parse_install_sources(&settings); let trusted_publishers = parse_string_list(&settings, "plugin_trusted_publishers"); if !sources.contains(&PluginInstallSource::Upload) { return Err((StatusCode::FORBIDDEN, "Upload installs are disabled by policy".to_string())); } let publisher = req.publisher.unwrap_or_default(); let wasm_bytes = decode_base64(&req.wasm_base64)?; let wasm_sha256 = sha256_hex(&wasm_bytes); let parsed_manifest: PluginManifest = serde_json::from_value(req.manifest.clone()) .map_err(|e| (StatusCode::BAD_REQUEST, format!("Invalid manifest: {e}")))?; if parsed_manifest.name != req.name { return Err(( StatusCode::BAD_REQUEST, "Manifest name must match request name".to_string(), )); } if parsed_manifest.version != req.version { return Err(( StatusCode::BAD_REQUEST, "Manifest version must match request version".to_string(), )); } let signature_bytes = verify_signature_if_required( trust_policy, &trusted_publishers, &publisher, req.signature_base64.as_deref(), &wasm_sha256, )?; let package_id = sqlx::query( r#" INSERT INTO plugin_packages (name, version, description, publisher, source, registry_url, wasm_sha256, wasm_bytes, manifest, signature) VALUES ($1, $2, $3, $4, 'upload', NULL, $5, $6, $7, $8) ON CONFLICT (name, version, publisher, wasm_sha256) DO UPDATE SET description = EXCLUDED.description, manifest = EXCLUDED.manifest, signature = EXCLUDED.signature RETURNING id "#, ) .bind(&req.name) .bind(&req.version) .bind(&req.description) .bind(&publisher) .bind(&wasm_sha256) .bind(&wasm_bytes) .bind(&req.manifest) .bind(&signature_bytes) .fetch_one(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? .try_get::("id") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let row = sqlx::query( r#" INSERT INTO community_plugin_packages (community_id, package_id, installed_by, is_active) VALUES ($1, $2, $3, true) ON CONFLICT (community_id, package_id) DO UPDATE SET is_active = EXCLUDED.is_active RETURNING installed_at, is_active "#, ) .bind(community_id) .bind(package_id) .bind(auth.user_id) .fetch_one(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; // Deactivate other active versions of this plugin in THIS community. let rows = sqlx::query( r#" SELECT cpp.package_id, COALESCE(cpp.settings,'{}'::jsonb) as settings FROM community_plugin_packages cpp JOIN plugin_packages pp ON pp.id = cpp.package_id WHERE cpp.community_id = $1 AND cpp.is_active = true AND pp.name = $2 AND cpp.package_id <> $3 "#, ) .bind(community_id) .bind(&req.name) .bind(package_id) .fetch_all(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; for r in rows { let other_package_id: Uuid = r .try_get("package_id") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let other_settings: Value = r .try_get("settings") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let _ = sqlx::query( "UPDATE community_plugin_packages SET is_active = false WHERE community_id = $1 AND package_id = $2", ) .bind(community_id) .bind(other_package_id) .execute(&pool) .await; plugins .handle_community_plugin_package_change( community_id, Some(auth.user_id), other_package_id, true, other_settings.clone(), false, other_settings, ) .await; } // Activate lifecycle hook for the newly uploaded package. plugins .handle_community_plugin_package_change( community_id, Some(auth.user_id), package_id, false, json!({}), true, json!({}), ) .await; let installed_at = row .try_get::, _>("installed_at") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let is_active = row .try_get::("is_active") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let _ = sqlx::query!( r#"INSERT INTO public_events (community_id, actor_user_id, plugin_name, event_type, payload) VALUES ($1, $2, NULL, 'plugin.package_uploaded', $3)"#, community_id, auth.user_id, json!({"name": req.name, "version": req.version, "publisher": publisher, "sha256": wasm_sha256}) ) .execute(&pool) .await; Ok(Json(CommunityPluginPackageInfo { package_id, name: req.name, version: req.version, description: req.description, publisher, source: "upload".to_string(), registry_url: None, wasm_sha256, manifest: req.manifest, settings: json!({}), signature_present: signature_bytes.is_some(), installed_at, is_active, })) } async fn install_registry_plugin_package( auth: AuthUser, Path(community_id): Path, State(pool): State, Extension(plugins): Extension>, Json(req): Json, ) -> Result, (StatusCode, String)> { ensure_admin_or_moderator(&pool, auth.user_id, community_id).await?; let settings = load_community_settings(&pool, community_id).await?; let trust_policy = parse_trust_policy(&settings); let sources = parse_install_sources(&settings); let trusted_publishers = parse_string_list(&settings, "plugin_trusted_publishers"); let registry_allowlist = parse_string_list(&settings, "plugin_registry_allowlist"); if !sources.contains(&PluginInstallSource::Registry) { return Err((StatusCode::FORBIDDEN, "Registry installs are disabled by policy".to_string())); } let url = Url::parse(&req.url) .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid registry URL".to_string()))?; match url.scheme() { "https" | "http" => {} _ => return Err((StatusCode::BAD_REQUEST, "Invalid registry URL scheme".to_string())), } enforce_registry_allowlist(&url, ®istry_allowlist)?; let res = reqwest::get(url.clone()) .await .map_err(|e| (StatusCode::BAD_GATEWAY, e.to_string()))?; if !res.status().is_success() { return Err((StatusCode::BAD_GATEWAY, "Registry fetch failed".to_string())); } let bundle: UploadPluginPackageRequest = res .json() .await .map_err(|_| (StatusCode::BAD_GATEWAY, "Invalid registry response".to_string()))?; let parsed_manifest: PluginManifest = serde_json::from_value(bundle.manifest.clone()) .map_err(|e| (StatusCode::BAD_GATEWAY, format!("Registry returned invalid manifest: {e}")))?; if parsed_manifest.name != bundle.name { return Err(( StatusCode::BAD_GATEWAY, "Registry manifest name mismatch".to_string(), )); } if parsed_manifest.version != bundle.version { return Err(( StatusCode::BAD_GATEWAY, "Registry manifest version mismatch".to_string(), )); } let publisher = bundle.publisher.unwrap_or_default(); let wasm_bytes = decode_base64(&bundle.wasm_base64)?; let wasm_sha256 = sha256_hex(&wasm_bytes); let signature_bytes = verify_signature_if_required( trust_policy, &trusted_publishers, &publisher, bundle.signature_base64.as_deref(), &wasm_sha256, )?; let package_id = sqlx::query( r#" INSERT INTO plugin_packages (name, version, description, publisher, source, registry_url, wasm_sha256, wasm_bytes, manifest, signature) VALUES ($1, $2, $3, $4, 'registry', $5, $6, $7, $8, $9) ON CONFLICT (name, version, publisher, wasm_sha256) DO UPDATE SET description = EXCLUDED.description, manifest = EXCLUDED.manifest, signature = EXCLUDED.signature, registry_url = EXCLUDED.registry_url RETURNING id "#, ) .bind(&bundle.name) .bind(&bundle.version) .bind(&bundle.description) .bind(&publisher) .bind(url.as_str()) .bind(&wasm_sha256) .bind(&wasm_bytes) .bind(&bundle.manifest) .bind(&signature_bytes) .fetch_one(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? .try_get::("id") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let row = sqlx::query( r#" INSERT INTO community_plugin_packages (community_id, package_id, installed_by, is_active) VALUES ($1, $2, $3, true) ON CONFLICT (community_id, package_id) DO UPDATE SET is_active = EXCLUDED.is_active RETURNING installed_at, is_active "#, ) .bind(community_id) .bind(package_id) .bind(auth.user_id) .fetch_one(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; // Deactivate other active versions of this plugin in THIS community. let rows = sqlx::query( r#" SELECT cpp.package_id, COALESCE(cpp.settings,'{}'::jsonb) as settings FROM community_plugin_packages cpp JOIN plugin_packages pp ON pp.id = cpp.package_id WHERE cpp.community_id = $1 AND cpp.is_active = true AND pp.name = $2 AND cpp.package_id <> $3 "#, ) .bind(community_id) .bind(&bundle.name) .bind(package_id) .fetch_all(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; for r in rows { let other_package_id: Uuid = r .try_get("package_id") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let other_settings: Value = r .try_get("settings") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let _ = sqlx::query( "UPDATE community_plugin_packages SET is_active = false WHERE community_id = $1 AND package_id = $2", ) .bind(community_id) .bind(other_package_id) .execute(&pool) .await; plugins .handle_community_plugin_package_change( community_id, Some(auth.user_id), other_package_id, true, other_settings.clone(), false, other_settings, ) .await; } // Activate lifecycle hook for the newly installed package. plugins .handle_community_plugin_package_change( community_id, Some(auth.user_id), package_id, false, json!({}), true, json!({}), ) .await; let installed_at = row .try_get::, _>("installed_at") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let is_active = row .try_get::("is_active") .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let _ = sqlx::query!( r#"INSERT INTO public_events (community_id, actor_user_id, plugin_name, event_type, payload) VALUES ($1, $2, NULL, 'plugin.package_installed', $3)"#, community_id, auth.user_id, json!({"name": bundle.name, "version": bundle.version, "publisher": publisher, "sha256": wasm_sha256, "registry_url": url.as_str()}) ) .execute(&pool) .await; Ok(Json(CommunityPluginPackageInfo { package_id, name: bundle.name, version: bundle.version, description: bundle.description, publisher, source: "registry".to_string(), registry_url: Some(url.as_str().to_string()), wasm_sha256, manifest: bundle.manifest, settings: json!({}), signature_present: signature_bytes.is_some(), installed_at, is_active, })) } fn parse_trust_policy(settings: &Value) -> PluginTrustPolicy { match settings .get("plugin_trust_policy") .and_then(|v| v.as_str()) { Some("unsigned_allowed") => PluginTrustPolicy::UnsignedAllowed, _ => PluginTrustPolicy::SignedOnly, } } fn trust_policy_str(policy: PluginTrustPolicy) -> &'static str { match policy { PluginTrustPolicy::SignedOnly => "signed_only", PluginTrustPolicy::UnsignedAllowed => "unsigned_allowed", } } fn parse_install_sources(settings: &Value) -> Vec { let Some(arr) = settings.get("plugin_install_sources").and_then(|v| v.as_array()) else { return vec![PluginInstallSource::Upload, PluginInstallSource::Registry]; }; let mut sources: Vec = Vec::new(); for v in arr { match v.as_str() { Some("upload") => sources.push(PluginInstallSource::Upload), Some("registry") => sources.push(PluginInstallSource::Registry), _ => {} } } sources.sort_by_key(|s| match s { PluginInstallSource::Upload => 0, PluginInstallSource::Registry => 1, }); sources.dedup(); if sources.is_empty() { vec![PluginInstallSource::Upload, PluginInstallSource::Registry] } else { sources } } fn install_sources_json(sources: &[PluginInstallSource]) -> Value { let mut v: Vec = sources .iter() .map(|s| match s { PluginInstallSource::Upload => Value::String("upload".to_string()), PluginInstallSource::Registry => Value::String("registry".to_string()), }) .collect(); v.sort_by(|a, b| a.as_str().unwrap_or("").cmp(b.as_str().unwrap_or(""))); v.dedup(); Value::Array(v) } fn parse_bool(settings: &Value, key: &str, default: bool) -> bool { settings.get(key).and_then(|v| v.as_bool()).unwrap_or(default) } fn parse_string_list(settings: &Value, key: &str) -> Vec { let Some(arr) = settings.get(key).and_then(|v| v.as_array()) else { return Vec::new(); }; let mut out: Vec = arr .iter() .filter_map(|v| v.as_str().map(|s| s.to_string())) .collect(); out.sort(); out.dedup(); out } async fn list_community_plugins( auth: AuthUser, Path(community_id): Path, State(pool): State, ) -> Result>, (StatusCode, String)> { let membership = sqlx::query!( "SELECT role FROM community_members WHERE user_id = $1 AND community_id = $2", auth.user_id, community_id ) .fetch_optional(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; match membership { Some(m) if m.role == "admin" || m.role == "moderator" => {} _ => return Err((StatusCode::FORBIDDEN, "Must be admin or moderator".to_string())), } let rows = sqlx::query!( r#" SELECT p.name, p.version, p.description, p.is_core, p.is_active as global_is_active, COALESCE(cp.is_active, false) as "community_is_active!", COALESCE(cp.settings, '{}'::jsonb) as "settings!: serde_json::Value", p.settings_schema as "settings_schema: serde_json::Value" FROM plugins p LEFT JOIN community_plugins cp ON cp.plugin_id = p.id AND cp.community_id = $1 ORDER BY p.is_core DESC, p.name ASC "#, community_id ) .fetch_all(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; Ok(Json( rows.into_iter() .map(|r| CommunityPluginInfo { name: r.name, version: r.version, description: r.description, is_core: r.is_core, global_is_active: r.global_is_active, community_is_active: r.community_is_active, settings: r.settings, settings_schema: r.settings_schema, }) .collect(), )) } async fn update_community_plugin( auth: AuthUser, Path((community_id, plugin_name)): Path<(Uuid, String)>, State(pool): State, Extension(plugins): Extension>, Json(req): Json, ) -> Result, (StatusCode, String)> { if req.is_active.is_none() && req.settings.is_none() { return Err(( StatusCode::BAD_REQUEST, "Must provide is_active and/or settings".to_string(), )); } let membership = sqlx::query!( "SELECT role FROM community_members WHERE user_id = $1 AND community_id = $2", auth.user_id, community_id ) .fetch_optional(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; match membership { Some(m) if m.role == "admin" || m.role == "moderator" => {} _ => return Err((StatusCode::FORBIDDEN, "Must be admin or moderator".to_string())), } let plugin = sqlx::query!( "SELECT id, name, version, description, is_core, is_active, settings_schema FROM plugins WHERE name = $1", plugin_name ) .fetch_optional(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? .ok_or((StatusCode::NOT_FOUND, "Plugin not found".to_string()))?; if !plugin.is_active { return Err(( StatusCode::CONFLICT, "Plugin is disabled globally".to_string(), )); } if plugin.is_core { if let Some(false) = req.is_active { return Err(( StatusCode::FORBIDDEN, "Core plugins cannot be deactivated".to_string(), )); } } if let Some(settings) = &req.settings { if let Some(schema) = &plugin.settings_schema { let compiled = JSONSchema::options() .with_draft(Draft::Draft7) .compile(schema) .map_err(|e| { ( StatusCode::INTERNAL_SERVER_ERROR, format!("Invalid settings schema for plugin {}: {}", plugin.name, e), ) })?; if !compiled.is_valid(settings) { let errors = compiled.validate(settings).err().unwrap(); let msgs: Vec = errors.take(5).map(|e| e.to_string()).collect(); return Err(( StatusCode::BAD_REQUEST, format!("Invalid settings: {}", msgs.join("; ")), )); } } } let old = sqlx::query!( r#" SELECT is_active, settings as "settings!: serde_json::Value" FROM community_plugins WHERE community_id = $1 AND plugin_id = $2 "#, community_id, plugin.id ) .fetch_optional(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let old_is_active = old.as_ref().map(|r| r.is_active).unwrap_or(false); let old_settings = old .as_ref() .map(|r| r.settings.clone()) .unwrap_or_else(|| json!({})); let settings_to_apply = req.settings.clone(); let is_active_to_apply = req.is_active; let row = sqlx::query!( r#" INSERT INTO community_plugins (community_id, plugin_id, settings, is_active) VALUES ($1, $2, COALESCE($3, '{}'::jsonb), COALESCE($4, true)) ON CONFLICT (community_id, plugin_id) DO UPDATE SET settings = COALESCE($3, community_plugins.settings), is_active = COALESCE($4, community_plugins.is_active), activated_at = CASE WHEN COALESCE($4, community_plugins.is_active) = true AND community_plugins.is_active = false THEN NOW() ELSE community_plugins.activated_at END RETURNING is_active as community_is_active, settings as "settings!: serde_json::Value" "#, community_id, plugin.id, settings_to_apply, is_active_to_apply ) .fetch_one(&pool) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let event_type = match is_active_to_apply { Some(true) => Some("plugin.activated"), Some(false) => Some("plugin.deactivated"), None => None, }; if let Some(event_type) = event_type { let _ = sqlx::query!( r#"INSERT INTO public_events (community_id, actor_user_id, plugin_name, event_type, payload) VALUES ($1, $2, $3, $4, $5)"#, community_id, auth.user_id, plugin.name, event_type, serde_json::json!({}) ) .execute(&pool) .await; } if req.settings.is_some() { let keys = redacted_settings_keys(req.settings.as_ref().unwrap()); let _ = sqlx::query!( r#"INSERT INTO public_events (community_id, actor_user_id, plugin_name, event_type, payload) VALUES ($1, $2, $3, 'plugin.settings_updated', $4)"#, community_id, auth.user_id, plugin.name, serde_json::json!({"keys": keys}) ) .execute(&pool) .await; } plugins .handle_community_plugin_change( community_id, Some(auth.user_id), &plugin.name, old_is_active, old_settings, row.community_is_active, row.settings.clone(), ) .await; Ok(Json(CommunityPluginInfo { name: plugin.name, version: plugin.version, description: plugin.description, is_core: plugin.is_core, global_is_active: plugin.is_active, community_is_active: row.community_is_active, settings: row.settings, settings_schema: plugin.settings_schema, })) }