Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 41 additions & 29 deletions internal/cache/tiered.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
type Tiered struct {
caches []Cache
metadata *metadatadb.Store
etags *metadatadb.Map[Key, string]
namespace Namespace
}

Expand All @@ -40,7 +41,7 @@ func MaybeNewTiered(ctx context.Context, caches []Cache, metadata *metadatadb.St
if metadata == nil {
panic("Tiered cache requires a metadata store")
}
return Tiered{caches: caches, metadata: metadata}
return Tiered{caches: caches, metadata: metadata, etags: tieredETags(metadata, "")}
}

type authoritativeCache struct {
Expand Down Expand Up @@ -70,21 +71,27 @@ func (t Tiered) Close() error {

// Create a new object. All underlying caches will be written to in sequence.
func (t Tiered) Create(ctx context.Context, key Key, headers http.Header, ttl time.Duration, opts ...Option) (Writer, error) {
rawETag, _, err := createETag(opts...)
rawETag, quotedETag, err := createETag(opts...)
if err != nil {
return nil, errors.WithStack(err)
}
createOpts := []Option{WithETag(rawETag)}

replaceETag, err := t.replacementETag(ctx, key, quotedETag)
if err != nil {
return nil, err
}

// The first error will cancel all outstanding writes.
ctx, cancel := context.WithCancelCause(ctx)

tw := &tieredWriter{
writers: make([]Writer, len(t.caches)),
cancel: cancel,
etags: t.etags(),
key: key,
rawETag: rawETag,
writers: make([]Writer, len(t.caches)),
cancel: cancel,
etags: t.etags,
key: key,
etag: quotedETag,
replaceETag: replaceETag,
}
// Note: we can't use errgroup here because we do not want to cancel the context on Wait().
wg := sync.WaitGroup{}
Expand All @@ -108,6 +115,18 @@ func (t Tiered) Create(ctx context.Context, key Key, headers http.Header, ttl ti
}
}

func (t Tiered) replacementETag(ctx context.Context, key Key, newETag string) (bool, error) {
headers, err := t.caches[len(t.caches)-1].Stat(ctx, key)
switch {
case errors.Is(err, os.ErrNotExist):
return false, nil
Comment on lines +121 to +122

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Publish replacements even after authoritative eviction

When the authoritative tier returns os.ErrNotExist here, the write is treated as a new key and tieredWriter.Close() never records the new ETag. Cache implementations are allowed to evict objects independently, so with a shorter remote/authoritative TTL (or eviction) and longer local tiers, other replicas can still hold the old ETag; because invalidateStale() only checks metadata, those local hits will keep serving stale content after this replacement. Previously every successful create published the ETag, so this regresses mutable keys whenever the authoritative copy disappeared before the replacement write.

Useful? React with 👍 / 👎.

case err != nil:
return false, errors.WithStack(err)
default:
return headers.Get(ETagKey) != newETag, nil
}
}

// Delete from all underlying caches. All errors are returned.
func (t Tiered) Delete(ctx context.Context, key Key) error {
wg := sync.WaitGroup{}
Expand All @@ -118,7 +137,7 @@ func (t Tiered) Delete(ctx context.Context, key Key) error {
wg.Wait()
err := errors.Join(errs...)
if err == nil {
t.deleteETag(key)
t.etags.Delete(key)
}
return err
}
Expand Down Expand Up @@ -304,16 +323,12 @@ func (t Tiered) backfillReader(ctx context.Context, key Key, src io.ReadCloser,

const tieredETagsMap = "cache-etags"

func (t Tiered) etags() *metadatadb.Map[Key, string] {
return metadatadb.NewMap[Key, string](t.metadata.Namespace(string(t.namespace)), tieredETagsMap)
}

func (t Tiered) deleteETag(key Key) {
t.etags().Delete(key)
func tieredETags(metadata *metadatadb.Store, namespace Namespace) *metadatadb.Map[Key, string] {
return metadatadb.NewMap[Key, string](metadata.Namespace(string(namespace)), tieredETagsMap)
}

func (t Tiered) invalidateStale(ctx context.Context, c Cache, key Key, headers http.Header) bool {
want, ok := t.etags().Get(key)
want, ok := t.etags.Get(key)
if !ok || want == headers.Get(ETagKey) {
return false
}
Expand Down Expand Up @@ -470,13 +485,14 @@ func (t Tiered) Stats(ctx context.Context) (Stats, error) {
}

type tieredWriter struct {
writers []Writer
cancel context.CancelCauseFunc
etags *metadatadb.Map[Key, string]
key Key
rawETag string
closed bool
aborted bool
writers []Writer
cancel context.CancelCauseFunc
etags *metadatadb.Map[Key, string]
key Key
etag string
replaceETag bool
closed bool
aborted bool
}

var _ Writer = (*tieredWriter)(nil)
Expand All @@ -501,12 +517,8 @@ func (t *tieredWriter) Close() error {
}
wg.Wait()
err := errors.Join(errs...)
if err == nil && !t.aborted && t.etags != nil {
quoted, qerr := FormatETag(t.rawETag)
if qerr != nil {
return errors.WithStack(qerr)
}
t.etags.Set(t.key, quoted)
if err == nil && !t.aborted && t.replaceETag {
t.etags.Set(t.key, t.etag)
}
return err
}
Expand All @@ -531,7 +543,7 @@ func (t Tiered) Namespace(namespace Namespace) Cache {
for i, c := range t.caches {
namespaced[i] = c.Namespace(namespace)
}
return Tiered{caches: namespaced, metadata: t.metadata, namespace: namespace}
return Tiered{caches: namespaced, metadata: t.metadata, etags: tieredETags(t.metadata, namespace), namespace: namespace}
}

// ListNamespaces returns unique namespaces from all underlying caches.
Expand Down
60 changes: 56 additions & 4 deletions internal/cache/tiered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func TestTieredRequiresMetadataStore(t *testing.T) {
})
}

func TestTieredCreatePublishesMetadataETag(t *testing.T) {
func TestTieredCreateDoesNotPublishMetadataETagForNewKey(t *testing.T) {
_, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug})
store := newMetadataStore(ctx)
lower, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour})
Expand All @@ -292,9 +292,55 @@ func TestTieredCreatePublishesMetadataETag(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, w.Close())

_, ok := tieredETags(store, "").Get(key)
assert.False(t, ok)
}

func TestTieredCreatePublishesMetadataETagForReplacement(t *testing.T) {
_, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug})
store := newMetadataStore(ctx)
lower, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour})
assert.NoError(t, err)
upper, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour})
assert.NoError(t, err)
tiered := cache.MaybeNewTiered(ctx, []cache.Cache{lower, upper}, store)
defer tiered.Close()

