Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
set positional-arguments := true
set positional-arguments
set shell := ["bash", "-c"]

# Import modules
Expand Down
11 changes: 8 additions & 3 deletions client/parallel_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ type RangeReader interface {
//
// The first chunk is fetched with a ranged Open, whose response yields both the
// total size (from Content-Range) and the object's ETag; every remaining chunk
// is then requested with IfMatch pinned to that ETag. If the object changes
// mid-download, the chunk is rejected with a bodiless ErrPreconditionFailed
// is then requested with IfMatch pinned to that ETag. As an unpinned ranged
// read, the discovery request is a tiered backend's signal to resolve it from
// the shared tier, so the pin avoids cross-replica precondition churn. If the
// object changes mid-download, the chunk is rejected with a bodiless ErrPreconditionFailed
// (412) and ParallelGet returns an error rather than splicing bytes from two
// revisions; a server that ignores If-Match is caught by verifying each chunk's
// response ETag. A missing or truncated chunk is likewise reported as an error,
Expand All @@ -54,7 +56,10 @@ func ParallelGet(ctx context.Context, c RangeReader, key Key, dst io.WriterAt, c
}

// Discovery: the first ranged Open delivers chunk zero and reveals the total
// size and ETag used to pin the rest.
// size and ETag used to pin the rest. It carries no validator, which a tiered
// backend takes as the signal to resolve the pin from its shared tier,
// keeping every chunk on the revision the shared tier holds rather than an
// arbitrary local replica's.
rc, headers, err := c.Open(ctx, key, Range(0, chunkSize))
if errors.Is(err, ErrRangeNotSatisfiable) {
return nil // Empty object: nothing to write.
Expand Down
35 changes: 35 additions & 0 deletions internal/cache/tiered.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,19 @@ func (t Tiered) Stat(ctx context.Context, key Key, opts ...Option) (http.Header,
// that errored while being probed takes precedence over both, so outages are
// not misreported as missing versions.
//
// An unpinned ranged read (a Range with no If-Match or If-Range) is treated as
// a discovery read and resolved from the deepest (shared) tier so its validator
// is consistent across replicas, falling back to normal tiering only if that
// tier misses or is unavailable.
//
// If all caches fail, all errors are returned.
func (t Tiered) Open(ctx context.Context, key Key, opts ...Option) (io.ReadCloser, http.Header, error) {
ro := NewRequestOptions(opts...)

if r, headers, handled, err := t.openDiscovery(ctx, ro, key, opts...); handled {
return r, headers, err
}

// A Range request yields a partial body, which must never be backfilled
// into a lower tier as if it were the whole object.
partial := ro.Range != ""
Expand Down Expand Up @@ -222,6 +232,31 @@ func (t Tiered) Open(ctx context.Context, key Key, opts ...Option) (io.ReadClose
return nil, nil, errors.Join(errs...)
}

// openDiscovery resolves an unpinned ranged read from the deepest (shared) tier
// so its validator is consistent across replicas, regardless of what local
// tiers hold. Such a read is a discovery read: the caller will pin subsequent
// reads to the ETag it returns, and the only pin consistent across replicas is
// the shared tier's. A hit, or a definitive range outcome, is handled here
// (handled=true); a request that is not an unpinned range, a miss, or a
// transient failure at the shared tier returns handled=false so Open falls back
// to normal tiering and a healthy local tier can still serve.
func (t Tiered) openDiscovery(ctx context.Context, ro RequestOptions, key Key, opts ...Option) (io.ReadCloser, http.Header, bool, error) {
if ro.Range == "" || ro.IfMatch != "" || ro.IfRange != "" {
return nil, nil, false, nil
}
deepest := t.caches[len(t.caches)-1]
r, headers, err := deepest.Open(ctx, key, opts...)
switch {
case err == nil:
return r, headers, true, nil
case errors.Is(err, ErrRangeNotSatisfiable):
return nil, headers, true, errors.WithStack(err)
default:
logging.FromContext(ctx).WarnContext(ctx, "Tiered: discovery open falling back to local tiers", "key", key, "error", err)
return nil, nil, false, nil
}
}

// backfillReader wraps src so that every byte read is also written to dst.
// On successful close the dst entry becomes available for future reads.
// On error or partial read the dst entry is discarded per the Cache contract
Expand Down
83 changes: 83 additions & 0 deletions internal/cache/tiered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,89 @@ func TestTieredDivergentValidatorPermutations(t *testing.T) {
}
}

func TestTieredDiscoveryResolvesFromDeepestTier(t *testing.T) {
stale := []byte("0123456789")
pinned := []byte("abcdefghij")
staleETag := contentETag(stale)
pinnedETag := contentETag(pinned)

newTiered := func(t *testing.T) (context.Context, cache.Cache, cache.Cache, cache.Cache) {
t.Helper()
_, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug})
lower, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour})
assert.NoError(t, err)
upper, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour})
assert.NoError(t, err)
return ctx, cache.MaybeNewTiered(ctx, []cache.Cache{lower, upper}), lower, upper
}

