feat: karapace-store — content-addressable object store, layers, metadata, WAL

- ObjectStore: blake3-addressed objects, atomic writes (NamedTempFile + persist)
- Integrity verification on every read (hash comparison without String allocation)
- LayerStore: layer manifests with Base/Dependency/Policy/Snapshot kinds
- MetadataStore: environment state machine, naming, ref-counting, blake3 checksum
- GarbageCollector: signal-cancellable orphan cleanup, protects live references
- WriteAheadLog: crash recovery with typed rollback steps (RemoveDir/RemoveFile/ResetState)
- StoreLayout: #[inline] path accessors, store format v2 versioning
- Store migration: v1→v2 with atomic version file rewrite
- Deterministic tar packing/unpacking (sorted entries, zero timestamps, uid/gid 0)
- fsync_dir() for POSIX-portable rename durability
This commit is contained in:
Marco Allegretti 2026-02-22 18:36:31 +01:00
parent cdd13755a0
commit 4de311ebc7
12 changed files with 4839 additions and 0 deletions

View file

@ -0,0 +1,22 @@
[package]
name = "karapace-store"
description = "Content-addressable store, metadata, layers, GC, and integrity for Karapace"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
[lints]
workspace = true
[dependencies]
blake3.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tempfile.workspace = true
fs2.workspace = true
chrono.workspace = true
tar.workspace = true
tracing.workspace = true
karapace-schema = { path = "../karapace-schema" }

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,293 @@
use crate::layers::LayerStore;
use crate::layout::StoreLayout;
use crate::metadata::{EnvState, MetadataStore};
use crate::objects::ObjectStore;
use crate::StoreError;
use std::collections::HashSet;
use std::fs;
pub struct GarbageCollector {
layout: StoreLayout,
}
#[derive(Debug, Default)]
pub struct GcReport {
pub orphaned_envs: Vec<String>,
pub orphaned_layers: Vec<String>,
pub orphaned_objects: Vec<String>,
pub removed_envs: usize,
pub removed_layers: usize,
pub removed_objects: usize,
}
impl GarbageCollector {
pub fn new(layout: StoreLayout) -> Self {
Self { layout }
}
pub fn collect(&self, dry_run: bool) -> Result<GcReport, StoreError> {
self.collect_with_cancel(dry_run, || false)
}
pub fn collect_with_cancel(
&self,
dry_run: bool,
should_stop: impl Fn() -> bool,
) -> Result<GcReport, StoreError> {
let meta_store = MetadataStore::new(self.layout.clone());
let layer_store = LayerStore::new(self.layout.clone());
let object_store = ObjectStore::new(self.layout.clone());
let mut report = GcReport::default();
let all_meta = meta_store.list()?;
let mut live_layers: HashSet<String> = HashSet::new();
// Objects directly referenced by live environments (manifest hashes)
let mut live_objects: HashSet<String> = HashSet::new();
for meta in &all_meta {
if meta.ref_count == 0
&& meta.state != EnvState::Running
&& meta.state != EnvState::Archived
{
report.orphaned_envs.push(meta.env_id.to_string());
} else {
live_layers.insert(meta.base_layer.to_string());
for dep in &meta.dependency_layers {
live_layers.insert(dep.to_string());
}
if let Some(ref policy) = meta.policy_layer {
live_layers.insert(policy.to_string());
}
// Manifest object is directly referenced by metadata
if !meta.manifest_hash.is_empty() {
live_objects.insert(meta.manifest_hash.to_string());
}
}
}
let all_layers = layer_store.list()?;
// Preserve snapshot layers whose parent is a live layer.
// Without this, snapshots created by commit() would be GC'd as orphans.
for layer_hash in &all_layers {
if !live_layers.contains(layer_hash) {
if let Ok(layer) = layer_store.get(layer_hash) {
if layer.kind == crate::layers::LayerKind::Snapshot {
if let Some(ref parent) = layer.parent {
if live_layers.contains(parent) {
live_layers.insert(layer_hash.clone());
}
}
}
}
}
}
for layer_hash in &all_layers {
if live_layers.contains(layer_hash) {
if let Ok(layer) = layer_store.get(layer_hash) {
for obj_ref in &layer.object_refs {
live_objects.insert(obj_ref.clone());
}
}
} else {
report.orphaned_layers.push(layer_hash.clone());
}
}
let all_objects = object_store.list()?;
for obj_hash in &all_objects {
if !live_objects.contains(obj_hash) {
report.orphaned_objects.push(obj_hash.clone());
}
}
if !dry_run {
for env_id in &report.orphaned_envs {
if should_stop() {
break;
}
let env_path = self.layout.env_path(env_id);
if env_path.exists() {
fs::remove_dir_all(&env_path)?;
}
meta_store.remove(env_id)?;
report.removed_envs += 1;
}
for layer_hash in &report.orphaned_layers {
if should_stop() {
break;
}
layer_store.remove(layer_hash)?;
report.removed_layers += 1;
}
for obj_hash in &report.orphaned_objects {
if should_stop() {
break;
}
object_store.remove(obj_hash)?;
report.removed_objects += 1;
}
}
Ok(report)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::metadata::EnvMetadata;
fn setup() -> (tempfile::TempDir, StoreLayout) {
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
(dir, layout)
}
#[test]
fn gc_removes_zero_refcount_envs() {
let (_dir, layout) = setup();
let meta_store = MetadataStore::new(layout.clone());
let meta = EnvMetadata {
env_id: "orphan1".into(),
short_id: "orphan1".into(),
name: None,
state: EnvState::Built,
manifest_hash: "mhash".into(),
base_layer: "base1".into(),
dependency_layers: vec![],
policy_layer: None,
created_at: "2025-01-01T00:00:00Z".to_owned(),
updated_at: "2025-01-01T00:00:00Z".to_owned(),
ref_count: 0,
checksum: None,
};
meta_store.put(&meta).unwrap();
let gc = GarbageCollector::new(layout);
let report = gc.collect(false).unwrap();
assert_eq!(report.removed_envs, 1);
}
#[test]
fn gc_dry_run_does_not_remove() {
let (_dir, layout) = setup();
let meta_store = MetadataStore::new(layout.clone());
let meta = EnvMetadata {
env_id: "orphan2".into(),
short_id: "orphan2".into(),
name: None,
state: EnvState::Defined,
manifest_hash: "mhash".into(),
base_layer: "base1".into(),
dependency_layers: vec![],
policy_layer: None,
created_at: "2025-01-01T00:00:00Z".to_owned(),
updated_at: "2025-01-01T00:00:00Z".to_owned(),
ref_count: 0,
checksum: None,
};
meta_store.put(&meta).unwrap();
let gc = GarbageCollector::new(layout.clone());
let report = gc.collect(true).unwrap();
assert_eq!(report.orphaned_envs.len(), 1);
assert_eq!(report.removed_envs, 0);
assert!(meta_store.exists("orphan2"));
}
#[test]
fn gc_preserves_manifest_objects() {
let (_dir, layout) = setup();
let meta_store = MetadataStore::new(layout.clone());
let object_store = ObjectStore::new(layout.clone());
// Create a manifest object
let manifest_hash = object_store.put(b"manifest-content").unwrap();
// Create a live environment referencing the manifest
let meta = EnvMetadata {
env_id: "live1".into(),
short_id: "live1".into(),
name: None,
state: EnvState::Built,
manifest_hash: manifest_hash.clone().into(),
base_layer: "".into(),
dependency_layers: vec![],
policy_layer: None,
created_at: "2025-01-01T00:00:00Z".to_owned(),
updated_at: "2025-01-01T00:00:00Z".to_owned(),
ref_count: 1,
checksum: None,
};
meta_store.put(&meta).unwrap();
let gc = GarbageCollector::new(layout.clone());
let report = gc.collect(false).unwrap();
// Manifest object must NOT be collected
assert!(object_store.exists(&manifest_hash));
assert!(!report.orphaned_objects.contains(&manifest_hash));
}
#[test]
fn gc_preserves_archived_envs() {
let (_dir, layout) = setup();
let meta_store = MetadataStore::new(layout.clone());
let meta = EnvMetadata {
env_id: "archived1".into(),
short_id: "archived1".into(),
name: None,
state: EnvState::Archived,
manifest_hash: "mhash".into(),
base_layer: "base1".into(),
dependency_layers: vec![],
policy_layer: None,
created_at: "2025-01-01T00:00:00Z".to_owned(),
updated_at: "2025-01-01T00:00:00Z".to_owned(),
ref_count: 0,
checksum: None,
};
meta_store.put(&meta).unwrap();
let gc = GarbageCollector::new(layout);
let report = gc.collect(false).unwrap();
assert_eq!(report.removed_envs, 0, "archived envs must be preserved");
assert!(report.orphaned_envs.is_empty());
}
#[test]
fn gc_preserves_running_envs() {
let (_dir, layout) = setup();
let meta_store = MetadataStore::new(layout.clone());
let meta = EnvMetadata {
env_id: "running1".into(),
short_id: "running1".into(),
name: None,
state: EnvState::Running,
manifest_hash: "mhash".into(),
base_layer: "base1".into(),
dependency_layers: vec![],
policy_layer: None,
created_at: "2025-01-01T00:00:00Z".to_owned(),
updated_at: "2025-01-01T00:00:00Z".to_owned(),
ref_count: 0,
checksum: None,
};
meta_store.put(&meta).unwrap();
let gc = GarbageCollector::new(layout);
let report = gc.collect(false).unwrap();
assert_eq!(report.removed_envs, 0);
}
}

View file

@ -0,0 +1,224 @@
use crate::layers::LayerStore;
use crate::layout::StoreLayout;
use crate::metadata::MetadataStore;
use crate::objects::ObjectStore;
use crate::StoreError;
#[derive(Debug, Default)]
pub struct IntegrityReport {
pub checked: usize,
pub passed: usize,
pub failed: Vec<IntegrityFailure>,
pub layers_checked: usize,
pub layers_passed: usize,
pub metadata_checked: usize,
pub metadata_passed: usize,
}
#[derive(Debug)]
pub struct IntegrityFailure {
pub hash: String,
pub reason: String,
}
pub fn verify_store_integrity(layout: &StoreLayout) -> Result<IntegrityReport, StoreError> {
let object_store = ObjectStore::new(layout.clone());
let layer_store = LayerStore::new(layout.clone());
let meta_store = MetadataStore::new(layout.clone());
let all_objects = object_store.list()?;
let all_layers = layer_store.list()?;
let all_meta = meta_store.list()?;
let mut report = IntegrityReport {
checked: all_objects.len(),
layers_checked: all_layers.len(),
metadata_checked: all_meta.len(),
..Default::default()
};
// Verify objects (blake3 content-addressed)
for hash in &all_objects {
match object_store.get(hash) {
Ok(_) => report.passed += 1,
Err(StoreError::IntegrityFailure { actual, .. }) => {
report.failed.push(IntegrityFailure {
hash: hash.clone(),
reason: format!("object hash mismatch: got {actual}"),
});
}
Err(e) => {
report.failed.push(IntegrityFailure {
hash: hash.clone(),
reason: format!("object read error: {e}"),
});
}
}
}
// Verify layers (blake3 content-addressed)
for hash in &all_layers {
match layer_store.get(hash) {
Ok(_) => report.layers_passed += 1,
Err(StoreError::IntegrityFailure { actual, .. }) => {
report.failed.push(IntegrityFailure {
hash: hash.clone(),
reason: format!("layer hash mismatch: got {actual}"),
});
}
Err(e) => {
report.failed.push(IntegrityFailure {
hash: hash.clone(),
reason: format!("layer read error: {e}"),
});
}
}
}
// Verify metadata (embedded checksum)
for meta in &all_meta {
match meta_store.get(&meta.env_id) {
Ok(_) => report.metadata_passed += 1,
Err(StoreError::IntegrityFailure { actual, .. }) => {
report.failed.push(IntegrityFailure {
hash: meta.env_id.to_string(),
reason: format!("metadata checksum mismatch: got {actual}"),
});
}
Err(e) => {
report.failed.push(IntegrityFailure {
hash: meta.env_id.to_string(),
reason: format!("metadata read error: {e}"),
});
}
}
}
Ok(report)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn clean_store_passes_integrity() {
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
let obj_store = ObjectStore::new(layout.clone());
obj_store.put(b"data1").unwrap();
obj_store.put(b"data2").unwrap();
let report = verify_store_integrity(&layout).unwrap();
assert_eq!(report.checked, 2);
assert_eq!(report.passed, 2);
assert!(report.failed.is_empty());
}
#[test]
fn corrupted_object_detected() {
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
let obj_store = ObjectStore::new(layout.clone());
let hash = obj_store.put(b"original").unwrap();
std::fs::write(layout.objects_dir().join(&hash), b"corrupted").unwrap();
let report = verify_store_integrity(&layout).unwrap();
assert_eq!(report.failed.len(), 1);
assert_eq!(report.failed[0].hash, hash);
}
#[test]
fn verify_store_checks_layers() {
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
let layer_store = LayerStore::new(layout.clone());
let layer = crate::LayerManifest {
hash: "test".to_owned(),
kind: crate::LayerKind::Base,
parent: None,
object_refs: vec![],
read_only: true,
tar_hash: String::new(),
};
layer_store.put(&layer).unwrap();
let report = verify_store_integrity(&layout).unwrap();
assert_eq!(report.layers_checked, 1);
assert_eq!(report.layers_passed, 1);
}
#[test]
fn verify_store_detects_corrupt_layer() {
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
let layer_store = LayerStore::new(layout.clone());
let layer = crate::LayerManifest {
hash: "test".to_owned(),
kind: crate::LayerKind::Base,
parent: None,
object_refs: vec![],
read_only: true,
tar_hash: String::new(),
};
let hash = layer_store.put(&layer).unwrap();
// Corrupt the layer file
std::fs::write(layout.layers_dir().join(&hash), b"corrupted").unwrap();
let report = verify_store_integrity(&layout).unwrap();
assert_eq!(report.layers_checked, 1);
assert_eq!(report.layers_passed, 0);
assert!(!report.failed.is_empty());
}
#[test]
fn verify_store_checks_metadata() {
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
let meta_store = MetadataStore::new(layout.clone());
let meta = crate::EnvMetadata {
env_id: "test_env".into(),
short_id: "test_env".into(),
name: None,
state: crate::EnvState::Built,
manifest_hash: "mhash".into(),
base_layer: "base".into(),
dependency_layers: vec![],
policy_layer: None,
created_at: "2025-01-01T00:00:00Z".to_owned(),
updated_at: "2025-01-01T00:00:00Z".to_owned(),
ref_count: 1,
checksum: None,
};
meta_store.put(&meta).unwrap();
let report = verify_store_integrity(&layout).unwrap();
assert_eq!(report.metadata_checked, 1);
assert_eq!(report.metadata_passed, 1);
}
#[test]
fn empty_store_passes() {
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
let report = verify_store_integrity(&layout).unwrap();
assert_eq!(report.checked, 0);
assert_eq!(report.layers_checked, 0);
assert_eq!(report.metadata_checked, 0);
assert!(report.failed.is_empty());
}
}

View file

@ -0,0 +1,576 @@
use crate::layout::StoreLayout;
use crate::{fsync_dir, StoreError};
use serde::{Deserialize, Serialize};
use std::fs;
use std::io::Write;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use tempfile::NamedTempFile;
use tracing::warn;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum LayerKind {
Base,
Dependency,
Policy,
Snapshot,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct LayerManifest {
pub hash: String,
pub kind: LayerKind,
pub parent: Option<String>,
pub object_refs: Vec<String>,
pub read_only: bool,
/// blake3 hash of the tar archive containing this layer's filesystem content.
/// Empty for legacy (v1) synthetic layers.
#[serde(default)]
pub tar_hash: String,
}
pub struct LayerStore {
layout: StoreLayout,
}
impl LayerStore {
pub fn new(layout: StoreLayout) -> Self {
Self { layout }
}
/// Compute the content hash that `put()` would use for this manifest,
/// without writing anything to disk.
pub fn compute_hash(manifest: &LayerManifest) -> Result<String, StoreError> {
let content = serde_json::to_string_pretty(manifest)?;
Ok(blake3::hash(content.as_bytes()).to_hex().to_string())
}
/// Store a layer manifest. Returns the content hash (blake3 of serialized JSON),
/// which is used as the filename. Idempotent — existing layers are skipped.
pub fn put(&self, manifest: &LayerManifest) -> Result<String, StoreError> {
let content = serde_json::to_string_pretty(manifest)?;
let hash = blake3::hash(content.as_bytes()).to_hex().to_string();
let dest = self.layout.layers_dir().join(&hash);
if dest.exists() {
return Ok(hash);
}
let dir = self.layout.layers_dir();
let mut tmp = NamedTempFile::new_in(&dir)?;
tmp.write_all(content.as_bytes())?;
tmp.as_file().sync_all()?;
tmp.persist(&dest).map_err(|e| StoreError::Io(e.error))?;
fsync_dir(&dir)?;
Ok(hash)
}
pub fn get(&self, hash: &str) -> Result<LayerManifest, StoreError> {
let path = self.layout.layers_dir().join(hash);
if !path.exists() {
return Err(StoreError::LayerNotFound(hash.to_owned()));
}
let content = fs::read_to_string(&path)?;
// Verify integrity: content hash must match filename
let actual = blake3::hash(content.as_bytes());
let actual_hex = actual.to_hex();
if actual_hex.as_str() != hash {
return Err(StoreError::IntegrityFailure {
hash: hash.to_owned(),
expected: hash.to_owned(),
actual: actual_hex.to_string(),
});
}
let manifest: LayerManifest = serde_json::from_str(&content)?;
Ok(manifest)
}
pub fn exists(&self, hash: &str) -> bool {
self.layout.layers_dir().join(hash).exists()
}
pub fn remove(&self, hash: &str) -> Result<(), StoreError> {
let path = self.layout.layers_dir().join(hash);
if path.exists() {
fs::remove_file(path)?;
}
Ok(())
}
pub fn list(&self) -> Result<Vec<String>, StoreError> {
let dir = self.layout.layers_dir();
if !dir.exists() {
return Ok(Vec::new());
}
let mut hashes = Vec::new();
for entry in fs::read_dir(dir)? {
let entry = entry?;
if let Some(name) = entry.file_name().to_str() {
if !name.starts_with('.') {
hashes.push(name.to_owned());
}
}
}
hashes.sort();
Ok(hashes)
}
}
/// Create a deterministic tar archive from a directory.
///
/// Phase 1 supports regular files, directories, and symlinks.
/// Device nodes, sockets, FIFOs, and extended attributes are skipped with warnings.
///
/// Determinism guarantees:
/// - Entries sorted lexicographically by relative path
/// - All timestamps set to 0 (Unix epoch)
/// - All ownership set to 0:0 (root:root)
/// - Permissions preserved as-is from source
pub fn pack_layer(source_dir: &Path) -> Result<Vec<u8>, StoreError> {
let mut entries = collect_entries(source_dir, source_dir)?;
entries.sort_by(|a, b| a.0.cmp(&b.0));
let mut ar = tar::Builder::new(Vec::new());
ar.follow_symlinks(false);
for (rel_path, full_path) in &entries {
let ft = match full_path.symlink_metadata() {
Ok(m) => m.file_type(),
Err(e) => {
warn!("skipping {}: metadata error: {e}", rel_path);
continue;
}
};
if ft.is_file() {
append_file(&mut ar, rel_path, full_path)?;
} else if ft.is_dir() {
append_dir(&mut ar, rel_path, full_path)?;
} else if ft.is_symlink() {
append_symlink(&mut ar, rel_path, full_path)?;
} else {
warn!("skipping unsupported file type: {rel_path}");
}
}
let data = ar.into_inner()?;
Ok(data)
}
/// Extract a tar archive to a target directory.
pub fn unpack_layer(tar_data: &[u8], target_dir: &Path) -> Result<(), StoreError> {
fs::create_dir_all(target_dir)?;
let mut ar = tar::Archive::new(tar_data);
ar.set_preserve_permissions(true);
ar.set_preserve_mtime(false);
ar.set_unpack_xattrs(false);
ar.unpack(target_dir)?;
Ok(())
}
/// Recursively collect (relative_path, full_path) pairs from a directory tree.
fn collect_entries(
root: &Path,
current: &Path,
) -> Result<Vec<(String, std::path::PathBuf)>, StoreError> {
let mut result = Vec::new();
if !current.exists() {
return Ok(result);
}
for entry in fs::read_dir(current)? {
let entry = entry?;
let full = entry.path();
let rel = full
.strip_prefix(root)
.map_err(|e| StoreError::Io(std::io::Error::other(format!("path strip: {e}"))))?
.to_string_lossy()
.to_string();
let meta = full.symlink_metadata()?;
if meta.is_dir() {
result.push((rel.clone(), full.clone()));
result.extend(collect_entries(root, &full)?);
} else {
result.push((rel, full));
}
}
Ok(result)
}
fn make_header(full_path: &Path, entry_type: tar::EntryType) -> Result<tar::Header, StoreError> {
let meta = full_path.symlink_metadata()?;
let mut header = tar::Header::new_gnu();
header.set_entry_type(entry_type);
header.set_mtime(0);
header.set_uid(0);
header.set_gid(0);
header.set_mode(meta.permissions().mode());
Ok(header)
}
fn append_file(
ar: &mut tar::Builder<Vec<u8>>,
rel_path: &str,
full_path: &Path,
) -> Result<(), StoreError> {
let data = fs::read(full_path)?;
let mut header = make_header(full_path, tar::EntryType::Regular)?;
header.set_size(data.len() as u64);
header.set_cksum();
ar.append_data(&mut header, rel_path, data.as_slice())?;
Ok(())
}
fn append_dir(
ar: &mut tar::Builder<Vec<u8>>,
rel_path: &str,
full_path: &Path,
) -> Result<(), StoreError> {
let mut header = make_header(full_path, tar::EntryType::Directory)?;
header.set_size(0);
header.set_cksum();
let path = if rel_path.ends_with('/') {
rel_path.to_owned()
} else {
format!("{rel_path}/")
};
ar.append_data(&mut header, &path, &[] as &[u8])?;
Ok(())
}
fn append_symlink(
ar: &mut tar::Builder<Vec<u8>>,
rel_path: &str,
full_path: &Path,
) -> Result<(), StoreError> {
let target = fs::read_link(full_path)?;
let mut header = make_header(full_path, tar::EntryType::Symlink)?;
header.set_size(0);
header.set_cksum();
ar.append_link(&mut header, rel_path, &target)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn test_layer_store() -> (tempfile::TempDir, LayerStore) {
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
(dir, LayerStore::new(layout))
}
fn sample_layer() -> LayerManifest {
LayerManifest {
hash: "abc123def456".to_owned(),
kind: LayerKind::Base,
parent: None,
object_refs: vec!["obj1".to_owned(), "obj2".to_owned()],
read_only: true,
tar_hash: String::new(),
}
}
#[test]
fn put_and_get_roundtrip() {
let (_dir, store) = test_layer_store();
let layer = sample_layer();
let content_hash = store.put(&layer).unwrap();
let retrieved = store.get(&content_hash).unwrap();
assert_eq!(layer, retrieved);
}
#[test]
fn put_is_idempotent() {
let (_dir, store) = test_layer_store();
let layer = sample_layer();
store.put(&layer).unwrap();
store.put(&layer).unwrap();
}
#[test]
fn get_nonexistent_fails() {
let (_dir, store) = test_layer_store();
assert!(store.get("nonexistent").is_err());
}
#[test]
fn list_layers() {
let (_dir, store) = test_layer_store();
let content_hash = store.put(&sample_layer()).unwrap();
let list = store.list().unwrap();
assert_eq!(list.len(), 1);
assert_eq!(list[0], content_hash);
}
#[test]
fn deserialize_without_tar_hash_defaults_empty() {
let json = r#"{
"hash": "h1",
"kind": "Base",
"parent": null,
"object_refs": [],
"read_only": true
}"#;
let m: LayerManifest = serde_json::from_str(json).unwrap();
assert!(m.tar_hash.is_empty());
}
// --- pack/unpack tests ---
fn create_fixture_dir(dir: &Path) {
// Regular files
fs::write(dir.join("hello.txt"), "hello world").unwrap();
fs::write(dir.join("binary.bin"), [0u8, 1, 2, 255]).unwrap();
// Subdirectory with files
fs::create_dir_all(dir.join("subdir")).unwrap();
fs::write(dir.join("subdir").join("nested.txt"), "nested content").unwrap();
// Empty directory
fs::create_dir_all(dir.join("empty_dir")).unwrap();
// Symlink
std::os::unix::fs::symlink("hello.txt", dir.join("link_to_hello")).unwrap();
}
#[test]
fn pack_unpack_roundtrip() {
let src = tempfile::tempdir().unwrap();
create_fixture_dir(src.path());
let tar_data = pack_layer(src.path()).unwrap();
assert!(!tar_data.is_empty());
let dst = tempfile::tempdir().unwrap();
unpack_layer(&tar_data, dst.path()).unwrap();
// Verify regular files
assert_eq!(
fs::read_to_string(dst.path().join("hello.txt")).unwrap(),
"hello world"
);
assert_eq!(
fs::read(dst.path().join("binary.bin")).unwrap(),
&[0u8, 1, 2, 255]
);
// Verify nested file
assert_eq!(
fs::read_to_string(dst.path().join("subdir").join("nested.txt")).unwrap(),
"nested content"
);
// Verify empty directory
assert!(dst.path().join("empty_dir").is_dir());
// Verify symlink
let link = dst.path().join("link_to_hello");
assert!(link.symlink_metadata().unwrap().file_type().is_symlink());
assert_eq!(fs::read_link(&link).unwrap().to_string_lossy(), "hello.txt");
}
#[test]
fn pack_is_deterministic() {
let src = tempfile::tempdir().unwrap();
create_fixture_dir(src.path());
let tar1 = pack_layer(src.path()).unwrap();
let tar2 = pack_layer(src.path()).unwrap();
assert_eq!(tar1, tar2, "pack_layer must be deterministic");
}
#[test]
fn pack_deterministic_hash() {
let src = tempfile::tempdir().unwrap();
create_fixture_dir(src.path());
let tar1 = pack_layer(src.path()).unwrap();
let tar2 = pack_layer(src.path()).unwrap();
let h1 = blake3::hash(&tar1).to_hex().to_string();
let h2 = blake3::hash(&tar2).to_hex().to_string();
assert_eq!(h1, h2);
}
#[test]
fn pack_empty_dir() {
let src = tempfile::tempdir().unwrap();
let tar_data = pack_layer(src.path()).unwrap();
// Empty directory produces a valid (possibly empty) tar
let dst = tempfile::tempdir().unwrap();
unpack_layer(&tar_data, dst.path()).unwrap();
}
#[test]
fn pack_different_content_different_hash() {
let src1 = tempfile::tempdir().unwrap();
fs::write(src1.path().join("a.txt"), "aaa").unwrap();
let tar1 = pack_layer(src1.path()).unwrap();
let src2 = tempfile::tempdir().unwrap();
fs::write(src2.path().join("a.txt"), "bbb").unwrap();
let tar2 = pack_layer(src2.path()).unwrap();
let h1 = blake3::hash(&tar1).to_hex().to_string();
let h2 = blake3::hash(&tar2).to_hex().to_string();
assert_ne!(h1, h2);
}
#[test]
fn unpack_nonexistent_target_created() {
let src = tempfile::tempdir().unwrap();
fs::write(src.path().join("f.txt"), "data").unwrap();
let tar_data = pack_layer(src.path()).unwrap();
let base = tempfile::tempdir().unwrap();
let target = base.path().join("new_subdir");
assert!(!target.exists());
unpack_layer(&tar_data, &target).unwrap();
assert!(target.join("f.txt").exists());
}
// --- A2: Layer Integrity Hardening ---
#[test]
fn layer_tar_hash_verified_on_restore() {
let src = tempfile::tempdir().unwrap();
fs::write(src.path().join("data.txt"), "layer content").unwrap();
let tar_data = pack_layer(src.path()).unwrap();
let tar_hash = blake3::hash(&tar_data).to_hex().to_string();
// Store the tar in object store
let store_dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(store_dir.path());
layout.initialize().unwrap();
let obj_store = crate::ObjectStore::new(layout.clone());
let stored_hash = obj_store.put(&tar_data).unwrap();
// Verify stored hash matches computed hash
assert_eq!(stored_hash, tar_hash);
// Retrieve and verify integrity
let retrieved = obj_store.get(&stored_hash).unwrap();
let retrieved_hash = blake3::hash(&retrieved).to_hex().to_string();
assert_eq!(retrieved_hash, tar_hash);
// Unpack and verify content
let dst = tempfile::tempdir().unwrap();
unpack_layer(&retrieved, dst.path()).unwrap();
assert_eq!(
fs::read_to_string(dst.path().join("data.txt")).unwrap(),
"layer content"
);
}
#[test]
fn corrupt_layer_file_detected_on_read() {
let (dir, store) = test_layer_store();
let layer = sample_layer();
let content_hash = store.put(&layer).unwrap();
// Corrupt the layer file on disk
let layer_path = StoreLayout::new(dir.path())
.layers_dir()
.join(&content_hash);
fs::write(&layer_path, b"this is not valid JSON").unwrap();
// get() must fail with an integrity error (hash mismatch)
let result = store.get(&content_hash);
assert!(
result.is_err(),
"corrupted layer manifest must fail on read"
);
}
#[test]
fn layer_manifest_hash_matches_content() {
let src = tempfile::tempdir().unwrap();
fs::write(src.path().join("file.txt"), "content").unwrap();
let tar_data = pack_layer(src.path()).unwrap();
let tar_hash = blake3::hash(&tar_data).to_hex().to_string();
let layer = LayerManifest {
hash: tar_hash.clone(),
kind: LayerKind::Base,
parent: None,
object_refs: vec![tar_hash.clone()],
read_only: true,
tar_hash: tar_hash.clone(),
};
// Verify tar_hash in manifest matches actual content hash
assert_eq!(layer.tar_hash, blake3::hash(&tar_data).to_hex().to_string());
// Verify object_refs include the tar
assert!(layer.object_refs.contains(&tar_hash));
}
#[test]
fn partial_tar_write_detected() {
let src = tempfile::tempdir().unwrap();
fs::write(src.path().join("data.txt"), "real data").unwrap();
let tar_data = pack_layer(src.path()).unwrap();
// Simulate partial write: truncate the tar data
let truncated = &tar_data[..tar_data.len() / 2];
// Store the truncated data under the hash of the full data
let store_dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(store_dir.path());
layout.initialize().unwrap();
let obj_store = crate::ObjectStore::new(layout);
// Write the full data first to get the correct hash
let correct_hash = obj_store.put(&tar_data).unwrap();
// Now corrupt it with truncated data
let obj_path = store_dir
.path()
.join("store")
.join("objects")
.join(&correct_hash);
fs::write(&obj_path, truncated).unwrap();
// Reading must detect integrity failure
let result = obj_store.get(&correct_hash);
assert!(
result.is_err(),
"truncated object must be detected as corrupt"
);
}
#[test]
fn compute_hash_matches_put() {
let (_dir, store) = test_layer_store();
let layer = sample_layer();
let predicted = LayerStore::compute_hash(&layer).unwrap();
let stored = store.put(&layer).unwrap();
assert_eq!(predicted, stored, "compute_hash() must match put() hash");
}
#[test]
fn corrupt_tar_data_fails_unpack() {
// Garbage data should fail to unpack
let garbage = b"this is not a tar archive at all";
let dst = tempfile::tempdir().unwrap();
let result = unpack_layer(garbage, dst.path());
// tar::Archive may produce an empty archive or an error — both are acceptable
// as long as no valid files are produced from garbage input
if result.is_ok() {
// If it "succeeded", verify no files were created
let entries: Vec<_> = fs::read_dir(dst.path())
.unwrap()
.filter_map(Result::ok)
.collect();
assert!(
entries.is_empty(),
"garbage tar data must not produce files"
);
}
}
}

View file

@ -0,0 +1,185 @@
use crate::StoreError;
use serde::{Deserialize, Serialize};
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use tempfile::NamedTempFile;
/// Current store format version. Incremented on incompatible layout changes.
pub const STORE_FORMAT_VERSION: u32 = 2;
const VERSION_FILE: &str = "version";
/// Directory layout for the Karapace content-addressable store.
///
/// Manages paths for objects, layers, metadata, environments, and the store
/// version marker. All subdirectories are created lazily on [`initialize`](Self::initialize).
#[derive(Debug, Clone)]
pub struct StoreLayout {
root: PathBuf,
}
#[derive(Debug, Serialize, Deserialize)]
struct StoreVersion {
format_version: u32,
}
impl StoreLayout {
pub fn new(root: impl Into<PathBuf>) -> Self {
Self { root: root.into() }
}
#[inline]
pub fn root(&self) -> &Path {
&self.root
}
#[inline]
pub fn objects_dir(&self) -> PathBuf {
self.root.join("store").join("objects")
}
#[inline]
pub fn layers_dir(&self) -> PathBuf {
self.root.join("store").join("layers")
}
#[inline]
pub fn metadata_dir(&self) -> PathBuf {
self.root.join("store").join("metadata")
}
#[inline]
pub fn env_dir(&self) -> PathBuf {
self.root.join("env")
}
#[inline]
pub fn env_path(&self, env_id: &str) -> PathBuf {
self.root.join("env").join(env_id)
}
#[inline]
pub fn overlay_dir(&self, env_id: &str) -> PathBuf {
self.env_path(env_id).join("overlay")
}
/// The writable upper layer of the overlay filesystem.
/// This is where fuse-overlayfs stores all mutations during container use.
/// Drift detection, export, and commit must scan this directory.
#[inline]
pub fn upper_dir(&self, env_id: &str) -> PathBuf {
self.env_path(env_id).join("upper")
}
/// Temporary staging area for layer packing/unpacking operations.
#[inline]
pub fn staging_dir(&self) -> PathBuf {
self.root.join("store").join("staging")
}
#[inline]
pub fn lock_file(&self) -> PathBuf {
self.root.join("store").join(".lock")
}
pub fn initialize(&self) -> Result<(), StoreError> {
fs::create_dir_all(self.objects_dir())?;
fs::create_dir_all(self.layers_dir())?;
fs::create_dir_all(self.metadata_dir())?;
fs::create_dir_all(self.env_dir())?;
fs::create_dir_all(self.staging_dir())?;
let version_path = self.root.join("store").join(VERSION_FILE);
if version_path.exists() {
self.verify_version()?;
} else {
let ver = StoreVersion {
format_version: STORE_FORMAT_VERSION,
};
let content = serde_json::to_string_pretty(&ver)?;
let store_dir = self.root.join("store");
let mut tmp = NamedTempFile::new_in(&store_dir)?;
tmp.write_all(content.as_bytes())?;
tmp.as_file().sync_all()?;
tmp.persist(&version_path)
.map_err(|e| StoreError::Io(e.error))?;
crate::fsync_dir(&store_dir)?;
}
Ok(())
}
pub fn verify_version(&self) -> Result<(), StoreError> {
let version_path = self.root.join("store").join(VERSION_FILE);
let content = fs::read_to_string(&version_path)?;
let ver: StoreVersion = serde_json::from_str(&content)?;
if ver.format_version != STORE_FORMAT_VERSION {
return Err(StoreError::VersionMismatch {
expected: STORE_FORMAT_VERSION,
found: ver.format_version,
});
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn layout_paths_are_correct() {
let layout = StoreLayout::new("/tmp/karapace-test");
assert_eq!(
layout.objects_dir(),
PathBuf::from("/tmp/karapace-test/store/objects")
);
assert_eq!(
layout.layers_dir(),
PathBuf::from("/tmp/karapace-test/store/layers")
);
assert_eq!(
layout.metadata_dir(),
PathBuf::from("/tmp/karapace-test/store/metadata")
);
assert_eq!(layout.env_dir(), PathBuf::from("/tmp/karapace-test/env"));
assert_eq!(
layout.env_path("abc123"),
PathBuf::from("/tmp/karapace-test/env/abc123")
);
assert_eq!(
layout.overlay_dir("abc123"),
PathBuf::from("/tmp/karapace-test/env/abc123/overlay")
);
}
#[test]
fn initialize_creates_directories() {
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
assert!(layout.objects_dir().is_dir());
assert!(layout.layers_dir().is_dir());
assert!(layout.metadata_dir().is_dir());
assert!(layout.env_dir().is_dir());
}
#[test]
fn initialize_writes_version() {
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
layout.verify_version().unwrap();
}
#[test]
fn initialize_is_idempotent() {
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
layout.initialize().unwrap();
layout.verify_version().unwrap();
}
}

View file

@ -0,0 +1,138 @@
//! Content-addressable object store, layer management, and environment metadata for Karapace.
//!
//! This crate provides the storage layer: a content-addressable `ObjectStore` backed
//! by blake3 hashing with atomic writes, `LayerStore` for overlay filesystem layer
//! manifests, `MetadataStore` for environment state tracking, `StoreLayout` for
//! directory structure management, and `GarbageCollector` for orphan cleanup.
pub mod gc;
pub mod integrity;
pub mod layers;
pub mod layout;
pub mod metadata;
pub mod migration;
pub mod objects;
pub mod wal;
pub use gc::{GarbageCollector, GcReport};
pub use integrity::{verify_store_integrity, IntegrityFailure, IntegrityReport};
pub use layers::{pack_layer, unpack_layer, LayerKind, LayerManifest, LayerStore};
pub use layout::{StoreLayout, STORE_FORMAT_VERSION};
pub use metadata::{validate_env_name, EnvMetadata, EnvState, MetadataStore};
pub use migration::{migrate_store, MigrationResult};
pub use objects::ObjectStore;
pub use wal::{RollbackStep, WalOpKind, WriteAheadLog};
use std::path::Path;
use thiserror::Error;
/// Fsync a directory to ensure that a preceding `rename()` is durable.
///
/// On Linux with ext4 `data=ordered` (the default), renames are usually
/// durable without an explicit dir fsync, but POSIX does not guarantee this.
/// Calling `fsync()` on the parent directory makes the rename durable on
/// all filesystems and mount configurations.
pub(crate) fn fsync_dir(dir: &Path) -> Result<(), std::io::Error> {
let f = std::fs::File::open(dir)?;
f.sync_all()
}
#[derive(Debug, Error)]
pub enum StoreError {
#[error("store I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("integrity check failed for object '{hash}': expected {expected}, got {actual}")]
IntegrityFailure {
hash: String,
expected: String,
actual: String,
},
#[error("object not found: {0}")]
ObjectNotFound(String),
#[error("layer not found: {0}")]
LayerNotFound(String),
#[error("environment not found: {0}")]
EnvNotFound(String),
#[error("lock acquisition failed: {0}")]
LockFailed(String),
#[error("store format version mismatch: expected {expected}, found {found}")]
VersionMismatch { expected: u32, found: u32 },
#[error("serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("invalid environment name: {0}")]
InvalidName(String),
#[error("name '{name}' is already used by environment {existing_env_id}")]
NameConflict {
name: String,
existing_env_id: String,
},
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn store_error_display_invalid_name() {
let e = StoreError::InvalidName("bad".to_owned());
assert!(e.to_string().contains("invalid environment name"));
}
#[test]
fn store_error_display_name_conflict() {
let e = StoreError::NameConflict {
name: "dup".to_owned(),
existing_env_id: "abc123".to_owned(),
};
let msg = e.to_string();
assert!(msg.contains("dup"));
assert!(msg.contains("abc123"));
}
#[test]
fn store_error_display_object_not_found() {
let e = StoreError::ObjectNotFound("hash123".to_owned());
assert!(e.to_string().contains("hash123"));
}
#[test]
fn store_error_display_layer_not_found() {
let e = StoreError::LayerNotFound("lhash".to_owned());
assert!(e.to_string().contains("lhash"));
}
#[test]
fn store_error_display_env_not_found() {
let e = StoreError::EnvNotFound("envid".to_owned());
assert!(e.to_string().contains("envid"));
}
#[test]
fn store_error_display_lock_failed() {
let e = StoreError::LockFailed("reason".to_owned());
assert!(e.to_string().contains("reason"));
}
#[test]
fn store_error_display_version_mismatch() {
let e = StoreError::VersionMismatch {
expected: 2,
found: 1,
};
let msg = e.to_string();
assert!(msg.contains('2'));
assert!(msg.contains('1'));
}
#[test]
fn store_error_display_integrity_failure() {
let e = StoreError::IntegrityFailure {
hash: "h".to_owned(),
expected: "exp".to_owned(),
actual: "act".to_owned(),
};
let msg = e.to_string();
assert!(msg.contains("exp"));
assert!(msg.contains("act"));
}
}

View file

@ -0,0 +1,534 @@
use crate::layout::StoreLayout;
use crate::{fsync_dir, StoreError};
use karapace_schema::types::{EnvId, LayerHash, ObjectHash, ShortId};
use serde::{Deserialize, Serialize};
use std::fs;
use std::io::Write;
use tempfile::NamedTempFile;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum EnvState {
Defined,
Built,
Running,
Frozen,
Archived,
}
impl std::fmt::Display for EnvState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EnvState::Defined => write!(f, "defined"),
EnvState::Built => write!(f, "built"),
EnvState::Running => write!(f, "running"),
EnvState::Frozen => write!(f, "frozen"),
EnvState::Archived => write!(f, "archived"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct EnvMetadata {
pub env_id: EnvId,
pub short_id: ShortId,
#[serde(default)]
pub name: Option<String>,
pub state: EnvState,
pub manifest_hash: ObjectHash,
pub base_layer: LayerHash,
pub dependency_layers: Vec<LayerHash>,
pub policy_layer: Option<LayerHash>,
pub created_at: String,
pub updated_at: String,
pub ref_count: u32,
/// blake3 checksum for integrity verification. `None` for legacy metadata.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub checksum: Option<String>,
}
impl EnvMetadata {
/// Compute the checksum over the metadata content (excluding the checksum field itself).
fn compute_checksum(&self) -> String {
let mut copy = self.clone();
copy.checksum = None;
// Serialize without the checksum field (skip_serializing_if = None)
let json =
serde_json::to_string_pretty(&copy).expect("infallible: EnvMetadata always serializes");
blake3::hash(json.as_bytes()).to_hex().to_string()
}
}
pub fn validate_env_name(name: &str) -> Result<(), StoreError> {
if name.is_empty() || name.len() > 64 {
return Err(StoreError::InvalidName(
"environment name must be 1-64 characters".to_owned(),
));
}
if !name
.bytes()
.all(|b| b.is_ascii_alphanumeric() || b == b'_' || b == b'-')
{
return Err(StoreError::InvalidName(
"environment name must match [a-zA-Z0-9_-]".to_owned(),
));
}
Ok(())
}
pub struct MetadataStore {
layout: StoreLayout,
}
impl MetadataStore {
pub fn new(layout: StoreLayout) -> Self {
Self { layout }
}
pub fn put(&self, meta: &EnvMetadata) -> Result<(), StoreError> {
let dest = self.layout.metadata_dir().join(&meta.env_id);
// Compute and embed checksum before writing
let mut meta_with_checksum = meta.clone();
meta_with_checksum.checksum = Some(meta_with_checksum.compute_checksum());
let content = serde_json::to_string_pretty(&meta_with_checksum)?;
let dir = self.layout.metadata_dir();
let mut tmp = NamedTempFile::new_in(&dir)?;
tmp.write_all(content.as_bytes())?;
tmp.as_file().sync_all()?;
tmp.persist(&dest).map_err(|e| StoreError::Io(e.error))?;
fsync_dir(&dir)?;
Ok(())
}
pub fn get(&self, env_id: &str) -> Result<EnvMetadata, StoreError> {
let path = self.layout.metadata_dir().join(env_id);
if !path.exists() {
return Err(StoreError::EnvNotFound(env_id.to_owned()));
}
let content = fs::read_to_string(&path)?;
let meta: EnvMetadata = serde_json::from_str(&content)?;
// Verify checksum if present (backward-compatible: legacy files have None)
if let Some(ref expected) = meta.checksum {
let actual = meta.compute_checksum();
if actual != *expected {
return Err(StoreError::IntegrityFailure {
hash: env_id.to_owned(),
expected: expected.clone(),
actual,
});
}
}
Ok(meta)
}
pub fn update_state(&self, env_id: &str, new_state: EnvState) -> Result<(), StoreError> {
let mut meta = self.get(env_id)?;
meta.state = new_state;
meta.updated_at = chrono::Utc::now().to_rfc3339();
self.put(&meta)
}
pub fn exists(&self, env_id: &str) -> bool {
self.layout.metadata_dir().join(env_id).exists()
}
pub fn remove(&self, env_id: &str) -> Result<(), StoreError> {
let path = self.layout.metadata_dir().join(env_id);
if path.exists() {
fs::remove_file(path)?;
}
Ok(())
}
pub fn list(&self) -> Result<Vec<EnvMetadata>, StoreError> {
let dir = self.layout.metadata_dir();
if !dir.exists() {
return Ok(Vec::new());
}
let mut results = Vec::new();
for entry in fs::read_dir(dir)? {
let entry = entry?;
if entry.file_type()?.is_file() {
let name = entry.file_name();
let name_str = name.to_str().unwrap_or("");
if !name_str.starts_with('.') {
match self.get(name_str) {
Ok(meta) => results.push(meta),
Err(e) => {
tracing::warn!("skipping corrupted metadata entry '{name_str}': {e}");
}
}
}
}
}
results.sort_by(|a, b| a.env_id.cmp(&b.env_id));
Ok(results)
}
/// Like `list()`, but returns per-entry `Result`s so callers (e.g.
/// `verify-store`) can surface individual corruption errors.
#[allow(clippy::type_complexity)]
pub fn list_with_errors(
&self,
) -> Result<Vec<Result<EnvMetadata, (String, StoreError)>>, StoreError> {
let dir = self.layout.metadata_dir();
if !dir.exists() {
return Ok(Vec::new());
}
let mut results = Vec::new();
for entry in fs::read_dir(dir)? {
let entry = entry?;
if entry.file_type()?.is_file() {
let name = entry.file_name();
let name_str = name.to_str().unwrap_or("").to_owned();
if !name_str.starts_with('.') {
match self.get(&name_str) {
Ok(meta) => results.push(Ok(meta)),
Err(e) => results.push(Err((name_str, e))),
}
}
}
}
Ok(results)
}
pub fn increment_ref(&self, env_id: &str) -> Result<u32, StoreError> {
let mut meta = self.get(env_id)?;
meta.ref_count += 1;
meta.updated_at = chrono::Utc::now().to_rfc3339();
self.put(&meta)?;
Ok(meta.ref_count)
}
pub fn decrement_ref(&self, env_id: &str) -> Result<u32, StoreError> {
let mut meta = self.get(env_id)?;
meta.ref_count = meta.ref_count.saturating_sub(1);
meta.updated_at = chrono::Utc::now().to_rfc3339();
self.put(&meta)?;
Ok(meta.ref_count)
}
pub fn get_by_name(&self, name: &str) -> Result<EnvMetadata, StoreError> {
let all = self.list()?;
all.into_iter()
.find(|m| m.name.as_deref() == Some(name))
.ok_or_else(|| StoreError::EnvNotFound(format!("name '{name}'")))
}
pub fn update_name(&self, env_id: &str, name: Option<String>) -> Result<(), StoreError> {
if let Some(ref n) = name {
validate_env_name(n)?;
// Check uniqueness
if let Ok(existing) = self.get_by_name(n) {
if *existing.env_id != *env_id {
return Err(StoreError::NameConflict {
name: n.clone(),
existing_env_id: existing.env_id[..12.min(existing.env_id.len())]
.to_owned(),
});
}
}
}
let mut meta = self.get(env_id)?;
meta.name = name;
meta.updated_at = chrono::Utc::now().to_rfc3339();
self.put(&meta)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_metadata_store() -> (tempfile::TempDir, MetadataStore) {
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
(dir, MetadataStore::new(layout))
}
fn sample_meta() -> EnvMetadata {
EnvMetadata {
env_id: "abc123def456".into(),
short_id: "abc123def456".into(),
name: None,
state: EnvState::Defined,
manifest_hash: "mhash".into(),
base_layer: "base1".into(),
dependency_layers: vec!["dep1".into()],
policy_layer: None,
created_at: "2025-01-01T00:00:00Z".to_owned(),
updated_at: "2025-01-01T00:00:00Z".to_owned(),
ref_count: 1,
checksum: None,
}
}
#[test]
fn metadata_roundtrip() {
let (_dir, store) = test_metadata_store();
let meta = sample_meta();
store.put(&meta).unwrap();
let retrieved = store.get(&meta.env_id).unwrap();
// put() computes and embeds the checksum, so compare core fields
assert_eq!(meta.env_id, retrieved.env_id);
assert_eq!(meta.state, retrieved.state);
assert_eq!(meta.ref_count, retrieved.ref_count);
// Verify checksum was written
assert!(retrieved.checksum.is_some(), "put() must embed a checksum");
}
#[test]
fn state_transition() {
let (_dir, store) = test_metadata_store();
store.put(&sample_meta()).unwrap();
store.update_state("abc123def456", EnvState::Built).unwrap();
let meta = store.get("abc123def456").unwrap();
assert_eq!(meta.state, EnvState::Built);
}
#[test]
fn ref_counting() {
let (_dir, store) = test_metadata_store();
store.put(&sample_meta()).unwrap();
let count = store.increment_ref("abc123def456").unwrap();
assert_eq!(count, 2);
let count = store.decrement_ref("abc123def456").unwrap();
assert_eq!(count, 1);
let count = store.decrement_ref("abc123def456").unwrap();
assert_eq!(count, 0);
let count = store.decrement_ref("abc123def456").unwrap();
assert_eq!(count, 0);
}
#[test]
fn list_metadata() {
let (_dir, store) = test_metadata_store();
store.put(&sample_meta()).unwrap();
let list = store.list().unwrap();
assert_eq!(list.len(), 1);
}
#[test]
fn name_roundtrip() {
let (_dir, store) = test_metadata_store();
let mut meta = sample_meta();
meta.name = Some("my-env".to_owned());
store.put(&meta).unwrap();
let retrieved = store.get(&meta.env_id).unwrap();
assert_eq!(retrieved.name, Some("my-env".to_owned()));
}
#[test]
fn get_by_name_works() {
let (_dir, store) = test_metadata_store();
let mut meta = sample_meta();
meta.name = Some("dev-env".to_owned());
store.put(&meta).unwrap();
let found = store.get_by_name("dev-env").unwrap();
assert_eq!(found.env_id, meta.env_id);
}
#[test]
fn get_by_name_not_found() {
let (_dir, store) = test_metadata_store();
store.put(&sample_meta()).unwrap();
assert!(store.get_by_name("nonexistent").is_err());
}
#[test]
fn update_name_validates() {
let (_dir, store) = test_metadata_store();
store.put(&sample_meta()).unwrap();
assert!(store
.update_name("abc123def456", Some("valid-name".to_owned()))
.is_ok());
assert!(store
.update_name("abc123def456", Some(String::new()))
.is_err());
assert!(store
.update_name("abc123def456", Some("has spaces".to_owned()))
.is_err());
assert!(store
.update_name("abc123def456", Some("a".repeat(65)).clone())
.is_err());
}
#[test]
fn name_uniqueness_enforced() {
let (_dir, store) = test_metadata_store();
let mut m1 = sample_meta();
m1.name = Some("shared-name".to_owned());
store.put(&m1).unwrap();
let mut m2 = sample_meta();
m2.env_id = "xyz789".into();
m2.short_id = "xyz789".into();
store.put(&m2).unwrap();
assert!(store
.update_name("xyz789", Some("shared-name".to_owned()))
.is_err());
}
#[test]
fn backward_compat_no_name_field() {
let (_dir, store) = test_metadata_store();
// Simulate old metadata without name field
let json = r#"{
"env_id": "old123",
"short_id": "old123",
"state": "Built",
"manifest_hash": "mh",
"base_layer": "bl",
"dependency_layers": [],
"policy_layer": null,
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2025-01-01T00:00:00Z",
"ref_count": 1
}"#;
let dir = store.layout.metadata_dir();
fs::write(dir.join("old123"), json).unwrap();
let meta = store.get("old123").unwrap();
assert_eq!(meta.name, None);
}
#[test]
fn exists_returns_true_for_known() {
let (_dir, store) = test_metadata_store();
store.put(&sample_meta()).unwrap();
assert!(store.exists("abc123def456"));
}
#[test]
fn exists_returns_false_for_unknown() {
let (_dir, store) = test_metadata_store();
assert!(!store.exists("unknown_id"));
}
#[test]
fn remove_deletes_metadata() {
let (_dir, store) = test_metadata_store();
store.put(&sample_meta()).unwrap();
store.remove("abc123def456").unwrap();
assert!(!store.exists("abc123def456"));
}
#[test]
fn get_nonexistent_fails() {
let (_dir, store) = test_metadata_store();
assert!(store.get("nonexistent").is_err());
}
#[test]
fn validate_env_name_valid_chars() {
assert!(validate_env_name("my-env_123").is_ok());
assert!(validate_env_name("a").is_ok());
assert!(validate_env_name(&"x".repeat(64)).is_ok());
}
#[test]
fn validate_env_name_rejects_empty() {
assert!(validate_env_name("").is_err());
}
#[test]
fn validate_env_name_rejects_too_long() {
assert!(validate_env_name(&"x".repeat(65)).is_err());
}
#[test]
fn validate_env_name_rejects_special_chars() {
assert!(validate_env_name("has space").is_err());
assert!(validate_env_name("has/slash").is_err());
assert!(validate_env_name("has.dot").is_err());
}
#[test]
fn update_name_to_none_clears_name() {
let (_dir, store) = test_metadata_store();
let mut meta = sample_meta();
meta.name = Some("named".to_owned());
store.put(&meta).unwrap();
store.update_name("abc123def456", None).unwrap();
let retrieved = store.get("abc123def456").unwrap();
assert_eq!(retrieved.name, None);
}
#[test]
fn list_empty_store() {
let (_dir, store) = test_metadata_store();
let list = store.list().unwrap();
assert!(list.is_empty());
}
#[test]
fn list_multiple_envs() {
let (_dir, store) = test_metadata_store();
let mut m1 = sample_meta();
m1.env_id = "env1".into();
m1.short_id = "env1".into();
store.put(&m1).unwrap();
let mut m2 = sample_meta();
m2.env_id = "env2".into();
m2.short_id = "env2".into();
store.put(&m2).unwrap();
let list = store.list().unwrap();
assert_eq!(list.len(), 2);
}
#[test]
fn list_warns_on_corruption() {
let (dir, store) = test_metadata_store();
// Store a valid entry
store.put(&sample_meta()).unwrap();
// Write a corrupt metadata file
let corrupt_path = StoreLayout::new(dir.path())
.metadata_dir()
.join("corrupt_env");
fs::write(&corrupt_path, "NOT VALID JSON").unwrap();
// list() should return only the valid entry, skipping the corrupt one
let list = store.list().unwrap();
assert_eq!(list.len(), 1);
assert_eq!(list[0].env_id.to_string(), "abc123def456");
}
#[test]
fn list_with_errors_surfaces_corruption() {
let (dir, store) = test_metadata_store();
store.put(&sample_meta()).unwrap();
// Write a corrupt metadata file
let corrupt_path = StoreLayout::new(dir.path())
.metadata_dir()
.join("corrupt_env");
fs::write(&corrupt_path, "GARBAGE").unwrap();
let results = store.list_with_errors().unwrap();
assert_eq!(results.len(), 2);
let ok_count = results.iter().filter(|r| r.is_ok()).count();
let err_count = results.iter().filter(|r| r.is_err()).count();
assert_eq!(ok_count, 1);
assert_eq!(err_count, 1);
}
#[test]
fn same_name_same_env_allowed() {
let (_dir, store) = test_metadata_store();
let mut meta = sample_meta();
meta.name = Some("my-name".to_owned());
store.put(&meta).unwrap();
// Renaming to the same name on the same env should succeed
assert!(store
.update_name("abc123def456", Some("my-name".to_owned()))
.is_ok());
}
}

View file

@ -0,0 +1,155 @@
//! Store format migration engine.
//!
//! Provides automatic migration from older store format versions to the current
//! [`STORE_FORMAT_VERSION`]. Creates a backup of the version file before any
//! modification and writes all changes atomically.
use crate::layout::STORE_FORMAT_VERSION;
use crate::{fsync_dir, StoreError};
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use tempfile::NamedTempFile;
use tracing::{info, warn};
/// Result of a successful migration.
#[derive(Debug)]
pub struct MigrationResult {
pub from_version: u32,
pub to_version: u32,
pub environments_migrated: usize,
pub backup_path: PathBuf,
}
/// Migrate a store from its current format version to [`STORE_FORMAT_VERSION`].
///
/// - Returns `Ok(None)` if the store is already at the current version.
/// - Returns `Err(VersionMismatch)` if the store is from a *newer* version.
/// - Creates a backup of the version file at `store/version.backup.{timestamp}`.
/// - Rewrites metadata files atomically to add any missing v2 fields.
/// - Writes the new version file atomically as the final step.
pub fn migrate_store(root: &Path) -> Result<Option<MigrationResult>, StoreError> {
let store_dir = root.join("store");
let version_path = store_dir.join("version");
if !version_path.exists() {
return Err(StoreError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("no version file at {}", version_path.display()),
)));
}
let content = fs::read_to_string(&version_path)?;
let ver: serde_json::Value =
serde_json::from_str(&content).map_err(StoreError::Serialization)?;
let found = ver
.get("format_version")
.and_then(serde_json::Value::as_u64)
.unwrap_or(0) as u32;
if found == STORE_FORMAT_VERSION {
return Ok(None);
}
if found > STORE_FORMAT_VERSION {
return Err(StoreError::VersionMismatch {
expected: STORE_FORMAT_VERSION,
found,
});
}
// --- Backup ---
let timestamp = chrono::Utc::now().format("%Y%m%dT%H%M%SZ");
let backup_path = store_dir.join(format!("version.backup.{timestamp}"));
fs::copy(&version_path, &backup_path)?;
info!("backed up store version file to {}", backup_path.display());
// --- Migrate metadata files ---
let metadata_dir = store_dir.join("metadata");
let mut envs_migrated = 0;
if metadata_dir.is_dir() {
for entry in fs::read_dir(&metadata_dir)? {
let entry = entry?;
let path = entry.path();
if !path.is_file() {
continue;
}
match migrate_metadata_file(&path) {
Ok(true) => envs_migrated += 1,
Ok(false) => {}
Err(e) => {
warn!("skipping metadata file {}: {e}", path.display());
}
}
}
}
// --- Write new version file atomically (LAST step) ---
let new_ver = serde_json::json!({ "format_version": STORE_FORMAT_VERSION });
let new_content = serde_json::to_string_pretty(&new_ver).map_err(StoreError::Serialization)?;
let mut tmp = NamedTempFile::new_in(&store_dir)?;
tmp.write_all(new_content.as_bytes())?;
tmp.as_file().sync_all()?;
tmp.persist(&version_path)
.map_err(|e| StoreError::Io(e.error))?;
fsync_dir(&store_dir)?;
info!("migrated store from v{found} to v{STORE_FORMAT_VERSION} ({envs_migrated} environments)");
Ok(Some(MigrationResult {
from_version: found,
to_version: STORE_FORMAT_VERSION,
environments_migrated: envs_migrated,
backup_path,
}))
}
/// Migrate a single metadata JSON file to v2 format.
///
/// v2 added: `name` (Option<String>), `checksum` (Option<String>), `policy_layer` (Option).
/// If any of these are missing, they are added with default values.
///
/// Returns `Ok(true)` if the file was rewritten, `Ok(false)` if no changes needed.
fn migrate_metadata_file(path: &Path) -> Result<bool, StoreError> {
let content = fs::read_to_string(path)?;
let mut val: serde_json::Value =
serde_json::from_str(&content).map_err(StoreError::Serialization)?;
let obj = val.as_object_mut().ok_or_else(|| {
StoreError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"metadata is not a JSON object",
))
})?;
let mut changed = false;
// v2 fields with defaults
if !obj.contains_key("name") {
obj.insert("name".to_owned(), serde_json::Value::Null);
changed = true;
}
if !obj.contains_key("checksum") {
obj.insert("checksum".to_owned(), serde_json::Value::Null);
changed = true;
}
if !obj.contains_key("policy_layer") {
obj.insert("policy_layer".to_owned(), serde_json::Value::Null);
changed = true;
}
if !changed {
return Ok(false);
}
// Rewrite atomically
let new_content = serde_json::to_string_pretty(&val).map_err(StoreError::Serialization)?;
let dir = path.parent().unwrap_or(Path::new("."));
let mut tmp = NamedTempFile::new_in(dir)?;
tmp.write_all(new_content.as_bytes())?;
tmp.as_file().sync_all()?;
tmp.persist(path).map_err(|e| StoreError::Io(e.error))?;
fsync_dir(dir)?;
Ok(true)
}

