mirror of
https://codeberg.org/likwid/likwid.git
synced 2026-06-25 15:37:42 +00:00
411 lines
12 KiB
Rust
411 lines
12 KiB
Rust
|
|
use async_trait::async_trait;
|
||
|
|
use serde::{Deserialize, Serialize};
|
||
|
|
use serde_json::{json, Value};
|
||
|
|
use sqlx::PgPool;
|
||
|
|
use std::sync::Arc;
|
||
|
|
use uuid::Uuid;
|
||
|
|
|
||
|
|
use crate::plugins::{
|
||
|
|
hooks::HookContext,
|
||
|
|
manager::PluginSystem,
|
||
|
|
Plugin, PluginError, PluginMetadata, PluginScope,
|
||
|
|
};
|
||
|
|
|
||
|
|
pub struct FederationPlugin;
|
||
|
|
|
||
|
|
impl FederationPlugin {
|
||
|
|
pub fn new() -> Self {
|
||
|
|
Self
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
#[async_trait]
|
||
|
|
impl Plugin for FederationPlugin {
|
||
|
|
fn metadata(&self) -> PluginMetadata {
|
||
|
|
PluginMetadata {
|
||
|
|
name: "federation",
|
||
|
|
version: "1.0.0",
|
||
|
|
description: "Multi-community federation and cross-instance collaboration",
|
||
|
|
is_core: false,
|
||
|
|
scope: PluginScope::Global,
|
||
|
|
default_enabled: false,
|
||
|
|
settings_schema: Some(json!({
|
||
|
|
"type": "object",
|
||
|
|
"properties": {
|
||
|
|
"allow_incoming_requests": {"type": "boolean", "default": true},
|
||
|
|
"min_trust_level": {"type": "integer", "default": 2},
|
||
|
|
"sync_interval_minutes": {"type": "integer", "default": 15}
|
||
|
|
}
|
||
|
|
})),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
fn register(&self, system: &mut PluginSystem) {
|
||
|
|
// Hook: Sync federated proposals periodically
|
||
|
|
system.add_action(
|
||
|
|
"cron.every_15_minutes",
|
||
|
|
"federation".to_string(),
|
||
|
|
50,
|
||
|
|
Arc::new(|ctx: HookContext, _payload: Value| {
|
||
|
|
Box::pin(async move {
|
||
|
|
FederationService::sync_all_federations(&ctx.pool).await?;
|
||
|
|
Ok(())
|
||
|
|
})
|
||
|
|
}),
|
||
|
|
);
|
||
|
|
|
||
|
|
// Hook: Share proposal when created in federated community
|
||
|
|
system.add_action(
|
||
|
|
"proposal.created",
|
||
|
|
"federation".to_string(),
|
||
|
|
100,
|
||
|
|
Arc::new(|ctx: HookContext, payload: Value| {
|
||
|
|
Box::pin(async move {
|
||
|
|
if let Some(proposal_id) = payload.get("proposal_id")
|
||
|
|
.and_then(|v| v.as_str())
|
||
|
|
.and_then(|s| Uuid::parse_str(s).ok())
|
||
|
|
{
|
||
|
|
FederationService::share_proposal_if_federated(&ctx.pool, proposal_id).await?;
|
||
|
|
}
|
||
|
|
Ok(())
|
||
|
|
})
|
||
|
|
}),
|
||
|
|
);
|
||
|
|
|
||
|
|
// Hook: Broadcast decision results
|
||
|
|
system.add_action(
|
||
|
|
"proposal.decided",
|
||
|
|
"federation".to_string(),
|
||
|
|
100,
|
||
|
|
Arc::new(|ctx: HookContext, payload: Value| {
|
||
|
|
Box::pin(async move {
|
||
|
|
if let Some(proposal_id) = payload.get("proposal_id")
|
||
|
|
.and_then(|v| v.as_str())
|
||
|
|
.and_then(|s| Uuid::parse_str(s).ok())
|
||
|
|
{
|
||
|
|
FederationService::broadcast_decision(&ctx.pool, proposal_id).await?;
|
||
|
|
}
|
||
|
|
Ok(())
|
||
|
|
})
|
||
|
|
}),
|
||
|
|
);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
|
|
pub struct FederatedInstance {
|
||
|
|
pub id: Uuid,
|
||
|
|
pub instance_url: String,
|
||
|
|
pub instance_name: String,
|
||
|
|
pub status: String,
|
||
|
|
pub trust_level: i32,
|
||
|
|
}
|
||
|
|
|
||
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
|
|
pub struct CommunityFederation {
|
||
|
|
pub id: Uuid,
|
||
|
|
pub local_community_id: Uuid,
|
||
|
|
pub remote_instance_id: Uuid,
|
||
|
|
pub remote_community_id: Uuid,
|
||
|
|
pub remote_community_name: Option<String>,
|
||
|
|
pub status: String,
|
||
|
|
}
|
||
|
|
|
||
|
|
pub struct FederationService;
|
||
|
|
|
||
|
|
impl FederationService {
|
||
|
|
/// Register a new federated instance
|
||
|
|
pub async fn register_instance(
|
||
|
|
pool: &PgPool,
|
||
|
|
url: &str,
|
||
|
|
name: &str,
|
||
|
|
description: Option<&str>,
|
||
|
|
public_key: Option<&str>,
|
||
|
|
) -> Result<Uuid, PluginError> {
|
||
|
|
let instance_id: Uuid = sqlx::query_scalar(
|
||
|
|
"SELECT register_federated_instance($1, $2, $3, $4)"
|
||
|
|
)
|
||
|
|
.bind(url)
|
||
|
|
.bind(name)
|
||
|
|
.bind(description)
|
||
|
|
.bind(public_key)
|
||
|
|
.fetch_one(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
Ok(instance_id)
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Create a federation between communities
|
||
|
|
pub async fn create_federation(
|
||
|
|
pool: &PgPool,
|
||
|
|
local_community_id: Uuid,
|
||
|
|
remote_instance_id: Uuid,
|
||
|
|
remote_community_id: Uuid,
|
||
|
|
remote_community_name: &str,
|
||
|
|
sync_direction: &str,
|
||
|
|
) -> Result<Uuid, PluginError> {
|
||
|
|
let federation_id: Uuid = sqlx::query_scalar(
|
||
|
|
"SELECT create_community_federation($1, $2, $3, $4, $5::sync_direction)"
|
||
|
|
)
|
||
|
|
.bind(local_community_id)
|
||
|
|
.bind(remote_instance_id)
|
||
|
|
.bind(remote_community_id)
|
||
|
|
.bind(remote_community_name)
|
||
|
|
.bind(sync_direction)
|
||
|
|
.fetch_one(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
Ok(federation_id)
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Approve a federation locally
|
||
|
|
pub async fn approve_federation(
|
||
|
|
pool: &PgPool,
|
||
|
|
federation_id: Uuid,
|
||
|
|
_approved_by: Uuid,
|
||
|
|
) -> Result<(), PluginError> {
|
||
|
|
sqlx::query!(
|
||
|
|
r#"UPDATE community_federations SET
|
||
|
|
approved_locally = true,
|
||
|
|
status = CASE WHEN approved_remotely THEN 'active'::federation_status ELSE status END,
|
||
|
|
updated_at = NOW()
|
||
|
|
WHERE id = $1"#,
|
||
|
|
federation_id
|
||
|
|
)
|
||
|
|
.execute(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
sqlx::query!(
|
||
|
|
r#"INSERT INTO federation_sync_log (federation_id, operation_type, direction, success)
|
||
|
|
VALUES ($1, 'local_approval', 'push', true)"#,
|
||
|
|
federation_id
|
||
|
|
)
|
||
|
|
.execute(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
Ok(())
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Sync all active federations
|
||
|
|
pub async fn sync_all_federations(pool: &PgPool) -> Result<i32, PluginError> {
|
||
|
|
let federations = sqlx::query!(
|
||
|
|
r#"SELECT cf.id, cf.local_community_id, cf.remote_instance_id,
|
||
|
|
fi.instance_url, cf.sync_direction::text AS "sync_direction!"
|
||
|
|
FROM community_federations cf
|
||
|
|
JOIN federated_instances fi ON fi.id = cf.remote_instance_id
|
||
|
|
WHERE cf.status = 'active' AND fi.status = 'active'"#
|
||
|
|
)
|
||
|
|
.fetch_all(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
let mut synced = 0;
|
||
|
|
for fed in federations {
|
||
|
|
// In a real implementation, this would make HTTP requests to remote instances
|
||
|
|
// For now, just log the sync attempt
|
||
|
|
let start = std::time::Instant::now();
|
||
|
|
|
||
|
|
sqlx::query(
|
||
|
|
r#"INSERT INTO federation_sync_log
|
||
|
|
(federation_id, instance_id, operation_type, direction, success, duration_ms)
|
||
|
|
VALUES ($1, $2, 'scheduled_sync', $3::sync_direction, true, $4)"#
|
||
|
|
)
|
||
|
|
.bind(fed.id)
|
||
|
|
.bind(fed.remote_instance_id)
|
||
|
|
.bind(&fed.sync_direction)
|
||
|
|
.bind(start.elapsed().as_millis() as i32)
|
||
|
|
.execute(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
sqlx::query!(
|
||
|
|
"UPDATE federated_instances SET last_sync_at = NOW(), total_syncs = total_syncs + 1 WHERE id = $1",
|
||
|
|
fed.remote_instance_id
|
||
|
|
)
|
||
|
|
.execute(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
synced += 1;
|
||
|
|
}
|
||
|
|
|
||
|
|
Ok(synced)
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Share a proposal to federated communities
|
||
|
|
pub async fn share_proposal_if_federated(
|
||
|
|
pool: &PgPool,
|
||
|
|
proposal_id: Uuid,
|
||
|
|
) -> Result<(), PluginError> {
|
||
|
|
// Get the proposal's community
|
||
|
|
let proposal = sqlx::query!(
|
||
|
|
"SELECT community_id FROM proposals WHERE id = $1",
|
||
|
|
proposal_id
|
||
|
|
)
|
||
|
|
.fetch_optional(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
let Some(proposal) = proposal else {
|
||
|
|
return Ok(());
|
||
|
|
};
|
||
|
|
|
||
|
|
// Check for active federations that sync proposals
|
||
|
|
let federations = sqlx::query!(
|
||
|
|
r#"SELECT id, remote_instance_id, remote_community_id
|
||
|
|
FROM community_federations
|
||
|
|
WHERE local_community_id = $1
|
||
|
|
AND status = 'active'
|
||
|
|
AND sync_proposals = true
|
||
|
|
AND sync_direction IN ('push', 'bidirectional')"#,
|
||
|
|
proposal.community_id
|
||
|
|
)
|
||
|
|
.fetch_all(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
for fed in federations {
|
||
|
|
// Create federated proposal record
|
||
|
|
sqlx::query!(
|
||
|
|
r#"INSERT INTO federated_proposals
|
||
|
|
(federation_id, local_proposal_id, remote_proposal_id, is_origin_local)
|
||
|
|
VALUES ($1, $2, $2, true)
|
||
|
|
ON CONFLICT DO NOTHING"#,
|
||
|
|
fed.id,
|
||
|
|
proposal_id
|
||
|
|
)
|
||
|
|
.execute(pool)
|
||
|
|
.await?;
|
||
|
|
}
|
||
|
|
|
||
|
|
Ok(())
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Broadcast decision to federated communities
|
||
|
|
pub async fn broadcast_decision(
|
||
|
|
pool: &PgPool,
|
||
|
|
proposal_id: Uuid,
|
||
|
|
) -> Result<(), PluginError> {
|
||
|
|
// Find federated proposal
|
||
|
|
let federated = sqlx::query!(
|
||
|
|
r#"SELECT fp.id, fp.federation_id
|
||
|
|
FROM federated_proposals fp
|
||
|
|
WHERE fp.local_proposal_id = $1"#,
|
||
|
|
proposal_id
|
||
|
|
)
|
||
|
|
.fetch_all(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
for fp in federated {
|
||
|
|
// Get local vote results
|
||
|
|
let results = sqlx::query!(
|
||
|
|
r#"SELECT COUNT(*)::int AS total_votes
|
||
|
|
FROM votes WHERE proposal_id = $1"#,
|
||
|
|
proposal_id
|
||
|
|
)
|
||
|
|
.fetch_one(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
// Create or update federated decision
|
||
|
|
sqlx::query!(
|
||
|
|
r#"INSERT INTO federated_decisions
|
||
|
|
(federated_proposal_id, decision_type, outcome, total_votes, is_final)
|
||
|
|
VALUES ($1, 'vote', 'pending', $2, false)
|
||
|
|
ON CONFLICT DO NOTHING"#,
|
||
|
|
fp.id,
|
||
|
|
results.total_votes.unwrap_or(0)
|
||
|
|
)
|
||
|
|
.execute(pool)
|
||
|
|
.await?;
|
||
|
|
}
|
||
|
|
|
||
|
|
Ok(())
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Get federation statistics for a community
|
||
|
|
pub async fn get_stats(pool: &PgPool, community_id: Uuid) -> Result<Value, PluginError> {
|
||
|
|
let stats = sqlx::query!(
|
||
|
|
"SELECT * FROM get_federation_stats($1)",
|
||
|
|
community_id
|
||
|
|
)
|
||
|
|
.fetch_one(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
Ok(json!({
|
||
|
|
"total_federations": stats.total_federations,
|
||
|
|
"active_federations": stats.active_federations,
|
||
|
|
"federated_proposals": stats.federated_proposals,
|
||
|
|
"total_syncs": stats.total_syncs,
|
||
|
|
"last_sync": stats.last_sync
|
||
|
|
}))
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Get all instances
|
||
|
|
pub async fn get_instances(pool: &PgPool) -> Result<Vec<FederatedInstance>, PluginError> {
|
||
|
|
let instances = sqlx::query_as!(
|
||
|
|
FederatedInstance,
|
||
|
|
r#"SELECT id, instance_url, instance_name, status::text AS "status!", trust_level
|
||
|
|
FROM federated_instances ORDER BY instance_name"#
|
||
|
|
)
|
||
|
|
.fetch_all(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
Ok(instances)
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Get federations for a community
|
||
|
|
pub async fn get_community_federations(
|
||
|
|
pool: &PgPool,
|
||
|
|
community_id: Uuid,
|
||
|
|
) -> Result<Vec<CommunityFederation>, PluginError> {
|
||
|
|
let federations = sqlx::query_as!(
|
||
|
|
CommunityFederation,
|
||
|
|
r#"SELECT id, local_community_id, remote_instance_id, remote_community_id,
|
||
|
|
remote_community_name, status::text AS "status!"
|
||
|
|
FROM community_federations
|
||
|
|
WHERE local_community_id = $1"#,
|
||
|
|
community_id
|
||
|
|
)
|
||
|
|
.fetch_all(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
Ok(federations)
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Submit a federation request (handles incoming requests from other instances)
|
||
|
|
pub async fn request_federation(
|
||
|
|
pool: &PgPool,
|
||
|
|
from_instance_url: &str,
|
||
|
|
from_community_name: &str,
|
||
|
|
to_community_id: Uuid,
|
||
|
|
message: Option<&str>,
|
||
|
|
) -> Result<Uuid, PluginError> {
|
||
|
|
let request_id = sqlx::query_scalar!(
|
||
|
|
r#"INSERT INTO federation_requests
|
||
|
|
(from_instance_url, from_community_name, to_community_id, request_message)
|
||
|
|
VALUES ($1, $2, $3, $4)
|
||
|
|
RETURNING id"#,
|
||
|
|
from_instance_url,
|
||
|
|
from_community_name,
|
||
|
|
to_community_id,
|
||
|
|
message
|
||
|
|
)
|
||
|
|
.fetch_one(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
Ok(request_id)
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Update instance trust level
|
||
|
|
pub async fn set_trust_level(
|
||
|
|
pool: &PgPool,
|
||
|
|
instance_id: Uuid,
|
||
|
|
trust_level: i32,
|
||
|
|
) -> Result<(), PluginError> {
|
||
|
|
sqlx::query!(
|
||
|
|
"UPDATE federated_instances SET trust_level = $2, updated_at = NOW() WHERE id = $1",
|
||
|
|
instance_id,
|
||
|
|
trust_level.clamp(1, 5)
|
||
|
|
)
|
||
|
|
.execute(pool)
|
||
|
|
.await?;
|
||
|
|
|
||
|
|
Ok(())
|
||
|
|
}
|
||
|
|
}
|