feat: karapace-remote — remote content-addressable store, push/pull, registry

- RemoteBackend trait: put/get/has blob, registry operations
- HTTP backend (ureq): blob transfer with X-Karapace-Protocol header
- Push/pull transfer with blake3 integrity verification on pull
- JSON registry for name@tag references
- RemoteConfig: persistent server URL configuration
- Auth token support via Bearer header
- Header-capturing mock server for protocol verification tests
This commit is contained in:
Marco Allegretti 2026-02-22 18:37:14 +01:00
parent f535020600
commit 11034ee27a
7 changed files with 3892 additions and 0 deletions

View file

@ -0,0 +1,23 @@
[package]
name = "karapace-remote"
description = "Remote content-addressable store for Karapace environment sharing"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
[lints]
workspace = true
[dependencies]
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tracing.workspace = true
ureq.workspace = true
chrono.workspace = true
blake3.workspace = true
karapace-store = { path = "../karapace-store" }
[dev-dependencies]
tempfile.workspace = true

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,76 @@
use crate::RemoteError;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoteConfig {
pub url: String,
#[serde(default)]
pub auth_token: Option<String>,
}
impl RemoteConfig {
pub fn new(url: &str) -> Self {
Self {
url: url.trim_end_matches('/').to_owned(),
auth_token: None,
}
}
#[must_use]
pub fn with_token(mut self, token: &str) -> Self {
self.auth_token = Some(token.to_owned());
self
}
/// Load config from `~/.config/karapace/remote.json`.
pub fn load_default() -> Result<Self, RemoteError> {
let path = default_config_path()?;
Self::load(&path)
}
pub fn load(path: &Path) -> Result<Self, RemoteError> {
let content = std::fs::read_to_string(path)?;
serde_json::from_str(&content)
.map_err(|e| RemoteError::Config(format!("invalid remote config: {e}")))
}
pub fn save(&self, path: &Path) -> Result<(), RemoteError> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let content = serde_json::to_string_pretty(self)
.map_err(|e| RemoteError::Serialization(e.to_string()))?;
std::fs::write(path, content)?;
Ok(())
}
}
fn default_config_path() -> Result<PathBuf, RemoteError> {
let home = std::env::var("HOME").map_err(|_| RemoteError::Config("HOME not set".to_owned()))?;
Ok(PathBuf::from(home).join(".config/karapace/remote.json"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("remote.json");
let config = RemoteConfig::new("https://store.example.com/v1").with_token("secret123");
config.save(&path).unwrap();
let loaded = RemoteConfig::load(&path).unwrap();
assert_eq!(loaded.url, "https://store.example.com/v1");
assert_eq!(loaded.auth_token.as_deref(), Some("secret123"));
}
#[test]
fn config_strips_trailing_slash() {
let config = RemoteConfig::new("https://example.com/");
assert_eq!(config.url, "https://example.com");
}
}

View file

@ -0,0 +1,463 @@
use crate::{BlobKind, RemoteBackend, RemoteConfig, RemoteError};
/// HTTP-based remote store backend.
///
/// Expects a simple REST API:
/// - `PUT /objects/<key>` — upload object blob
/// - `GET /objects/<key>` — download object blob
/// - `HEAD /objects/<key>` — check existence
/// - `GET /objects/` — list objects (JSON array of strings)
/// - Same pattern for `/layers/` and `/metadata/`
/// - `PUT /registry` — upload registry index
/// - `GET /registry` — download registry index
pub struct HttpBackend {
config: RemoteConfig,
agent: ureq::Agent,
}
impl HttpBackend {
pub fn new(config: RemoteConfig) -> Self {
let agent = ureq::Agent::new_with_defaults();
Self { config, agent }
}
fn kind_path(kind: BlobKind) -> &'static str {
match kind {
BlobKind::Object => "objects",
BlobKind::Layer => "layers",
BlobKind::Metadata => "metadata",
}
}
fn url(&self, kind: BlobKind, key: &str) -> String {
format!("{}/{}/{}", self.config.url, Self::kind_path(kind), key)
}
fn do_put(&self, url: &str, content_type: &str, data: &[u8]) -> Result<(), RemoteError> {
let mut req = self
.agent
.put(url)
.header("Content-Type", content_type)
.header("X-Karapace-Protocol", &crate::PROTOCOL_VERSION.to_string());
if let Some(ref token) = self.config.auth_token {
req = req.header("Authorization", &format!("Bearer {token}"));
}
req.send(data as &[u8])
.map_err(|e| RemoteError::Http(e.to_string()))?;
Ok(())
}
fn do_get(&self, url: &str) -> Result<Vec<u8>, RemoteError> {
let mut req = self
.agent
.get(url)
.header("X-Karapace-Protocol", &crate::PROTOCOL_VERSION.to_string());
if let Some(ref token) = self.config.auth_token {
req = req.header("Authorization", &format!("Bearer {token}"));
}
let resp = req.call().map_err(|e| RemoteError::Http(e.to_string()))?;
let body = resp
.into_body()
.read_to_vec()
.map_err(|e| RemoteError::Http(e.to_string()))?;
Ok(body)
}
fn do_head(&self, url: &str) -> Result<u16, RemoteError> {
let mut req = self
.agent
.head(url)
.header("X-Karapace-Protocol", &crate::PROTOCOL_VERSION.to_string());
if let Some(ref token) = self.config.auth_token {
req = req.header("Authorization", &format!("Bearer {token}"));
}
let resp = req.call().map_err(|e| RemoteError::Http(e.to_string()))?;
Ok(resp.status().into())
}
}
impl RemoteBackend for HttpBackend {
fn put_blob(&self, kind: BlobKind, key: &str, data: &[u8]) -> Result<(), RemoteError> {
let url = self.url(kind, key);
tracing::debug!("PUT {url} ({} bytes)", data.len());
self.do_put(&url, "application/octet-stream", data)
}
fn get_blob(&self, kind: BlobKind, key: &str) -> Result<Vec<u8>, RemoteError> {
let url = self.url(kind, key);
tracing::debug!("GET {url}");
self.do_get(&url)
}
fn has_blob(&self, kind: BlobKind, key: &str) -> Result<bool, RemoteError> {
let url = self.url(kind, key);
tracing::debug!("HEAD {url}");
match self.do_head(&url) {
Ok(status) => Ok(status == 200),
Err(_) => Ok(false),
}
}
fn list_blobs(&self, kind: BlobKind) -> Result<Vec<String>, RemoteError> {
let url = format!("{}/{}/", self.config.url, Self::kind_path(kind));
tracing::debug!("GET {url}");
let body = self.do_get(&url)?;
let body_str = String::from_utf8(body).map_err(|e| RemoteError::Http(e.to_string()))?;
let keys: Vec<String> = serde_json::from_str(&body_str)
.map_err(|e| RemoteError::Serialization(e.to_string()))?;
Ok(keys)
}
fn put_registry(&self, data: &[u8]) -> Result<(), RemoteError> {
let url = format!("{}/registry", self.config.url);
tracing::debug!("PUT {url} ({} bytes)", data.len());
self.do_put(&url, "application/json", data)
}
fn get_registry(&self) -> Result<Vec<u8>, RemoteError> {
let url = format!("{}/registry", self.config.url);
tracing::debug!("GET {url}");
self.do_get(&url)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::TcpListener;
use std::sync::{Arc, Mutex};
/// A captured HTTP request for header inspection.
#[derive(Debug, Clone)]
struct CapturedRequest {
method: String,
path: String,
headers: HashMap<String, String>,
}
struct MockServer {
addr: String,
_handle: std::thread::JoinHandle<()>,
requests: Arc<Mutex<Vec<CapturedRequest>>>,
}
impl MockServer {
fn start() -> Self {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = format!("http://{}", listener.local_addr().unwrap());
let store: Arc<Mutex<HashMap<String, Vec<u8>>>> = Arc::new(Mutex::new(HashMap::new()));
let requests: Arc<Mutex<Vec<CapturedRequest>>> = Arc::new(Mutex::new(Vec::new()));
let store_clone = Arc::clone(&store);
let requests_clone = Arc::clone(&requests);
let handle = std::thread::spawn(move || {
for stream in listener.incoming() {
let Ok(mut stream) = stream else { break };
let store = Arc::clone(&store_clone);
let reqs = Arc::clone(&requests_clone);
std::thread::spawn(move || {
let mut reader = BufReader::new(stream.try_clone().unwrap());
let mut request_line = String::new();
if reader.read_line(&mut request_line).is_err() {
return;
}
let parts: Vec<&str> = request_line.trim().splitn(3, ' ').collect();
if parts.len() < 2 {
return;
}
let method = parts[0].to_owned();
let path = parts[1].to_owned();
let mut content_length: usize = 0;
let mut headers = HashMap::new();
loop {
let mut line = String::new();
if reader.read_line(&mut line).is_err() || line.trim().is_empty() {
break;
}
if let Some((k, v)) = line.trim().split_once(": ") {
headers.insert(k.to_lowercase(), v.to_owned());
}
let lower = line.to_lowercase();
if let Some(val) = lower.strip_prefix("content-length: ") {
content_length = val.trim().parse().unwrap_or(0);
}
}
reqs.lock().unwrap().push(CapturedRequest {
method: method.clone(),
path: path.clone(),
headers,
});
let mut body = vec![0u8; content_length];
if content_length > 0 {
let _ = std::io::Read::read_exact(&mut reader, &mut body);
}
let mut data = store.lock().unwrap();
let response = match method.as_str() {
"PUT" => {
data.insert(path.clone(), body);
"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
.to_owned()
}
"GET" => {
if let Some(val) = data.get(&path) {
format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
val.len()
)
} else {
"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
.to_owned()
}
}
"HEAD" => {
if data.contains_key(&path) {
"HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
.to_owned()
} else {
"HTTP/1.1 404 Not Found\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
.to_owned()
}
}
_ => "HTTP/1.1 405 Method Not Allowed\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
.to_owned(),
};
let _ = stream.write_all(response.as_bytes());
if method == "GET" {
if let Some(val) = data.get(&path) {
let _ = stream.write_all(val);
}
}
let _ = stream.flush();
});
}
});
MockServer {
addr,
_handle: handle,
requests,
}
}
fn captured_requests(&self) -> Vec<CapturedRequest> {
self.requests.lock().unwrap().clone()
}
}
fn test_backend(url: &str) -> HttpBackend {
HttpBackend::new(RemoteConfig {
url: url.to_owned(),
auth_token: None,
})
}
fn test_backend_with_auth(url: &str, token: &str) -> HttpBackend {
HttpBackend::new(RemoteConfig {
url: url.to_owned(),
auth_token: Some(token.to_owned()),
})
}
#[test]
fn http_put_and_get_blob() {
let server = MockServer::start();
let backend = test_backend(&server.addr);
backend
.put_blob(BlobKind::Object, "hash123", b"test data")
.unwrap();
let data = backend.get_blob(BlobKind::Object, "hash123").unwrap();
assert_eq!(data, b"test data");
}
#[test]
fn http_has_blob_true_and_false() {
let server = MockServer::start();
let backend = test_backend(&server.addr);
assert!(!backend.has_blob(BlobKind::Object, "missing").unwrap());
backend
.put_blob(BlobKind::Object, "exists", b"data")
.unwrap();
assert!(backend.has_blob(BlobKind::Object, "exists").unwrap());
}
#[test]
fn http_get_nonexistent_fails() {
let server = MockServer::start();
let backend = test_backend(&server.addr);
let result = backend.get_blob(BlobKind::Object, "nonexistent");
assert!(result.is_err());
}
#[test]
fn http_put_and_get_registry() {
let server = MockServer::start();
let backend = test_backend(&server.addr);
let registry_data = b"{\"entries\":{}}";
backend.put_registry(registry_data).unwrap();
let data = backend.get_registry().unwrap();
assert_eq!(data, registry_data);
}
#[test]
fn http_connection_refused_returns_error() {
let backend = test_backend("http://127.0.0.1:1");
let result = backend.put_blob(BlobKind::Object, "key", b"data");
assert!(result.is_err());
}
#[test]
fn http_multiple_blob_kinds() {
let server = MockServer::start();
let backend = test_backend(&server.addr);
backend
.put_blob(BlobKind::Object, "obj1", b"object-data")
.unwrap();
backend
.put_blob(BlobKind::Layer, "layer1", b"layer-data")
.unwrap();
backend
.put_blob(BlobKind::Metadata, "meta1", b"meta-data")
.unwrap();
assert_eq!(
backend.get_blob(BlobKind::Object, "obj1").unwrap(),
b"object-data"
);
assert_eq!(
backend.get_blob(BlobKind::Layer, "layer1").unwrap(),
b"layer-data"
);
assert_eq!(
backend.get_blob(BlobKind::Metadata, "meta1").unwrap(),
b"meta-data"
);
}
// --- M4: Protocol version header tests ---
#[test]
fn http_requests_include_protocol_header() {
let server = MockServer::start();
let backend = test_backend(&server.addr);
// PUT sends the header
backend.put_blob(BlobKind::Object, "h1", b"data").unwrap();
// GET sends the header
let _ = backend.get_blob(BlobKind::Object, "h1");
// HEAD sends the header
let _ = backend.has_blob(BlobKind::Object, "h1");
// Allow the mock server threads to finish
std::thread::sleep(std::time::Duration::from_millis(50));
let reqs = server.captured_requests();
assert!(
reqs.len() >= 3,
"expected at least 3 requests, got {}",
reqs.len()
);
for req in &reqs {
let proto = req.headers.get("x-karapace-protocol");
assert_eq!(
proto,
Some(&"1".to_owned()),
"{} {} missing X-Karapace-Protocol header",
req.method,
req.path
);
}
}
#[test]
fn http_protocol_version_constant_is_1() {
assert_eq!(crate::PROTOCOL_VERSION, 1);
}
#[test]
fn http_auth_token_sent_as_bearer_header() {
let server = MockServer::start();
let backend = test_backend_with_auth(&server.addr, "secret-token-42");
backend
.put_blob(BlobKind::Object, "auth1", b"data")
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(50));
let reqs = server.captured_requests();
assert!(!reqs.is_empty());
let auth = reqs[0].headers.get("authorization");
assert_eq!(
auth,
Some(&"Bearer secret-token-42".to_owned()),
"PUT must include Authorization: Bearer header"
);
}
#[test]
fn http_no_auth_header_without_token() {
let server = MockServer::start();
let backend = test_backend(&server.addr);
backend
.put_blob(BlobKind::Object, "noauth", b"data")
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(50));
let reqs = server.captured_requests();
assert!(!reqs.is_empty());
assert!(
!reqs[0].headers.contains_key("authorization"),
"no auth token configured — Authorization header must not be sent"
);
}
// --- M7.2: Remote HTTP coverage ---
#[test]
fn http_list_blobs_returns_keys() {
let server = MockServer::start();
let backend = test_backend(&server.addr);
// Populate the mock store with a list response
backend.put_blob(BlobKind::Object, "a", b"data-a").unwrap();
backend.put_blob(BlobKind::Object, "b", b"data-b").unwrap();
backend.put_blob(BlobKind::Object, "c", b"data-c").unwrap();
// Store the list response at the list endpoint
let list_url = format!("{}/objects/", server.addr);
let list_body = serde_json::to_vec(&["a", "b", "c"]).unwrap();
// Manually insert the list response via a PUT to the list path
backend
.do_put(&list_url, "application/json", &list_body)
.unwrap();
let keys = backend.list_blobs(BlobKind::Object).unwrap();
assert_eq!(keys, vec!["a", "b", "c"]);
}
#[test]
fn http_large_blob_roundtrip() {
let server = MockServer::start();
let backend = test_backend(&server.addr);
// Create a 1MB blob
let large_data: Vec<u8> = (0..1_000_000).map(|i| (i % 256) as u8).collect();
backend
.put_blob(BlobKind::Object, "large", &large_data)
.unwrap();
let retrieved = backend.get_blob(BlobKind::Object, "large").unwrap();
assert_eq!(retrieved.len(), large_data.len());
assert_eq!(retrieved, large_data);
}
}

View file

@ -0,0 +1,83 @@
//! Remote store synchronization for sharing Karapace environments.
//!
//! This crate provides push/pull transfer of content-addressable objects and layer
//! manifests to/from a remote HTTP backend, a registry for named environment
//! references, and configuration for remote endpoints with optional authentication.
pub mod config;
pub mod http;
pub mod registry;
pub mod transfer;
pub use config::RemoteConfig;
pub use registry::{parse_ref, Registry, RegistryEntry};
pub use transfer::{pull_env, push_env, resolve_ref, PullResult, PushResult};
/// Protocol version sent as `X-Karapace-Protocol` header on all HTTP requests.
/// Servers can reject clients with incompatible protocol versions.
pub const PROTOCOL_VERSION: u32 = 1;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum RemoteError {
#[error("remote I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("HTTP error: {0}")]
Http(String),
#[error("store error: {0}")]
Store(#[from] karapace_store::StoreError),
#[error("serialization error: {0}")]
Serialization(String),
#[error("not found: {0}")]
NotFound(String),
#[error("remote config error: {0}")]
Config(String),
#[error("integrity failure for '{key}': expected {expected}, got {actual}")]
IntegrityFailure {
key: String,
expected: String,
actual: String,
},
}
/// A content-addressable blob in the remote store.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BlobKind {
Object,
Layer,
Metadata,
}
/// Trait for remote storage backends.
pub trait RemoteBackend: Send + Sync {
/// Upload a blob to the remote store. Returns the key used.
fn put_blob(&self, kind: BlobKind, key: &str, data: &[u8]) -> Result<(), RemoteError>;
/// Download a blob from the remote store.
fn get_blob(&self, kind: BlobKind, key: &str) -> Result<Vec<u8>, RemoteError>;
/// Check if a blob exists in the remote store.
fn has_blob(&self, kind: BlobKind, key: &str) -> Result<bool, RemoteError>;
/// List all blobs of a given kind.
fn list_blobs(&self, kind: BlobKind) -> Result<Vec<String>, RemoteError>;
/// Upload the registry index.
fn put_registry(&self, data: &[u8]) -> Result<(), RemoteError>;
/// Download the registry index.
fn get_registry(&self) -> Result<Vec<u8>, RemoteError>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn blob_kind_debug() {
assert_eq!(format!("{:?}", BlobKind::Object), "Object");
assert_eq!(format!("{:?}", BlobKind::Layer), "Layer");
assert_eq!(format!("{:?}", BlobKind::Metadata), "Metadata");
}
}