key := cache.NewKey("tiered-replace-metadata-etag")
seedTier(ctx, t, tiered, key, []byte("old"), "old-etag")

w, err := tiered.Create(ctx, key, nil, time.Minute, cache.WithETag("new-etag"))
assert.NoError(t, err)
_, err = w.Write([]byte("new"))
assert.NoError(t, err)
assert.NoError(t, w.Close())

etag, ok := tieredETags(store, "").Get(key)
assert.True(t, ok)
assert.Equal(t, `"metadata-etag"`, etag)
assert.Equal(t, `"new-etag"`, etag)
}

func TestTieredCreateDoesNotPublishMetadataETagForSameETagReplacement(t *testing.T) {
_, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug})
store := newMetadataStore(ctx)
lower, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour})
assert.NoError(t, err)
upper, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour})
assert.NoError(t, err)
tiered := cache.MaybeNewTiered(ctx, []cache.Cache{lower, upper}, store)
defer tiered.Close()

key := cache.NewKey("tiered-same-metadata-etag")
seedTier(ctx, t, tiered, key, []byte("old"), "same-etag")

w, err := tiered.Create(ctx, key, nil, time.Minute, cache.WithETag("same-etag"))
assert.NoError(t, err)
_, err = w.Write([]byte("new"))
assert.NoError(t, err)
assert.NoError(t, w.Close())

_, ok := tieredETags(store, "").Get(key)
assert.False(t, ok)
}

func TestTieredAbortDoesNotPublishMetadataETag(t *testing.T) {
Expand Down Expand Up @@ -461,9 +507,15 @@ func TestTieredMetadataIsNamespaced(t *testing.T) {

key := cache.NewKey("tiered-namespaced-metadata")
namespaced := tiered.Namespace("alpha")
w, err := namespaced.Create(ctx, key, nil, time.Minute, cache.WithETag("alpha-etag"))
w, err := namespaced.Create(ctx, key, nil, time.Minute, cache.WithETag("old-alpha-etag"))
assert.NoError(t, err)
_, err = w.Write([]byte("content"))
_, err = w.Write([]byte("old"))
assert.NoError(t, err)
assert.NoError(t, w.Close())

w, err = namespaced.Create(ctx, key, nil, time.Minute, cache.WithETag("alpha-etag"))
assert.NoError(t, err)
_, err = w.Write([]byte("new"))
assert.NoError(t, err)
assert.NoError(t, w.Close())

Expand Down