From 58d38ac0e9c9b255a5832eb49c4a727a1c03dd15 Mon Sep 17 00:00:00 2001 From: Elizabeth Worstell Date: Thu, 2 Jul 2026 15:14:38 -0700 Subject: [PATCH] fix(cache): resolve parallel-restore discovery from the shared tier MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ParallelGet's discovery read (the first ranged Open, which has no validator yet) is what establishes the ETag every subsequent chunk pins to with If-Match. It previously resolved against whichever tier the landing replica hit first — usually its local disk. Because replicas regenerate snapshots independently, local disks hold divergent revisions, so the pin was often a version other replicas no longer held, and the resulting 412 retry churn erased the gains of parallel download. Tiered.Open now treats an unpinned ranged read (a Range with no If-Match or If-Range) as a discovery read and resolves it from the deepest (shared) tier, so the pin is always the shared tier's current revision. It falls back to normal tiering only when that tier misses or is unavailable, so a healthy local tier can still serve. Full reads and pinned chunks are unchanged: they still serve from local disk where it matches. This needs no client-side flag — the discovery read is already an unpinned range — so it also covers any future parallel-download caller automatically. --- Justfile | 2 +- client/parallel_get.go | 11 +++-- internal/cache/tiered.go | 35 +++++++++++++++ internal/cache/tiered_test.go | 83 +++++++++++++++++++++++++++++++++++ 4 files changed, 127 insertions(+), 4 deletions(-) diff --git a/Justfile b/Justfile index 2f3f6cb..86d4f51 100644 --- a/Justfile +++ b/Justfile @@ -1,4 +1,4 @@ -set positional-arguments := true +set positional-arguments set shell := ["bash", "-c"] # Import modules diff --git a/client/parallel_get.go b/client/parallel_get.go index b9d0176..27d7238 100644 --- a/client/parallel_get.go +++ b/client/parallel_get.go @@ -26,8 +26,10 @@ type RangeReader interface { // // The first chunk is fetched with a ranged Open, whose response yields both the // total size (from Content-Range) and the object's ETag; every remaining chunk -// is then requested with IfMatch pinned to that ETag. If the object changes -// mid-download, the chunk is rejected with a bodiless ErrPreconditionFailed +// is then requested with IfMatch pinned to that ETag. As an unpinned ranged +// read, the discovery request is a tiered backend's signal to resolve it from +// the shared tier, so the pin avoids cross-replica precondition churn. If the +// object changes mid-download, the chunk is rejected with a bodiless ErrPreconditionFailed // (412) and ParallelGet returns an error rather than splicing bytes from two // revisions; a server that ignores If-Match is caught by verifying each chunk's // response ETag. A missing or truncated chunk is likewise reported as an error, @@ -54,7 +56,10 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c } // Discovery: the first ranged Open delivers chunk zero and reveals the total - // size and ETag used to pin the rest. + // size and ETag used to pin the rest. It carries no validator, which a tiered + // backend takes as the signal to resolve the pin from its shared tier, + // keeping every chunk on the revision the shared tier holds rather than an + // arbitrary local replica's. rc, headers, err := c.Open(ctx, key, Range(0, chunkSize)) if errors.Is(err, ErrRangeNotSatisfiable) { return nil // Empty object: nothing to write. diff --git a/internal/cache/tiered.go b/internal/cache/tiered.go index 4396532..c0c3431 100644 --- a/internal/cache/tiered.go +++ b/internal/cache/tiered.go @@ -142,9 +142,19 @@ func (t Tiered) Stat(ctx context.Context, key Key, opts ...Option) (http.Header, // that errored while being probed takes precedence over both, so outages are // not misreported as missing versions. // +// An unpinned ranged read (a Range with no If-Match or If-Range) is treated as +// a discovery read and resolved from the deepest (shared) tier so its validator +// is consistent across replicas, falling back to normal tiering only if that +// tier misses or is unavailable. +// // If all caches fail, all errors are returned. func (t Tiered) Open(ctx context.Context, key Key, opts ...Option) (io.ReadCloser, http.Header, error) { ro := NewRequestOptions(opts...) + + if r, headers, handled, err := t.openDiscovery(ctx, ro, key, opts...); handled { + return r, headers, err + } + // A Range request yields a partial body, which must never be backfilled // into a lower tier as if it were the whole object. partial := ro.Range != "" @@ -222,6 +232,31 @@ func (t Tiered) Open(ctx context.Context, key Key, opts ...Option) (io.ReadClose return nil, nil, errors.Join(errs...) } +// openDiscovery resolves an unpinned ranged read from the deepest (shared) tier +// so its validator is consistent across replicas, regardless of what local +// tiers hold. Such a read is a discovery read: the caller will pin subsequent +// reads to the ETag it returns, and the only pin consistent across replicas is +// the shared tier's. A hit, or a definitive range outcome, is handled here +// (handled=true); a request that is not an unpinned range, a miss, or a +// transient failure at the shared tier returns handled=false so Open falls back +// to normal tiering and a healthy local tier can still serve. +func (t Tiered) openDiscovery(ctx context.Context, ro RequestOptions, key Key, opts ...Option) (io.ReadCloser, http.Header, bool, error) { + if ro.Range == "" || ro.IfMatch != "" || ro.IfRange != "" { + return nil, nil, false, nil + } + deepest := t.caches[len(t.caches)-1] + r, headers, err := deepest.Open(ctx, key, opts...) + switch { + case err == nil: + return r, headers, true, nil + case errors.Is(err, ErrRangeNotSatisfiable): + return nil, headers, true, errors.WithStack(err) + default: + logging.FromContext(ctx).WarnContext(ctx, "Tiered: discovery open falling back to local tiers", "key", key, "error", err) + return nil, nil, false, nil + } +} + // backfillReader wraps src so that every byte read is also written to dst. // On successful close the dst entry becomes available for future reads. // On error or partial read the dst entry is discarded per the Cache contract diff --git a/internal/cache/tiered_test.go b/internal/cache/tiered_test.go index b620986..6cfc5f0 100644 --- a/internal/cache/tiered_test.go +++ b/internal/cache/tiered_test.go @@ -293,6 +293,89 @@ func TestTieredDivergentValidatorPermutations(t *testing.T) { } } +func TestTieredDiscoveryResolvesFromDeepestTier(t *testing.T) { + stale := []byte("0123456789") + pinned := []byte("abcdefghij") + staleETag := contentETag(stale) + pinnedETag := contentETag(pinned) + + newTiered := func(t *testing.T) (context.Context, cache.Cache, cache.Cache, cache.Cache) { + t.Helper() + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) + lower, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + upper, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + return ctx, cache.MaybeNewTiered(ctx, []cache.Cache{lower, upper}), lower, upper + } + + t.Run("UnpinnedRangePinsDeepestETag", func(t *testing.T) { + ctx, tiered, lower, upper := newTiered(t) + defer tiered.Close() + key := cache.NewKey("discovery-divergent") + seedTier(ctx, t, lower, key, stale) + seedTier(ctx, t, upper, key, pinned) + + r, headers, err := tiered.Open(ctx, key, cache.Range(0, 4)) + assert.NoError(t, err) + assert.Equal(t, []byte("abcd"), readAllAndClose(t, r)) + assert.Equal(t, pinnedETag, headers.Get(cache.ETagKey)) + }) + + t.Run("FullReadStaysOnLocalTier", func(t *testing.T) { + ctx, tiered, lower, upper := newTiered(t) + defer tiered.Close() + key := cache.NewKey("discovery-full") + seedTier(ctx, t, lower, key, stale) + seedTier(ctx, t, upper, key, pinned) + + r, headers, err := tiered.Open(ctx, key) + assert.NoError(t, err) + assert.Equal(t, stale, readAllAndClose(t, r)) + assert.Equal(t, staleETag, headers.Get(cache.ETagKey)) + }) + + t.Run("PinnedRangeStaysOnLocalTier", func(t *testing.T) { + ctx, tiered, lower, upper := newTiered(t) + defer tiered.Close() + key := cache.NewKey("discovery-pinned") + seedTier(ctx, t, lower, key, stale) + seedTier(ctx, t, upper, key, pinned) + + r, headers, err := tiered.Open(ctx, key, cache.Range(0, 4), cache.IfMatch(staleETag)) + assert.NoError(t, err) + assert.Equal(t, []byte("0123"), readAllAndClose(t, r)) + assert.Equal(t, staleETag, headers.Get(cache.ETagKey)) + }) + + t.Run("FallsBackToLocalOnDeepestMiss", func(t *testing.T) { + ctx, tiered, lower, _ := newTiered(t) + defer tiered.Close() + key := cache.NewKey("discovery-lower-only") + seedTier(ctx, t, lower, key, stale) + + r, headers, err := tiered.Open(ctx, key, cache.Range(0, 4)) + assert.NoError(t, err) + assert.Equal(t, []byte("0123"), readAllAndClose(t, r)) + assert.Equal(t, staleETag, headers.Get(cache.ETagKey)) + }) + + t.Run("FallsBackToLocalOnDeepestOutage", func(t *testing.T) { + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) + lower, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + tiered := cache.MaybeNewTiered(ctx, []cache.Cache{lower, newFailingCache(errors.New("backend unavailable"))}) + defer tiered.Close() + key := cache.NewKey("discovery-outage") + seedTier(ctx, t, lower, key, stale) + + r, headers, err := tiered.Open(ctx, key, cache.Range(0, 4)) + assert.NoError(t, err) + assert.Equal(t, []byte("0123"), readAllAndClose(t, r)) + assert.Equal(t, staleETag, headers.Get(cache.ETagKey)) + }) +} + func TestTieredDivergentValidatorProbeErrors(t *testing.T) { _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) lower, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour})