ingest v1: streaming dedup sink (--ingest / --ingest-restore)

Reads stdin, splits via CDC, deduplicates chunks against a sidecar
block store at <archive>.blocks/, writes a chunk-hash manifest at
<archive>.  The reverse operation reads the manifest and reassembles
the byte stream from the block store.

Manifest format (magic UC2INGST) is a standalone container, not yet
unified with the master-block archive layout.  Tar boundaries are not
preserved; the input is treated as an opaque byte stream.  Follow-ups
filed for both.

Builds entirely on existing CDC + blockstore + merkle infrastructure.
No new compression or hashing primitives.

Tests cover small + 200 KB multichunk round-trip, idempotent dedup
(repeat ingest of the same data reports zero new chunks and exact
bytes_saved), empty stream, bad-magic rejection.  Lint gate stays
green.

Closes fa0c7d4.
This commit is contained in:
Eremey Valetov
2026-05-04 18:37:18 -04:00
parent 4a51918b83
commit 446158e855
7 changed files with 594 additions and 2 deletions

View File

@@ -207,7 +207,18 @@ tar cf - /project | uc2 --ingest backup.uc2 # dedup tar stream
cp -a /snapshot/ | uc2 --ingest backup.uc2 # incremental dedup
```
- [ ] `uc2 --ingest` mode: streaming input with master-block dedup
- [x] `uc2 --ingest` mode v1: stdin -> CDC -> sidecar blockstore at
`<archive>.blocks/` -> chunk-hash manifest. `uc2 --ingest-restore`
reverses the round-trip. Tested: small/multichunk round-trip,
idempotent dedup on repeat ingest, empty stream, bad-magic
rejection. Not yet integrated with the master-block archive
layout (sidecar blockstore is a separate format with magic
`UC2INGST`); tar entry boundaries are not preserved. Follow-up
tracked separately.
- [ ] `uc2 --ingest` v2: integrate with master-block archive layout
so output is a real UC2 v3 archive, not a sidecar manifest
- [ ] Tar-entry preservation: parse tar boundaries inside --ingest
so individual files are recoverable as archive entries
- [ ] Incremental snapshots: `uc2 snapshot /path repo.uc2`
(borg/restic-style deduplicating backups without filesystem support)

View File

@@ -36,6 +36,7 @@ void setprogname(const char *argv0);
#include <uc2/libuc2.h>
#include <uc2/uc2_cdc.h>
#include <uc2/uc2_ingest.h>
#include <uc2/uc2_lz4.h>
#include <uc2/uc2_ots.h>
#include <uc2/uc2_sha256.h>
@@ -53,6 +54,12 @@ enum ots_mode {
OTS_MODE_INFO
};
enum ingest_mode {
INGEST_MODE_NONE = 0,
INGEST_MODE_WRITE,
INGEST_MODE_RESTORE
};
struct options {
bool list:1;
bool all:1;
@@ -66,6 +73,7 @@ struct options {
bool quiet:1;
bool benchmark:1;
int ots_mode;
int ingest_mode;
char sep;
int level;
char *archive;
@@ -1592,6 +1600,103 @@ static int run_benchmark(int nfiles, char **files)
return EXIT_SUCCESS;
}
/* Pre-parse for ingest long options: --ingest, --ingest-restore.
* Both take a single path argument, accepted as either
* --ingest <path> (separate argv entry; rejected if path starts with '-')
* --ingest=<path> (inline). */
static int extract_ingest_long_opts(int *argcp, char **argv, char **out_path)
{
int argc = *argcp;
int mode = INGEST_MODE_NONE;
*out_path = NULL;
for (int i = 1; i < argc; ) {
const char *a = argv[i];
int matched_args = 0;
int new_mode = INGEST_MODE_NONE;
const char *inline_value = NULL;
if (strcmp(a, "--ingest") == 0) {
new_mode = INGEST_MODE_WRITE;
matched_args = 1;
} else if (strncmp(a, "--ingest=", 9) == 0) {
new_mode = INGEST_MODE_WRITE;
matched_args = 1;
inline_value = a + 9;
} else if (strcmp(a, "--ingest-restore") == 0) {
new_mode = INGEST_MODE_RESTORE;
matched_args = 1;
} else if (strncmp(a, "--ingest-restore=", 17) == 0) {
new_mode = INGEST_MODE_RESTORE;
matched_args = 1;
inline_value = a + 17;
}
if (!matched_args) { i++; continue; }
mode = new_mode;
if (inline_value) {
*out_path = (char *)inline_value;
} else {
if (i + 1 >= argc || argv[i + 1][0] == '-')
errx(EXIT_FAILURE,
"%s requires a path argument", a);
*out_path = argv[i + 1];
matched_args = 2;
}
for (int j = i; j + matched_args < argc; j++)
argv[j] = argv[j + matched_args];
argc -= matched_args;
}
*argcp = argc;
return mode;
}
static int cmd_ingest_write(const char *archive_path)
{
uint8_t *buf = NULL;
size_t cap = 0, len = 0;
const size_t chunk = 64 * 1024;
for (;;) {
if (len + chunk > cap) {
size_t ncap = cap ? cap * 2 : chunk;
while (ncap < len + chunk) ncap *= 2;
uint8_t *p = realloc(buf, ncap);
if (!p) { free(buf); err(EXIT_FAILURE, "realloc"); }
buf = p;
cap = ncap;
}
size_t n = fread(buf + len, 1, chunk, stdin);
len += n;
if (n < chunk) {
if (ferror(stdin)) {
free(buf);
err(EXIT_FAILURE, "read stdin");
}
break;
}
}
struct uc2_ingest_stats st;
int rc = uc2_ingest_write(archive_path, buf, len, 0, &st);
free(buf);
if (rc != 0)
errx(EXIT_FAILURE, "ingest write failed: %s", archive_path);
uc2_say(stderr,
"ingested %llu bytes -> %d chunks (%d new, %d deduped, %llu bytes saved)\n",
(unsigned long long)st.bytes_in,
st.chunks_total, st.chunks_new, st.chunks_dedup,
(unsigned long long)st.bytes_saved);
return EXIT_SUCCESS;
}
static int cmd_ingest_restore(const char *archive_path)
{
if (uc2_ingest_restore(archive_path, stdout) != 0)
errx(EXIT_FAILURE, "ingest restore failed: %s", archive_path);
return EXIT_SUCCESS;
}
/* Pre-parse for OTS long options: --ots-attach, --ots-extract, --ots-info.
* Removes matched arguments from argv in place so the existing getopt loop
* doesn't see them. --ots-attach takes a value, accepted as either
@@ -1654,6 +1759,15 @@ int main(int argc, char *argv[])
if (argc == 1)
goto usage;
{
char *ingest_path = NULL;
opt.ingest_mode = extract_ingest_long_opts(&argc, argv, &ingest_path);
if (opt.ingest_mode == INGEST_MODE_WRITE)
return cmd_ingest_write(ingest_path);
if (opt.ingest_mode == INGEST_MODE_RESTORE)
return cmd_ingest_restore(ingest_path);
}
opt.ots_mode = extract_ots_long_opts(&argc, argv, &opt.ots_path);
for (;;) {
@@ -1725,6 +1839,8 @@ usage:
"uc2 --ots-attach <proof.ots> [-f] archive.uc2\n"
"uc2 --ots-extract archive.uc2 <out.ots>\n"
"uc2 --ots-info archive.uc2\n"
"uc2 --ingest <archive> # stdin -> dedup blockstore\n"
"uc2 --ingest-restore <archive> # blockstore -> stdout\n"
);
if (!opt.help)
printf("uc2 -h\n");

View File

@@ -1,6 +1,6 @@
# libuc2 — UC2 decompression library
set(LIBUC2_SOURCES src/decompress.c src/compress.c src/uc2_tables.c src/uc2_cdc.c src/uc2_merkle.c src/uc2_blockstore.c src/uc2_simhash.c src/uc2_delta.c src/uc2_rans.c src/uc2_dict.c src/uc2_preprocess.c src/uc2_lz4.c src/uc2_blake3.c src/uc2_sha256.c src/uc2_ots.c)
set(LIBUC2_SOURCES src/decompress.c src/compress.c src/uc2_tables.c src/uc2_cdc.c src/uc2_merkle.c src/uc2_blockstore.c src/uc2_simhash.c src/uc2_delta.c src/uc2_rans.c src/uc2_dict.c src/uc2_preprocess.c src/uc2_lz4.c src/uc2_blake3.c src/uc2_sha256.c src/uc2_ots.c src/uc2_ingest.c)
# Embed super.bin: use .S with .incbin on GCC/Clang, generated C array on MSVC
if(MSVC)

View File

@@ -0,0 +1,58 @@
/* SPDX-License-Identifier: GPL-3.0-or-later */
/* Streaming dedup ingest for UC2.
*
* uc2 --ingest <archive> reads a byte stream (typically stdin from
* tar / rsync / cp -a), splits it via CDC, deduplicates chunks against
* a sidecar block store, and writes a manifest file that lists chunk
* hashes in order. uc2 --ingest-restore <archive> reverses this.
*
* Manifest layout (little-endian):
* +0 8B magic "UC2INGST"
* +8 1B version (1)
* +9 1B cdc_bits
* +10 2B reserved
* +12 4B chunk_count
* +16 ... chunk_count * 12B: 8B hash, 4B length
*
* Block store layout (per uc2_blockstore.h):
* <archive>.blocks/<aa>/<hash16>
*
* Limitations of v1:
* - The whole stream is buffered in memory before chunking. Suits
* CDC's locality-of-boundary requirement and is fine for streams
* up to a few GB. True streaming is a future revision.
* - The manifest file is not a UC2 v3 archive; it is a separate
* format with its own magic. Unifying with the master-block
* archive layout is a follow-up.
*/
#ifndef UC2_INGEST_H
#define UC2_INGEST_H
#include <stdint.h>
#include <stddef.h>
#include <stdio.h>
struct uc2_ingest_stats {
uint64_t bytes_in; /* input stream length */
int chunks_total; /* total chunks in input */
int chunks_new; /* chunks newly stored */
int chunks_dedup; /* chunks already in the block store */
uint64_t bytes_stored; /* bytes physically written this call */
uint64_t bytes_saved; /* bytes saved by dedup */
};
/* Ingest len bytes of data into archive_path. The block store lives
* at <archive_path>.blocks/. cdc_bits selects the average chunk
* size (13 = 8 KiB; 0 picks a sensible default). */
int uc2_ingest_write(const char *archive_path,
const uint8_t *data, size_t len,
int cdc_bits,
struct uc2_ingest_stats *stats);
/* Restore the byte stream described by an ingest manifest. Reads
* chunks from <archive_path>.blocks/ and writes them in order to out. */
int uc2_ingest_restore(const char *archive_path, FILE *out);
#endif

199
lib/src/uc2_ingest.c Normal file
View File

@@ -0,0 +1,199 @@
/* SPDX-License-Identifier: GPL-3.0-or-later */
#include "uc2/uc2_ingest.h"
#include "uc2/uc2_blockstore.h"
#include "uc2/uc2_merkle.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
static const char INGEST_MAGIC[8] = { 'U','C','2','I','N','G','S','T' };
#define INGEST_VERSION 1
#define DEFAULT_CDC_BITS 13
static char *make_blocks_path(const char *archive_path)
{
size_t n = strlen(archive_path);
char *p = malloc(n + 8);
if (!p) return NULL;
memcpy(p, archive_path, n);
memcpy(p + n, ".blocks", 8); /* includes trailing NUL */
return p;
}
static void put_le32(uint8_t *p, uint32_t v)
{
p[0] = (uint8_t)v;
p[1] = (uint8_t)(v >> 8);
p[2] = (uint8_t)(v >> 16);
p[3] = (uint8_t)(v >> 24);
}
static void put_le64(uint8_t *p, uint64_t v)
{
for (int i = 0; i < 8; i++)
p[i] = (uint8_t)(v >> (i * 8));
}
static uint32_t get_le32(const uint8_t *p)
{
return (uint32_t)p[0]
| ((uint32_t)p[1] << 8)
| ((uint32_t)p[2] << 16)
| ((uint32_t)p[3] << 24);
}
static uint64_t get_le64(const uint8_t *p)
{
uint64_t v = 0;
for (int i = 0; i < 8; i++)
v |= (uint64_t)p[i] << (i * 8);
return v;
}
int uc2_ingest_write(const char *archive_path,
const uint8_t *data, size_t len,
int cdc_bits,
struct uc2_ingest_stats *stats)
{
if (!archive_path)
return -1;
if (cdc_bits <= 0)
cdc_bits = DEFAULT_CDC_BITS;
char *blocks_path = make_blocks_path(archive_path);
if (!blocks_path)
return -1;
struct uc2_blockstore bs;
if (uc2_blockstore_open(&bs, blocks_path) != 0) {
free(blocks_path);
return -1;
}
free(blocks_path);
struct uc2_merkle tree;
uc2_merkle_build(&tree, data, len, cdc_bits);
int new_chunks = 0;
if (tree.nchunks > 0)
new_chunks = uc2_blockstore_ingest(&bs, &tree, data, len);
FILE *f = fopen(archive_path, "wb");
if (!f) {
uc2_merkle_free(&tree);
uc2_blockstore_close(&bs);
return -1;
}
uint8_t hdr[16];
memcpy(hdr, INGEST_MAGIC, 8);
hdr[8] = INGEST_VERSION;
hdr[9] = (uint8_t)cdc_bits;
hdr[10] = 0;
hdr[11] = 0;
put_le32(hdr + 12, (uint32_t)tree.nchunks);
if (fwrite(hdr, 1, sizeof hdr, f) != sizeof hdr) {
fclose(f);
uc2_merkle_free(&tree);
uc2_blockstore_close(&bs);
return -1;
}
for (int i = 0; i < tree.nchunks; i++) {
uint8_t rec[12];
put_le64(rec, tree.chunks[i].hash);
put_le32(rec + 8, tree.chunks[i].length);
if (fwrite(rec, 1, sizeof rec, f) != sizeof rec) {
fclose(f);
uc2_merkle_free(&tree);
uc2_blockstore_close(&bs);
return -1;
}
}
if (fclose(f) != 0) {
uc2_merkle_free(&tree);
uc2_blockstore_close(&bs);
return -1;
}
if (stats) {
stats->bytes_in = (uint64_t)len;
stats->chunks_total = tree.nchunks;
stats->chunks_new = new_chunks;
stats->chunks_dedup = tree.nchunks - new_chunks;
stats->bytes_stored = (uint64_t)bs.total_bytes;
stats->bytes_saved = (uint64_t)bs.saved_bytes;
}
uc2_merkle_free(&tree);
uc2_blockstore_close(&bs);
return 0;
}
int uc2_ingest_restore(const char *archive_path, FILE *out)
{
if (!archive_path || !out)
return -1;
FILE *f = fopen(archive_path, "rb");
if (!f)
return -1;
uint8_t hdr[16];
if (fread(hdr, 1, sizeof hdr, f) != sizeof hdr) {
fclose(f);
return -1;
}
if (memcmp(hdr, INGEST_MAGIC, 8) != 0 || hdr[8] != INGEST_VERSION) {
fclose(f);
return -1;
}
uint32_t nchunks = get_le32(hdr + 12);
char *blocks_path = make_blocks_path(archive_path);
if (!blocks_path) {
fclose(f);
return -1;
}
struct uc2_blockstore bs;
if (uc2_blockstore_open(&bs, blocks_path) != 0) {
free(blocks_path);
fclose(f);
return -1;
}
free(blocks_path);
uint8_t *buf = NULL;
size_t buf_cap = 0;
int rc = 0;
for (uint32_t i = 0; i < nchunks; i++) {
uint8_t rec[12];
if (fread(rec, 1, sizeof rec, f) != sizeof rec) {
rc = -1;
break;
}
uint64_t hash = get_le64(rec);
uint32_t clen = get_le32(rec + 8);
if (clen > buf_cap) {
uint8_t *p = realloc(buf, clen);
if (!p) { rc = -1; break; }
buf = p;
buf_cap = clen;
}
int n = uc2_blockstore_read(&bs, hash, buf, clen);
if (n != (int)clen) { rc = -1; break; }
if (fwrite(buf, 1, clen, out) != clen) { rc = -1; break; }
}
free(buf);
uc2_blockstore_close(&bs);
fclose(f);
return rc;
}

View File

@@ -125,6 +125,12 @@ target_include_directories(test_ots PRIVATE "${PROJECT_BINARY_DIR}/lib")
target_compile_features(test_ots PRIVATE c_std_99)
add_test(NAME ots COMMAND test_ots)
add_executable(test_ingest src/test_ingest.c)
target_link_libraries(test_ingest PRIVATE uc2)
target_include_directories(test_ingest PRIVATE "${PROJECT_BINARY_DIR}/lib")
target_compile_features(test_ingest PRIVATE c_std_99)
add_test(NAME ingest COMMAND test_ingest)
# Optional cross-check: validates uc2 .ots output against the python-opentimestamps
# reference parser. Skipped (return code 77) when opentimestamps is not installed.
find_package(Python3 COMPONENTS Interpreter)

202
tests/src/test_ingest.c Normal file
View File

@@ -0,0 +1,202 @@
/* Tests for streaming dedup ingest (uc2 --ingest). */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#ifdef _MSC_VER
#include <process.h>
#define getpid _getpid
#else
#include <unistd.h>
#endif
#include <uc2/uc2_ingest.h>
static int tests_run = 0, tests_passed = 0;
#define TEST(name) do { tests_run++; printf(" %s: ", #name); name(); tests_passed++; printf("OK\n"); } while (0)
static char tmp_archive[256];
static void rmrf(const char *path)
{
char cmd[768];
snprintf(cmd, sizeof cmd, "rm -rf '%s' '%s.blocks'", path, path);
system(cmd);
}
static void fill_random(uint8_t *buf, size_t len, uint32_t seed)
{
for (size_t i = 0; i < len; i++) {
seed = seed * 1103515245 + 12345;
buf[i] = (uint8_t)(seed >> 16);
}
}
/* Read whole file into a freshly-malloc'd buffer. Caller frees. */
static uint8_t *slurp(const char *path, size_t *out_len)
{
FILE *f = fopen(path, "rb");
if (!f) return NULL;
fseek(f, 0, SEEK_END);
long n = ftell(f);
fseek(f, 0, SEEK_SET);
uint8_t *buf = malloc(n > 0 ? (size_t)n : 1);
size_t got = fread(buf, 1, (size_t)n, f);
fclose(f);
*out_len = got;
return buf;
}
static void test_roundtrip_small(void)
{
rmrf(tmp_archive);
const char *msg = "hello world";
struct uc2_ingest_stats st;
int rc = uc2_ingest_write(tmp_archive,
(const uint8_t *)msg, strlen(msg), 0, &st);
assert(rc == 0);
(void)rc;
assert(st.bytes_in == strlen(msg));
assert(st.chunks_total >= 1);
assert(st.chunks_new == st.chunks_total);
assert(st.chunks_dedup == 0);
char restored[320];
snprintf(restored, sizeof restored, "%s.out", tmp_archive);
FILE *out = fopen(restored, "wb");
assert(out);
rc = uc2_ingest_restore(tmp_archive, out);
fclose(out);
assert(rc == 0);
size_t got_len;
uint8_t *got = slurp(restored, &got_len);
assert(got_len == strlen(msg));
assert(memcmp(got, msg, got_len) == 0);
free(got);
unlink(restored);
rmrf(tmp_archive);
}
static void test_roundtrip_multichunk(void)
{
rmrf(tmp_archive);
const size_t N = 200000;
uint8_t *data = malloc(N);
fill_random(data, N, 0x12345678);
struct uc2_ingest_stats st;
int rc = uc2_ingest_write(tmp_archive, data, N, 0, &st);
assert(rc == 0);
(void)rc;
assert(st.bytes_in == N);
assert(st.chunks_total > 1); /* CDC should find boundaries in 200 KB */
char restored[320];
snprintf(restored, sizeof restored, "%s.out", tmp_archive);
FILE *out = fopen(restored, "wb");
assert(out);
rc = uc2_ingest_restore(tmp_archive, out);
fclose(out);
assert(rc == 0);
size_t got_len;
uint8_t *got = slurp(restored, &got_len);
assert(got_len == N);
assert(memcmp(got, data, N) == 0);
free(got);
free(data);
unlink(restored);
rmrf(tmp_archive);
}
static void test_dedup_idempotent(void)
{
rmrf(tmp_archive);
/* Repeated short pattern -> stable CDC boundaries -> high dedup. */
const size_t N = 200000;
uint8_t *data = malloc(N);
const char *pattern = "the quick brown fox jumps over the lazy dog\n";
size_t plen = strlen(pattern);
for (size_t i = 0; i < N; i++) data[i] = (uint8_t)pattern[i % plen];
struct uc2_ingest_stats st1, st2;
int rc = uc2_ingest_write(tmp_archive, data, N, 0, &st1);
assert(rc == 0);
(void)rc;
assert(st1.chunks_new == st1.chunks_total);
assert(st1.chunks_dedup == 0);
rc = uc2_ingest_write(tmp_archive, data, N, 0, &st2);
assert(rc == 0);
assert(st2.chunks_total == st1.chunks_total);
assert(st2.chunks_new == 0);
assert(st2.chunks_dedup == st2.chunks_total);
assert(st2.bytes_saved == (uint64_t)N);
free(data);
rmrf(tmp_archive);
}
static void test_empty_stream(void)
{
rmrf(tmp_archive);
struct uc2_ingest_stats st;
int rc = uc2_ingest_write(tmp_archive, NULL, 0, 0, &st);
assert(rc == 0);
(void)rc;
assert(st.bytes_in == 0);
assert(st.chunks_total == 0);
char restored[320];
snprintf(restored, sizeof restored, "%s.out", tmp_archive);
FILE *out = fopen(restored, "wb");
assert(out);
rc = uc2_ingest_restore(tmp_archive, out);
fclose(out);
assert(rc == 0);
size_t got_len;
uint8_t *got = slurp(restored, &got_len);
assert(got_len == 0);
free(got);
unlink(restored);
rmrf(tmp_archive);
}
static void test_bad_magic_rejected(void)
{
rmrf(tmp_archive);
FILE *f = fopen(tmp_archive, "wb");
assert(f);
const char garbage[16] = "not-a-uc2-ingest";
fwrite(garbage, 1, sizeof garbage, f);
fclose(f);
FILE *out = fopen("/dev/null", "wb");
#ifdef _MSC_VER
if (!out) out = fopen("NUL", "wb");
#endif
assert(out);
int rc = uc2_ingest_restore(tmp_archive, out);
fclose(out);
assert(rc != 0);
(void)rc;
rmrf(tmp_archive);
}
int main(void)
{
snprintf(tmp_archive, sizeof tmp_archive,
"/tmp/uc2_ingest_test_%d.uc2", (int)getpid());
printf("Running uc2_ingest tests...\n");
TEST(test_roundtrip_small);
TEST(test_roundtrip_multichunk);
TEST(test_dedup_idempotent);
TEST(test_empty_stream);
TEST(test_bad_magic_rejected);
printf("Passed: %d/%d\n", tests_passed, tests_run);
return tests_passed == tests_run ? 0 : 1;
}