diff --git a/ROADMAP.md b/ROADMAP.md index afaf5b9..5e90d03 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -207,7 +207,18 @@ tar cf - /project | uc2 --ingest backup.uc2 # dedup tar stream cp -a /snapshot/ | uc2 --ingest backup.uc2 # incremental dedup ``` -- [ ] `uc2 --ingest` mode: streaming input with master-block dedup +- [x] `uc2 --ingest` mode v1: stdin -> CDC -> sidecar blockstore at + `.blocks/` -> chunk-hash manifest. `uc2 --ingest-restore` + reverses the round-trip. Tested: small/multichunk round-trip, + idempotent dedup on repeat ingest, empty stream, bad-magic + rejection. Not yet integrated with the master-block archive + layout (sidecar blockstore is a separate format with magic + `UC2INGST`); tar entry boundaries are not preserved. Follow-up + tracked separately. +- [ ] `uc2 --ingest` v2: integrate with master-block archive layout + so output is a real UC2 v3 archive, not a sidecar manifest +- [ ] Tar-entry preservation: parse tar boundaries inside --ingest + so individual files are recoverable as archive entries - [ ] Incremental snapshots: `uc2 snapshot /path repo.uc2` (borg/restic-style deduplicating backups without filesystem support) diff --git a/cli/src/main.c b/cli/src/main.c index 8806cde..5bb3407 100644 --- a/cli/src/main.c +++ b/cli/src/main.c @@ -36,6 +36,7 @@ void setprogname(const char *argv0); #include #include +#include #include #include #include @@ -53,6 +54,12 @@ enum ots_mode { OTS_MODE_INFO }; +enum ingest_mode { + INGEST_MODE_NONE = 0, + INGEST_MODE_WRITE, + INGEST_MODE_RESTORE +}; + struct options { bool list:1; bool all:1; @@ -66,6 +73,7 @@ struct options { bool quiet:1; bool benchmark:1; int ots_mode; + int ingest_mode; char sep; int level; char *archive; @@ -1592,6 +1600,103 @@ static int run_benchmark(int nfiles, char **files) return EXIT_SUCCESS; } +/* Pre-parse for ingest long options: --ingest, --ingest-restore. + * Both take a single path argument, accepted as either + * --ingest (separate argv entry; rejected if path starts with '-') + * --ingest= (inline). */ +static int extract_ingest_long_opts(int *argcp, char **argv, char **out_path) +{ + int argc = *argcp; + int mode = INGEST_MODE_NONE; + *out_path = NULL; + for (int i = 1; i < argc; ) { + const char *a = argv[i]; + int matched_args = 0; + int new_mode = INGEST_MODE_NONE; + const char *inline_value = NULL; + + if (strcmp(a, "--ingest") == 0) { + new_mode = INGEST_MODE_WRITE; + matched_args = 1; + } else if (strncmp(a, "--ingest=", 9) == 0) { + new_mode = INGEST_MODE_WRITE; + matched_args = 1; + inline_value = a + 9; + } else if (strcmp(a, "--ingest-restore") == 0) { + new_mode = INGEST_MODE_RESTORE; + matched_args = 1; + } else if (strncmp(a, "--ingest-restore=", 17) == 0) { + new_mode = INGEST_MODE_RESTORE; + matched_args = 1; + inline_value = a + 17; + } + + if (!matched_args) { i++; continue; } + + mode = new_mode; + if (inline_value) { + *out_path = (char *)inline_value; + } else { + if (i + 1 >= argc || argv[i + 1][0] == '-') + errx(EXIT_FAILURE, + "%s requires a path argument", a); + *out_path = argv[i + 1]; + matched_args = 2; + } + for (int j = i; j + matched_args < argc; j++) + argv[j] = argv[j + matched_args]; + argc -= matched_args; + } + *argcp = argc; + return mode; +} + +static int cmd_ingest_write(const char *archive_path) +{ + uint8_t *buf = NULL; + size_t cap = 0, len = 0; + const size_t chunk = 64 * 1024; + for (;;) { + if (len + chunk > cap) { + size_t ncap = cap ? cap * 2 : chunk; + while (ncap < len + chunk) ncap *= 2; + uint8_t *p = realloc(buf, ncap); + if (!p) { free(buf); err(EXIT_FAILURE, "realloc"); } + buf = p; + cap = ncap; + } + size_t n = fread(buf + len, 1, chunk, stdin); + len += n; + if (n < chunk) { + if (ferror(stdin)) { + free(buf); + err(EXIT_FAILURE, "read stdin"); + } + break; + } + } + + struct uc2_ingest_stats st; + int rc = uc2_ingest_write(archive_path, buf, len, 0, &st); + free(buf); + if (rc != 0) + errx(EXIT_FAILURE, "ingest write failed: %s", archive_path); + + uc2_say(stderr, + "ingested %llu bytes -> %d chunks (%d new, %d deduped, %llu bytes saved)\n", + (unsigned long long)st.bytes_in, + st.chunks_total, st.chunks_new, st.chunks_dedup, + (unsigned long long)st.bytes_saved); + return EXIT_SUCCESS; +} + +static int cmd_ingest_restore(const char *archive_path) +{ + if (uc2_ingest_restore(archive_path, stdout) != 0) + errx(EXIT_FAILURE, "ingest restore failed: %s", archive_path); + return EXIT_SUCCESS; +} + /* Pre-parse for OTS long options: --ots-attach, --ots-extract, --ots-info. * Removes matched arguments from argv in place so the existing getopt loop * doesn't see them. --ots-attach takes a value, accepted as either @@ -1654,6 +1759,15 @@ int main(int argc, char *argv[]) if (argc == 1) goto usage; + { + char *ingest_path = NULL; + opt.ingest_mode = extract_ingest_long_opts(&argc, argv, &ingest_path); + if (opt.ingest_mode == INGEST_MODE_WRITE) + return cmd_ingest_write(ingest_path); + if (opt.ingest_mode == INGEST_MODE_RESTORE) + return cmd_ingest_restore(ingest_path); + } + opt.ots_mode = extract_ots_long_opts(&argc, argv, &opt.ots_path); for (;;) { @@ -1725,6 +1839,8 @@ usage: "uc2 --ots-attach [-f] archive.uc2\n" "uc2 --ots-extract archive.uc2 \n" "uc2 --ots-info archive.uc2\n" + "uc2 --ingest # stdin -> dedup blockstore\n" + "uc2 --ingest-restore # blockstore -> stdout\n" ); if (!opt.help) printf("uc2 -h\n"); diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 39b4ff8..635560d 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -1,6 +1,6 @@ # libuc2 — UC2 decompression library -set(LIBUC2_SOURCES src/decompress.c src/compress.c src/uc2_tables.c src/uc2_cdc.c src/uc2_merkle.c src/uc2_blockstore.c src/uc2_simhash.c src/uc2_delta.c src/uc2_rans.c src/uc2_dict.c src/uc2_preprocess.c src/uc2_lz4.c src/uc2_blake3.c src/uc2_sha256.c src/uc2_ots.c) +set(LIBUC2_SOURCES src/decompress.c src/compress.c src/uc2_tables.c src/uc2_cdc.c src/uc2_merkle.c src/uc2_blockstore.c src/uc2_simhash.c src/uc2_delta.c src/uc2_rans.c src/uc2_dict.c src/uc2_preprocess.c src/uc2_lz4.c src/uc2_blake3.c src/uc2_sha256.c src/uc2_ots.c src/uc2_ingest.c) # Embed super.bin: use .S with .incbin on GCC/Clang, generated C array on MSVC if(MSVC) diff --git a/lib/include/uc2/uc2_ingest.h b/lib/include/uc2/uc2_ingest.h new file mode 100644 index 0000000..ed44a12 --- /dev/null +++ b/lib/include/uc2/uc2_ingest.h @@ -0,0 +1,58 @@ +/* SPDX-License-Identifier: GPL-3.0-or-later */ + +/* Streaming dedup ingest for UC2. + * + * uc2 --ingest reads a byte stream (typically stdin from + * tar / rsync / cp -a), splits it via CDC, deduplicates chunks against + * a sidecar block store, and writes a manifest file that lists chunk + * hashes in order. uc2 --ingest-restore reverses this. + * + * Manifest layout (little-endian): + * +0 8B magic "UC2INGST" + * +8 1B version (1) + * +9 1B cdc_bits + * +10 2B reserved + * +12 4B chunk_count + * +16 ... chunk_count * 12B: 8B hash, 4B length + * + * Block store layout (per uc2_blockstore.h): + * .blocks// + * + * Limitations of v1: + * - The whole stream is buffered in memory before chunking. Suits + * CDC's locality-of-boundary requirement and is fine for streams + * up to a few GB. True streaming is a future revision. + * - The manifest file is not a UC2 v3 archive; it is a separate + * format with its own magic. Unifying with the master-block + * archive layout is a follow-up. + */ + +#ifndef UC2_INGEST_H +#define UC2_INGEST_H + +#include +#include +#include + +struct uc2_ingest_stats { + uint64_t bytes_in; /* input stream length */ + int chunks_total; /* total chunks in input */ + int chunks_new; /* chunks newly stored */ + int chunks_dedup; /* chunks already in the block store */ + uint64_t bytes_stored; /* bytes physically written this call */ + uint64_t bytes_saved; /* bytes saved by dedup */ +}; + +/* Ingest len bytes of data into archive_path. The block store lives + * at .blocks/. cdc_bits selects the average chunk + * size (13 = 8 KiB; 0 picks a sensible default). */ +int uc2_ingest_write(const char *archive_path, + const uint8_t *data, size_t len, + int cdc_bits, + struct uc2_ingest_stats *stats); + +/* Restore the byte stream described by an ingest manifest. Reads + * chunks from .blocks/ and writes them in order to out. */ +int uc2_ingest_restore(const char *archive_path, FILE *out); + +#endif diff --git a/lib/src/uc2_ingest.c b/lib/src/uc2_ingest.c new file mode 100644 index 0000000..2fe1524 --- /dev/null +++ b/lib/src/uc2_ingest.c @@ -0,0 +1,199 @@ +/* SPDX-License-Identifier: GPL-3.0-or-later */ + +#include "uc2/uc2_ingest.h" +#include "uc2/uc2_blockstore.h" +#include "uc2/uc2_merkle.h" + +#include +#include +#include + +static const char INGEST_MAGIC[8] = { 'U','C','2','I','N','G','S','T' }; +#define INGEST_VERSION 1 +#define DEFAULT_CDC_BITS 13 + +static char *make_blocks_path(const char *archive_path) +{ + size_t n = strlen(archive_path); + char *p = malloc(n + 8); + if (!p) return NULL; + memcpy(p, archive_path, n); + memcpy(p + n, ".blocks", 8); /* includes trailing NUL */ + return p; +} + +static void put_le32(uint8_t *p, uint32_t v) +{ + p[0] = (uint8_t)v; + p[1] = (uint8_t)(v >> 8); + p[2] = (uint8_t)(v >> 16); + p[3] = (uint8_t)(v >> 24); +} + +static void put_le64(uint8_t *p, uint64_t v) +{ + for (int i = 0; i < 8; i++) + p[i] = (uint8_t)(v >> (i * 8)); +} + +static uint32_t get_le32(const uint8_t *p) +{ + return (uint32_t)p[0] + | ((uint32_t)p[1] << 8) + | ((uint32_t)p[2] << 16) + | ((uint32_t)p[3] << 24); +} + +static uint64_t get_le64(const uint8_t *p) +{ + uint64_t v = 0; + for (int i = 0; i < 8; i++) + v |= (uint64_t)p[i] << (i * 8); + return v; +} + +int uc2_ingest_write(const char *archive_path, + const uint8_t *data, size_t len, + int cdc_bits, + struct uc2_ingest_stats *stats) +{ + if (!archive_path) + return -1; + if (cdc_bits <= 0) + cdc_bits = DEFAULT_CDC_BITS; + + char *blocks_path = make_blocks_path(archive_path); + if (!blocks_path) + return -1; + + struct uc2_blockstore bs; + if (uc2_blockstore_open(&bs, blocks_path) != 0) { + free(blocks_path); + return -1; + } + free(blocks_path); + + struct uc2_merkle tree; + uc2_merkle_build(&tree, data, len, cdc_bits); + + int new_chunks = 0; + if (tree.nchunks > 0) + new_chunks = uc2_blockstore_ingest(&bs, &tree, data, len); + + FILE *f = fopen(archive_path, "wb"); + if (!f) { + uc2_merkle_free(&tree); + uc2_blockstore_close(&bs); + return -1; + } + + uint8_t hdr[16]; + memcpy(hdr, INGEST_MAGIC, 8); + hdr[8] = INGEST_VERSION; + hdr[9] = (uint8_t)cdc_bits; + hdr[10] = 0; + hdr[11] = 0; + put_le32(hdr + 12, (uint32_t)tree.nchunks); + if (fwrite(hdr, 1, sizeof hdr, f) != sizeof hdr) { + fclose(f); + uc2_merkle_free(&tree); + uc2_blockstore_close(&bs); + return -1; + } + + for (int i = 0; i < tree.nchunks; i++) { + uint8_t rec[12]; + put_le64(rec, tree.chunks[i].hash); + put_le32(rec + 8, tree.chunks[i].length); + if (fwrite(rec, 1, sizeof rec, f) != sizeof rec) { + fclose(f); + uc2_merkle_free(&tree); + uc2_blockstore_close(&bs); + return -1; + } + } + + if (fclose(f) != 0) { + uc2_merkle_free(&tree); + uc2_blockstore_close(&bs); + return -1; + } + + if (stats) { + stats->bytes_in = (uint64_t)len; + stats->chunks_total = tree.nchunks; + stats->chunks_new = new_chunks; + stats->chunks_dedup = tree.nchunks - new_chunks; + stats->bytes_stored = (uint64_t)bs.total_bytes; + stats->bytes_saved = (uint64_t)bs.saved_bytes; + } + + uc2_merkle_free(&tree); + uc2_blockstore_close(&bs); + return 0; +} + +int uc2_ingest_restore(const char *archive_path, FILE *out) +{ + if (!archive_path || !out) + return -1; + + FILE *f = fopen(archive_path, "rb"); + if (!f) + return -1; + + uint8_t hdr[16]; + if (fread(hdr, 1, sizeof hdr, f) != sizeof hdr) { + fclose(f); + return -1; + } + if (memcmp(hdr, INGEST_MAGIC, 8) != 0 || hdr[8] != INGEST_VERSION) { + fclose(f); + return -1; + } + uint32_t nchunks = get_le32(hdr + 12); + + char *blocks_path = make_blocks_path(archive_path); + if (!blocks_path) { + fclose(f); + return -1; + } + + struct uc2_blockstore bs; + if (uc2_blockstore_open(&bs, blocks_path) != 0) { + free(blocks_path); + fclose(f); + return -1; + } + free(blocks_path); + + uint8_t *buf = NULL; + size_t buf_cap = 0; + int rc = 0; + + for (uint32_t i = 0; i < nchunks; i++) { + uint8_t rec[12]; + if (fread(rec, 1, sizeof rec, f) != sizeof rec) { + rc = -1; + break; + } + uint64_t hash = get_le64(rec); + uint32_t clen = get_le32(rec + 8); + + if (clen > buf_cap) { + uint8_t *p = realloc(buf, clen); + if (!p) { rc = -1; break; } + buf = p; + buf_cap = clen; + } + + int n = uc2_blockstore_read(&bs, hash, buf, clen); + if (n != (int)clen) { rc = -1; break; } + if (fwrite(buf, 1, clen, out) != clen) { rc = -1; break; } + } + + free(buf); + uc2_blockstore_close(&bs); + fclose(f); + return rc; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e6bdaaf..9a625d1 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -125,6 +125,12 @@ target_include_directories(test_ots PRIVATE "${PROJECT_BINARY_DIR}/lib") target_compile_features(test_ots PRIVATE c_std_99) add_test(NAME ots COMMAND test_ots) +add_executable(test_ingest src/test_ingest.c) +target_link_libraries(test_ingest PRIVATE uc2) +target_include_directories(test_ingest PRIVATE "${PROJECT_BINARY_DIR}/lib") +target_compile_features(test_ingest PRIVATE c_std_99) +add_test(NAME ingest COMMAND test_ingest) + # Optional cross-check: validates uc2 .ots output against the python-opentimestamps # reference parser. Skipped (return code 77) when opentimestamps is not installed. find_package(Python3 COMPONENTS Interpreter) diff --git a/tests/src/test_ingest.c b/tests/src/test_ingest.c new file mode 100644 index 0000000..3f4f5e5 --- /dev/null +++ b/tests/src/test_ingest.c @@ -0,0 +1,202 @@ +/* Tests for streaming dedup ingest (uc2 --ingest). */ + +#include +#include +#include +#include +#ifdef _MSC_VER +#include +#define getpid _getpid +#else +#include +#endif +#include + +static int tests_run = 0, tests_passed = 0; +#define TEST(name) do { tests_run++; printf(" %s: ", #name); name(); tests_passed++; printf("OK\n"); } while (0) + +static char tmp_archive[256]; + +static void rmrf(const char *path) +{ + char cmd[768]; + snprintf(cmd, sizeof cmd, "rm -rf '%s' '%s.blocks'", path, path); + system(cmd); +} + +static void fill_random(uint8_t *buf, size_t len, uint32_t seed) +{ + for (size_t i = 0; i < len; i++) { + seed = seed * 1103515245 + 12345; + buf[i] = (uint8_t)(seed >> 16); + } +} + +/* Read whole file into a freshly-malloc'd buffer. Caller frees. */ +static uint8_t *slurp(const char *path, size_t *out_len) +{ + FILE *f = fopen(path, "rb"); + if (!f) return NULL; + fseek(f, 0, SEEK_END); + long n = ftell(f); + fseek(f, 0, SEEK_SET); + uint8_t *buf = malloc(n > 0 ? (size_t)n : 1); + size_t got = fread(buf, 1, (size_t)n, f); + fclose(f); + *out_len = got; + return buf; +} + +static void test_roundtrip_small(void) +{ + rmrf(tmp_archive); + const char *msg = "hello world"; + struct uc2_ingest_stats st; + int rc = uc2_ingest_write(tmp_archive, + (const uint8_t *)msg, strlen(msg), 0, &st); + assert(rc == 0); + (void)rc; + assert(st.bytes_in == strlen(msg)); + assert(st.chunks_total >= 1); + assert(st.chunks_new == st.chunks_total); + assert(st.chunks_dedup == 0); + + char restored[320]; + snprintf(restored, sizeof restored, "%s.out", tmp_archive); + FILE *out = fopen(restored, "wb"); + assert(out); + rc = uc2_ingest_restore(tmp_archive, out); + fclose(out); + assert(rc == 0); + + size_t got_len; + uint8_t *got = slurp(restored, &got_len); + assert(got_len == strlen(msg)); + assert(memcmp(got, msg, got_len) == 0); + free(got); + unlink(restored); + rmrf(tmp_archive); +} + +static void test_roundtrip_multichunk(void) +{ + rmrf(tmp_archive); + const size_t N = 200000; + uint8_t *data = malloc(N); + fill_random(data, N, 0x12345678); + + struct uc2_ingest_stats st; + int rc = uc2_ingest_write(tmp_archive, data, N, 0, &st); + assert(rc == 0); + (void)rc; + assert(st.bytes_in == N); + assert(st.chunks_total > 1); /* CDC should find boundaries in 200 KB */ + + char restored[320]; + snprintf(restored, sizeof restored, "%s.out", tmp_archive); + FILE *out = fopen(restored, "wb"); + assert(out); + rc = uc2_ingest_restore(tmp_archive, out); + fclose(out); + assert(rc == 0); + + size_t got_len; + uint8_t *got = slurp(restored, &got_len); + assert(got_len == N); + assert(memcmp(got, data, N) == 0); + + free(got); + free(data); + unlink(restored); + rmrf(tmp_archive); +} + +static void test_dedup_idempotent(void) +{ + rmrf(tmp_archive); + /* Repeated short pattern -> stable CDC boundaries -> high dedup. */ + const size_t N = 200000; + uint8_t *data = malloc(N); + const char *pattern = "the quick brown fox jumps over the lazy dog\n"; + size_t plen = strlen(pattern); + for (size_t i = 0; i < N; i++) data[i] = (uint8_t)pattern[i % plen]; + + struct uc2_ingest_stats st1, st2; + int rc = uc2_ingest_write(tmp_archive, data, N, 0, &st1); + assert(rc == 0); + (void)rc; + assert(st1.chunks_new == st1.chunks_total); + assert(st1.chunks_dedup == 0); + + rc = uc2_ingest_write(tmp_archive, data, N, 0, &st2); + assert(rc == 0); + assert(st2.chunks_total == st1.chunks_total); + assert(st2.chunks_new == 0); + assert(st2.chunks_dedup == st2.chunks_total); + assert(st2.bytes_saved == (uint64_t)N); + + free(data); + rmrf(tmp_archive); +} + +static void test_empty_stream(void) +{ + rmrf(tmp_archive); + struct uc2_ingest_stats st; + int rc = uc2_ingest_write(tmp_archive, NULL, 0, 0, &st); + assert(rc == 0); + (void)rc; + assert(st.bytes_in == 0); + assert(st.chunks_total == 0); + + char restored[320]; + snprintf(restored, sizeof restored, "%s.out", tmp_archive); + FILE *out = fopen(restored, "wb"); + assert(out); + rc = uc2_ingest_restore(tmp_archive, out); + fclose(out); + assert(rc == 0); + + size_t got_len; + uint8_t *got = slurp(restored, &got_len); + assert(got_len == 0); + free(got); + unlink(restored); + rmrf(tmp_archive); +} + +static void test_bad_magic_rejected(void) +{ + rmrf(tmp_archive); + FILE *f = fopen(tmp_archive, "wb"); + assert(f); + const char garbage[16] = "not-a-uc2-ingest"; + fwrite(garbage, 1, sizeof garbage, f); + fclose(f); + + FILE *out = fopen("/dev/null", "wb"); +#ifdef _MSC_VER + if (!out) out = fopen("NUL", "wb"); +#endif + assert(out); + int rc = uc2_ingest_restore(tmp_archive, out); + fclose(out); + assert(rc != 0); + (void)rc; + rmrf(tmp_archive); +} + +int main(void) +{ + snprintf(tmp_archive, sizeof tmp_archive, + "/tmp/uc2_ingest_test_%d.uc2", (int)getpid()); + + printf("Running uc2_ingest tests...\n"); + TEST(test_roundtrip_small); + TEST(test_roundtrip_multichunk); + TEST(test_dedup_idempotent); + TEST(test_empty_stream); + TEST(test_bad_magic_rejected); + printf("Passed: %d/%d\n", tests_passed, tests_run); + return tests_passed == tests_run ? 0 : 1; +}