feat: karapace-server — reference remote server implementing protocol v1

- tiny_http-based HTTP server for blob storage and registry
- Dual URL routing: /blobs/Kind/key and /kind_plural/key
- Blob CRUD: PUT, GET, HEAD, list by kind
- Registry: GET/PUT for name@tag references
- TestServer helper for integration testing
- 7 HTTP E2E tests: roundtrip, push/pull, concurrent clients, restart persistence
This commit is contained in:
Marco Allegretti 2026-02-22 18:37:27 +01:00
parent 11034ee27a
commit 23ac53ba4d
5 changed files with 2156 additions and 0 deletions

View file

@ -0,0 +1,24 @@
[package]
name = "karapace-server"
description = "Reference HTTP server for the Karapace remote protocol v1"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
[lints]
workspace = true
[dependencies]
tiny_http.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
clap.workspace = true
[dev-dependencies]
tempfile.workspace = true
ureq.workspace = true
karapace-remote = { path = "../karapace-remote" }
karapace-store = { path = "../karapace-store" }

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,389 @@
//! Reference HTTP server library for the Karapace remote protocol v1.
//!
//! Implements the blob store and registry routes defined in `docs/protocol-v1.md`.
//! Storage is file-backed: blobs go into `{data_dir}/blobs/{kind}/{key}`,
//! the registry lives at `{data_dir}/registry.json`.
//!
//! The [`TestServer`] helper starts a server on a random port for integration testing.
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use tiny_http::{Header, Method, Response, Server, StatusCode};
use tracing::{debug, error, info};
/// In-memory + file-backed blob store.
pub struct Store {
data_dir: PathBuf,
/// Cache of registry data (kept in memory for atomic read-modify-write).
registry: RwLock<Option<Vec<u8>>>,
}
impl Store {
pub fn new(data_dir: PathBuf) -> Self {
let reg_path = data_dir.join("registry.json");
let registry = if reg_path.exists() {
fs::read(&reg_path).ok()
} else {
None
};
Self {
data_dir,
registry: RwLock::new(registry),
}
}
pub fn data_dir(&self) -> &Path {
&self.data_dir
}
fn blob_dir(&self, kind: &str) -> PathBuf {
self.data_dir.join("blobs").join(kind)
}
fn blob_path(&self, kind: &str, key: &str) -> PathBuf {
self.blob_dir(kind).join(key)
}
pub fn put_blob(&self, kind: &str, key: &str, data: &[u8]) -> std::io::Result<()> {
let dir = self.blob_dir(kind);
fs::create_dir_all(&dir)?;
let path = dir.join(key);
fs::write(&path, data)?;
Ok(())
}
pub fn get_blob(&self, kind: &str, key: &str) -> Option<Vec<u8>> {
let path = self.blob_path(kind, key);
fs::read(&path).ok()
}
pub fn has_blob(&self, kind: &str, key: &str) -> bool {
self.blob_path(kind, key).exists()
}
pub fn list_blobs(&self, kind: &str) -> Vec<String> {
let dir = self.blob_dir(kind);
if !dir.exists() {
return Vec::new();
}
fs::read_dir(dir)
.map(|rd| {
rd.filter_map(Result::ok)
.filter_map(|e| e.file_name().to_str().map(String::from))
.collect()
})
.unwrap_or_default()
}
pub fn put_registry(&self, data: &[u8]) -> std::io::Result<()> {
let reg_path = self.data_dir.join("registry.json");
fs::create_dir_all(&self.data_dir)?;
fs::write(&reg_path, data)?;
let mut reg = self.registry.write().expect("registry lock poisoned");
*reg = Some(data.to_vec());
Ok(())
}
pub fn get_registry(&self) -> Option<Vec<u8>> {
let reg = self.registry.read().expect("registry lock poisoned");
reg.clone()
}
}
/// Valid blob kinds per protocol spec.
pub fn is_valid_kind(kind: &str) -> bool {
matches!(kind, "Object" | "Layer" | "Metadata")
}
/// Map the HttpBackend's plural lowercase path prefix to the server's internal kind name.
/// `/objects/` → "Object", `/layers/` → "Layer", `/metadata/` → "Metadata".
fn map_client_kind(prefix: &str) -> Option<&'static str> {
match prefix {
"objects" => Some("Object"),
"layers" => Some("Layer"),
"metadata" => Some("Metadata"),
_ => None,
}
}
/// Parse a URL path into (kind, key).
///
/// Accepts two URL schemes:
/// - Server-canonical: `/blobs/Object/abc123`
/// - Client (HttpBackend): `/objects/abc123`, `/layers/abc123`, `/metadata/abc123`
pub fn parse_blob_route(path: &str) -> Option<(&str, Option<&str>)> {
// Try /blobs/{Kind}/... first
if let Some(rest) = path.strip_prefix("/blobs/") {
if let Some(idx) = rest.find('/') {
let kind = &rest[..idx];
let key = &rest[idx + 1..];
if is_valid_kind(kind) && !key.is_empty() {
return Some((kind, Some(key)));
}
} else if is_valid_kind(rest) {
return Some((rest, None));
}
}
None
}
/// Parse the client URL scheme: `/{plural_kind}/{key}` or `/{plural_kind}/`.
fn parse_client_route(path: &str) -> Option<(&'static str, Option<&str>)> {
let path = path.strip_prefix('/')?;
if let Some(idx) = path.find('/') {
let prefix = &path[..idx];
let rest = &path[idx + 1..];
let kind = map_client_kind(prefix)?;
if rest.is_empty() {
Some((kind, None))
} else {
Some((kind, Some(rest)))
}
} else {
let kind = map_client_kind(path)?;
Some((kind, None))
}
}
fn respond_err(req: tiny_http::Request, code: u16, msg: &str) {
let _ = req.respond(Response::from_string(msg).with_status_code(StatusCode(code)));
}
fn respond_octet(req: tiny_http::Request, data: Vec<u8>) {
let header =
Header::from_bytes("Content-Type", "application/octet-stream").expect("valid header");
let _ = req.respond(Response::from_data(data).with_header(header));
}
fn respond_json(req: tiny_http::Request, json: impl Into<Vec<u8>>) {
let header = Header::from_bytes("Content-Type", "application/json").expect("valid header");
let _ = req.respond(Response::from_data(json.into()).with_header(header));
}
fn read_body(req: &mut tiny_http::Request) -> Option<Vec<u8>> {
let mut body = Vec::new();
if req.as_reader().read_to_end(&mut body).is_ok() {
Some(body)
} else {
None
}
}
fn handle_blob_keyed(
store: &Store,
mut req: tiny_http::Request,
method: &Method,
kind: &str,
key: &str,
) {
match *method {
Method::Put => {
let Some(body) = read_body(&mut req) else {
respond_err(req, 500, "read error");
return;
};
match store.put_blob(kind, key, &body) {
Ok(()) => {
info!("PUT {kind}/{key}: {} bytes", body.len());
let _ = req.respond(Response::from_string("ok"));
}
Err(e) => {
error!("PUT {kind}/{key}: {e}");
respond_err(req, 500, &format!("write error: {e}"));
}
}
}
Method::Get => match store.get_blob(kind, key) {
Some(data) => respond_octet(req, data),
None => respond_err(req, 404, "not found"),
},
Method::Head => {
let code = if store.has_blob(kind, key) { 200 } else { 404 };
let _ = req.respond(Response::empty(code));
}
_ => respond_err(req, 405, "method not allowed"),
}
}
fn handle_registry(store: &Store, mut req: tiny_http::Request, method: &Method) {
match *method {
Method::Put => {
let Some(body) = read_body(&mut req) else {
respond_err(req, 500, "read error");
return;
};
match store.put_registry(&body) {
Ok(()) => {
info!("PUT /registry: {} bytes", body.len());
let _ = req.respond(Response::from_string("ok"));
}
Err(e) => {
error!("PUT /registry: {e}");
respond_err(req, 500, &format!("write error: {e}"));
}
}
}
Method::Get => match store.get_registry() {
Some(data) => respond_json(req, data),
None => respond_err(req, 404, "not found"),
},
_ => respond_err(req, 405, "method not allowed"),
}
}
/// Handle a single HTTP request, dispatching to the appropriate route handler.
pub fn handle_request(store: &Store, req: tiny_http::Request) {
let method = req.method().clone();
let url = req.url().to_owned();
debug!("{method} {url}");
// Try both URL schemes: /blobs/Kind/key (server canonical) and /kind_plural/key (client)
let route = parse_blob_route(&url).or_else(|| parse_client_route(&url));
if let Some(parsed) = route {
match parsed {
(kind, Some(key)) => handle_blob_keyed(store, req, &method, kind, key),
(kind, None) if method == Method::Get => {
let keys = store.list_blobs(kind);
let json = serde_json::to_string(&keys).unwrap_or_else(|_| "[]".to_owned());
respond_json(req, json.into_bytes());
}
_ => respond_err(req, 405, "method not allowed"),
}
} else if url == "/registry" {
handle_registry(store, req, &method);
} else if url == "/health" && method == Method::Get {
let _ = req.respond(Response::from_string(r#"{"status":"ok"}"#));
} else {
respond_err(req, 404, "not found");
}
}
/// Start the server loop, blocking the current thread.
pub fn run_server(store: &Arc<Store>, addr: &str) {
let server = Server::http(addr).expect("failed to bind HTTP server");
for request in server.incoming_requests() {
handle_request(store, request);
}
}
/// A test helper that starts a karapace-server on a random port in a background thread.
///
/// The server listens on `127.0.0.1:{port}` and stores data in the provided `data_dir`.
/// Drop the `TestServer` to stop the server (via `Server::unblock`).
pub struct TestServer {
pub url: String,
pub port: u16,
pub data_dir: PathBuf,
_server: Arc<Server>,
_handle: std::thread::JoinHandle<()>,
}
impl TestServer {
/// Start a test server with a temporary data directory.
/// Binds to `127.0.0.1:0` (random port).
pub fn start(data_dir: PathBuf) -> Self {
fs::create_dir_all(&data_dir).expect("failed to create test data dir");
let server =
Arc::new(Server::http("127.0.0.1:0").expect("failed to bind test HTTP server"));
let port = server.server_addr().to_ip().expect("not an IP addr").port();
let url = format!("http://127.0.0.1:{port}");
let store = Arc::new(Store::new(data_dir.clone()));
let srv = Arc::clone(&server);
let handle = std::thread::spawn(move || {
for request in srv.incoming_requests() {
handle_request(&store, request);
}
});
Self {
url,
port,
data_dir,
_server: server,
_handle: handle,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_blob_route_object_with_key() {
let (kind, key) = parse_blob_route("/blobs/Object/abc123").unwrap();
assert_eq!(kind, "Object");
assert_eq!(key, Some("abc123"));
}
#[test]
fn parse_blob_route_layer_list() {
let (kind, key) = parse_blob_route("/blobs/Layer").unwrap();
assert_eq!(kind, "Layer");
assert_eq!(key, None);
}
#[test]
fn parse_blob_route_metadata_with_key() {
let (kind, key) = parse_blob_route("/blobs/Metadata/env_abc").unwrap();
assert_eq!(kind, "Metadata");
assert_eq!(key, Some("env_abc"));
}
#[test]
fn parse_blob_route_invalid_kind() {
assert!(parse_blob_route("/blobs/Invalid/key").is_none());
}
#[test]
fn parse_blob_route_missing_prefix() {
assert!(parse_blob_route("/other/Object/key").is_none());
}
#[test]
fn store_blob_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let store = Store::new(dir.path().to_path_buf());
store.put_blob("Object", "hash1", b"content").unwrap();
assert!(store.has_blob("Object", "hash1"));
assert_eq!(store.get_blob("Object", "hash1"), Some(b"content".to_vec()));
assert!(!store.has_blob("Object", "missing"));
}
#[test]
fn store_list_blobs() {
let dir = tempfile::tempdir().unwrap();
let store = Store::new(dir.path().to_path_buf());
store.put_blob("Layer", "l1", b"a").unwrap();
store.put_blob("Layer", "l2", b"b").unwrap();
let mut keys = store.list_blobs("Layer");
keys.sort();
assert_eq!(keys, vec!["l1", "l2"]);
}
#[test]
fn store_registry_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let store = Store::new(dir.path().to_path_buf());
assert!(store.get_registry().is_none());
store.put_registry(b"{\"entries\":{}}").unwrap();
assert_eq!(store.get_registry(), Some(b"{\"entries\":{}}".to_vec()));
}
#[test]
fn store_registry_persists_to_disk() {
let dir = tempfile::tempdir().unwrap();
{
let store = Store::new(dir.path().to_path_buf());
store.put_registry(b"reg_data").unwrap();
}
let store2 = Store::new(dir.path().to_path_buf());
assert_eq!(store2.get_registry(), Some(b"reg_data".to_vec()));
}
}

View file

@ -0,0 +1,38 @@
use clap::Parser;
use karapace_server::Store;
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
use tracing::info;
#[derive(Parser)]
#[command(name = "karapace-server", about = "Karapace remote protocol v1 server")]
struct Cli {
/// Port to listen on.
#[arg(long, default_value_t = 8321)]
port: u16,
/// Directory to store blobs and registry data.
#[arg(long, default_value = "./karapace-remote-data")]
data_dir: PathBuf,
}
fn main() {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
let cli = Cli::parse();
fs::create_dir_all(&cli.data_dir).expect("failed to create data directory");
let addr = format!("0.0.0.0:{}", cli.port);
info!("starting karapace-server on {addr}");
info!("data directory: {}", cli.data_dir.display());
let store = Arc::new(Store::new(cli.data_dir));
karapace_server::run_server(&store, &addr);
}

View file

@ -0,0 +1,294 @@
//! IG-M3: HTTP client ↔ server E2E integration tests.
//!
//! These tests start a real `karapace-server` in-process on a random port
//! and exercise the real `HttpBackend` client against it. No mocks.
use karapace_remote::http::HttpBackend;
use karapace_remote::{BlobKind, RemoteBackend, RemoteConfig};
use karapace_server::TestServer;
use karapace_store::{
EnvMetadata, EnvState, LayerKind, LayerManifest, LayerStore, MetadataStore, ObjectStore,
StoreLayout,
};
fn start_server() -> (TestServer, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let server = TestServer::start(dir.path().to_path_buf());
(server, dir)
}
fn make_client(url: &str) -> HttpBackend {
HttpBackend::new(RemoteConfig {
url: url.to_owned(),
auth_token: None,
})
}
/// Create a local store with a mock-built environment for push/pull testing.
fn setup_local_env(dir: &std::path::Path) -> (StoreLayout, String) {
let layout = StoreLayout::new(dir);
layout.initialize().unwrap();
let obj_store = ObjectStore::new(layout.clone());
let layer_store = LayerStore::new(layout.clone());
let meta_store = MetadataStore::new(layout.clone());
let obj_hash = obj_store.put(b"test data content").unwrap();
let manifest_hash = obj_store.put(b"{\"manifest\": \"test\"}").unwrap();
let layer = LayerManifest {
hash: "layer_hash_001".to_owned(),
kind: LayerKind::Base,
parent: None,
object_refs: vec![obj_hash],
read_only: true,
tar_hash: String::new(),
};
let layer_content_hash = layer_store.put(&layer).unwrap();
let meta = EnvMetadata {
env_id: "env_abc123".into(),
short_id: "env_abc123".into(),
name: Some("test-env".to_owned()),
state: EnvState::Built,
base_layer: layer_content_hash.into(),
dependency_layers: vec![],
policy_layer: None,
manifest_hash: manifest_hash.into(),
ref_count: 1,
created_at: "2025-01-01T00:00:00Z".to_owned(),
updated_at: "2025-01-01T00:00:00Z".to_owned(),
checksum: None,
};
meta_store.put(&meta).unwrap();
(layout, "env_abc123".to_owned())
}
// --- Tests ---
#[test]
fn http_e2e_blob_roundtrip() {
let (server, _dir) = start_server();
let client = make_client(&server.url);
// PUT
client
.put_blob(BlobKind::Object, "hash1", b"hello world")
.unwrap();
// GET
let data = client.get_blob(BlobKind::Object, "hash1").unwrap();
assert_eq!(data, b"hello world");
// HEAD — exists
assert!(client.has_blob(BlobKind::Object, "hash1").unwrap());
// HEAD — missing
assert!(!client.has_blob(BlobKind::Object, "missing").unwrap());
// Multiple kinds
client
.put_blob(BlobKind::Layer, "l1", b"layer-data")
.unwrap();
client
.put_blob(BlobKind::Metadata, "m1", b"meta-data")
.unwrap();
assert_eq!(
client.get_blob(BlobKind::Layer, "l1").unwrap(),
b"layer-data"
);
assert_eq!(
client.get_blob(BlobKind::Metadata, "m1").unwrap(),
b"meta-data"
);
}
#[test]
fn http_e2e_push_pull_full_env() {
let (server, _srv_dir) = start_server();
let client = make_client(&server.url);
// Set up source store with a mock environment
let src_dir = tempfile::tempdir().unwrap();
let (src_layout, env_id) = setup_local_env(src_dir.path());
// Push to real server
let push_result =
karapace_remote::push_env(&src_layout, &env_id, &client, Some("test@latest")).unwrap();
assert_eq!(push_result.objects_pushed, 2);
assert_eq!(push_result.layers_pushed, 1);
// Pull into a fresh store
let dst_dir = tempfile::tempdir().unwrap();
let dst_layout = StoreLayout::new(dst_dir.path());
dst_layout.initialize().unwrap();
let pull_result = karapace_remote::pull_env(&dst_layout, &env_id, &client).unwrap();
assert_eq!(pull_result.objects_pulled, 2);
assert_eq!(pull_result.layers_pulled, 1);
// Verify metadata identical
let src_meta = MetadataStore::new(src_layout).get(&env_id).unwrap();
let dst_meta = MetadataStore::new(dst_layout.clone()).get(&env_id).unwrap();
assert_eq!(src_meta.env_id, dst_meta.env_id);
assert_eq!(src_meta.name, dst_meta.name);
assert_eq!(src_meta.base_layer, dst_meta.base_layer);
assert_eq!(src_meta.manifest_hash, dst_meta.manifest_hash);
// Verify objects byte-for-byte identical
let src_obj = ObjectStore::new(StoreLayout::new(src_dir.path()));
let dst_obj = ObjectStore::new(dst_layout.clone());
let src_data = src_obj.get(&src_meta.manifest_hash).unwrap();
let dst_data = dst_obj.get(&dst_meta.manifest_hash).unwrap();
assert_eq!(src_data, dst_data);
// Verify layers identical
let src_layer = LayerStore::new(StoreLayout::new(src_dir.path()))
.get(&src_meta.base_layer)
.unwrap();
let dst_layer = LayerStore::new(dst_layout)
.get(&dst_meta.base_layer)
.unwrap();
assert_eq!(src_layer.object_refs, dst_layer.object_refs);
assert_eq!(src_layer.kind, dst_layer.kind);
}
#[test]
fn http_e2e_registry_roundtrip() {
let (server, _dir) = start_server();
let client = make_client(&server.url);
// Set up and push an environment with a tag
let src_dir = tempfile::tempdir().unwrap();
let (src_layout, env_id) = setup_local_env(src_dir.path());
karapace_remote::push_env(&src_layout, &env_id, &client, Some("myapp@latest")).unwrap();
// Resolve the reference
let resolved = karapace_remote::resolve_ref(&client, "myapp@latest").unwrap();
assert_eq!(resolved, env_id);
}
#[test]
fn http_e2e_concurrent_4_clients() {
let (server, _dir) = start_server();
let url = server.url.clone();
let handles: Vec<_> = (0..4)
.map(|thread_idx| {
let u = url.clone();
std::thread::spawn(move || {
let client = make_client(&u);
for i in 0..10 {
let key = format!("t{thread_idx}_blob_{i}");
let data = format!("data-{thread_idx}-{i}");
client
.put_blob(BlobKind::Object, &key, data.as_bytes())
.unwrap();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
// Verify all 40 blobs exist
let client = make_client(&server.url);
for thread_idx in 0..4 {
for i in 0..10 {
let key = format!("t{thread_idx}_blob_{i}");
let expected = format!("data-{thread_idx}-{i}");
let data = client.get_blob(BlobKind::Object, &key).unwrap();
assert_eq!(data, expected.as_bytes(), "blob {key} data mismatch");
}
}
}
#[test]
fn http_e2e_server_restart_persistence() {
let data_dir = tempfile::tempdir().unwrap();
// Start server, push data
{
let server = TestServer::start(data_dir.path().to_path_buf());
let client = make_client(&server.url);
client
.put_blob(BlobKind::Object, "persist1", b"data1")
.unwrap();
client
.put_blob(BlobKind::Layer, "persist2", b"data2")
.unwrap();
client.put_registry(b"{\"entries\":{}}").unwrap();
// server drops here — stops listening
}
// Start new server on same data_dir
{
let server2 = TestServer::start(data_dir.path().to_path_buf());
let client2 = make_client(&server2.url);
// All data must survive
assert_eq!(
client2.get_blob(BlobKind::Object, "persist1").unwrap(),
b"data1"
);
assert_eq!(
client2.get_blob(BlobKind::Layer, "persist2").unwrap(),
b"data2"
);
assert_eq!(client2.get_registry().unwrap(), b"{\"entries\":{}}");
}
}
#[test]
fn http_e2e_integrity_on_pull() {
let (server, server_data) = start_server();
let client = make_client(&server.url);
// Push a real environment
let src_dir = tempfile::tempdir().unwrap();
let (src_layout, env_id) = setup_local_env(src_dir.path());
karapace_remote::push_env(&src_layout, &env_id, &client, None).unwrap();
// Tamper with an object on the server's filesystem directly
let src_meta = MetadataStore::new(src_layout).get(&env_id).unwrap();
let manifest_hash = src_meta.manifest_hash.to_string();
let tampered_path = server_data
.path()
.join("blobs")
.join("Object")
.join(&manifest_hash);
std::fs::write(&tampered_path, b"CORRUPTED DATA").unwrap();
// Pull into a fresh store — must detect integrity failure
let dst_dir = tempfile::tempdir().unwrap();
let dst_layout = StoreLayout::new(dst_dir.path());
dst_layout.initialize().unwrap();
let result = karapace_remote::pull_env(&dst_layout, &env_id, &client);
assert!(
result.is_err(),
"pull must fail when a blob has been tampered with"
);
let err_msg = format!("{}", result.unwrap_err());
assert!(
err_msg.contains("integrity") || err_msg.contains("Integrity"),
"error must mention integrity failure, got: {err_msg}"
);
}
#[test]
fn http_e2e_404_on_missing() {
let (server, _dir) = start_server();
let client = make_client(&server.url);
let result = client.get_blob(BlobKind::Object, "nonexistent");
assert!(result.is_err(), "GET missing blob must return error");
let err_msg = format!("{}", result.unwrap_err());
assert!(
err_msg.contains("404") || err_msg.contains("not found") || err_msg.contains("Not Found"),
"error must indicate 404, got: {err_msg}"
);
}