t.Run("UnpinnedRangePinsDeepestETag", func(t *testing.T) {
ctx, tiered, lower, upper := newTiered(t)
defer tiered.Close()
key := cache.NewKey("discovery-divergent")
seedTier(ctx, t, lower, key, stale)
seedTier(ctx, t, upper, key, pinned)

r, headers, err := tiered.Open(ctx, key, cache.Range(0, 4))
assert.NoError(t, err)
assert.Equal(t, []byte("abcd"), readAllAndClose(t, r))
assert.Equal(t, pinnedETag, headers.Get(cache.ETagKey))
})

t.Run("FullReadStaysOnLocalTier", func(t *testing.T) {
ctx, tiered, lower, upper := newTiered(t)
defer tiered.Close()
key := cache.NewKey("discovery-full")
seedTier(ctx, t, lower, key, stale)
seedTier(ctx, t, upper, key, pinned)

r, headers, err := tiered.Open(ctx, key)
assert.NoError(t, err)
assert.Equal(t, stale, readAllAndClose(t, r))
assert.Equal(t, staleETag, headers.Get(cache.ETagKey))
})

t.Run("PinnedRangeStaysOnLocalTier", func(t *testing.T) {
ctx, tiered, lower, upper := newTiered(t)
defer tiered.Close()
key := cache.NewKey("discovery-pinned")
seedTier(ctx, t, lower, key, stale)
seedTier(ctx, t, upper, key, pinned)

r, headers, err := tiered.Open(ctx, key, cache.Range(0, 4), cache.IfMatch(staleETag))
assert.NoError(t, err)
assert.Equal(t, []byte("0123"), readAllAndClose(t, r))
assert.Equal(t, staleETag, headers.Get(cache.ETagKey))
})

t.Run("FallsBackToLocalOnDeepestMiss", func(t *testing.T) {
ctx, tiered, lower, _ := newTiered(t)
defer tiered.Close()
key := cache.NewKey("discovery-lower-only")
seedTier(ctx, t, lower, key, stale)

r, headers, err := tiered.Open(ctx, key, cache.Range(0, 4))
assert.NoError(t, err)
assert.Equal(t, []byte("0123"), readAllAndClose(t, r))
assert.Equal(t, staleETag, headers.Get(cache.ETagKey))
})

t.Run("FallsBackToLocalOnDeepestOutage", func(t *testing.T) {
_, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug})
lower, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour})
assert.NoError(t, err)
tiered := cache.MaybeNewTiered(ctx, []cache.Cache{lower, newFailingCache(errors.New("backend unavailable"))})
defer tiered.Close()
key := cache.NewKey("discovery-outage")
seedTier(ctx, t, lower, key, stale)

r, headers, err := tiered.Open(ctx, key, cache.Range(0, 4))
assert.NoError(t, err)
assert.Equal(t, []byte("0123"), readAllAndClose(t, r))
assert.Equal(t, staleETag, headers.Get(cache.ETagKey))
})
}

func TestTieredDivergentValidatorProbeErrors(t *testing.T) {
_, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug})
lower, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour})
Expand Down
Loading