diff --git a/backend/Cargo.lock b/backend/Cargo.lock index c8ee70f..d6b4347 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1875,6 +1875,15 @@ dependencies = [ "libc", ] +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "matchit" version = "0.8.4" @@ -3346,6 +3355,7 @@ dependencies = [ "tower-layer", "tower-service", "tracing", + "uuid", ] [[package]] @@ -3410,10 +3420,14 @@ version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex-automata", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] diff --git a/backend/Cargo.toml b/backend/Cargo.toml index b19fef9..e75afc9 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,32 +1,32 @@ -[package] -name = "likwid" -version = "0.1.0" -edition = "2021" -license = "EUPL-1.2" - -[dependencies] -tokio = { version = "1", features = ["full"] } -axum = "0.8" -serde = { version = "1", features = ["derive"] } -serde_json = "1" -sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "uuid", "chrono", "json", "migrate"] } -dotenvy = "0.15" -tracing = "0.1" -tracing-subscriber = "0.3" -uuid = { version = "1", features = ["serde", "v4"] } -chrono = { version = "0.4", features = ["serde"] } -envy = "0.4" -async-trait = "0.1" -tower-http = { version = "0.6", features = ["cors", "trace"] } -argon2 = "0.5" -jsonwebtoken = "9" -axum-extra = { version = "0.10", features = ["typed-header"] } -thiserror = "2" -jsonschema = "0.17" -base64 = "0.21" -sha2 = "0.10" -ed25519-dalek = "2" -reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } -wasmtime = "19" -wasmtime-wasi = "19" -slug = "0.1" +[package] +name = "likwid" +version = "0.1.0" +edition = "2021" +license = "EUPL-1.2" + +[dependencies] +tokio = { version = "1", features = ["full"] } +axum = "0.8" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "uuid", "chrono", "json", "migrate"] } +dotenvy = "0.15" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +uuid = { version = "1", features = ["serde", "v4"] } +chrono = { version = "0.4", features = ["serde"] } +envy = "0.4" +async-trait = "0.1" +tower-http = { version = "0.6", features = ["cors", "trace", "request-id"] } +argon2 = "0.5" +jsonwebtoken = "9" +axum-extra = { version = "0.10", features = ["typed-header"] } +thiserror = "2" +jsonschema = "0.17" +base64 = "0.21" +sha2 = "0.10" +ed25519-dalek = "2" +reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } +wasmtime = "19" +wasmtime-wasi = "19" +slug = "0.1" diff --git a/backend/src/api/demo.rs b/backend/src/api/demo.rs index 2c3c4eb..6da7b0d 100644 --- a/backend/src/api/demo.rs +++ b/backend/src/api/demo.rs @@ -14,7 +14,7 @@ use std::sync::Arc; use super::permissions::{perms, require_permission}; use crate::auth::AuthUser; use crate::config::Config; -use crate::demo::{self, DEMO_ACCOUNTS}; +use crate::demo::{self, DemoResetError, DEMO_ACCOUNTS}; /// Combined state for demo endpoints #[derive(Clone)] @@ -71,7 +71,17 @@ async fn reset_demo(State(state): State, auth: AuthUser) -> impl Into Json(json!({"success": true, "message": "Demo data has been reset to initial state"})), ) .into_response(), - Err(e) => { + Err(DemoResetError::Busy) => ( + StatusCode::CONFLICT, + Json(json!({"error": "Demo reset already in progress"})), + ) + .into_response(), + Err(DemoResetError::LockTimeout) => ( + StatusCode::GATEWAY_TIMEOUT, + Json(json!({"error": "Timed out while acquiring demo reset lock"})), + ) + .into_response(), + Err(DemoResetError::Sql(e)) => { tracing::error!("Failed to reset demo data: {}", e); ( StatusCode::INTERNAL_SERVER_ERROR, diff --git a/backend/src/demo/mod.rs b/backend/src/demo/mod.rs index a8a662e..aab72ba 100644 --- a/backend/src/demo/mod.rs +++ b/backend/src/demo/mod.rs @@ -6,6 +6,19 @@ //! - Provides seeded data for demonstration purposes use sqlx::PgPool; +use std::time::{Duration, Instant}; +use thiserror::Error; +use tokio::time::timeout; + +#[derive(Debug, Error)] +pub enum DemoResetError { + #[error("demo reset already in progress")] + Busy, + #[error("timed out while acquiring demo reset lock")] + LockTimeout, + #[error(transparent)] + Sql(#[from] sqlx::Error), +} /// Demo account credentials: (username, password, display_name) pub const DEMO_ACCOUNTS: &[(&str, &str, &str)] = &[ @@ -28,11 +41,27 @@ pub fn verify_demo_password(username: &str, password: &str) -> bool { /// Reset demo data to initial state /// This clears user-created data and reloads the seed data -pub async fn reset_demo_data(pool: &PgPool) -> Result<(), sqlx::Error> { +pub async fn reset_demo_data(pool: &PgPool) -> Result<(), DemoResetError> { tracing::info!("Resetting demo data to initial state..."); + let start = Instant::now(); let mut tx = pool.begin().await?; + let lock_result = timeout( + Duration::from_secs(2), + sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_xact_lock($1)") + .bind(1_337_001_i64) + .fetch_one(&mut *tx), + ) + .await; + + match lock_result { + Ok(Ok(true)) => {} + Ok(Ok(false)) => return Err(DemoResetError::Busy), + Ok(Err(e)) => return Err(DemoResetError::Sql(e)), + Err(_) => return Err(DemoResetError::LockTimeout), + } + // Remove volatile/user-generated data sqlx::query("DELETE FROM public_events") .execute(&mut *tx) @@ -402,7 +431,10 @@ pub async fn reset_demo_data(pool: &PgPool) -> Result<(), sqlx::Error> { tx.commit().await?; - tracing::info!("Demo data reset complete"); + tracing::info!( + elapsed_ms = start.elapsed().as_millis(), + "Demo data reset complete" + ); Ok(()) } diff --git a/backend/src/main.rs b/backend/src/main.rs index fe93fff..c30cfb6 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -8,16 +8,19 @@ mod plugins; mod rate_limit; mod voting; -use axum::http::{HeaderName, HeaderValue}; +use axum::http::{HeaderName, HeaderValue, Request}; use axum::response::Response; use axum::{middleware, Extension}; use chrono::{Datelike, Timelike, Utc, Weekday}; use serde_json::json; use std::net::SocketAddr; use std::sync::Arc; +use std::time::{Duration, Instant}; use thiserror::Error; use tower_http::cors::{Any, CorsLayer}; +use tower_http::request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer}; use tower_http::trace::TraceLayer; +use tracing_subscriber::EnvFilter; use uuid::Uuid; use crate::config::Config; @@ -43,7 +46,8 @@ enum StartupError { #[tokio::main] async fn main() { - tracing_subscriber::fmt::init(); + let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + tracing_subscriber::fmt().with_env_filter(filter).init(); if let Err(e) = run().await { tracing::error!("{e}"); @@ -68,20 +72,84 @@ async fn run() -> Result<(), StartupError> { let database_url = std::env::var("DATABASE_URL").unwrap_or_else(|_| config.database_url.clone()); - let pool = db::create_pool(&database_url).await?; - - tracing::info!("Connected to database"); + let pool = { + let start = Instant::now(); + let mut attempt: u32 = 1; + loop { + match db::create_pool(&database_url).await { + Ok(pool) => { + tracing::info!( + elapsed_ms = start.elapsed().as_millis(), + "Connected to database" + ); + break pool; + } + Err(e) => { + if attempt >= 30 { + return Err(StartupError::Db(e)); + } + tracing::warn!(attempt, error = %e, "Failed to connect to database; retrying"); + tokio::time::sleep(Duration::from_secs(1)).await; + attempt += 1; + } + } + } + }; let mut migrator = sqlx::migrate!("./migrations"); if config.is_demo() { migrator.set_ignore_missing(true); } - migrator.run(&pool).await?; + + { + let start = Instant::now(); + let mut attempt: u32 = 1; + loop { + match migrator.run(&pool).await { + Ok(()) => { + tracing::info!( + elapsed_ms = start.elapsed().as_millis(), + "Database migrations applied" + ); + break; + } + Err(e) => { + if attempt >= 30 { + return Err(StartupError::Migrations(e)); + } + tracing::warn!(attempt, error = %e, "Database migrations failed; retrying"); + tokio::time::sleep(Duration::from_secs(1)).await; + attempt += 1; + } + } + } + } if config.is_demo() { let mut demo_migrator = sqlx::migrate!("./migrations_demo"); demo_migrator.set_ignore_missing(true); - demo_migrator.run(&pool).await?; + + let start = Instant::now(); + let mut attempt: u32 = 1; + loop { + match demo_migrator.run(&pool).await { + Ok(()) => { + tracing::info!( + elapsed_ms = start.elapsed().as_millis(), + "Demo database migrations applied" + ); + break; + } + Err(e) => { + if attempt >= 30 { + return Err(StartupError::Migrations(e)); + } + tracing::warn!(attempt, error = %e, "Demo migrations failed; retrying"); + tokio::time::sleep(Duration::from_secs(1)).await; + attempt += 1; + } + } + } } let cors = CorsLayer::new() @@ -104,7 +172,7 @@ async fn run() -> Result<(), StartupError> { let mut last_week_key: i64 = -1; let mut last_15min_key: i64 = -1; - let mut interval = tokio::time::interval(std::time::Duration::from_secs(5)); + let mut interval = tokio::time::interval(Duration::from_secs(5)); loop { interval.tick().await; @@ -130,7 +198,8 @@ async fn run() -> Result<(), StartupError> { .await; let min15_key = now.timestamp() / 900; - if min15_key != last_15min_key { + let is_15min = min15_key != last_15min_key; + if is_15min { last_15min_key = min15_key; cron_plugins .do_action("cron.every_15_minutes", ctx.clone(), payload.clone()) @@ -138,7 +207,8 @@ async fn run() -> Result<(), StartupError> { } let hour_key = now.timestamp() / 3600; - if hour_key != last_hour_key { + let is_hour = hour_key != last_hour_key; + if is_hour { last_hour_key = hour_key; if now.minute() == 0 { cron_plugins @@ -148,7 +218,8 @@ async fn run() -> Result<(), StartupError> { } let day_key = now.timestamp() / 86_400; - if day_key != last_day_key { + let is_day = day_key != last_day_key; + if is_day { last_day_key = day_key; if now.hour() == 0 && now.minute() == 0 { cron_plugins @@ -159,7 +230,8 @@ async fn run() -> Result<(), StartupError> { let iso_week = now.iso_week(); let week_key = (iso_week.year() as i64) * 100 + (iso_week.week() as i64); - if week_key != last_week_key { + let is_week = week_key != last_week_key; + if is_week { last_week_key = week_key; if now.weekday() == Weekday::Mon && now.hour() == 0 && now.minute() == 0 { cron_plugins @@ -182,15 +254,15 @@ async fn run() -> Result<(), StartupError> { }; let mut wasm_hooks: Vec<&'static str> = vec!["cron.minute", "cron.minutely"]; - if min15_key == last_15min_key { + if is_15min { wasm_hooks.push("cron.every_15_minutes"); } - if now.minute() == 0 { + if is_hour && now.minute() == 0 { wasm_hooks.push("cron.hourly"); } - if now.hour() == 0 && now.minute() == 0 { + if is_day && now.hour() == 0 && now.minute() == 0 { wasm_hooks.push("cron.daily"); - if now.weekday() == Weekday::Mon { + if is_week && now.weekday() == Weekday::Mon { wasm_hooks.push("cron.weekly"); } } @@ -206,6 +278,25 @@ async fn run() -> Result<(), StartupError> { }); } + let request_id_header = HeaderName::from_static("x-request-id"); + let trace_layer = { + let request_id_header = request_id_header.clone(); + TraceLayer::new_for_http().make_span_with(move |request: &Request<_>| { + let request_id = request + .headers() + .get(request_id_header.as_str()) + .and_then(|v| v.to_str().ok()) + .unwrap_or("-"); + + tracing::info_span!( + "http.request", + method = %request.method(), + uri = %request.uri(), + request_id = %request_id, + ) + }) + }; + let app = api::create_router(pool.clone(), config.clone()) .layer(Extension(plugins)) .layer(Extension(config.clone())) @@ -214,7 +305,9 @@ async fn run() -> Result<(), StartupError> { rate_limit::RateLimitState::new(config.clone()), rate_limit::rate_limit_middleware, )) - .layer(TraceLayer::new_for_http()) + .layer(trace_layer) + .layer(PropagateRequestIdLayer::new(request_id_header.clone())) + .layer(SetRequestIdLayer::new(request_id_header, MakeRequestUuid)) .layer(middleware::map_response(add_security_headers)); let host: std::net::IpAddr = config