likwid/backend/src/api/plugins.rs
Marco Allegretti d4bcba405b backend: modify 56 files
Verified changes:
- modify backend/src/api/analytics.rs
- modify backend/src/api/approvals.rs
- modify backend/src/api/auth.rs
- modify backend/src/api/comments.rs
- modify backend/src/api/communities.rs
- modify backend/src/api/conflicts.rs
- modify backend/src/api/delegation.rs
- modify backend/src/api/deliberation.rs
- modify backend/src/api/demo.rs
- modify backend/src/api/exports.rs
- modify backend/src/api/federation.rs
- modify backend/src/api/gitlab.rs
- modify backend/src/api/invitations.rs
- modify backend/src/api/lifecycle.rs
- modify backend/src/api/mod.rs
- modify backend/src/api/moderation.rs
- modify backend/src/api/moderation_ledger.rs
- modify backend/src/api/notifications.rs
- modify backend/src/api/permissions.rs
- modify backend/src/api/plugins.rs
- modify backend/src/api/proposals.rs
- modify backend/src/api/roles.rs
- modify backend/src/api/self_moderation.rs
- modify backend/src/api/settings.rs
- modify backend/src/api/users.rs
- modify backend/src/api/voting_config.rs
- modify backend/src/api/workflows.rs
- modify backend/src/auth/jwt.rs
- modify backend/src/auth/middleware.rs
- modify backend/src/auth/mod.rs
- modify backend/src/demo/mod.rs
- modify backend/src/main.rs
- modify backend/src/models/community.rs
- modify backend/src/models/mod.rs
- modify backend/src/models/proposal.rs
- modify backend/src/models/user.rs
- modify backend/src/plugins/builtin/conflict_resolution.rs
- modify backend/src/plugins/builtin/decision_workflows.rs
- modify backend/src/plugins/builtin/federation.rs
- modify backend/src/plugins/builtin/governance_analytics.rs
- modify backend/src/plugins/builtin/moderation_ledger.rs
- modify backend/src/plugins/builtin/proposal_lifecycle.rs
- modify backend/src/plugins/builtin/public_data_export.rs
- modify backend/src/plugins/builtin/self_moderation.rs
- modify backend/src/plugins/builtin/structured_deliberation.rs
- modify backend/src/plugins/hooks.rs
- modify backend/src/plugins/manager.rs
- modify backend/src/plugins/wasm/host_api.rs
- modify backend/src/plugins/wasm/plugin.rs
- modify backend/src/plugins/wasm/runtime.rs
- modify backend/src/rate_limit.rs
- modify backend/src/voting/mod.rs
- modify backend/src/voting/quadratic.rs
- modify backend/src/voting/ranked_choice.rs
- modify backend/src/voting/schulze.rs
- modify backend/src/voting/star.rs

Diffstat:
- 56 files changed, 2697 insertions(+), 1629 deletions(-)
2026-02-03 17:54:39 +01:00

1572 lines
51 KiB
Rust

use axum::{
extract::{Path, State},
http::StatusCode,
routing::{get, post, put},
Extension, Json, Router,
};
use base64::{engine::general_purpose, Engine as _};
use chrono::{DateTime, Utc};
use ed25519_dalek::{Signature, VerifyingKey};
use jsonschema::{Draft, JSONSchema};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use sha2::{Digest, Sha256};
use sqlx::PgPool;
use sqlx::Row;
use std::net::IpAddr;
use std::sync::Arc;
use uuid::Uuid;
use crate::auth::AuthUser;
use crate::plugins::wasm::host_api::PluginManifest;
use crate::plugins::PluginManager;
#[derive(Debug, Serialize)]
pub struct CommunityPluginInfo {
pub name: String,
pub version: String,
pub description: Option<String>,
pub is_core: bool,
pub global_is_active: bool,
pub community_is_active: bool,
pub settings: Value,
pub settings_schema: Option<Value>,
}
async fn get_plugin_policy(
auth: AuthUser,
Path(community_id): Path<Uuid>,
State(pool): State<PgPool>,
) -> Result<Json<PluginPolicyResponse>, (StatusCode, String)> {
let membership = sqlx::query!(
"SELECT role FROM community_members WHERE user_id = $1 AND community_id = $2",
auth.user_id,
community_id
)
.fetch_optional(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
match membership {
Some(m) if m.role == "admin" || m.role == "moderator" => {}
_ => {
return Err((
StatusCode::FORBIDDEN,
"Must be admin or moderator".to_string(),
))
}
}
let row = sqlx::query!(
r#"SELECT settings as "settings!: serde_json::Value" FROM communities WHERE id = $1 AND is_active = true"#,
community_id
)
.fetch_optional(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((StatusCode::NOT_FOUND, "Community not found".to_string()))?;
Ok(Json(PluginPolicyResponse {
trust_policy: parse_trust_policy(&row.settings),
install_sources: parse_install_sources(&row.settings),
allow_outbound_http: parse_bool(&row.settings, "plugin_allow_outbound_http", false),
http_egress_allowlist: parse_string_list(&row.settings, "plugin_http_egress_allowlist"),
registry_allowlist: parse_string_list(&row.settings, "plugin_registry_allowlist"),
allow_background_jobs: parse_bool(&row.settings, "plugin_allow_background_jobs", false),
trusted_publishers: parse_string_list(&row.settings, "plugin_trusted_publishers"),
}))
}
async fn update_plugin_policy(
auth: AuthUser,
Path(community_id): Path<Uuid>,
State(pool): State<PgPool>,
Json(req): Json<UpdatePluginPolicyRequest>,
) -> Result<Json<PluginPolicyResponse>, (StatusCode, String)> {
let membership = sqlx::query!(
"SELECT role FROM community_members WHERE user_id = $1 AND community_id = $2",
auth.user_id,
community_id
)
.fetch_optional(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
match membership {
Some(m) if m.role == "admin" || m.role == "moderator" => {}
_ => {
return Err((
StatusCode::FORBIDDEN,
"Must be admin or moderator".to_string(),
))
}
}
let current = sqlx::query!(
r#"SELECT settings as "settings!: serde_json::Value" FROM communities WHERE id = $1 AND is_active = true"#,
community_id
)
.fetch_optional(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((StatusCode::NOT_FOUND, "Community not found".to_string()))?;
let mut patch = serde_json::Map::new();
let mut event_patch = serde_json::Map::new();
if let Some(tp) = req.trust_policy {
let v = trust_policy_str(tp).to_string();
patch.insert("plugin_trust_policy".to_string(), Value::String(v.clone()));
event_patch.insert("trust_policy".to_string(), Value::String(v));
}
if let Some(sources) = &req.install_sources {
let v = install_sources_json(sources);
patch.insert("plugin_install_sources".to_string(), v.clone());
event_patch.insert("install_sources".to_string(), v);
}
if let Some(v) = req.allow_outbound_http {
patch.insert("plugin_allow_outbound_http".to_string(), Value::Bool(v));
event_patch.insert("allow_outbound_http".to_string(), Value::Bool(v));
}
if let Some(list) = &req.http_egress_allowlist {
let mut out = list.clone();
out.sort();
out.dedup();
let v = Value::Array(out.into_iter().map(Value::String).collect());
patch.insert("plugin_http_egress_allowlist".to_string(), v.clone());
event_patch.insert("http_egress_allowlist".to_string(), v);
}
if let Some(list) = &req.registry_allowlist {
let mut out = list.clone();
out.sort();
out.dedup();
let v = Value::Array(out.into_iter().map(Value::String).collect());
patch.insert("plugin_registry_allowlist".to_string(), v.clone());
event_patch.insert("registry_allowlist".to_string(), v);
}
if let Some(v) = req.allow_background_jobs {
patch.insert("plugin_allow_background_jobs".to_string(), Value::Bool(v));
event_patch.insert("allow_background_jobs".to_string(), Value::Bool(v));
}
if let Some(list) = &req.trusted_publishers {
let mut out = list.clone();
out.sort();
out.dedup();
let v = Value::Array(out.into_iter().map(Value::String).collect());
patch.insert("plugin_trusted_publishers".to_string(), v.clone());
event_patch.insert("trusted_publishers".to_string(), v);
}
if patch.is_empty() {
return Ok(Json(PluginPolicyResponse {
trust_policy: parse_trust_policy(&current.settings),
install_sources: parse_install_sources(&current.settings),
allow_outbound_http: parse_bool(&current.settings, "plugin_allow_outbound_http", false),
http_egress_allowlist: parse_string_list(
&current.settings,
"plugin_http_egress_allowlist",
),
registry_allowlist: parse_string_list(&current.settings, "plugin_registry_allowlist"),
allow_background_jobs: parse_bool(
&current.settings,
"plugin_allow_background_jobs",
false,
),
trusted_publishers: parse_string_list(&current.settings, "plugin_trusted_publishers"),
}));
}
let patch_value = Value::Object(patch);
let row = sqlx::query!(
r#"
UPDATE communities
SET settings = settings || $2::jsonb,
updated_at = NOW()
WHERE id = $1 AND is_active = true
RETURNING settings as "settings!: serde_json::Value"
"#,
community_id,
patch_value
)
.fetch_optional(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((StatusCode::NOT_FOUND, "Community not found".to_string()))?;
let _ = sqlx::query!(
r#"INSERT INTO public_events (community_id, actor_user_id, plugin_name, event_type, payload)
VALUES ($1, $2, NULL, 'plugin.policy_updated', $3)"#,
community_id,
auth.user_id,
Value::Object(event_patch)
)
.execute(&pool)
.await;
Ok(Json(PluginPolicyResponse {
trust_policy: parse_trust_policy(&row.settings),
install_sources: parse_install_sources(&row.settings),
allow_outbound_http: parse_bool(&row.settings, "plugin_allow_outbound_http", false),
http_egress_allowlist: parse_string_list(&row.settings, "plugin_http_egress_allowlist"),
registry_allowlist: parse_string_list(&row.settings, "plugin_registry_allowlist"),
allow_background_jobs: parse_bool(&row.settings, "plugin_allow_background_jobs", false),
trusted_publishers: parse_string_list(&row.settings, "plugin_trusted_publishers"),
}))
}
#[derive(Debug, Deserialize)]
pub struct UpdateCommunityPluginRequest {
pub is_active: Option<bool>,
pub settings: Option<Value>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PluginTrustPolicy {
SignedOnly,
UnsignedAllowed,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum PluginInstallSource {
Upload,
Registry,
}
#[derive(Debug, Serialize)]
pub struct PluginPolicyResponse {
pub trust_policy: PluginTrustPolicy,
pub install_sources: Vec<PluginInstallSource>,
pub allow_outbound_http: bool,
pub http_egress_allowlist: Vec<String>,
pub registry_allowlist: Vec<String>,
pub allow_background_jobs: bool,
pub trusted_publishers: Vec<String>,
}
#[derive(Debug, Deserialize)]
pub struct UpdatePluginPolicyRequest {
pub trust_policy: Option<PluginTrustPolicy>,
pub install_sources: Option<Vec<PluginInstallSource>>,
pub allow_outbound_http: Option<bool>,
pub http_egress_allowlist: Option<Vec<String>>,
pub registry_allowlist: Option<Vec<String>>,
pub allow_background_jobs: Option<bool>,
pub trusted_publishers: Option<Vec<String>>,
}
#[derive(Debug, Deserialize)]
pub struct UploadPluginPackageRequest {
pub name: String,
pub version: String,
pub description: Option<String>,
pub publisher: Option<String>,
pub manifest: Value,
pub wasm_base64: String,
pub signature_base64: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct InstallRegistryPluginPackageRequest {
pub url: String,
}
#[derive(Debug, Serialize)]
pub struct CommunityPluginPackageInfo {
pub package_id: Uuid,
pub name: String,
pub version: String,
pub description: Option<String>,
pub publisher: String,
pub source: String,
pub registry_url: Option<String>,
pub wasm_sha256: String,
pub manifest: Value,
pub settings: Value,
pub signature_present: bool,
pub installed_at: DateTime<Utc>,
pub is_active: bool,
}
#[derive(Debug, Deserialize)]
pub struct UpdateCommunityPluginPackageRequest {
pub is_active: Option<bool>,
pub settings: Option<Value>,
}
fn redacted_settings_keys(settings: &Value) -> Vec<String> {
let Some(obj) = settings.as_object() else {
return Vec::new();
};
let mut keys: Vec<String> = obj
.keys()
.map(|k| {
let lower = k.to_ascii_lowercase();
if lower.contains("secret")
|| lower.contains("token")
|| lower.contains("password")
|| lower.ends_with("_key")
|| lower.ends_with("_token")
{
"<redacted>".to_string()
} else {
k.to_string()
}
})
.collect();
keys.sort();
keys.dedup();
keys
}
pub fn router(pool: PgPool) -> Router {
Router::new()
.route(
"/api/communities/{community_id}/plugins",
get(list_community_plugins),
)
.route(
"/api/communities/{community_id}/plugins/{plugin_name}",
put(update_community_plugin),
)
.route(
"/api/communities/{community_id}/plugin-policy",
get(get_plugin_policy).put(update_plugin_policy),
)
.route(
"/api/communities/{community_id}/plugin-packages",
get(list_community_plugin_packages).post(upload_plugin_package),
)
.route(
"/api/communities/{community_id}/plugin-packages/{package_id}",
put(update_community_plugin_package),
)
.route(
"/api/communities/{community_id}/plugin-packages/install-registry",
post(install_registry_plugin_package),
)
.with_state(pool)
}
fn redact_settings_values(settings: &Value) -> Value {
let Some(obj) = settings.as_object() else {
return settings.clone();
};
let mut out = serde_json::Map::new();
for (k, v) in obj {
let lower = k.to_ascii_lowercase();
if lower.contains("secret")
|| lower.contains("token")
|| lower.contains("password")
|| lower.ends_with("_key")
|| lower.ends_with("_token")
{
out.insert(k.clone(), Value::String("<redacted>".to_string()));
} else {
out.insert(k.clone(), v.clone());
}
}
Value::Object(out)
}
fn decode_base64(s: &str) -> Result<Vec<u8>, (StatusCode, String)> {
general_purpose::STANDARD
.decode(s)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid base64".to_string()))
}
fn sha256_hex(bytes: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(bytes);
let out = hasher.finalize();
format!("{:x}", out)
}
fn verify_signature_if_required(
trust_policy: PluginTrustPolicy,
trusted_publishers: &[String],
publisher: &str,
signature_base64: Option<&str>,
wasm_sha256: &str,
) -> Result<Option<Vec<u8>>, (StatusCode, String)> {
let Some(sig_b64) = signature_base64 else {
if matches!(trust_policy, PluginTrustPolicy::SignedOnly) {
return Err((StatusCode::FORBIDDEN, "Plugin must be signed".to_string()));
}
return Ok(None);
};
let sig_bytes = decode_base64(sig_b64)?;
let sig_arr: [u8; 64] = sig_bytes
.as_slice()
.try_into()
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid signature".to_string()))?;
let sig = Signature::from_bytes(&sig_arr);
if matches!(trust_policy, PluginTrustPolicy::SignedOnly) {
if trusted_publishers.is_empty() {
return Err((
StatusCode::FORBIDDEN,
"No trusted publishers configured".to_string(),
));
}
if !trusted_publishers.iter().any(|p| p == publisher) {
return Err((StatusCode::FORBIDDEN, "Publisher not trusted".to_string()));
}
}
let key_bytes = decode_base64(publisher)
.map_err(|_| (StatusCode::FORBIDDEN, "Invalid publisher key".to_string()))?;
let key_arr: [u8; 32] = key_bytes
.as_slice()
.try_into()
.map_err(|_| (StatusCode::FORBIDDEN, "Invalid publisher key".to_string()))?;
let key = VerifyingKey::from_bytes(&key_arr)
.map_err(|_| (StatusCode::FORBIDDEN, "Invalid publisher key".to_string()))?;
key.verify_strict(wasm_sha256.as_bytes(), &sig)
.map_err(|_| (StatusCode::FORBIDDEN, "Invalid signature".to_string()))?;
Ok(Some(sig_arr.to_vec()))
}
fn enforce_registry_allowlist(url: &Url, allowlist: &[String]) -> Result<(), (StatusCode, String)> {
let host = url.host_str().ok_or((
StatusCode::BAD_REQUEST,
"Registry URL must include host".to_string(),
))?;
if let Ok(ip) = host.parse::<IpAddr>() {
let is_disallowed = match ip {
IpAddr::V4(v4) => {
v4.is_loopback() || v4.is_private() || v4.is_link_local() || v4.is_unspecified()
}
IpAddr::V6(v6) => {
v6.is_loopback()
|| v6.is_unique_local()
|| v6.is_unicast_link_local()
|| v6.is_unspecified()
}
};
if is_disallowed {
return Err((
StatusCode::FORBIDDEN,
"Registry host is not allowed".to_string(),
));
}
}
if host.eq_ignore_ascii_case("localhost") {
return Err((
StatusCode::FORBIDDEN,
"Registry host is not allowed".to_string(),
));
}
if !allowlist.is_empty() && !allowlist.iter().any(|h| h == host) {
return Err((
StatusCode::FORBIDDEN,
"Registry host not in allowlist".to_string(),
));
}
Ok(())
}
async fn ensure_admin_or_moderator(
pool: &PgPool,
user_id: Uuid,
community_id: Uuid,
) -> Result<(), (StatusCode, String)> {
let membership = sqlx::query!(
"SELECT role FROM community_members WHERE user_id = $1 AND community_id = $2",
user_id,
community_id
)
.fetch_optional(pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
match membership {
Some(m) if m.role == "admin" || m.role == "moderator" => Ok(()),
_ => Err((
StatusCode::FORBIDDEN,
"Must be admin or moderator".to_string(),
)),
}
}
async fn load_community_settings(
pool: &PgPool,
community_id: Uuid,
) -> Result<Value, (StatusCode, String)> {
let row = sqlx::query!(
r#"SELECT settings as "settings!: serde_json::Value" FROM communities WHERE id = $1 AND is_active = true"#,
community_id
)
.fetch_optional(pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((StatusCode::NOT_FOUND, "Community not found".to_string()))?;
Ok(row.settings)
}
async fn list_community_plugin_packages(
auth: AuthUser,
Path(community_id): Path<Uuid>,
State(pool): State<PgPool>,
) -> Result<Json<Vec<CommunityPluginPackageInfo>>, (StatusCode, String)> {
ensure_admin_or_moderator(&pool, auth.user_id, community_id).await?;
let rows = sqlx::query(
r#"
SELECT
pp.id as package_id,
pp.name,
pp.version,
pp.description,
COALESCE(pp.publisher, '') as publisher,
pp.source,
pp.registry_url,
pp.wasm_sha256,
pp.manifest,
(pp.signature IS NOT NULL) as signature_present,
COALESCE(cpp.settings, '{}'::jsonb) as settings,
cpp.installed_at,
cpp.is_active
FROM community_plugin_packages cpp
JOIN plugin_packages pp ON pp.id = cpp.package_id
WHERE cpp.community_id = $1
ORDER BY cpp.installed_at DESC
"#,
)
.bind(community_id)
.fetch_all(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let mut out: Vec<CommunityPluginPackageInfo> = Vec::new();
for r in rows {
out.push(CommunityPluginPackageInfo {
package_id: r
.try_get::<Uuid, _>("package_id")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?,
name: r
.try_get::<String, _>("name")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?,
version: r
.try_get::<String, _>("version")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?,
description: r
.try_get::<Option<String>, _>("description")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?,
publisher: r
.try_get::<String, _>("publisher")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?,
source: r
.try_get::<String, _>("source")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?,
registry_url: r
.try_get::<Option<String>, _>("registry_url")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?,
wasm_sha256: r
.try_get::<String, _>("wasm_sha256")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?,
manifest: r
.try_get::<Value, _>("manifest")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?,
settings: redact_settings_values(
&r.try_get::<Value, _>("settings")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?,
),
signature_present: r
.try_get::<bool, _>("signature_present")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?,
installed_at: r
.try_get::<DateTime<Utc>, _>("installed_at")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?,
is_active: r
.try_get::<bool, _>("is_active")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?,
});
}
Ok(Json(out))
}
async fn update_community_plugin_package(
auth: AuthUser,
Path((community_id, package_id)): Path<(Uuid, Uuid)>,
State(pool): State<PgPool>,
Extension(plugins): Extension<Arc<PluginManager>>,
Json(req): Json<UpdateCommunityPluginPackageRequest>,
) -> Result<Json<CommunityPluginPackageInfo>, (StatusCode, String)> {
if req.is_active.is_none() && req.settings.is_none() {
return Err((
StatusCode::BAD_REQUEST,
"Must provide is_active and/or settings".to_string(),
));
}
ensure_admin_or_moderator(&pool, auth.user_id, community_id).await?;
let pkg = sqlx::query!(
r#"SELECT id, name, version, description, COALESCE(publisher,'') as publisher, source, registry_url, wasm_sha256,
manifest as "manifest!: serde_json::Value", (signature IS NOT NULL) as "signature_present!: bool"
FROM plugin_packages
WHERE id = $1"#,
package_id
)
.fetch_optional(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((StatusCode::NOT_FOUND, "Package not found".to_string()))?;
let link_row = sqlx::query(
r#"SELECT is_active, COALESCE(settings,'{}'::jsonb) as settings, installed_at
FROM community_plugin_packages
WHERE community_id = $1 AND package_id = $2"#,
)
.bind(community_id)
.bind(package_id)
.fetch_optional(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((
StatusCode::NOT_FOUND,
"Package not installed for community".to_string(),
))?;
let link_is_active: bool = link_row
.try_get("is_active")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let link_settings: Value = link_row
.try_get("settings")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let _link_installed_at: DateTime<Utc> = link_row
.try_get("installed_at")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let manifest: PluginManifest = serde_json::from_value(pkg.manifest.clone()).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Stored manifest invalid: {e}"),
)
})?;
if let Some(settings) = &req.settings {
if let Some(schema) = &manifest.settings_schema {
if !schema.is_null() {
let compiled = JSONSchema::options()
.with_draft(Draft::Draft7)
.compile(schema)
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Invalid settings schema: {e}"),
)
})?;
if !compiled.is_valid(settings) {
let msgs: Vec<String> = match compiled.validate(settings) {
Ok(()) => vec!["Invalid settings".to_string()],
Err(errors) => errors.take(5).map(|e| e.to_string()).collect(),
};
return Err((
StatusCode::BAD_REQUEST,
format!("Invalid settings: {}", msgs.join("; ")),
));
}
}
}
}
let new_is_active = req.is_active.unwrap_or(link_is_active);
let new_settings = req.settings.clone().unwrap_or(link_settings.clone());
let row = sqlx::query(
r#"UPDATE community_plugin_packages
SET is_active = $3,
settings = $4
WHERE community_id = $1 AND package_id = $2
RETURNING installed_at"#,
)
.bind(community_id)
.bind(package_id)
.bind(new_is_active)
.bind(new_settings.clone())
.fetch_one(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let updated_installed_at: DateTime<Utc> = row
.try_get("installed_at")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
// If activating, deactivate any other active versions of this plugin name in THIS community,
// and invoke their lifecycle hooks.
if new_is_active {
let rows = sqlx::query(
r#"
SELECT cpp.package_id, COALESCE(cpp.settings,'{}'::jsonb) as settings
FROM community_plugin_packages cpp
JOIN plugin_packages pp ON pp.id = cpp.package_id
WHERE cpp.community_id = $1
AND cpp.is_active = true
AND pp.name = $2
AND cpp.package_id <> $3
"#,
)
.bind(community_id)
.bind(&pkg.name)
.bind(package_id)
.fetch_all(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
for r in rows {
let other_package_id: Uuid = r
.try_get("package_id")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let other_settings: Value = r
.try_get("settings")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let _ = sqlx::query(
"UPDATE community_plugin_packages SET is_active = false WHERE community_id = $1 AND package_id = $2",
)
.bind(community_id)
.bind(other_package_id)
.execute(&pool)
.await;
plugins
.handle_community_plugin_package_change(
community_id,
Some(auth.user_id),
other_package_id,
true,
other_settings.clone(),
false,
other_settings,
)
.await;
}
}
plugins
.handle_community_plugin_package_change(
community_id,
Some(auth.user_id),
package_id,
link_is_active,
link_settings.clone(),
new_is_active,
new_settings.clone(),
)
.await;
if let Some(settings) = req.settings.as_ref() {
let keys = redacted_settings_keys(settings);
let _ = sqlx::query!(
r#"INSERT INTO public_events (community_id, actor_user_id, plugin_name, event_type, payload)
VALUES ($1, $2, NULL, 'plugin.package_settings_updated', $3)"#,
community_id,
auth.user_id,
json!({"package_id": package_id, "name": pkg.name, "version": pkg.version, "keys": keys})
)
.execute(&pool)
.await;
}
if req.is_active.is_some() {
let _ = sqlx::query!(
r#"INSERT INTO public_events (community_id, actor_user_id, plugin_name, event_type, payload)
VALUES ($1, $2, NULL, $3, $4)"#,
community_id,
auth.user_id,
if new_is_active { "plugin.package_activated" } else { "plugin.package_deactivated" },
json!({"package_id": package_id, "name": pkg.name, "version": pkg.version})
)
.execute(&pool)
.await;
}
Ok(Json(CommunityPluginPackageInfo {
package_id,
name: pkg.name,
version: pkg.version,
description: pkg.description,
publisher: pkg.publisher.unwrap_or_default(),
source: pkg.source,
registry_url: pkg.registry_url,
wasm_sha256: pkg.wasm_sha256,
manifest: pkg.manifest,
settings: redact_settings_values(&new_settings),
signature_present: pkg.signature_present,
installed_at: updated_installed_at,
is_active: new_is_active,
}))
}
async fn upload_plugin_package(
auth: AuthUser,
Path(community_id): Path<Uuid>,
State(pool): State<PgPool>,
Extension(plugins): Extension<Arc<PluginManager>>,
Json(req): Json<UploadPluginPackageRequest>,
) -> Result<Json<CommunityPluginPackageInfo>, (StatusCode, String)> {
ensure_admin_or_moderator(&pool, auth.user_id, community_id).await?;
let settings = load_community_settings(&pool, community_id).await?;
let trust_policy = parse_trust_policy(&settings);
let sources = parse_install_sources(&settings);
let trusted_publishers = parse_string_list(&settings, "plugin_trusted_publishers");
if !sources.contains(&PluginInstallSource::Upload) {
return Err((
StatusCode::FORBIDDEN,
"Upload installs are disabled by policy".to_string(),
));
}
let publisher = req.publisher.unwrap_or_default();
let wasm_bytes = decode_base64(&req.wasm_base64)?;
let wasm_sha256 = sha256_hex(&wasm_bytes);
let parsed_manifest: PluginManifest = serde_json::from_value(req.manifest.clone())
.map_err(|e| (StatusCode::BAD_REQUEST, format!("Invalid manifest: {e}")))?;
if parsed_manifest.name != req.name {
return Err((
StatusCode::BAD_REQUEST,
"Manifest name must match request name".to_string(),
));
}
if parsed_manifest.version != req.version {
return Err((
StatusCode::BAD_REQUEST,
"Manifest version must match request version".to_string(),
));
}
let signature_bytes = verify_signature_if_required(
trust_policy,
&trusted_publishers,
&publisher,
req.signature_base64.as_deref(),
&wasm_sha256,
)?;
let package_id = sqlx::query(
r#"
INSERT INTO plugin_packages
(name, version, description, publisher, source, registry_url, wasm_sha256, wasm_bytes, manifest, signature)
VALUES
($1, $2, $3, $4, 'upload', NULL, $5, $6, $7, $8)
ON CONFLICT (name, version, publisher, wasm_sha256)
DO UPDATE SET
description = EXCLUDED.description,
manifest = EXCLUDED.manifest,
signature = EXCLUDED.signature
RETURNING id
"#,
)
.bind(&req.name)
.bind(&req.version)
.bind(&req.description)
.bind(&publisher)
.bind(&wasm_sha256)
.bind(&wasm_bytes)
.bind(&req.manifest)
.bind(&signature_bytes)
.fetch_one(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.try_get::<Uuid, _>("id")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let row = sqlx::query(
r#"
INSERT INTO community_plugin_packages (community_id, package_id, installed_by, is_active)
VALUES ($1, $2, $3, true)
ON CONFLICT (community_id, package_id)
DO UPDATE SET is_active = EXCLUDED.is_active
RETURNING installed_at, is_active
"#,
)
.bind(community_id)
.bind(package_id)
.bind(auth.user_id)
.fetch_one(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
// Deactivate other active versions of this plugin in THIS community.
let rows = sqlx::query(
r#"
SELECT cpp.package_id, COALESCE(cpp.settings,'{}'::jsonb) as settings
FROM community_plugin_packages cpp
JOIN plugin_packages pp ON pp.id = cpp.package_id
WHERE cpp.community_id = $1
AND cpp.is_active = true
AND pp.name = $2
AND cpp.package_id <> $3
"#,
)
.bind(community_id)
.bind(&req.name)
.bind(package_id)
.fetch_all(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
for r in rows {
let other_package_id: Uuid = r
.try_get("package_id")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let other_settings: Value = r
.try_get("settings")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let _ = sqlx::query(
"UPDATE community_plugin_packages SET is_active = false WHERE community_id = $1 AND package_id = $2",
)
.bind(community_id)
.bind(other_package_id)
.execute(&pool)
.await;
plugins
.handle_community_plugin_package_change(
community_id,
Some(auth.user_id),
other_package_id,
true,
other_settings.clone(),
false,
other_settings,
)
.await;
}
// Activate lifecycle hook for the newly uploaded package.
plugins
.handle_community_plugin_package_change(
community_id,
Some(auth.user_id),
package_id,
false,
json!({}),
true,
json!({}),
)
.await;
let installed_at = row
.try_get::<DateTime<Utc>, _>("installed_at")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let is_active = row
.try_get::<bool, _>("is_active")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let _ = sqlx::query!(
r#"INSERT INTO public_events (community_id, actor_user_id, plugin_name, event_type, payload)
VALUES ($1, $2, NULL, 'plugin.package_uploaded', $3)"#,
community_id,
auth.user_id,
json!({"name": req.name, "version": req.version, "publisher": publisher, "sha256": wasm_sha256})
)
.execute(&pool)
.await;
Ok(Json(CommunityPluginPackageInfo {
package_id,
name: req.name,
version: req.version,
description: req.description,
publisher,
source: "upload".to_string(),
registry_url: None,
wasm_sha256,
manifest: req.manifest,
settings: json!({}),
signature_present: signature_bytes.is_some(),
installed_at,
is_active,
}))
}
async fn install_registry_plugin_package(
auth: AuthUser,
Path(community_id): Path<Uuid>,
State(pool): State<PgPool>,
Extension(plugins): Extension<Arc<PluginManager>>,
Json(req): Json<InstallRegistryPluginPackageRequest>,
) -> Result<Json<CommunityPluginPackageInfo>, (StatusCode, String)> {
ensure_admin_or_moderator(&pool, auth.user_id, community_id).await?;
let settings = load_community_settings(&pool, community_id).await?;
let trust_policy = parse_trust_policy(&settings);
let sources = parse_install_sources(&settings);
let trusted_publishers = parse_string_list(&settings, "plugin_trusted_publishers");
let registry_allowlist = parse_string_list(&settings, "plugin_registry_allowlist");
if !sources.contains(&PluginInstallSource::Registry) {
return Err((
StatusCode::FORBIDDEN,
"Registry installs are disabled by policy".to_string(),
));
}
let url = Url::parse(&req.url)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid registry URL".to_string()))?;
match url.scheme() {
"https" | "http" => {}
_ => {
return Err((
StatusCode::BAD_REQUEST,
"Invalid registry URL scheme".to_string(),
))
}
}
enforce_registry_allowlist(&url, &registry_allowlist)?;
let res = reqwest::get(url.clone())
.await
.map_err(|e| (StatusCode::BAD_GATEWAY, e.to_string()))?;
if !res.status().is_success() {
return Err((StatusCode::BAD_GATEWAY, "Registry fetch failed".to_string()));
}
let bundle: UploadPluginPackageRequest = res.json().await.map_err(|_| {
(
StatusCode::BAD_GATEWAY,
"Invalid registry response".to_string(),
)
})?;
let parsed_manifest: PluginManifest =
serde_json::from_value(bundle.manifest.clone()).map_err(|e| {
(
StatusCode::BAD_GATEWAY,
format!("Registry returned invalid manifest: {e}"),
)
})?;
if parsed_manifest.name != bundle.name {
return Err((
StatusCode::BAD_GATEWAY,
"Registry manifest name mismatch".to_string(),
));
}
if parsed_manifest.version != bundle.version {
return Err((
StatusCode::BAD_GATEWAY,
"Registry manifest version mismatch".to_string(),
));
}
let publisher = bundle.publisher.unwrap_or_default();
let wasm_bytes = decode_base64(&bundle.wasm_base64)?;
let wasm_sha256 = sha256_hex(&wasm_bytes);
let signature_bytes = verify_signature_if_required(
trust_policy,
&trusted_publishers,
&publisher,
bundle.signature_base64.as_deref(),
&wasm_sha256,
)?;
let package_id = sqlx::query(
r#"
INSERT INTO plugin_packages
(name, version, description, publisher, source, registry_url, wasm_sha256, wasm_bytes, manifest, signature)
VALUES
($1, $2, $3, $4, 'registry', $5, $6, $7, $8, $9)
ON CONFLICT (name, version, publisher, wasm_sha256)
DO UPDATE SET
description = EXCLUDED.description,
manifest = EXCLUDED.manifest,
signature = EXCLUDED.signature,
registry_url = EXCLUDED.registry_url
RETURNING id
"#,
)
.bind(&bundle.name)
.bind(&bundle.version)
.bind(&bundle.description)
.bind(&publisher)
.bind(url.as_str())
.bind(&wasm_sha256)
.bind(&wasm_bytes)
.bind(&bundle.manifest)
.bind(&signature_bytes)
.fetch_one(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.try_get::<Uuid, _>("id")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let row = sqlx::query(
r#"
INSERT INTO community_plugin_packages (community_id, package_id, installed_by, is_active)
VALUES ($1, $2, $3, true)
ON CONFLICT (community_id, package_id)
DO UPDATE SET is_active = EXCLUDED.is_active
RETURNING installed_at, is_active
"#,
)
.bind(community_id)
.bind(package_id)
.bind(auth.user_id)
.fetch_one(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
// Deactivate other active versions of this plugin in THIS community.
let rows = sqlx::query(
r#"
SELECT cpp.package_id, COALESCE(cpp.settings,'{}'::jsonb) as settings
FROM community_plugin_packages cpp
JOIN plugin_packages pp ON pp.id = cpp.package_id
WHERE cpp.community_id = $1
AND cpp.is_active = true
AND pp.name = $2
AND cpp.package_id <> $3
"#,
)
.bind(community_id)
.bind(&bundle.name)
.bind(package_id)
.fetch_all(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
for r in rows {
let other_package_id: Uuid = r
.try_get("package_id")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let other_settings: Value = r
.try_get("settings")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let _ = sqlx::query(
"UPDATE community_plugin_packages SET is_active = false WHERE community_id = $1 AND package_id = $2",
)
.bind(community_id)
.bind(other_package_id)
.execute(&pool)
.await;
plugins
.handle_community_plugin_package_change(
community_id,
Some(auth.user_id),
other_package_id,
true,
other_settings.clone(),
false,
other_settings,
)
.await;
}
// Activate lifecycle hook for the newly installed package.
plugins
.handle_community_plugin_package_change(
community_id,
Some(auth.user_id),
package_id,
false,
json!({}),
true,
json!({}),
)
.await;
let installed_at = row
.try_get::<DateTime<Utc>, _>("installed_at")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let is_active = row
.try_get::<bool, _>("is_active")
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let _ = sqlx::query!(
r#"INSERT INTO public_events (community_id, actor_user_id, plugin_name, event_type, payload)
VALUES ($1, $2, NULL, 'plugin.package_installed', $3)"#,
community_id,
auth.user_id,
json!({"name": bundle.name, "version": bundle.version, "publisher": publisher, "sha256": wasm_sha256, "registry_url": url.as_str()})
)
.execute(&pool)
.await;
Ok(Json(CommunityPluginPackageInfo {
package_id,
name: bundle.name,
version: bundle.version,
description: bundle.description,
publisher,
source: "registry".to_string(),
registry_url: Some(url.as_str().to_string()),
wasm_sha256,
manifest: bundle.manifest,
settings: json!({}),
signature_present: signature_bytes.is_some(),
installed_at,
is_active,
}))
}
fn parse_trust_policy(settings: &Value) -> PluginTrustPolicy {
match settings.get("plugin_trust_policy").and_then(|v| v.as_str()) {
Some("unsigned_allowed") => PluginTrustPolicy::UnsignedAllowed,
_ => PluginTrustPolicy::SignedOnly,
}
}
fn trust_policy_str(policy: PluginTrustPolicy) -> &'static str {
match policy {
PluginTrustPolicy::SignedOnly => "signed_only",
PluginTrustPolicy::UnsignedAllowed => "unsigned_allowed",
}
}
fn parse_install_sources(settings: &Value) -> Vec<PluginInstallSource> {
let Some(arr) = settings
.get("plugin_install_sources")
.and_then(|v| v.as_array())
else {
return vec![PluginInstallSource::Upload, PluginInstallSource::Registry];
};
let mut sources: Vec<PluginInstallSource> = Vec::new();
for v in arr {
match v.as_str() {
Some("upload") => sources.push(PluginInstallSource::Upload),
Some("registry") => sources.push(PluginInstallSource::Registry),
_ => {}
}
}
sources.sort_by_key(|s| match s {
PluginInstallSource::Upload => 0,
PluginInstallSource::Registry => 1,
});
sources.dedup();
if sources.is_empty() {
vec![PluginInstallSource::Upload, PluginInstallSource::Registry]
} else {
sources
}
}
fn install_sources_json(sources: &[PluginInstallSource]) -> Value {
let mut v: Vec<Value> = sources
.iter()
.map(|s| match s {
PluginInstallSource::Upload => Value::String("upload".to_string()),
PluginInstallSource::Registry => Value::String("registry".to_string()),
})
.collect();
v.sort_by(|a, b| a.as_str().unwrap_or("").cmp(b.as_str().unwrap_or("")));
v.dedup();
Value::Array(v)
}
fn parse_bool(settings: &Value, key: &str, default: bool) -> bool {
settings
.get(key)
.and_then(|v| v.as_bool())
.unwrap_or(default)
}
fn parse_string_list(settings: &Value, key: &str) -> Vec<String> {
let Some(arr) = settings.get(key).and_then(|v| v.as_array()) else {
return Vec::new();
};
let mut out: Vec<String> = arr
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
out.sort();
out.dedup();
out
}
async fn list_community_plugins(
auth: AuthUser,
Path(community_id): Path<Uuid>,
State(pool): State<PgPool>,
) -> Result<Json<Vec<CommunityPluginInfo>>, (StatusCode, String)> {
let membership = sqlx::query!(
"SELECT role FROM community_members WHERE user_id = $1 AND community_id = $2",
auth.user_id,
community_id
)
.fetch_optional(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
match membership {
Some(m) if m.role == "admin" || m.role == "moderator" => {}
_ => {
return Err((
StatusCode::FORBIDDEN,
"Must be admin or moderator".to_string(),
))
}
}
let rows = sqlx::query!(
r#"
SELECT
p.name,
p.version,
p.description,
p.is_core,
p.is_active as global_is_active,
COALESCE(cp.is_active, false) as "community_is_active!",
COALESCE(cp.settings, '{}'::jsonb) as "settings!: serde_json::Value",
p.settings_schema as "settings_schema: serde_json::Value"
FROM plugins p
LEFT JOIN community_plugins cp
ON cp.plugin_id = p.id AND cp.community_id = $1
ORDER BY p.is_core DESC, p.name ASC
"#,
community_id
)
.fetch_all(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(
rows.into_iter()
.map(|r| CommunityPluginInfo {
name: r.name,
version: r.version,
description: r.description,
is_core: r.is_core,
global_is_active: r.global_is_active,
community_is_active: r.community_is_active,
settings: r.settings,
settings_schema: r.settings_schema,
})
.collect(),
))
}
async fn update_community_plugin(
auth: AuthUser,
Path((community_id, plugin_name)): Path<(Uuid, String)>,
State(pool): State<PgPool>,
Extension(plugins): Extension<Arc<PluginManager>>,
Json(req): Json<UpdateCommunityPluginRequest>,
) -> Result<Json<CommunityPluginInfo>, (StatusCode, String)> {
if req.is_active.is_none() && req.settings.is_none() {
return Err((
StatusCode::BAD_REQUEST,
"Must provide is_active and/or settings".to_string(),
));
}
let membership = sqlx::query!(
"SELECT role FROM community_members WHERE user_id = $1 AND community_id = $2",
auth.user_id,
community_id
)
.fetch_optional(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
match membership {
Some(m) if m.role == "admin" || m.role == "moderator" => {}
_ => {
return Err((
StatusCode::FORBIDDEN,
"Must be admin or moderator".to_string(),
))
}
}
let plugin = sqlx::query!(
"SELECT id, name, version, description, is_core, is_active, settings_schema FROM plugins WHERE name = $1",
plugin_name
)
.fetch_optional(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((StatusCode::NOT_FOUND, "Plugin not found".to_string()))?;
if !plugin.is_active {
return Err((
StatusCode::CONFLICT,
"Plugin is disabled globally".to_string(),
));
}
if plugin.is_core {
if let Some(false) = req.is_active {
return Err((
StatusCode::FORBIDDEN,
"Core plugins cannot be deactivated".to_string(),
));
}
}
if let Some(settings) = &req.settings {
if let Some(schema) = &plugin.settings_schema {
let compiled = JSONSchema::options()
.with_draft(Draft::Draft7)
.compile(schema)
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Invalid settings schema for plugin {}: {}", plugin.name, e),
)
})?;
if !compiled.is_valid(settings) {
let msgs: Vec<String> = match compiled.validate(settings) {
Ok(()) => vec!["Invalid settings".to_string()],
Err(errors) => errors.take(5).map(|e| e.to_string()).collect(),
};
return Err((
StatusCode::BAD_REQUEST,
format!("Invalid settings: {}", msgs.join("; ")),
));
}
}
}
let old = sqlx::query!(
r#"
SELECT
is_active,
settings as "settings!: serde_json::Value"
FROM community_plugins
WHERE community_id = $1 AND plugin_id = $2
"#,
community_id,
plugin.id
)
.fetch_optional(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let old_is_active = old.as_ref().map(|r| r.is_active).unwrap_or(false);
let old_settings = old
.as_ref()
.map(|r| r.settings.clone())
.unwrap_or_else(|| json!({}));
let settings_to_apply = req.settings.clone();
let is_active_to_apply = req.is_active;
let row = sqlx::query!(
r#"
INSERT INTO community_plugins (community_id, plugin_id, settings, is_active)
VALUES ($1, $2, COALESCE($3, '{}'::jsonb), COALESCE($4, true))
ON CONFLICT (community_id, plugin_id)
DO UPDATE SET
settings = COALESCE($3, community_plugins.settings),
is_active = COALESCE($4, community_plugins.is_active),
activated_at = CASE
WHEN COALESCE($4, community_plugins.is_active) = true AND community_plugins.is_active = false THEN NOW()
ELSE community_plugins.activated_at
END
RETURNING is_active as community_is_active,
settings as "settings!: serde_json::Value"
"#,
community_id,
plugin.id,
settings_to_apply,
is_active_to_apply
)
.fetch_one(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let event_type = match is_active_to_apply {
Some(true) => Some("plugin.activated"),
Some(false) => Some("plugin.deactivated"),
None => None,
};
if let Some(event_type) = event_type {
let _ = sqlx::query!(
r#"INSERT INTO public_events (community_id, actor_user_id, plugin_name, event_type, payload)
VALUES ($1, $2, $3, $4, $5)"#,
community_id,
auth.user_id,
plugin.name,
event_type,
serde_json::json!({})
)
.execute(&pool)
.await;
}
if let Some(settings) = req.settings.as_ref() {
let keys = redacted_settings_keys(settings);
let _ = sqlx::query!(
r#"INSERT INTO public_events (community_id, actor_user_id, plugin_name, event_type, payload)
VALUES ($1, $2, $3, 'plugin.settings_updated', $4)"#,
community_id,
auth.user_id,
plugin.name,
serde_json::json!({"keys": keys})
)
.execute(&pool)
.await;
}
plugins
.handle_community_plugin_change(
community_id,
Some(auth.user_id),
&plugin.name,
old_is_active,
old_settings,
row.community_is_active,
row.settings.clone(),
)
.await;
Ok(Json(CommunityPluginInfo {
name: plugin.name,
version: plugin.version,
description: plugin.description,
is_core: plugin.is_core,
global_is_active: plugin.is_active,
community_is_active: row.community_is_active,
settings: row.settings,
settings_schema: plugin.settings_schema,
}))
}