use async_trait::async_trait; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use sqlx::PgPool; use std::sync::Arc; use uuid::Uuid; use crate::plugins::{ hooks::HookContext, manager::PluginSystem, Plugin, PluginError, PluginMetadata, PluginScope, }; pub struct FederationPlugin; impl FederationPlugin { pub fn new() -> Self { Self } } #[async_trait] impl Plugin for FederationPlugin { fn metadata(&self) -> PluginMetadata { PluginMetadata { name: "federation", version: "1.0.0", description: "Multi-community federation and cross-instance collaboration", is_core: false, scope: PluginScope::Global, default_enabled: false, settings_schema: Some(json!({ "type": "object", "properties": { "allow_incoming_requests": {"type": "boolean", "default": true}, "min_trust_level": {"type": "integer", "default": 2}, "sync_interval_minutes": {"type": "integer", "default": 15} } })), } } fn register(&self, system: &mut PluginSystem) { // Hook: Sync federated proposals periodically system.add_action( "cron.every_15_minutes", "federation".to_string(), 50, Arc::new(|ctx: HookContext, _payload: Value| { Box::pin(async move { FederationService::sync_all_federations(&ctx.pool).await?; Ok(()) }) }), ); // Hook: Share proposal when created in federated community system.add_action( "proposal.created", "federation".to_string(), 100, Arc::new(|ctx: HookContext, payload: Value| { Box::pin(async move { if let Some(proposal_id) = payload .get("proposal_id") .and_then(|v| v.as_str()) .and_then(|s| Uuid::parse_str(s).ok()) { FederationService::share_proposal_if_federated(&ctx.pool, proposal_id) .await?; } Ok(()) }) }), ); // Hook: Broadcast decision results system.add_action( "proposal.decided", "federation".to_string(), 100, Arc::new(|ctx: HookContext, payload: Value| { Box::pin(async move { if let Some(proposal_id) = payload .get("proposal_id") .and_then(|v| v.as_str()) .and_then(|s| Uuid::parse_str(s).ok()) { FederationService::broadcast_decision(&ctx.pool, proposal_id).await?; } Ok(()) }) }), ); } } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FederatedInstance { pub id: Uuid, pub instance_url: String, pub instance_name: String, pub status: String, pub trust_level: i32, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CommunityFederation { pub id: Uuid, pub local_community_id: Uuid, pub remote_instance_id: Uuid, pub remote_community_id: Uuid, pub remote_community_name: Option, pub status: String, } pub struct FederationService; impl FederationService { /// Register a new federated instance pub async fn register_instance( pool: &PgPool, url: &str, name: &str, description: Option<&str>, public_key: Option<&str>, ) -> Result { let instance_id: Uuid = sqlx::query_scalar("SELECT register_federated_instance($1, $2, $3, $4)") .bind(url) .bind(name) .bind(description) .bind(public_key) .fetch_one(pool) .await?; Ok(instance_id) } /// Create a federation between communities pub async fn create_federation( pool: &PgPool, local_community_id: Uuid, remote_instance_id: Uuid, remote_community_id: Uuid, remote_community_name: &str, sync_direction: &str, ) -> Result { let federation_id: Uuid = sqlx::query_scalar( "SELECT create_community_federation($1, $2, $3, $4, $5::sync_direction)", ) .bind(local_community_id) .bind(remote_instance_id) .bind(remote_community_id) .bind(remote_community_name) .bind(sync_direction) .fetch_one(pool) .await?; Ok(federation_id) } /// Approve a federation locally pub async fn approve_federation( pool: &PgPool, federation_id: Uuid, _approved_by: Uuid, ) -> Result<(), PluginError> { sqlx::query!( r#"UPDATE community_federations SET approved_locally = true, status = CASE WHEN approved_remotely THEN 'active'::federation_status ELSE status END, updated_at = NOW() WHERE id = $1"#, federation_id ) .execute(pool) .await?; sqlx::query!( r#"INSERT INTO federation_sync_log (federation_id, operation_type, direction, success) VALUES ($1, 'local_approval', 'push', true)"#, federation_id ) .execute(pool) .await?; Ok(()) } /// Sync all active federations pub async fn sync_all_federations(pool: &PgPool) -> Result { let federations = sqlx::query!( r#"SELECT cf.id, cf.local_community_id, cf.remote_instance_id, fi.instance_url, cf.sync_direction::text AS "sync_direction!" FROM community_federations cf JOIN federated_instances fi ON fi.id = cf.remote_instance_id WHERE cf.status = 'active' AND fi.status = 'active'"# ) .fetch_all(pool) .await?; let mut synced = 0; for fed in federations { // In a real implementation, this would make HTTP requests to remote instances // For now, just log the sync attempt let start = std::time::Instant::now(); sqlx::query( r#"INSERT INTO federation_sync_log (federation_id, instance_id, operation_type, direction, success, duration_ms) VALUES ($1, $2, 'scheduled_sync', $3::sync_direction, true, $4)"#, ) .bind(fed.id) .bind(fed.remote_instance_id) .bind(&fed.sync_direction) .bind(start.elapsed().as_millis() as i32) .execute(pool) .await?; sqlx::query!( "UPDATE federated_instances SET last_sync_at = NOW(), total_syncs = total_syncs + 1 WHERE id = $1", fed.remote_instance_id ) .execute(pool) .await?; synced += 1; } Ok(synced) } /// Share a proposal to federated communities pub async fn share_proposal_if_federated( pool: &PgPool, proposal_id: Uuid, ) -> Result<(), PluginError> { // Get the proposal's community let proposal = sqlx::query!( "SELECT community_id FROM proposals WHERE id = $1", proposal_id ) .fetch_optional(pool) .await?; let Some(proposal) = proposal else { return Ok(()); }; // Check for active federations that sync proposals let federations = sqlx::query!( r#"SELECT id, remote_instance_id, remote_community_id FROM community_federations WHERE local_community_id = $1 AND status = 'active' AND sync_proposals = true AND sync_direction IN ('push', 'bidirectional')"#, proposal.community_id ) .fetch_all(pool) .await?; for fed in federations { // Create federated proposal record sqlx::query!( r#"INSERT INTO federated_proposals (federation_id, local_proposal_id, remote_proposal_id, is_origin_local) VALUES ($1, $2, $2, true) ON CONFLICT DO NOTHING"#, fed.id, proposal_id ) .execute(pool) .await?; } Ok(()) } /// Broadcast decision to federated communities pub async fn broadcast_decision(pool: &PgPool, proposal_id: Uuid) -> Result<(), PluginError> { // Find federated proposal let federated = sqlx::query!( r#"SELECT fp.id, fp.federation_id FROM federated_proposals fp WHERE fp.local_proposal_id = $1"#, proposal_id ) .fetch_all(pool) .await?; for fp in federated { // Get local vote results let results = sqlx::query!( r#"SELECT COUNT(*)::int AS total_votes FROM votes WHERE proposal_id = $1"#, proposal_id ) .fetch_one(pool) .await?; // Create or update federated decision sqlx::query!( r#"INSERT INTO federated_decisions (federated_proposal_id, decision_type, outcome, total_votes, is_final) VALUES ($1, 'vote', 'pending', $2, false) ON CONFLICT DO NOTHING"#, fp.id, results.total_votes.unwrap_or(0) ) .execute(pool) .await?; } Ok(()) } /// Get federation statistics for a community pub async fn get_stats(pool: &PgPool, community_id: Uuid) -> Result { let stats = sqlx::query!("SELECT * FROM get_federation_stats($1)", community_id) .fetch_one(pool) .await?; Ok(json!({ "total_federations": stats.total_federations, "active_federations": stats.active_federations, "federated_proposals": stats.federated_proposals, "total_syncs": stats.total_syncs, "last_sync": stats.last_sync })) } /// Get all instances pub async fn get_instances(pool: &PgPool) -> Result, PluginError> { let instances = sqlx::query_as!( FederatedInstance, r#"SELECT id, instance_url, instance_name, status::text AS "status!", trust_level FROM federated_instances ORDER BY instance_name"# ) .fetch_all(pool) .await?; Ok(instances) } /// Get federations for a community pub async fn get_community_federations( pool: &PgPool, community_id: Uuid, ) -> Result, PluginError> { let federations = sqlx::query_as!( CommunityFederation, r#"SELECT id, local_community_id, remote_instance_id, remote_community_id, remote_community_name, status::text AS "status!" FROM community_federations WHERE local_community_id = $1"#, community_id ) .fetch_all(pool) .await?; Ok(federations) } /// Submit a federation request (handles incoming requests from other instances) pub async fn request_federation( pool: &PgPool, from_instance_url: &str, from_community_name: &str, to_community_id: Uuid, message: Option<&str>, ) -> Result { let request_id = sqlx::query_scalar!( r#"INSERT INTO federation_requests (from_instance_url, from_community_name, to_community_id, request_message) VALUES ($1, $2, $3, $4) RETURNING id"#, from_instance_url, from_community_name, to_community_id, message ) .fetch_one(pool) .await?; Ok(request_id) } /// Update instance trust level pub async fn set_trust_level( pool: &PgPool, instance_id: Uuid, trust_level: i32, ) -> Result<(), PluginError> { sqlx::query!( "UPDATE federated_instances SET trust_level = $2, updated_at = NOW() WHERE id = $1", instance_id, trust_level.clamp(1, 5) ) .execute(pool) .await?; Ok(()) } }