diff --git a/Justfile b/Justfile index 2f3f6cb..86d4f51 100644 --- a/Justfile +++ b/Justfile @@ -1,4 +1,4 @@ -set positional-arguments := true +set positional-arguments set shell := ["bash", "-c"] # Import modules diff --git a/client/parallel_get.go b/client/parallel_get.go index b9d0176..27d7238 100644 --- a/client/parallel_get.go +++ b/client/parallel_get.go @@ -26,8 +26,10 @@ type RangeReader interface { // // The first chunk is fetched with a ranged Open, whose response yields both the // total size (from Content-Range) and the object's ETag; every remaining chunk -// is then requested with IfMatch pinned to that ETag. If the object changes -// mid-download, the chunk is rejected with a bodiless ErrPreconditionFailed +// is then requested with IfMatch pinned to that ETag. As an unpinned ranged +// read, the discovery request is a tiered backend's signal to resolve it from +// the shared tier, so the pin avoids cross-replica precondition churn. If the +// object changes mid-download, the chunk is rejected with a bodiless ErrPreconditionFailed // (412) and ParallelGet returns an error rather than splicing bytes from two // revisions; a server that ignores If-Match is caught by verifying each chunk's // response ETag. A missing or truncated chunk is likewise reported as an error, @@ -54,7 +56,10 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c } // Discovery: the first ranged Open delivers chunk zero and reveals the total - // size and ETag used to pin the rest. + // size and ETag used to pin the rest. It carries no validator, which a tiered + // backend takes as the signal to resolve the pin from its shared tier, + // keeping every chunk on the revision the shared tier holds rather than an + // arbitrary local replica's. rc, headers, err := c.Open(ctx, key, Range(0, chunkSize)) if errors.Is(err, ErrRangeNotSatisfiable) { return nil // Empty object: nothing to write. diff --git a/internal/cache/tiered.go b/internal/cache/tiered.go index 4396532..c0c3431 100644 --- a/internal/cache/tiered.go +++ b/internal/cache/tiered.go @@ -142,9 +142,19 @@ func (t Tiered) Stat(ctx context.Context, key Key, opts ...Option) (http.Header, // that errored while being probed takes precedence over both, so outages are // not misreported as missing versions. // +// An unpinned ranged read (a Range with no If-Match or If-Range) is treated as +// a discovery read and resolved from the deepest (shared) tier so its validator +// is consistent across replicas, falling back to normal tiering only if that +// tier misses or is unavailable. +// // If all caches fail, all errors are returned. func (t Tiered) Open(ctx context.Context, key Key, opts ...Option) (io.ReadCloser, http.Header, error) { ro := NewRequestOptions(opts...) + + if r, headers, handled, err := t.openDiscovery(ctx, ro, key, opts...); handled { + return r, headers, err + } + // A Range request yields a partial body, which must never be backfilled // into a lower tier as if it were the whole object. partial := ro.Range != "" @@ -222,6 +232,31 @@ func (t Tiered) Open(ctx context.Context, key Key, opts ...Option) (io.ReadClose return nil, nil, errors.Join(errs...) } +// openDiscovery resolves an unpinned ranged read from the deepest (shared) tier +// so its validator is consistent across replicas, regardless of what local +// tiers hold. Such a read is a discovery read: the caller will pin subsequent +// reads to the ETag it returns, and the only pin consistent across replicas is +// the shared tier's. A hit, or a definitive range outcome, is handled here +// (handled=true); a request that is not an unpinned range, a miss, or a +// transient failure at the shared tier returns handled=false so Open falls back +// to normal tiering and a healthy local tier can still serve. +func (t Tiered) openDiscovery(ctx context.Context, ro RequestOptions, key Key, opts ...Option) (io.ReadCloser, http.Header, bool, error) { + if ro.Range == "" || ro.IfMatch != "" || ro.IfRange != "" { + return nil, nil, false, nil + } + deepest := t.caches[len(t.caches)-1] + r, headers, err := deepest.Open(ctx, key, opts...) + switch { + case err == nil: + return r, headers, true, nil + case errors.Is(err, ErrRangeNotSatisfiable): + return nil, headers, true, errors.WithStack(err) + default: + logging.FromContext(ctx).WarnContext(ctx, "Tiered: discovery open falling back to local tiers", "key", key, "error", err) + return nil, nil, false, nil + } +} + // backfillReader wraps src so that every byte read is also written to dst. // On successful close the dst entry becomes available for future reads. // On error or partial read the dst entry is discarded per the Cache contract diff --git a/internal/cache/tiered_test.go b/internal/cache/tiered_test.go index b620986..6cfc5f0 100644 --- a/internal/cache/tiered_test.go +++ b/internal/cache/tiered_test.go @@ -293,6 +293,89 @@ func TestTieredDivergentValidatorPermutations(t *testing.T) { } } +func TestTieredDiscoveryResolvesFromDeepestTier(t *testing.T) { + stale := []byte("0123456789") + pinned := []byte("abcdefghij") + staleETag := contentETag(stale) + pinnedETag := contentETag(pinned) + + newTiered := func(t *testing.T) (context.Context, cache.Cache, cache.Cache, cache.Cache) { + t.Helper() + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) + lower, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + upper, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + return ctx, cache.MaybeNewTiered(ctx, []cache.Cache{lower, upper}), lower, upper + } + + t.Run("UnpinnedRangePinsDeepestETag", func(t *testing.T) { + ctx, tiered, lower, upper := newTiered(t) + defer tiered.Close() + key := cache.NewKey("discovery-divergent") + seedTier(ctx, t, lower, key, stale) + seedTier(ctx, t, upper, key, pinned) + + r, headers, err := tiered.Open(ctx, key, cache.Range(0, 4)) + assert.NoError(t, err) + assert.Equal(t, []byte("abcd"), readAllAndClose(t, r)) + assert.Equal(t, pinnedETag, headers.Get(cache.ETagKey)) + }) + + t.Run("FullReadStaysOnLocalTier", func(t *testing.T) { + ctx, tiered, lower, upper := newTiered(t) + defer tiered.Close() + key := cache.NewKey("discovery-full") + seedTier(ctx, t, lower, key, stale) + seedTier(ctx, t, upper, key, pinned) + + r, headers, err := tiered.Open(ctx, key) + assert.NoError(t, err) + assert.Equal(t, stale, readAllAndClose(t, r)) + assert.Equal(t, staleETag, headers.Get(cache.ETagKey)) + }) + + t.Run("PinnedRangeStaysOnLocalTier", func(t *testing.T) { + ctx, tiered, lower, upper := newTiered(t) + defer tiered.Close() + key := cache.NewKey("discovery-pinned") + seedTier(ctx, t, lower, key, stale) + seedTier(ctx, t, upper, key, pinned) + + r, headers, err := tiered.Open(ctx, key, cache.Range(0, 4), cache.IfMatch(staleETag)) + assert.NoError(t, err) + assert.Equal(t, []byte("0123"), readAllAndClose(t, r)) + assert.Equal(t, staleETag, headers.Get(cache.ETagKey)) + }) + + t.Run("FallsBackToLocalOnDeepestMiss", func(t *testing.T) { + ctx, tiered, lower, _ := newTiered(t) + defer tiered.Close() + key := cache.NewKey("discovery-lower-only") + seedTier(ctx, t, lower, key, stale) + + r, headers, err := tiered.Open(ctx, key, cache.Range(0, 4)) + assert.NoError(t, err) + assert.Equal(t, []byte("0123"), readAllAndClose(t, r)) + assert.Equal(t, staleETag, headers.Get(cache.ETagKey)) + }) + + t.Run("FallsBackToLocalOnDeepestOutage", func(t *testing.T) { + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) + lower, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + tiered := cache.MaybeNewTiered(ctx, []cache.Cache{lower, newFailingCache(errors.New("backend unavailable"))}) + defer tiered.Close() + key := cache.NewKey("discovery-outage") + seedTier(ctx, t, lower, key, stale) + + r, headers, err := tiered.Open(ctx, key, cache.Range(0, 4)) + assert.NoError(t, err) + assert.Equal(t, []byte("0123"), readAllAndClose(t, r)) + assert.Equal(t, staleETag, headers.Get(cache.ETagKey)) + }) +} + func TestTieredDivergentValidatorProbeErrors(t *testing.T) { _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) lower, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour})