fix: stabilize backend startup + demo reset

This commit is contained in:
Marco Allegretti 2026-02-05 12:09:24 +01:00
parent 5e9893b588
commit b501c9da75
5 changed files with 202 additions and 53 deletions

14
backend/Cargo.lock generated
View file

@ -1875,6 +1875,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "matchers"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
dependencies = [
"regex-automata",
]
[[package]] [[package]]
name = "matchit" name = "matchit"
version = "0.8.4" version = "0.8.4"
@ -3346,6 +3355,7 @@ dependencies = [
"tower-layer", "tower-layer",
"tower-service", "tower-service",
"tracing", "tracing",
"uuid",
] ]
[[package]] [[package]]
@ -3410,10 +3420,14 @@ version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
dependencies = [ dependencies = [
"matchers",
"nu-ansi-term", "nu-ansi-term",
"once_cell",
"regex-automata",
"sharded-slab", "sharded-slab",
"smallvec", "smallvec",
"thread_local", "thread_local",
"tracing",
"tracing-core", "tracing-core",
"tracing-log", "tracing-log",
] ]

View file

@ -1,32 +1,32 @@
[package] [package]
name = "likwid" name = "likwid"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
license = "EUPL-1.2" license = "EUPL-1.2"
[dependencies] [dependencies]
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
axum = "0.8" axum = "0.8"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "uuid", "chrono", "json", "migrate"] } sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "uuid", "chrono", "json", "migrate"] }
dotenvy = "0.15" dotenvy = "0.15"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1", features = ["serde", "v4"] } uuid = { version = "1", features = ["serde", "v4"] }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
envy = "0.4" envy = "0.4"
async-trait = "0.1" async-trait = "0.1"
tower-http = { version = "0.6", features = ["cors", "trace"] } tower-http = { version = "0.6", features = ["cors", "trace", "request-id"] }
argon2 = "0.5" argon2 = "0.5"
jsonwebtoken = "9" jsonwebtoken = "9"
axum-extra = { version = "0.10", features = ["typed-header"] } axum-extra = { version = "0.10", features = ["typed-header"] }
thiserror = "2" thiserror = "2"
jsonschema = "0.17" jsonschema = "0.17"
base64 = "0.21" base64 = "0.21"
sha2 = "0.10" sha2 = "0.10"
ed25519-dalek = "2" ed25519-dalek = "2"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
wasmtime = "19" wasmtime = "19"
wasmtime-wasi = "19" wasmtime-wasi = "19"
slug = "0.1" slug = "0.1"

View file