View file

@ -0,0 +1,203 @@
use crate::layout::StoreLayout;
use crate::{fsync_dir, StoreError};
use std::fs;
use std::io::Write;
use tempfile::NamedTempFile;
/// Content-addressable object store backed by blake3 hashing.
///
/// Objects are stored as files named by their blake3 hash. Writes are atomic
/// via `NamedTempFile`, and reads verify integrity by recomputing the hash.
pub struct ObjectStore {
layout: StoreLayout,
}
impl ObjectStore {
pub fn new(layout: StoreLayout) -> Self {
Self { layout }
}
/// Store data and return its blake3 hash. Idempotent — existing objects are skipped.
pub fn put(&self, data: &[u8]) -> Result<String, StoreError> {
let hash = blake3::hash(data).to_hex().to_string();
let dest = self.layout.objects_dir().join(&hash);
if dest.exists() {
return Ok(hash);
}
let dir = self.layout.objects_dir();
let mut tmp = NamedTempFile::new_in(&dir)?;
tmp.write_all(data)?;
tmp.as_file().sync_all()?;
tmp.persist(&dest).map_err(|e| StoreError::Io(e.error))?;
fsync_dir(&dir)?;
Ok(hash)
}
/// Retrieve data by hash, verifying integrity on read.
pub fn get(&self, hash: &str) -> Result<Vec<u8>, StoreError> {
let path = self.layout.objects_dir().join(hash);
if !path.exists() {
return Err(StoreError::ObjectNotFound(hash.to_owned()));
}
let data = fs::read(&path)?;
let actual = blake3::hash(&data);
let actual_hex = actual.to_hex();
if actual_hex.as_str() != hash {
return Err(StoreError::IntegrityFailure {
hash: hash.to_owned(),
expected: hash.to_owned(),
actual: actual_hex.to_string(),
});
}
Ok(data)
}
pub fn exists(&self, hash: &str) -> bool {
self.layout.objects_dir().join(hash).exists()
}
pub fn remove(&self, hash: &str) -> Result<(), StoreError> {
let path = self.layout.objects_dir().join(hash);
if path.exists() {
fs::remove_file(path)?;
}
Ok(())
}
pub fn list(&self) -> Result<Vec<String>, StoreError> {
let dir = self.layout.objects_dir();
if !dir.exists() {
return Ok(Vec::new());
}
let mut hashes = Vec::new();
for entry in fs::read_dir(dir)? {
let entry = entry?;
if let Some(name) = entry.file_name().to_str() {
if !name.starts_with('.') {
hashes.push(name.to_owned());
}
}
}
hashes.sort();
Ok(hashes)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_store() -> (tempfile::TempDir, ObjectStore) {
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
let store = ObjectStore::new(layout);
(dir, store)
}
#[test]
fn put_and_get_roundtrip() {
let (_dir, store) = test_store();
let data = b"hello karapace";
let hash = store.put(data).unwrap();
let retrieved = store.get(&hash).unwrap();
assert_eq!(retrieved, data);
}
#[test]
fn put_is_idempotent() {
let (_dir, store) = test_store();
let data = b"hello";
let h1 = store.put(data).unwrap();
let h2 = store.put(data).unwrap();
assert_eq!(h1, h2);
}
#[test]
fn get_nonexistent_fails() {
let (_dir, store) = test_store();
assert!(store.get("nonexistent").is_err());
}
#[test]
fn integrity_check_on_read() {
let (dir, store) = test_store();
let data = b"test data";
let hash = store.put(data).unwrap();
let obj_path = StoreLayout::new(dir.path()).objects_dir().join(&hash);
fs::write(&obj_path, b"corrupted").unwrap();
assert!(store.get(&hash).is_err());
}
#[test]
fn list_objects() {
let (_dir, store) = test_store();
store.put(b"aaa").unwrap();
store.put(b"bbb").unwrap();
let list = store.list().unwrap();
assert_eq!(list.len(), 2);
}
#[test]
fn remove_object() {
let (_dir, store) = test_store();
let hash = store.put(b"data").unwrap();
assert!(store.exists(&hash));
store.remove(&hash).unwrap();
assert!(!store.exists(&hash));
}
#[test]
fn put_empty_data() {
let (_dir, store) = test_store();
let hash = store.put(b"").unwrap();
let retrieved = store.get(&hash).unwrap();
assert!(retrieved.is_empty());
}
#[test]
fn put_large_data() {
let (_dir, store) = test_store();
let data = vec![0xABu8; 1024 * 64]; // 64KB
let hash = store.put(&data).unwrap();
let retrieved = store.get(&hash).unwrap();
assert_eq!(retrieved.len(), 1024 * 64);
}
#[test]
fn list_empty_store() {
let (_dir, store) = test_store();
let list = store.list().unwrap();
assert!(list.is_empty());
}
#[test]
fn remove_nonexistent_is_ok() {
let (_dir, store) = test_store();
assert!(store.remove("nonexistent").is_ok());
}
#[test]
fn exists_nonexistent_is_false() {
let (_dir, store) = test_store();
assert!(!store.exists("nonexistent"));
}
#[test]
fn hash_is_deterministic() {
let (_dir, store) = test_store();
let h1 = store.put(b"deterministic").unwrap();
let h2 = store.put(b"deterministic").unwrap();
assert_eq!(h1, h2);
// Different data should produce different hash
let h3 = store.put(b"different").unwrap();
assert_ne!(h1, h3);
}
}

View file

@ -0,0 +1,508 @@
use crate::layout::StoreLayout;
use crate::StoreError;
use serde::{Deserialize, Serialize};
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use tempfile::NamedTempFile;
use tracing::{debug, info, warn};
/// A single rollback step that can undo part of an operation.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RollbackStep {
/// Remove a directory tree (e.g. orphaned env_dir).
RemoveDir(PathBuf),
/// Remove a single file (e.g. metadata, layer manifest).
RemoveFile(PathBuf),
/// Reset an environment's metadata state (e.g. Running → Built after crash).
ResetState {
env_id: String,
target_state: String,
},
}
/// The type of mutating operation being tracked.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WalOpKind {
Build,
Rebuild,
Commit,
Restore,
Destroy,
Gc,
Enter,
Exec,
}
impl std::fmt::Display for WalOpKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WalOpKind::Build => write!(f, "build"),
WalOpKind::Rebuild => write!(f, "rebuild"),
WalOpKind::Commit => write!(f, "commit"),
WalOpKind::Restore => write!(f, "restore"),
WalOpKind::Destroy => write!(f, "destroy"),
WalOpKind::Gc => write!(f, "gc"),
WalOpKind::Enter => write!(f, "enter"),
WalOpKind::Exec => write!(f, "exec"),
}
}
}
/// A WAL entry representing an in-flight operation.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalEntry {
pub op_id: String,
pub kind: WalOpKind,
pub env_id: String,
pub timestamp: String,
pub rollback_steps: Vec<RollbackStep>,
}
/// Write-ahead log for crash recovery.
///
/// Mutating engine methods create a WAL entry before starting work,
/// append rollback steps as side effects occur, and remove the entry
/// on successful completion. On startup, incomplete entries are rolled back.
pub struct WriteAheadLog {
wal_dir: PathBuf,
}
impl WriteAheadLog {
pub fn new(layout: &StoreLayout) -> Self {
let wal_dir = layout.root().join("store").join("wal");
Self { wal_dir }
}
/// Ensure the WAL directory exists.
pub fn initialize(&self) -> Result<(), StoreError> {
fs::create_dir_all(&self.wal_dir)?;
Ok(())
}
/// Begin a new WAL entry for an operation. Returns the op_id.
pub fn begin(&self, kind: WalOpKind, env_id: &str) -> Result<String, StoreError> {
let op_id = format!(
"{}-{}",
chrono::Utc::now().format("%Y%m%d%H%M%S%3f"),
&blake3::hash(env_id.as_bytes()).to_hex()[..8]
);
let entry = WalEntry {
op_id: op_id.clone(),
kind,
env_id: env_id.to_owned(),
timestamp: chrono::Utc::now().to_rfc3339(),
rollback_steps: Vec::new(),
};
self.write_entry(&entry)?;
debug!("WAL begin: {} for {env_id} (op_id={op_id})", entry.kind);
Ok(op_id)
}
/// Append a rollback step to an existing WAL entry.
pub fn add_rollback_step(&self, op_id: &str, step: RollbackStep) -> Result<(), StoreError> {
let mut entry = self.read_entry(op_id)?;
entry.rollback_steps.push(step);
self.write_entry(&entry)?;
Ok(())
}
/// Commit (remove) a WAL entry after successful completion.
pub fn commit(&self, op_id: &str) -> Result<(), StoreError> {
let path = self.entry_path(op_id);
if path.exists() {
fs::remove_file(&path)?;
debug!("WAL commit: {op_id}");
}
Ok(())
}
/// List all incomplete WAL entries.
pub fn list_incomplete(&self) -> Result<Vec<WalEntry>, StoreError> {
if !self.wal_dir.exists() {
return Ok(Vec::new());
}
let mut entries = Vec::new();
for dir_entry in fs::read_dir(&self.wal_dir)? {
let dir_entry = dir_entry?;
let path = dir_entry.path();
if path.extension().is_some_and(|e| e == "json") {
match fs::read_to_string(&path) {
Ok(content) => match serde_json::from_str::<WalEntry>(&content) {
Ok(entry) => entries.push(entry),
Err(e) => {
warn!("corrupt WAL entry {}: {e}", path.display());
// Remove corrupt entries
let _ = fs::remove_file(&path);
}
},
Err(e) => {
warn!("unreadable WAL entry {}: {e}", path.display());
let _ = fs::remove_file(&path);
}
}
}
}
entries.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
Ok(entries)
}
/// Roll back all incomplete WAL entries.
/// Returns the number of entries rolled back.
pub fn recover(&self) -> Result<usize, StoreError> {
let entries = self.list_incomplete()?;
let count = entries.len();
for entry in &entries {
info!(
"WAL recovery: rolling back {} on {} (op_id={})",
entry.kind, entry.env_id, entry.op_id
);
self.rollback_entry(entry);
// Remove the WAL entry after rollback
let _ = fs::remove_file(self.entry_path(&entry.op_id));
}
if count > 0 {
info!("WAL recovery complete: {count} entries rolled back");
}
Ok(count)
}
fn rollback_entry(&self, entry: &WalEntry) {
// Execute rollback steps in reverse order
for step in entry.rollback_steps.iter().rev() {
match step {
RollbackStep::RemoveDir(path) => {
if path.exists() {
if let Err(e) = fs::remove_dir_all(path) {
warn!("WAL rollback: failed to remove dir {}: {e}", path.display());
} else {
debug!("WAL rollback: removed dir {}", path.display());
}
}
}
RollbackStep::RemoveFile(path) => {
if path.exists() {
if let Err(e) = fs::remove_file(path) {
warn!(
"WAL rollback: failed to remove file {}: {e}",
path.display()
);
} else {
debug!("WAL rollback: removed file {}", path.display());
}
}
}
RollbackStep::ResetState {
env_id,
target_state,
} => {
// Resolve metadata dir from wal_dir (wal_dir = root/store/wal)
if let Some(store_dir) = self.wal_dir.parent() {
let metadata_dir = store_dir.join("metadata");
let meta_path = metadata_dir.join(env_id);
if meta_path.exists() {
match fs::read_to_string(&meta_path) {
Ok(content) => {
if let Ok(mut meta) =
serde_json::from_str::<serde_json::Value>(&content)
{
meta["state"] =
serde_json::Value::String(target_state.clone());
if let Ok(updated) = serde_json::to_string_pretty(&meta) {
if let Err(e) = fs::write(&meta_path, updated) {
warn!("WAL rollback: failed to reset state for {env_id}: {e}");
} else {
debug!("WAL rollback: reset {env_id} state to {target_state}");
}
}
}
}
Err(e) => {
warn!(
"WAL rollback: failed to read metadata for {env_id}: {e}"
);
}
}
}
}
}
}
}
}
fn entry_path(&self, op_id: &str) -> PathBuf {
self.wal_dir.join(format!("{op_id}.json"))
}
fn write_entry(&self, entry: &WalEntry) -> Result<(), StoreError> {
fs::create_dir_all(&self.wal_dir)?;
let content = serde_json::to_string_pretty(entry)?;
let mut tmp = NamedTempFile::new_in(&self.wal_dir)?;
tmp.write_all(content.as_bytes())?;
tmp.as_file().sync_all()?;
let dest = self.entry_path(&entry.op_id);
tmp.persist(&dest).map_err(|e| StoreError::Io(e.error))?;
crate::fsync_dir(&self.wal_dir)?;
Ok(())
}
fn read_entry(&self, op_id: &str) -> Result<WalEntry, StoreError> {
let path = self.entry_path(op_id);
let content = fs::read_to_string(&path)?;
let entry: WalEntry = serde_json::from_str(&content)?;
Ok(entry)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn setup() -> (tempfile::TempDir, WriteAheadLog) {
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
let wal = WriteAheadLog::new(&layout);
wal.initialize().unwrap();
(dir, wal)
}
#[test]
fn begin_creates_entry() {
let (_dir, wal) = setup();
let op_id = wal.begin(WalOpKind::Build, "test-env-123").unwrap();
assert!(!op_id.is_empty());
let entries = wal.list_incomplete().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].env_id, "test-env-123");
}
#[test]
fn commit_removes_entry() {
let (_dir, wal) = setup();
let op_id = wal.begin(WalOpKind::Build, "test-env").unwrap();
assert_eq!(wal.list_incomplete().unwrap().len(), 1);
wal.commit(&op_id).unwrap();
assert!(wal.list_incomplete().unwrap().is_empty());
}
#[test]
fn successful_ops_leave_zero_entries() {
let (_dir, wal) = setup();
let op1 = wal.begin(WalOpKind::Build, "env1").unwrap();
let op2 = wal.begin(WalOpKind::Commit, "env2").unwrap();
wal.commit(&op1).unwrap();
wal.commit(&op2).unwrap();
assert!(wal.list_incomplete().unwrap().is_empty());
}
#[test]
fn add_rollback_step_persists() {
let (_dir, wal) = setup();
let op_id = wal.begin(WalOpKind::Build, "env1").unwrap();
wal.add_rollback_step(&op_id, RollbackStep::RemoveDir(PathBuf::from("/tmp/fake")))
.unwrap();
let entries = wal.list_incomplete().unwrap();
assert_eq!(entries[0].rollback_steps.len(), 1);
}
#[test]
fn recover_rolls_back_incomplete() {
let (dir, wal) = setup();
let op_id = wal.begin(WalOpKind::Build, "env1").unwrap();
// Create a directory that should be rolled back
let orphan_dir = dir.path().join("orphan_env");
fs::create_dir_all(&orphan_dir).unwrap();
fs::write(orphan_dir.join("file.txt"), "data").unwrap();
assert!(orphan_dir.exists());
wal.add_rollback_step(&op_id, RollbackStep::RemoveDir(orphan_dir.clone()))
.unwrap();
// Simulate crash: don't call commit. Recovery should clean up.
let count = wal.recover().unwrap();
assert_eq!(count, 1);
assert!(
!orphan_dir.exists(),
"orphan dir must be removed by recovery"
);
assert!(wal.list_incomplete().unwrap().is_empty());
}
#[test]
fn recover_removes_file_rollback_step() {
let (dir, wal) = setup();
let op_id = wal.begin(WalOpKind::Commit, "env1").unwrap();
let orphan_file = dir.path().join("orphan.json");
fs::write(&orphan_file, "{}").unwrap();
wal.add_rollback_step(&op_id, RollbackStep::RemoveFile(orphan_file.clone()))
.unwrap();
let count = wal.recover().unwrap();
assert_eq!(count, 1);
assert!(!orphan_file.exists());
}
#[test]
fn recover_with_no_entries_is_noop() {
let (_dir, wal) = setup();
let count = wal.recover().unwrap();
assert_eq!(count, 0);
}
#[test]
fn op_kind_display() {
assert_eq!(WalOpKind::Build.to_string(), "build");
assert_eq!(WalOpKind::Rebuild.to_string(), "rebuild");
assert_eq!(WalOpKind::Commit.to_string(), "commit");
assert_eq!(WalOpKind::Restore.to_string(), "restore");
assert_eq!(WalOpKind::Destroy.to_string(), "destroy");
assert_eq!(WalOpKind::Enter.to_string(), "enter");
assert_eq!(WalOpKind::Exec.to_string(), "exec");
}
#[test]
fn recover_reset_state_rollback() {
let (dir, wal) = setup();
// Write a fake metadata file in the expected location (store/metadata/env1)
let metadata_dir = dir.path().join("store").join("metadata");
let meta_json = r#"{
"env_id": "env1",
"short_id": "env1",
"state": "Running",
"manifest_hash": "mh",
"base_layer": "bl",
"dependency_layers": [],
"policy_layer": null,
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2025-01-01T00:00:00Z",
"ref_count": 1
}"#;
fs::write(metadata_dir.join("env1"), meta_json).unwrap();
// Create a WAL entry with a ResetState rollback step
let op_id = wal.begin(WalOpKind::Enter, "env1").unwrap();
wal.add_rollback_step(
&op_id,
RollbackStep::ResetState {
env_id: "env1".to_owned(),
target_state: "Built".to_owned(),
},
)
.unwrap();
// Simulate crash: don't commit. Recovery should reset state.
let count = wal.recover().unwrap();
assert_eq!(count, 1);
// Verify state was reset to Built
let content = fs::read_to_string(metadata_dir.join("env1")).unwrap();
let meta: serde_json::Value = serde_json::from_str(&content).unwrap();
assert_eq!(meta["state"], "Built");
}
#[test]
fn recover_corrupt_wal_entry_is_removed() {
let (dir, wal) = setup();
// Write a corrupt WAL entry directly
let wal_dir = dir.path().join("store").join("wal");
fs::write(wal_dir.join("corrupt-op.json"), "THIS IS NOT JSON{{{").unwrap();
// Also write a valid entry
let op_id = wal.begin(WalOpKind::Build, "env1").unwrap();
let orphan = dir.path().join("orphan_from_valid");
fs::create_dir_all(&orphan).unwrap();
wal.add_rollback_step(&op_id, RollbackStep::RemoveDir(orphan.clone()))
.unwrap();
// Recovery must handle corrupt entry (remove it) and still roll back valid one
let count = wal.recover().unwrap();
assert_eq!(
count, 1,
"only the valid entry should be counted as rolled back"
);
assert!(!orphan.exists(), "valid rollback must still execute");
// Corrupt entry file must be gone
assert!(
!wal_dir.join("corrupt-op.json").exists(),
"corrupt WAL entry must be removed during recovery"
);
// No WAL entries remain
assert!(wal.list_incomplete().unwrap().is_empty());
}
#[test]
fn recover_no_duplicate_objects_after_partial_build() {
let (dir, wal) = setup();
let layout = StoreLayout::new(dir.path());
// Simulate a partial build: object was written, then crash
let obj_store = crate::ObjectStore::new(layout.clone());
let hash1 = obj_store.put(b"real object data").unwrap();
// WAL entry says to remove the object (rollback of partial build)
let obj_path = layout.objects_dir().join(&hash1);
let op_id = wal.begin(WalOpKind::Build, "env1").unwrap();
wal.add_rollback_step(&op_id, RollbackStep::RemoveFile(obj_path.clone()))
.unwrap();
// Simulate crash: don't commit
let count = wal.recover().unwrap();
assert_eq!(count, 1);
// Object must be gone (rolled back)
assert!(
!obj_path.exists(),
"partial object must be removed by recovery"
);
// No duplicate: writing the same data again must succeed cleanly
let hash2 = obj_store.put(b"real object data").unwrap();
assert_eq!(hash1, hash2, "same data must produce same hash");
assert!(
layout.objects_dir().join(&hash2).exists(),
"re-written object must exist"
);
}
#[test]
fn recover_version_file_unchanged() {
let (dir, wal) = setup();
let layout = StoreLayout::new(dir.path());
// Read version before
let version_before = fs::read_to_string(dir.path().join("store").join("version")).unwrap();
// Create WAL entries and recover
let op1 = wal.begin(WalOpKind::Build, "env1").unwrap();
let orphan = dir.path().join("test_orphan");
fs::create_dir_all(&orphan).unwrap();
wal.add_rollback_step(&op1, RollbackStep::RemoveDir(orphan.clone()))
.unwrap();
let count = wal.recover().unwrap();
assert_eq!(count, 1);
// Version file must be identical
let version_after = fs::read_to_string(dir.path().join("store").join("version")).unwrap();
assert_eq!(
version_before, version_after,
"version file must not change during WAL recovery"
);
// Store integrity must pass
let report = crate::verify_store_integrity(&layout).unwrap();
assert!(
report.failed.is_empty(),
"store integrity must pass after WAL recovery: {:?}",
report.failed
);
}
}

View file

@ -0,0 +1,366 @@
//! IG-M6: Store migration tests.
use karapace_store::{
migrate_store, EnvState, LayerKind, LayerManifest, LayerStore, MetadataStore, ObjectStore,
StoreLayout, STORE_FORMAT_VERSION,
};
use std::fs;
use std::path::Path;
/// Create a v1-format store with the given number of metadata files.
fn create_v1_store(root: &Path, num_envs: usize) {
let store_dir = root.join("store");
fs::create_dir_all(store_dir.join("objects")).unwrap();
fs::create_dir_all(store_dir.join("layers")).unwrap();
fs::create_dir_all(store_dir.join("metadata")).unwrap();
fs::create_dir_all(store_dir.join("staging")).unwrap();
fs::create_dir_all(root.join("env")).unwrap();
// Write v1 version file
fs::write(store_dir.join("version"), r#"{"format_version": 1}"#).unwrap();
// Write v1-format metadata (missing v2 fields: name, checksum, policy_layer)
for i in 0..num_envs {
let env_id = format!("env_{i:04}");
let meta_json = serde_json::json!({
"env_id": env_id,
"short_id": &env_id[..8],
"state": "Built",
"manifest_hash": format!("mhash_{i}"),
"base_layer": format!("blayer_{i}"),
"dependency_layers": [],
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2025-01-01T00:00:00Z",
"ref_count": 1
});
fs::write(
store_dir.join("metadata").join(&env_id),
serde_json::to_string_pretty(&meta_json).unwrap(),
)
.unwrap();
}
}
#[test]
fn migrate_v1_store_to_v2() {
let dir = tempfile::tempdir().unwrap();
create_v1_store(dir.path(), 2);
let result = migrate_store(dir.path()).unwrap();
assert!(result.is_some(), "migration must return Some for v1→v2");
let result = result.unwrap();
assert_eq!(result.from_version, 1);
assert_eq!(result.to_version, STORE_FORMAT_VERSION);
assert_eq!(result.environments_migrated, 2);
// Verify version file now says v2
let layout = StoreLayout::new(dir.path());
layout.verify_version().unwrap();
// Both metadata files must be readable by current MetadataStore
let meta_store = MetadataStore::new(layout);
let m0 = meta_store.get("env_0000").unwrap();
assert_eq!(m0.env_id.as_str(), "env_0000");
assert_eq!(m0.state, EnvState::Built);
let m1 = meta_store.get("env_0001").unwrap();
assert_eq!(m1.env_id.as_str(), "env_0001");
}
#[test]
fn migrate_preserves_all_metadata_fields() {
let dir = tempfile::tempdir().unwrap();
create_v1_store(dir.path(), 1);
migrate_store(dir.path()).unwrap();
let layout = StoreLayout::new(dir.path());
let meta_store = MetadataStore::new(layout);
let meta = meta_store.get("env_0000").unwrap();
// Original fields preserved
assert_eq!(meta.env_id.as_str(), "env_0000");
assert_eq!(meta.short_id.as_str(), "env_0000");
assert_eq!(meta.state, EnvState::Built);
assert_eq!(meta.manifest_hash.as_str(), "mhash_0");
assert_eq!(meta.base_layer.as_str(), "blayer_0");
assert!(meta.dependency_layers.is_empty());
assert_eq!(meta.created_at, "2025-01-01T00:00:00Z");
assert_eq!(meta.ref_count, 1);
// v2 defaults added
assert_eq!(meta.name, None);
assert_eq!(meta.policy_layer, None);
}
#[test]
fn migrate_preserves_objects_and_layers() {
let dir = tempfile::tempdir().unwrap();
// Start with a normal v2 store to create real objects and layers
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
let obj_store = ObjectStore::new(layout.clone());
let layer_store = LayerStore::new(layout.clone());
let h1 = obj_store.put(b"object data 1").unwrap();
let h2 = obj_store.put(b"object data 2").unwrap();
let h3 = obj_store.put(b"object data 3").unwrap();
let layer = LayerManifest {
hash: "test_layer".to_owned(),
kind: LayerKind::Base,
parent: None,
object_refs: vec![h1.clone(), h2.clone()],
read_only: true,
tar_hash: String::new(),
};
let lh1 = layer_store.put(&layer).unwrap();
let layer2 = LayerManifest {
hash: "test_layer2".to_owned(),
kind: LayerKind::Snapshot,
parent: Some(lh1.clone()),
object_refs: vec![h3.clone()],
read_only: false,
tar_hash: String::new(),
};
let lh2 = layer_store.put(&layer2).unwrap();
// Downgrade version file to v1
fs::write(
dir.path().join("store").join("version"),
r#"{"format_version": 1}"#,
)
.unwrap();
// Run migration
migrate_store(dir.path()).unwrap();
// Verify all objects intact
let obj_store2 = ObjectStore::new(StoreLayout::new(dir.path()));
assert_eq!(obj_store2.get(&h1).unwrap(), b"object data 1");
assert_eq!(obj_store2.get(&h2).unwrap(), b"object data 2");
assert_eq!(obj_store2.get(&h3).unwrap(), b"object data 3");
// Verify all layers intact
let layer_store2 = LayerStore::new(StoreLayout::new(dir.path()));
let loaded1 = layer_store2.get(&lh1).unwrap();
assert_eq!(loaded1.object_refs.len(), 2);
let loaded2 = layer_store2.get(&lh2).unwrap();
assert_eq!(loaded2.kind, LayerKind::Snapshot);
// Verify store integrity
let report = karapace_store::verify_store_integrity(&StoreLayout::new(dir.path())).unwrap();
assert!(
report.failed.is_empty(),
"store integrity must pass after migration, failures: {:?}",
report.failed
);
}
#[test]
fn migrate_creates_backup() {
let dir = tempfile::tempdir().unwrap();
create_v1_store(dir.path(), 0);
let result = migrate_store(dir.path()).unwrap().unwrap();
assert!(result.backup_path.exists(), "backup file must exist");
// Backup must contain v1
let backup_content = fs::read_to_string(&result.backup_path).unwrap();
assert!(
backup_content.contains("\"format_version\": 1")
|| backup_content.contains("\"format_version\":1"),
"backup must contain format_version 1, got: {backup_content}"
);
// Current version must be v2
let current = fs::read_to_string(dir.path().join("store").join("version")).unwrap();
assert!(
current.contains(&format!("{STORE_FORMAT_VERSION}")),
"version file must now be v{STORE_FORMAT_VERSION}"
);
}
#[test]
fn migrate_idempotent_on_current_version() {
let dir = tempfile::tempdir().unwrap();
let layout = StoreLayout::new(dir.path());
layout.initialize().unwrap();
let result = migrate_store(dir.path()).unwrap();
assert!(
result.is_none(),
"migrate on current-version store must return None"
);
// Store unmodified
layout.verify_version().unwrap();
}
#[test]
fn migrate_rejects_future_version() {
let dir = tempfile::tempdir().unwrap();
let store_dir = dir.path().join("store");
fs::create_dir_all(&store_dir).unwrap();
fs::write(store_dir.join("version"), r#"{"format_version": 99}"#).unwrap();
let result = migrate_store(dir.path());
assert!(result.is_err(), "future version must be rejected");
let err_msg = format!("{}", result.unwrap_err());
assert!(
err_msg.contains("mismatch") || err_msg.contains("Mismatch"),
"error must mention version mismatch, got: {err_msg}"
);
}
#[test]
fn migrate_atomic_version_unchanged_on_write_failure() {
use std::os::unix::fs::PermissionsExt;
let dir = tempfile::tempdir().unwrap();
create_v1_store(dir.path(), 1);
let store_dir = dir.path().join("store");
// Make the store directory non-writable so the final NamedTempFile::new_in(&store_dir)
// fails. Metadata migration writes into metadata/ (still writable) but the version
// file write into store/ will fail.
let original_mode = fs::metadata(&store_dir).unwrap().permissions().mode();
// Remove write permission from store/ dir (keep read+exec for traversal)
fs::set_permissions(&store_dir, fs::Permissions::from_mode(0o555)).unwrap();
let result = migrate_store(dir.path());
// Restore permissions for cleanup
fs::set_permissions(&store_dir, fs::Permissions::from_mode(original_mode)).unwrap();
// Migration MUST have failed — the version file write requires creating a temp file in store/
assert!(
result.is_err(),
"migration must fail when store dir is read-only — test is invalid if it succeeds"
);
// Version file MUST still say v1
let ver_content = fs::read_to_string(dir.path().join("store").join("version")).unwrap();
assert!(
ver_content.contains("\"format_version\": 1")
|| ver_content.contains("\"format_version\":1"),
"version must still be v1 after failed migration, got: {ver_content}"
);
// No partial version.backup files should exist (backup also writes to store/)
let backup_files: Vec<_> = fs::read_dir(&store_dir)
.unwrap()
.filter_map(Result::ok)
.filter(|e| {
e.file_name()
.to_string_lossy()
.starts_with("version.backup")
})
.collect();
// Backup may or may not exist depending on where exactly the failure occurred,
// but version must be unchanged regardless.
let _ = backup_files;
}
#[test]
fn migrate_corrupted_metadata_fails_and_store_untouched() {
let dir = tempfile::tempdir().unwrap();
let store_dir = dir.path().join("store");
// Create a minimal v1 store with a corrupted metadata file
fs::create_dir_all(store_dir.join("objects")).unwrap();
fs::create_dir_all(store_dir.join("layers")).unwrap();
fs::create_dir_all(store_dir.join("metadata")).unwrap();
fs::create_dir_all(store_dir.join("staging")).unwrap();
fs::create_dir_all(dir.path().join("env")).unwrap();
fs::write(store_dir.join("version"), r#"{"format_version": 1}"#).unwrap();
// Write corrupted metadata: not a JSON object (it's an array)
fs::write(store_dir.join("metadata").join("corrupt_env"), "[1, 2, 3]").unwrap();
// Migration should succeed (corrupt files are warned+skipped) but report 0 migrated
let result = migrate_store(dir.path()).unwrap();
assert!(result.is_some());
let result = result.unwrap();
assert_eq!(
result.environments_migrated, 0,
"corrupted metadata must not count as migrated"
);
// The corrupted file must still exist and be unchanged
let corrupt_content =
fs::read_to_string(store_dir.join("metadata").join("corrupt_env")).unwrap();
assert_eq!(
corrupt_content, "[1, 2, 3]",
"corrupted file must be untouched"
);
// Version file must now be v2 (migration itself succeeded, only metadata was skipped)
let layout = StoreLayout::new(dir.path());
layout.verify_version().unwrap();
}
#[test]
fn migrate_invalid_json_metadata_skipped() {
let dir = tempfile::tempdir().unwrap();
let store_dir = dir.path().join("store");
fs::create_dir_all(store_dir.join("objects")).unwrap();
fs::create_dir_all(store_dir.join("layers")).unwrap();
fs::create_dir_all(store_dir.join("metadata")).unwrap();
fs::create_dir_all(store_dir.join("staging")).unwrap();
fs::create_dir_all(dir.path().join("env")).unwrap();
fs::write(store_dir.join("version"), r#"{"format_version": 1}"#).unwrap();
// Write totally invalid JSON
fs::write(
store_dir.join("metadata").join("broken_env"),
"THIS IS NOT JSON AT ALL {{{",
)
.unwrap();
// Also write a valid v1 metadata file
let valid_meta = serde_json::json!({
"env_id": "valid_env",
"short_id": "valid_en",
"state": "Built",
"manifest_hash": "mh",
"base_layer": "bl",
"dependency_layers": [],
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2025-01-01T00:00:00Z",
"ref_count": 1
});
fs::write(
store_dir.join("metadata").join("valid_env"),
serde_json::to_string_pretty(&valid_meta).unwrap(),
)
.unwrap();
let result = migrate_store(dir.path()).unwrap().unwrap();
// Only the valid one should be migrated
assert_eq!(result.environments_migrated, 1);
// Invalid file must still exist, unchanged
let broken = fs::read_to_string(store_dir.join("metadata").join("broken_env")).unwrap();
assert_eq!(broken, "THIS IS NOT JSON AT ALL {{{");
// Valid file must now have v2 fields
let valid = fs::read_to_string(store_dir.join("metadata").join("valid_env")).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&valid).unwrap();
assert!(
parsed.get("name").is_some(),
"v2 'name' field must be present"
);
assert!(
parsed.get("checksum").is_some(),
"v2 'checksum' field must be present"
);
assert!(
parsed.get("policy_layer").is_some(),
"v2 'policy_layer' field must be present"
);
}