View file

@ -0,0 +1,159 @@
use crate::RemoteError;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
/// A single entry in the remote registry, mapping a tag to an env_id.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RegistryEntry {
pub env_id: String,
pub short_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
pub pushed_at: String,
}
/// The registry index: maps `name@tag` keys to environment entries.
/// Example: `"my-env@latest"` → `RegistryEntry { env_id: "abc...", ... }`
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct Registry {
pub entries: BTreeMap<String, RegistryEntry>,
}
impl Registry {
pub fn new() -> Self {
Self::default()
}
pub fn from_bytes(data: &[u8]) -> Result<Self, RemoteError> {
serde_json::from_slice(data)
.map_err(|e| RemoteError::Serialization(format!("invalid registry: {e}")))
}
pub fn to_bytes(&self) -> Result<Vec<u8>, RemoteError> {
serde_json::to_vec_pretty(self).map_err(|e| RemoteError::Serialization(e.to_string()))
}
/// Insert or update an entry. Key format: `name@tag` or just `env_id`.
pub fn publish(&mut self, key: &str, entry: RegistryEntry) {
self.entries.insert(key.to_owned(), entry);
}
/// Look up an entry by key.
pub fn lookup(&self, key: &str) -> Option<&RegistryEntry> {
self.entries.get(key)
}
/// List all keys in the registry.
pub fn list_keys(&self) -> Vec<&str> {
self.entries.keys().map(String::as_str).collect()
}
/// Find entries by env_id.
pub fn find_by_env_id(&self, env_id: &str) -> Vec<(&str, &RegistryEntry)> {
self.entries
.iter()
.filter(|(_, v)| v.env_id == env_id)
.map(|(k, v)| (k.as_str(), v))
.collect()
}
}
/// Parse a reference like `name@tag` into (name, tag).
/// If no `@` is present, the whole string is treated as the name with tag "latest".
pub fn parse_ref(reference: &str) -> (&str, &str) {
match reference.split_once('@') {
Some((name, tag)) => (name, tag),
None => (reference, "latest"),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn registry_roundtrip() {
let mut reg = Registry::new();
reg.publish(
"my-env@latest",
RegistryEntry {
env_id: "abc123".to_owned(),
short_id: "abc123".to_owned(),
name: Some("my-env".to_owned()),
pushed_at: "2025-01-01T00:00:00Z".to_owned(),
},
);
let bytes = reg.to_bytes().unwrap();
let loaded = Registry::from_bytes(&bytes).unwrap();
assert_eq!(loaded, reg);
}
#[test]
fn registry_lookup() {
let mut reg = Registry::new();
reg.publish(
"dev@v1",
RegistryEntry {
env_id: "hash1".to_owned(),
short_id: "hash1".to_owned(),
name: None,
pushed_at: "2025-01-01T00:00:00Z".to_owned(),
},
);
assert!(reg.lookup("dev@v1").is_some());
assert!(reg.lookup("nonexistent").is_none());
}
#[test]
fn parse_ref_with_tag() {
assert_eq!(parse_ref("my-env@v2"), ("my-env", "v2"));
}
#[test]
fn parse_ref_without_tag() {
assert_eq!(parse_ref("my-env"), ("my-env", "latest"));
}
#[test]
fn find_by_env_id_works() {
let mut reg = Registry::new();
reg.publish(
"a@latest",
RegistryEntry {
env_id: "hash1".to_owned(),
short_id: "hash1".to_owned(),
name: None,
pushed_at: "t".to_owned(),
},
);
reg.publish(
"b@latest",
RegistryEntry {
env_id: "hash1".to_owned(),
short_id: "hash1".to_owned(),
name: None,
pushed_at: "t".to_owned(),
},
);
reg.publish(
"c@latest",
RegistryEntry {
env_id: "hash2".to_owned(),
short_id: "hash2".to_owned(),
name: None,
pushed_at: "t".to_owned(),
},
);
let found = reg.find_by_env_id("hash1");
assert_eq!(found.len(), 2);
}
#[test]
fn empty_registry_roundtrip() {
let reg = Registry::new();
let bytes = reg.to_bytes().unwrap();
let loaded = Registry::from_bytes(&bytes).unwrap();
assert!(loaded.entries.is_empty());
}
}

View file

@ -0,0 +1,742 @@
use crate::{BlobKind, Registry, RegistryEntry, RemoteBackend, RemoteError};
use karapace_store::{LayerStore, MetadataStore, ObjectStore, StoreLayout};
/// Result of a push operation.
#[derive(Debug)]
pub struct PushResult {
pub objects_pushed: usize,
pub layers_pushed: usize,
pub objects_skipped: usize,
pub layers_skipped: usize,
}
/// Result of a pull operation.
#[derive(Debug)]
pub struct PullResult {
pub objects_pulled: usize,
pub layers_pulled: usize,
pub objects_skipped: usize,
pub layers_skipped: usize,
}
/// Push an environment (metadata + layers + objects) to a remote store.
/// Optionally publish it under a registry key (e.g. `"my-env@latest"`).
pub fn push_env(
layout: &StoreLayout,
env_id: &str,
backend: &dyn RemoteBackend,
registry_key: Option<&str>,
) -> Result<PushResult, RemoteError> {
let meta_store = MetadataStore::new(layout.clone());
let layer_store = LayerStore::new(layout.clone());
let object_store = ObjectStore::new(layout.clone());
// 1. Read metadata
let meta = meta_store.get(env_id)?;
let meta_json =
serde_json::to_vec_pretty(&meta).map_err(|e| RemoteError::Serialization(e.to_string()))?;
// 2. Collect all layer hashes (base + deps)
let mut layer_hashes = vec![meta.base_layer.clone()];
layer_hashes.extend(meta.dependency_layers.iter().cloned());
// 3. Collect all object hashes from layers + manifest
let mut object_hashes = Vec::new();
if !meta.manifest_hash.is_empty() {
object_hashes.push(meta.manifest_hash.to_string());
}
for lh in &layer_hashes {
let layer = layer_store.get(lh)?;
object_hashes.extend(layer.object_refs.iter().cloned());
}
object_hashes.sort();
object_hashes.dedup();
// 4. Push objects (skip existing)
let mut objects_pushed = 0;
let mut objects_skipped = 0;
for hash in &object_hashes {
if backend.has_blob(BlobKind::Object, hash)? {
objects_skipped += 1;
continue;
}
let data = object_store.get(hash)?;
backend.put_blob(BlobKind::Object, hash, &data)?;
objects_pushed += 1;
}
// 5. Push layers (skip existing)
let mut layers_pushed = 0;
let mut layers_skipped = 0;
for lh in &layer_hashes {
if backend.has_blob(BlobKind::Layer, lh)? {
layers_skipped += 1;
continue;
}
let layer = layer_store.get(lh)?;
let data = serde_json::to_vec_pretty(&layer)
.map_err(|e| RemoteError::Serialization(e.to_string()))?;
backend.put_blob(BlobKind::Layer, lh, &data)?;
layers_pushed += 1;
}
// 6. Push metadata
backend.put_blob(BlobKind::Metadata, env_id, &meta_json)?;
// 7. Update registry if key provided
if let Some(key) = registry_key {
let mut registry = match backend.get_registry() {
Ok(data) => Registry::from_bytes(&data).unwrap_or_default(),
Err(_) => Registry::new(),
};
registry.publish(
key,
RegistryEntry {
env_id: meta.env_id.to_string(),
short_id: meta.short_id.to_string(),
name: meta.name.clone(),
pushed_at: chrono::Utc::now().to_rfc3339(),
},
);
let reg_bytes = registry.to_bytes()?;
backend.put_registry(&reg_bytes)?;
}
Ok(PushResult {
objects_pushed,
layers_pushed,
objects_skipped,
layers_skipped,
})
}
/// Pull an environment from a remote store into the local store.
pub fn pull_env(
layout: &StoreLayout,
env_id: &str,
backend: &dyn RemoteBackend,
) -> Result<PullResult, RemoteError> {
let meta_store = MetadataStore::new(layout.clone());
let layer_store = LayerStore::new(layout.clone());
let object_store = ObjectStore::new(layout.clone());
// 1. Download metadata and verify checksum if present
let meta_bytes = backend.get_blob(BlobKind::Metadata, env_id)?;
let meta: karapace_store::EnvMetadata = serde_json::from_slice(&meta_bytes)
.map_err(|e| RemoteError::Serialization(format!("invalid metadata: {e}")))?;
if let Some(ref expected) = meta.checksum {
let mut copy = meta.clone();
copy.checksum = None;
let json = serde_json::to_string_pretty(&copy)
.map_err(|e| RemoteError::Serialization(e.to_string()))?;
let actual = blake3::hash(json.as_bytes()).to_hex().to_string();
if actual != *expected {
return Err(RemoteError::IntegrityFailure {
key: format!("metadata:{env_id}"),
expected: expected.clone(),
actual,
});
}
}
// 2. Collect layer hashes
let mut layer_hashes = vec![meta.base_layer.clone()];
layer_hashes.extend(meta.dependency_layers.iter().cloned());
// 3. Download layers (skip existing)
let mut layers_pulled = 0;
let mut layers_skipped = 0;
let mut object_hashes = Vec::new();
if !meta.manifest_hash.is_empty() {
object_hashes.push(meta.manifest_hash.to_string());
}
for lh in &layer_hashes {
if layer_store.exists(lh) {
let layer = layer_store.get(lh)?;
object_hashes.extend(layer.object_refs.iter().cloned());
layers_skipped += 1;
continue;
}
let data = backend.get_blob(BlobKind::Layer, lh)?;
let layer: karapace_store::LayerManifest = serde_json::from_slice(&data)
.map_err(|e| RemoteError::Serialization(format!("invalid layer: {e}")))?;
object_hashes.extend(layer.object_refs.iter().cloned());
let stored_hash = layer_store.put(&layer)?;
if stored_hash != **lh {
return Err(RemoteError::IntegrityFailure {
key: lh.to_string(),
expected: lh.to_string(),
actual: stored_hash,
});
}
layers_pulled += 1;
}
object_hashes.sort();
object_hashes.dedup();
// 4. Download objects (skip existing, verify blake3 integrity)
let mut objects_pulled = 0;
let mut objects_skipped = 0;
for hash in &object_hashes {
if object_store.exists(hash) {
objects_skipped += 1;
continue;
}
let data = backend.get_blob(BlobKind::Object, hash)?;
let actual = blake3::hash(&data).to_hex().to_string();
if actual != *hash {
return Err(RemoteError::IntegrityFailure {
key: hash.clone(),
expected: hash.clone(),
actual,
});
}
object_store.put(&data)?;
objects_pulled += 1;
}
// 5. Store metadata locally
meta_store.put(&meta)?;
Ok(PullResult {
objects_pulled,
layers_pulled,
objects_skipped,
layers_skipped,
})
}
/// Resolve a registry reference (e.g. "my-env@latest") to an env_id using the remote registry.
pub fn resolve_ref(backend: &dyn RemoteBackend, reference: &str) -> Result<String, RemoteError> {
let reg_bytes = backend.get_registry()?;
let registry = Registry::from_bytes(&reg_bytes)?;
let (name, tag) = crate::registry::parse_ref(reference);
let key = format!("{name}@{tag}");
let entry = registry
.lookup(&key)
.ok_or_else(|| RemoteError::NotFound(format!("registry key '{key}' not found")))?;
Ok(entry.env_id.clone())
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::sync::Mutex;
/// In-memory mock remote backend for testing.
struct MockRemote {
blobs: Mutex<HashMap<String, Vec<u8>>>,
registry: Mutex<Option<Vec<u8>>>,
}
impl MockRemote {
fn new() -> Self {
Self {
blobs: Mutex::new(HashMap::new()),
registry: Mutex::new(None),
}
}
fn blob_key(kind: BlobKind, key: &str) -> String {
format!("{kind:?}/{key}")
}
}
impl RemoteBackend for MockRemote {
fn put_blob(&self, kind: BlobKind, key: &str, data: &[u8]) -> Result<(), RemoteError> {
self.blobs
.lock()
.unwrap()
.insert(Self::blob_key(kind, key), data.to_vec());
Ok(())
}
fn get_blob(&self, kind: BlobKind, key: &str) -> Result<Vec<u8>, RemoteError> {
self.blobs
.lock()
.unwrap()
.get(&Self::blob_key(kind, key))
.cloned()
.ok_or_else(|| RemoteError::NotFound(key.to_owned()))
}
fn has_blob(&self, kind: BlobKind, key: &str) -> Result<bool, RemoteError> {
Ok(self
.blobs
.lock()
.unwrap()
.contains_key(&Self::blob_key(kind, key)))
}
fn list_blobs(&self, kind: BlobKind) -> Result<Vec<String>, RemoteError> {
let prefix = format!("{kind:?}/");
let blobs = self.blobs.lock().unwrap();
Ok(blobs
.keys()
.filter(|k| k.starts_with(&prefix))
.map(|k| k[prefix.len()..].to_owned())
.collect())
}
fn put_registry(&self, data: &[u8]) -> Result<(), RemoteError> {
*self.registry.lock().unwrap() = Some(data.to_vec());
Ok(())
}
fn get_registry(&self) -> Result<Vec<u8>, RemoteError> {
self.registry
.lock()
.unwrap()
.clone()
.ok_or_else(|| RemoteError::NotFound("registry".to_owned()))
}
}
fn setup_local_env(dir: &std::path::Path) -> (StoreLayout, String) {
let layout = StoreLayout::new(dir);
layout.initialize().unwrap();
let obj_store = ObjectStore::new(layout.clone());
let layer_store = LayerStore::new(layout.clone());
let meta_store = MetadataStore::new(layout.clone());
// Create a test object (layer content)
let obj_hash = obj_store.put(b"test data content").unwrap();
// Create a manifest object (environment manifest)
let manifest_hash = obj_store.put(b"{\"manifest\": \"test\"}").unwrap();
// Create a base layer referencing the object
let layer = karapace_store::LayerManifest {
hash: "layer_hash_001".to_owned(),
kind: karapace_store::LayerKind::Base,
parent: None,
object_refs: vec![obj_hash],
read_only: true,
tar_hash: String::new(),
};
let layer_content_hash = layer_store.put(&layer).unwrap();
// Create environment metadata
let meta = karapace_store::EnvMetadata {
env_id: "env_abc123".into(),
short_id: "env_abc123".into(),
name: Some("test-env".to_owned()),
state: karapace_store::EnvState::Built,
base_layer: layer_content_hash.into(),
dependency_layers: vec![],
policy_layer: None,
manifest_hash: manifest_hash.into(),
ref_count: 1,
created_at: "2025-01-01T00:00:00Z".to_owned(),
updated_at: "2025-01-01T00:00:00Z".to_owned(),
checksum: None,
};
meta_store.put(&meta).unwrap();
(layout, "env_abc123".to_owned())
}
#[test]
fn push_and_pull_roundtrip() {
let src_dir = tempfile::tempdir().unwrap();
let (src_layout, env_id) = setup_local_env(src_dir.path());
let remote = MockRemote::new();
// Push
let push_result = push_env(&src_layout, &env_id, &remote, Some("test-env@latest")).unwrap();
assert_eq!(push_result.objects_pushed, 2); // layer content + manifest
assert_eq!(push_result.layers_pushed, 1);
assert_eq!(push_result.objects_skipped, 0);
// Pull into a fresh store
let dst_dir = tempfile::tempdir().unwrap();
let dst_layout = StoreLayout::new(dst_dir.path());
dst_layout.initialize().unwrap();
let pull_result = pull_env(&dst_layout, &env_id, &remote).unwrap();
assert_eq!(pull_result.objects_pulled, 2); // layer content + manifest
assert_eq!(pull_result.layers_pulled, 1);
// Verify metadata exists in destination
let dst_meta = MetadataStore::new(dst_layout);
let meta = dst_meta.get(&env_id).unwrap();
assert_eq!(meta.name, Some("test-env".to_owned()));
}
#[test]
fn push_skips_existing_blobs() {
let src_dir = tempfile::tempdir().unwrap();
let (src_layout, env_id) = setup_local_env(src_dir.path());
let remote = MockRemote::new();
// Push once
push_env(&src_layout, &env_id, &remote, None).unwrap();
// Push again — should skip everything
let result = push_env(&src_layout, &env_id, &remote, None).unwrap();
assert_eq!(result.objects_skipped, 2); // layer content + manifest
assert_eq!(result.layers_skipped, 1);
assert_eq!(result.objects_pushed, 0);
assert_eq!(result.layers_pushed, 0);
}
#[test]
fn resolve_ref_from_registry() {
let remote = MockRemote::new();
// Manually push a registry
let mut reg = Registry::new();
reg.publish(
"my-env@latest",
RegistryEntry {
env_id: "hash_xyz".to_owned(),
short_id: "hash_xyz".to_owned(),
name: None,
pushed_at: "t".to_owned(),
},
);
remote.put_registry(&reg.to_bytes().unwrap()).unwrap();
let resolved = resolve_ref(&remote, "my-env@latest").unwrap();
assert_eq!(resolved, "hash_xyz");
// Without @tag → defaults to @latest
let resolved2 = resolve_ref(&remote, "my-env").unwrap();
assert_eq!(resolved2, "hash_xyz");
}
#[test]
fn pull_nonexistent_env_fails() {
let remote = MockRemote::new();
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
let result = pull_env(&layout, "nonexistent_env", &remote);
assert!(result.is_err());
}
#[test]
fn resolve_ref_not_found_fails() {
let remote = MockRemote::new();
let mut reg = Registry::new();
reg.publish(
"other@latest",
RegistryEntry {
env_id: "xyz".to_owned(),
short_id: "xyz".to_owned(),
name: None,
pushed_at: "t".to_owned(),
},
);
remote.put_registry(&reg.to_bytes().unwrap()).unwrap();
let result = resolve_ref(&remote, "missing-env@latest");
assert!(result.is_err());
}
#[test]
fn pull_skips_existing_objects() {
let src_dir = tempfile::tempdir().unwrap();
let (src_layout, env_id) = setup_local_env(src_dir.path());
let remote = MockRemote::new();
push_env(&src_layout, &env_id, &remote, None).unwrap();
// Pull into destination that already has the objects
let dst_dir = tempfile::tempdir().unwrap();
let dst_layout = StoreLayout::new(dst_dir.path());
dst_layout.initialize().unwrap();
// First pull
pull_env(&dst_layout, &env_id, &remote).unwrap();
// Second pull — should skip existing
let result = pull_env(&dst_layout, &env_id, &remote).unwrap();
assert_eq!(result.objects_skipped, 2); // layer content + manifest
assert_eq!(result.layers_skipped, 1);
assert_eq!(result.objects_pulled, 0);
assert_eq!(result.layers_pulled, 0);
}
#[test]
fn push_result_fields_correct() {
let src_dir = tempfile::tempdir().unwrap();
let (src_layout, env_id) = setup_local_env(src_dir.path());
let remote = MockRemote::new();
let result = push_env(&src_layout, &env_id, &remote, None).unwrap();
assert!(result.objects_pushed > 0 || result.objects_skipped > 0);
assert!(result.layers_pushed > 0 || result.layers_skipped > 0);
}
#[test]
fn pull_transfers_manifest_object() {
let src_dir = tempfile::tempdir().unwrap();
let (src_layout, env_id) = setup_local_env(src_dir.path());
let remote = MockRemote::new();
push_env(&src_layout, &env_id, &remote, None).unwrap();
// Pull into a fresh store
let dst_dir = tempfile::tempdir().unwrap();
let dst_layout = StoreLayout::new(dst_dir.path());
dst_layout.initialize().unwrap();
pull_env(&dst_layout, &env_id, &remote).unwrap();
// Verify the manifest object is accessible in the destination store
let dst_meta = MetadataStore::new(dst_layout.clone());
let meta = dst_meta.get(&env_id).unwrap();
let dst_obj = ObjectStore::new(dst_layout);
let manifest_data = dst_obj.get(&meta.manifest_hash);
assert!(
manifest_data.is_ok(),
"manifest object must be available after pull: {:?}",
manifest_data.err()
);
}
#[test]
fn pull_detects_tampered_metadata_checksum() {
let src_dir = tempfile::tempdir().unwrap();
let (src_layout, env_id) = setup_local_env(src_dir.path());
let remote = MockRemote::new();
// Push to populate the remote
push_env(&src_layout, &env_id, &remote, None).unwrap();
// Tamper with the metadata blob on the remote: change the name field
// but leave the checksum intact (so it mismatches)
let meta_bytes = remote.get_blob(BlobKind::Metadata, &env_id).unwrap();
let mut meta: serde_json::Value = serde_json::from_slice(&meta_bytes).unwrap();
meta["name"] = serde_json::Value::String("tampered".into());
let tampered = serde_json::to_string_pretty(&meta).unwrap();
remote
.put_blob(BlobKind::Metadata, &env_id, tampered.as_bytes())
.unwrap();
// Pull into a fresh store — should fail with integrity error
let dst_dir = tempfile::tempdir().unwrap();
let dst_layout = StoreLayout::new(dst_dir.path());
dst_layout.initialize().unwrap();
let result = pull_env(&dst_layout, &env_id, &remote);
assert!(
result.is_err(),
"pull must fail when metadata checksum is tampered"
);
}
#[test]
fn push_with_tag_publishes_registry() {
let src_dir = tempfile::tempdir().unwrap();
let (src_layout, env_id) = setup_local_env(src_dir.path());
let remote = MockRemote::new();
push_env(&src_layout, &env_id, &remote, Some("my-app@v1")).unwrap();
// Verify registry was published
let reg_bytes = remote.get_registry().unwrap();
let reg = Registry::from_bytes(&reg_bytes).unwrap();
let entry = reg.lookup("my-app@v1").unwrap();
assert_eq!(entry.env_id, env_id);
}
// --- §7: Network failure simulation ---
/// Mock remote that fails on the Nth put_blob call.
struct FailOnPutRemote {
inner: MockRemote,
call_count: Mutex<usize>,
fail_on: usize,
}
impl FailOnPutRemote {
fn new(fail_on: usize) -> Self {
Self {
inner: MockRemote::new(),
call_count: Mutex::new(0),
fail_on,
}
}
}
impl RemoteBackend for FailOnPutRemote {
fn put_blob(&self, kind: BlobKind, key: &str, data: &[u8]) -> Result<(), RemoteError> {
let mut count = self.call_count.lock().unwrap();
*count += 1;
if *count >= self.fail_on {
return Err(RemoteError::Http("simulated network failure".to_owned()));
}
drop(count);
self.inner.put_blob(kind, key, data)
}
fn get_blob(&self, kind: BlobKind, key: &str) -> Result<Vec<u8>, RemoteError> {
self.inner.get_blob(kind, key)
}
fn has_blob(&self, kind: BlobKind, key: &str) -> Result<bool, RemoteError> {
self.inner.has_blob(kind, key)
}
fn list_blobs(&self, kind: BlobKind) -> Result<Vec<String>, RemoteError> {
self.inner.list_blobs(kind)
}
fn put_registry(&self, data: &[u8]) -> Result<(), RemoteError> {
self.inner.put_registry(data)
}
fn get_registry(&self) -> Result<Vec<u8>, RemoteError> {
self.inner.get_registry()
}
}
/// Mock remote that returns garbage on get_blob.
struct CorruptGetRemote {
inner: MockRemote,
}
impl CorruptGetRemote {
fn new() -> Self {
Self {
inner: MockRemote::new(),
}
}
}
impl RemoteBackend for CorruptGetRemote {
fn put_blob(&self, kind: BlobKind, key: &str, data: &[u8]) -> Result<(), RemoteError> {
self.inner.put_blob(kind, key, data)
}
fn get_blob(&self, kind: BlobKind, key: &str) -> Result<Vec<u8>, RemoteError> {
// Return corrupted data for objects (not metadata/layers which are JSON)
if matches!(kind, BlobKind::Object) {
let real = self.inner.get_blob(kind, key)?;
let mut corrupted = real;
if !corrupted.is_empty() {
corrupted[0] ^= 0xFF;
}
Ok(corrupted)
} else {
self.inner.get_blob(kind, key)
}
}
fn has_blob(&self, kind: BlobKind, key: &str) -> Result<bool, RemoteError> {
self.inner.has_blob(kind, key)
}
fn list_blobs(&self, kind: BlobKind) -> Result<Vec<String>, RemoteError> {
self.inner.list_blobs(kind)
}
fn put_registry(&self, data: &[u8]) -> Result<(), RemoteError> {
self.inner.put_registry(data)
}
fn get_registry(&self) -> Result<Vec<u8>, RemoteError> {
self.inner.get_registry()
}
}
#[test]
fn push_fails_on_network_error() {
let src_dir = tempfile::tempdir().unwrap();
let (src_layout, env_id) = setup_local_env(src_dir.path());
// Fail on the very first put_blob call
let remote = FailOnPutRemote::new(1);
let result = push_env(&src_layout, &env_id, &remote, None);
assert!(
result.is_err(),
"push must fail when network error occurs during upload"
);
}
#[test]
fn pull_detects_corrupted_remote_object() {
let src_dir = tempfile::tempdir().unwrap();
let (src_layout, env_id) = setup_local_env(src_dir.path());
let corrupt_remote = CorruptGetRemote::new();
// Push via the inner (uncorrupted) remote first
push_env(&src_layout, &env_id, &corrupt_remote.inner, None).unwrap();
// Pull via the corrupting remote — objects will have flipped bytes
let dst_dir = tempfile::tempdir().unwrap();
let dst_layout = StoreLayout::new(dst_dir.path());
dst_layout.initialize().unwrap();
let result = pull_env(&dst_layout, &env_id, &corrupt_remote);
assert!(
result.is_err(),
"pull must fail when remote returns corrupted object data"
);
}
#[test]
fn large_object_push_pull_roundtrip() {
let src_dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(src_dir.path());
layout.initialize().unwrap();
let obj_store = ObjectStore::new(layout.clone());
let layer_store = LayerStore::new(layout.clone());
let meta_store = MetadataStore::new(layout.clone());
// Create a 1MB object (simulating a large layer tar)
let large_data: Vec<u8> = (0..1_048_576u32).map(|i| (i % 256) as u8).collect();
let obj_hash = obj_store.put(&large_data).unwrap();
let manifest_hash = obj_store.put(b"{\"manifest\": \"large\"}").unwrap();
let layer = karapace_store::LayerManifest {
hash: "large_layer".to_owned(),
kind: karapace_store::LayerKind::Base,
parent: None,
object_refs: vec![obj_hash],
read_only: true,
tar_hash: String::new(),
};
let layer_hash = layer_store.put(&layer).unwrap();
let meta = karapace_store::EnvMetadata {
env_id: "large_env".into(),
short_id: "large_env".into(),
name: None,
state: karapace_store::EnvState::Built,
base_layer: layer_hash.into(),
dependency_layers: vec![],
policy_layer: None,
manifest_hash: manifest_hash.into(),
ref_count: 1,
created_at: "2025-01-01T00:00:00Z".to_owned(),
updated_at: "2025-01-01T00:00:00Z".to_owned(),
checksum: None,
};
meta_store.put(&meta).unwrap();
let remote = MockRemote::new();
push_env(&layout, "large_env", &remote, None).unwrap();
// Pull into fresh store
let dst_dir = tempfile::tempdir().unwrap();
let dst_layout = StoreLayout::new(dst_dir.path());
dst_layout.initialize().unwrap();
let result = pull_env(&dst_layout, "large_env", &remote).unwrap();
assert_eq!(result.objects_pulled, 2);
// Verify the large object survived the roundtrip
let dst_obj = ObjectStore::new(dst_layout);
let pulled = dst_obj.get(&meta.manifest_hash).unwrap();
assert_eq!(pulled, b"{\"manifest\": \"large\"}");
}
}