diff --git a/internal/cache/tiered.go b/internal/cache/tiered.go index 3d0779f..939ccda 100644 --- a/internal/cache/tiered.go +++ b/internal/cache/tiered.go @@ -23,6 +23,7 @@ import ( type Tiered struct { caches []Cache metadata *metadatadb.Store + etags *metadatadb.Map[Key, string] namespace Namespace } @@ -40,7 +41,7 @@ func MaybeNewTiered(ctx context.Context, caches []Cache, metadata *metadatadb.St if metadata == nil { panic("Tiered cache requires a metadata store") } - return Tiered{caches: caches, metadata: metadata} + return Tiered{caches: caches, metadata: metadata, etags: tieredETags(metadata, "")} } type authoritativeCache struct { @@ -70,21 +71,27 @@ func (t Tiered) Close() error { // Create a new object. All underlying caches will be written to in sequence. func (t Tiered) Create(ctx context.Context, key Key, headers http.Header, ttl time.Duration, opts ...Option) (Writer, error) { - rawETag, _, err := createETag(opts...) + rawETag, quotedETag, err := createETag(opts...) if err != nil { return nil, errors.WithStack(err) } createOpts := []Option{WithETag(rawETag)} + replaceETag, err := t.replacementETag(ctx, key, quotedETag) + if err != nil { + return nil, err + } + // The first error will cancel all outstanding writes. ctx, cancel := context.WithCancelCause(ctx) tw := &tieredWriter{ - writers: make([]Writer, len(t.caches)), - cancel: cancel, - etags: t.etags(), - key: key, - rawETag: rawETag, + writers: make([]Writer, len(t.caches)), + cancel: cancel, + etags: t.etags, + key: key, + etag: quotedETag, + replaceETag: replaceETag, } // Note: we can't use errgroup here because we do not want to cancel the context on Wait(). wg := sync.WaitGroup{} @@ -108,6 +115,18 @@ func (t Tiered) Create(ctx context.Context, key Key, headers http.Header, ttl ti } } +func (t Tiered) replacementETag(ctx context.Context, key Key, newETag string) (bool, error) { + headers, err := t.caches[len(t.caches)-1].Stat(ctx, key) + switch { + case errors.Is(err, os.ErrNotExist): + return false, nil + case err != nil: + return false, errors.WithStack(err) + default: + return headers.Get(ETagKey) != newETag, nil + } +} + // Delete from all underlying caches. All errors are returned. func (t Tiered) Delete(ctx context.Context, key Key) error { wg := sync.WaitGroup{} @@ -118,7 +137,7 @@ func (t Tiered) Delete(ctx context.Context, key Key) error { wg.Wait() err := errors.Join(errs...) if err == nil { - t.deleteETag(key) + t.etags.Delete(key) } return err } @@ -304,16 +323,12 @@ func (t Tiered) backfillReader(ctx context.Context, key Key, src io.ReadCloser, const tieredETagsMap = "cache-etags" -func (t Tiered) etags() *metadatadb.Map[Key, string] { - return metadatadb.NewMap[Key, string](t.metadata.Namespace(string(t.namespace)), tieredETagsMap) -} - -func (t Tiered) deleteETag(key Key) { - t.etags().Delete(key) +func tieredETags(metadata *metadatadb.Store, namespace Namespace) *metadatadb.Map[Key, string] { + return metadatadb.NewMap[Key, string](metadata.Namespace(string(namespace)), tieredETagsMap) } func (t Tiered) invalidateStale(ctx context.Context, c Cache, key Key, headers http.Header) bool { - want, ok := t.etags().Get(key) + want, ok := t.etags.Get(key) if !ok || want == headers.Get(ETagKey) { return false } @@ -470,13 +485,14 @@ func (t Tiered) Stats(ctx context.Context) (Stats, error) { } type tieredWriter struct { - writers []Writer - cancel context.CancelCauseFunc - etags *metadatadb.Map[Key, string] - key Key - rawETag string - closed bool - aborted bool + writers []Writer + cancel context.CancelCauseFunc + etags *metadatadb.Map[Key, string] + key Key + etag string + replaceETag bool + closed bool + aborted bool } var _ Writer = (*tieredWriter)(nil) @@ -501,12 +517,8 @@ func (t *tieredWriter) Close() error { } wg.Wait() err := errors.Join(errs...) - if err == nil && !t.aborted && t.etags != nil { - quoted, qerr := FormatETag(t.rawETag) - if qerr != nil { - return errors.WithStack(qerr) - } - t.etags.Set(t.key, quoted) + if err == nil && !t.aborted && t.replaceETag { + t.etags.Set(t.key, t.etag) } return err } @@ -531,7 +543,7 @@ func (t Tiered) Namespace(namespace Namespace) Cache { for i, c := range t.caches { namespaced[i] = c.Namespace(namespace) } - return Tiered{caches: namespaced, metadata: t.metadata, namespace: namespace} + return Tiered{caches: namespaced, metadata: t.metadata, etags: tieredETags(t.metadata, namespace), namespace: namespace} } // ListNamespaces returns unique namespaces from all underlying caches. diff --git a/internal/cache/tiered_test.go b/internal/cache/tiered_test.go index 8ab0048..ae45717 100644 --- a/internal/cache/tiered_test.go +++ b/internal/cache/tiered_test.go @@ -275,7 +275,7 @@ func TestTieredRequiresMetadataStore(t *testing.T) { }) } -func TestTieredCreatePublishesMetadataETag(t *testing.T) { +func TestTieredCreateDoesNotPublishMetadataETagForNewKey(t *testing.T) { _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) store := newMetadataStore(ctx) lower, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) @@ -292,9 +292,55 @@ func TestTieredCreatePublishesMetadataETag(t *testing.T) { assert.NoError(t, err) assert.NoError(t, w.Close()) + _, ok := tieredETags(store, "").Get(key) + assert.False(t, ok) +} + +func TestTieredCreatePublishesMetadataETagForReplacement(t *testing.T) { + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) + store := newMetadataStore(ctx) + lower, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + upper, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + tiered := cache.MaybeNewTiered(ctx, []cache.Cache{lower, upper}, store) + defer tiered.Close() + + key := cache.NewKey("tiered-replace-metadata-etag") + seedTier(ctx, t, tiered, key, []byte("old"), "old-etag") + + w, err := tiered.Create(ctx, key, nil, time.Minute, cache.WithETag("new-etag")) + assert.NoError(t, err) + _, err = w.Write([]byte("new")) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + etag, ok := tieredETags(store, "").Get(key) assert.True(t, ok) - assert.Equal(t, `"metadata-etag"`, etag) + assert.Equal(t, `"new-etag"`, etag) +} + +func TestTieredCreateDoesNotPublishMetadataETagForSameETagReplacement(t *testing.T) { + _, ctx := logging.Configure(t.Context(), logging.Config{Level: slog.LevelDebug}) + store := newMetadataStore(ctx) + lower, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + upper, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + tiered := cache.MaybeNewTiered(ctx, []cache.Cache{lower, upper}, store) + defer tiered.Close() + + key := cache.NewKey("tiered-same-metadata-etag") + seedTier(ctx, t, tiered, key, []byte("old"), "same-etag") + + w, err := tiered.Create(ctx, key, nil, time.Minute, cache.WithETag("same-etag")) + assert.NoError(t, err) + _, err = w.Write([]byte("new")) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + + _, ok := tieredETags(store, "").Get(key) + assert.False(t, ok) } func TestTieredAbortDoesNotPublishMetadataETag(t *testing.T) { @@ -461,9 +507,15 @@ func TestTieredMetadataIsNamespaced(t *testing.T) { key := cache.NewKey("tiered-namespaced-metadata") namespaced := tiered.Namespace("alpha") - w, err := namespaced.Create(ctx, key, nil, time.Minute, cache.WithETag("alpha-etag")) + w, err := namespaced.Create(ctx, key, nil, time.Minute, cache.WithETag("old-alpha-etag")) assert.NoError(t, err) - _, err = w.Write([]byte("content")) + _, err = w.Write([]byte("old")) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + + w, err = namespaced.Create(ctx, key, nil, time.Minute, cache.WithETag("alpha-etag")) + assert.NoError(t, err) + _, err = w.Write([]byte("new")) assert.NoError(t, err) assert.NoError(t, w.Close())