@ -14,7 +14,7 @@ use std::sync::Arc;
use super::permissions::{perms, require_permission}; use super::permissions::{perms, require_permission};
use crate::auth::AuthUser; use crate::auth::AuthUser;
use crate::config::Config; use crate::config::Config;
use crate::demo::{self, DEMO_ACCOUNTS}; use crate::demo::{self, DemoResetError, DEMO_ACCOUNTS};
/// Combined state for demo endpoints /// Combined state for demo endpoints
#[derive(Clone)] #[derive(Clone)]
@ -71,7 +71,17 @@ async fn reset_demo(State(state): State<DemoState>, auth: AuthUser) -> impl Into
Json(json!({"success": true, "message": "Demo data has been reset to initial state"})), Json(json!({"success": true, "message": "Demo data has been reset to initial state"})),
) )
.into_response(), .into_response(),
Err(e) => { Err(DemoResetError::Busy) => (
StatusCode::CONFLICT,
Json(json!({"error": "Demo reset already in progress"})),
)
.into_response(),
Err(DemoResetError::LockTimeout) => (
StatusCode::GATEWAY_TIMEOUT,
Json(json!({"error": "Timed out while acquiring demo reset lock"})),
)
.into_response(),
Err(DemoResetError::Sql(e)) => {
tracing::error!("Failed to reset demo data: {}", e); tracing::error!("Failed to reset demo data: {}", e);
( (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,

View file

@ -6,6 +6,19 @@
//! - Provides seeded data for demonstration purposes //! - Provides seeded data for demonstration purposes
use sqlx::PgPool; use sqlx::PgPool;
use std::time::{Duration, Instant};
use thiserror::Error;
use tokio::time::timeout;
#[derive(Debug, Error)]
pub enum DemoResetError {
#[error("demo reset already in progress")]
Busy,
#[error("timed out while acquiring demo reset lock")]
LockTimeout,
#[error(transparent)]
Sql(#[from] sqlx::Error),
}
/// Demo account credentials: (username, password, display_name) /// Demo account credentials: (username, password, display_name)
pub const DEMO_ACCOUNTS: &[(&str, &str, &str)] = &[ pub const DEMO_ACCOUNTS: &[(&str, &str, &str)] = &[
@ -28,11 +41,27 @@ pub fn verify_demo_password(username: &str, password: &str) -> bool {
/// Reset demo data to initial state /// Reset demo data to initial state
/// This clears user-created data and reloads the seed data /// This clears user-created data and reloads the seed data
pub async fn reset_demo_data(pool: &PgPool) -> Result<(), sqlx::Error> { pub async fn reset_demo_data(pool: &PgPool) -> Result<(), DemoResetError> {
tracing::info!("Resetting demo data to initial state..."); tracing::info!("Resetting demo data to initial state...");
let start = Instant::now();
let mut tx = pool.begin().await?; let mut tx = pool.begin().await?;
let lock_result = timeout(
Duration::from_secs(2),
sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_xact_lock($1)")
.bind(1_337_001_i64)
.fetch_one(&mut *tx),
)
.await;
match lock_result {
Ok(Ok(true)) => {}
Ok(Ok(false)) => return Err(DemoResetError::Busy),
Ok(Err(e)) => return Err(DemoResetError::Sql(e)),
Err(_) => return Err(DemoResetError::LockTimeout),
}
// Remove volatile/user-generated data // Remove volatile/user-generated data
sqlx::query("DELETE FROM public_events") sqlx::query("DELETE FROM public_events")
.execute(&mut *tx) .execute(&mut *tx)
@ -402,7 +431,10 @@ pub async fn reset_demo_data(pool: &PgPool) -> Result<(), sqlx::Error> {
tx.commit().await?; tx.commit().await?;
tracing::info!("Demo data reset complete"); tracing::info!(
elapsed_ms = start.elapsed().as_millis(),
"Demo data reset complete"
);
Ok(()) Ok(())
} }

View file

@ -8,16 +8,19 @@ mod plugins;
mod rate_limit; mod rate_limit;
mod voting; mod voting;
use axum::http::{HeaderName, HeaderValue}; use axum::http::{HeaderName, HeaderValue, Request};
use axum::response::Response; use axum::response::Response;
use axum::{middleware, Extension}; use axum::{middleware, Extension};
use chrono::{Datelike, Timelike, Utc, Weekday}; use chrono::{Datelike, Timelike, Utc, Weekday};
use serde_json::json; use serde_json::json;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant};
use thiserror::Error; use thiserror::Error;
use tower_http::cors::{Any, CorsLayer}; use tower_http::cors::{Any, CorsLayer};
use tower_http::request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer};
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
use tracing_subscriber::EnvFilter;
use uuid::Uuid; use uuid::Uuid;
use crate::config::Config; use crate::config::Config;
@ -43,7 +46,8 @@ enum StartupError {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
tracing_subscriber::fmt::init(); let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
tracing_subscriber::fmt().with_env_filter(filter).init();
if let Err(e) = run().await { if let Err(e) = run().await {
tracing::error!("{e}"); tracing::error!("{e}");
@ -68,20 +72,84 @@ async fn run() -> Result<(), StartupError> {
let database_url = let database_url =
std::env::var("DATABASE_URL").unwrap_or_else(|_| config.database_url.clone()); std::env::var("DATABASE_URL").unwrap_or_else(|_| config.database_url.clone());
let pool = db::create_pool(&database_url).await?; let pool = {
let start = Instant::now();
tracing::info!("Connected to database"); let mut attempt: u32 = 1;
loop {
match db::create_pool(&database_url).await {
Ok(pool) => {
tracing::info!(
elapsed_ms = start.elapsed().as_millis(),
"Connected to database"
);
break pool;
}
Err(e) => {
if attempt >= 30 {
return Err(StartupError::Db(e));
}
tracing::warn!(attempt, error = %e, "Failed to connect to database; retrying");
tokio::time::sleep(Duration::from_secs(1)).await;
attempt += 1;
}
}
}
};
let mut migrator = sqlx::migrate!("./migrations"); let mut migrator = sqlx::migrate!("./migrations");
if config.is_demo() { if config.is_demo() {
migrator.set_ignore_missing(true); migrator.set_ignore_missing(true);
} }
migrator.run(&pool).await?;
{
let start = Instant::now();
let mut attempt: u32 = 1;
loop {
match migrator.run(&pool).await {
Ok(()) => {
tracing::info!(
elapsed_ms = start.elapsed().as_millis(),
"Database migrations applied"
);
break;
}
Err(e) => {
if attempt >= 30 {
return Err(StartupError::Migrations(e));
}
tracing::warn!(attempt, error = %e, "Database migrations failed; retrying");
tokio::time::sleep(Duration::from_secs(1)).await;
attempt += 1;
}
}
}
}
if config.is_demo() { if config.is_demo() {
let mut demo_migrator = sqlx::migrate!("./migrations_demo"); let mut demo_migrator = sqlx::migrate!("./migrations_demo");
demo_migrator.set_ignore_missing(true); demo_migrator.set_ignore_missing(true);
demo_migrator.run(&pool).await?;
let start = Instant::now();
let mut attempt: u32 = 1;
loop {
match demo_migrator.run(&pool).await {
Ok(()) => {
tracing::info!(
elapsed_ms = start.elapsed().as_millis(),
"Demo database migrations applied"
);
break;
}
Err(e) => {
if attempt >= 30 {
return Err(StartupError::Migrations(e));
}
tracing::warn!(attempt, error = %e, "Demo migrations failed; retrying");
tokio::time::sleep(Duration::from_secs(1)).await;
attempt += 1;
}
}
}
} }
let cors = CorsLayer::new() let cors = CorsLayer::new()
@ -104,7 +172,7 @@ async fn run() -> Result<(), StartupError> {
let mut last_week_key: i64 = -1; let mut last_week_key: i64 = -1;
let mut last_15min_key: i64 = -1; let mut last_15min_key: i64 = -1;
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5)); let mut interval = tokio::time::interval(Duration::from_secs(5));
loop { loop {
interval.tick().await; interval.tick().await;
@ -130,7 +198,8 @@ async fn run() -> Result<(), StartupError> {
.await; .await;
let min15_key = now.timestamp() / 900; let min15_key = now.timestamp() / 900;
if min15_key != last_15min_key { let is_15min = min15_key != last_15min_key;
if is_15min {
last_15min_key = min15_key; last_15min_key = min15_key;
cron_plugins cron_plugins
.do_action("cron.every_15_minutes", ctx.clone(), payload.clone()) .do_action("cron.every_15_minutes", ctx.clone(), payload.clone())
@ -138,7 +207,8 @@ async fn run() -> Result<(), StartupError> {
} }
let hour_key = now.timestamp() / 3600; let hour_key = now.timestamp() / 3600;
if hour_key != last_hour_key { let is_hour = hour_key != last_hour_key;
if is_hour {
last_hour_key = hour_key; last_hour_key = hour_key;
if now.minute() == 0 { if now.minute() == 0 {
cron_plugins cron_plugins
@ -148,7 +218,8 @@ async fn run() -> Result<(), StartupError> {
} }
let day_key = now.timestamp() / 86_400; let day_key = now.timestamp() / 86_400;
if day_key != last_day_key { let is_day = day_key != last_day_key;
if is_day {
last_day_key = day_key; last_day_key = day_key;
if now.hour() == 0 && now.minute() == 0 { if now.hour() == 0 && now.minute() == 0 {
cron_plugins cron_plugins
@ -159,7 +230,8 @@ async fn run() -> Result<(), StartupError> {
let iso_week = now.iso_week(); let iso_week = now.iso_week();
let week_key = (iso_week.year() as i64) * 100 + (iso_week.week() as i64); let week_key = (iso_week.year() as i64) * 100 + (iso_week.week() as i64);
if week_key != last_week_key { let is_week = week_key != last_week_key;
if is_week {
last_week_key = week_key; last_week_key = week_key;
if now.weekday() == Weekday::Mon && now.hour() == 0 && now.minute() == 0 { if now.weekday() == Weekday::Mon && now.hour() == 0 && now.minute() == 0 {
cron_plugins cron_plugins
@ -182,15 +254,15 @@ async fn run() -> Result<(), StartupError> {
}; };
let mut wasm_hooks: Vec<&'static str> = vec!["cron.minute", "cron.minutely"]; let mut wasm_hooks: Vec<&'static str> = vec!["cron.minute", "cron.minutely"];
if min15_key == last_15min_key { if is_15min {
wasm_hooks.push("cron.every_15_minutes"); wasm_hooks.push("cron.every_15_minutes");
} }
if now.minute() == 0 { if is_hour && now.minute() == 0 {
wasm_hooks.push("cron.hourly"); wasm_hooks.push("cron.hourly");
} }
if now.hour() == 0 && now.minute() == 0 { if is_day && now.hour() == 0 && now.minute() == 0 {
wasm_hooks.push("cron.daily"); wasm_hooks.push("cron.daily");
if now.weekday() == Weekday::Mon { if is_week && now.weekday() == Weekday::Mon {
wasm_hooks.push("cron.weekly"); wasm_hooks.push("cron.weekly");
} }
} }
@ -206,6 +278,25 @@ async fn run() -> Result<(), StartupError> {
}); });
} }
let request_id_header = HeaderName::from_static("x-request-id");
let trace_layer = {
let request_id_header = request_id_header.clone();
TraceLayer::new_for_http().make_span_with(move |request: &Request<_>| {
let request_id = request
.headers()
.get(request_id_header.as_str())
.and_then(|v| v.to_str().ok())
.unwrap_or("-");
tracing::info_span!(
"http.request",
method = %request.method(),
uri = %request.uri(),
request_id = %request_id,
)
})
};
let app = api::create_router(pool.clone(), config.clone()) let app = api::create_router(pool.clone(), config.clone())
.layer(Extension(plugins)) .layer(Extension(plugins))
.layer(Extension(config.clone())) .layer(Extension(config.clone()))
@ -214,7 +305,9 @@ async fn run() -> Result<(), StartupError> {
rate_limit::RateLimitState::new(config.clone()), rate_limit::RateLimitState::new(config.clone()),
rate_limit::rate_limit_middleware, rate_limit::rate_limit_middleware,
)) ))
.layer(TraceLayer::new_for_http()) .layer(trace_layer)
.layer(PropagateRequestIdLayer::new(request_id_header.clone()))
.layer(SetRequestIdLayer::new(request_id_header, MakeRequestUuid))
.layer(middleware::map_response(add_security_headers)); .layer(middleware::map_response(add_security_headers));
let host: std::net::IpAddr = config let host: std::net::IpAddr = config