From 4d47b4187d2e7095afd70bc054395310eb41fca3 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 12 May 2026 03:49:13 +0000 Subject: [PATCH] perf(ingester): Convert postings cache from FIFO to LRU eviction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The expanded postings cache previously used FIFO eviction, which evicts the oldest entry regardless of access frequency. Under memory pressure with high series churn, actively-queried entries get evicted simply because they were inserted earliest, causing repeated expensive recomputations. LRU eviction moves entries to the back of the list on every access, ensuring frequently-queried entries (e.g., ruler queries running every 30s) stay cached even when the cache is full. Only entries that haven't been accessed recently get evicted. Implementation: add a map[string]*list.Element for O(1) MoveToBack on cache hit. The eviction logic (shouldEvictHead/evictHead) remains unchanged — it still evicts from the front, but now the front contains the least-recently-used entry instead of the oldest-inserted entry. Signed-off-by: Alan Protasio --- CHANGELOG.md | 1 + pkg/storage/tsdb/expanded_postings_cache.go | 55 ++++++--- .../tsdb/expanded_postings_cache_test.go | 110 +++++++++++++++++- 3 files changed, 145 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7815bb58495..6a9f1369fea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * [ENHANCEMENT] Compactor: Prevent partition compaction to compact any blocks marked for deletion. #7391 * [ENHANCEMENT] Distributor: Optimize memory allocations by reusing the existing capacity of these pooled slices in the Prometheus Remote Write 2.0 path. #7392 * [ENHANCEMENT] Upgrade gRPC from v1.71.2 to v1.79.3 to address CVE-2026-33186. #7460 +* [ENHANCEMENT] Ingester: Convert expanded postings cache from FIFO to LRU eviction to retain frequently-queried entries under memory pressure. #7510 * [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370 * [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380 * [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389 diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index b4fcd23f76c..a241607c8a6 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -134,8 +134,8 @@ type ExpandedPostingsCache interface { type blocksPostingsForMatchersCache struct { userId string - headCache *fifoCache[[]storage.SeriesRef] - blocksCache *fifoCache[[]storage.SeriesRef] + headCache *lruCache[[]storage.SeriesRef] + blocksCache *lruCache[[]storage.SeriesRef] postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) timeNow func() time.Time @@ -158,8 +158,8 @@ func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfi } return &blocksPostingsForMatchersCache{ - headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow), - blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow), + headCache: newLruCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow), + blocksCache: newLruCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow), postingsForMatchersFunc: cfg.PostingsForMatchers, timeNow: cfg.timeNow, metrics: metrics, @@ -352,7 +352,7 @@ func (s *seedByHash) incrementSeed(userId string, v string) { s.seedByHash[i]++ } -type fifoCache[V any] struct { +type lruCache[V any] struct { cfg PostingsCacheConfig cachedValues *sync.Map timeNow func() time.Time @@ -365,8 +365,8 @@ type fifoCache[V any] struct { cachedBytes int64 } -func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] { - return &fifoCache[V]{ +func newLruCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *lruCache[V] { + return &lruCache[V]{ cachedValues: new(sync.Map), cached: list.New(), cfg: cfg, @@ -376,7 +376,7 @@ func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *Expanded } } -func (c *fifoCache[V]) clear() { +func (c *lruCache[V]) clear() { c.cachedMtx.Lock() defer c.cachedMtx.Unlock() c.cached = list.New() @@ -384,7 +384,7 @@ func (c *fifoCache[V]) clear() { c.cachedValues = new(sync.Map) } -func (c *fifoCache[V]) expire() { +func (c *lruCache[V]) expire() { if c.cfg.Ttl <= 0 { return } @@ -402,13 +402,13 @@ func (c *fifoCache[V]) expire() { } } -func (c *fifoCache[V]) size() int { +func (c *lruCache[V]) size() int { c.cachedMtx.RLock() defer c.cachedMtx.RUnlock() return c.cached.Len() } -func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) { +func (c *lruCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) { r := &cacheEntryPromise[V]{ done: make(chan struct{}), } @@ -434,14 +434,28 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error) // If the promise is already in the cache, lets wait it to fetch the data. <-loaded.(*cacheEntryPromise[V]).done + // LRU: move to back on access + c.cachedMtx.Lock() + if elem := loaded.(*cacheEntryPromise[V]).elem; elem != nil { + c.cached.MoveToBack(elem) + } + c.cachedMtx.Unlock() + // If is cached but is expired, lets try to replace the cache value. if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) { c.metrics.CacheMiss.WithLabelValues(c.name, "expired").Inc() r.v, r.sizeBytes, r.err = fetch() r.sizeBytes += int64(len(k)) c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes) - loaded = r r.ts = c.timeNow() + // Replace the list element: remove old, push new to back + c.cachedMtx.Lock() + if oldElem := loaded.(*cacheEntryPromise[V]).elem; oldElem != nil { + c.cached.Remove(oldElem) + } + r.elem = c.cached.PushBack(k) + c.cachedMtx.Unlock() + loaded = r ok = false } } @@ -449,12 +463,12 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error) return loaded.(*cacheEntryPromise[V]), ok } -func (c *fifoCache[V]) contains(k string) bool { +func (c *lruCache[V]) contains(k string) bool { _, ok := c.cachedValues.Load(k) return ok } -func (c *fifoCache[V]) shouldEvictHead() (string, bool) { +func (c *lruCache[V]) shouldEvictHead() (string, bool) { h := c.cached.Front() if h == nil { return "", false @@ -475,7 +489,7 @@ func (c *fifoCache[V]) shouldEvictHead() (string, bool) { return "", false } -func (c *fifoCache[V]) evictHead() { +func (c *lruCache[V]) evictHead() { front := c.cached.Front() c.cached.Remove(front) oldestKey := front.Value.(string) @@ -484,18 +498,22 @@ func (c *fifoCache[V]) evictHead() { } } -func (c *fifoCache[V]) created(key string, sizeBytes int64) { +func (c *lruCache[V]) created(key string, sizeBytes int64) { if c.cfg.Ttl <= 0 { c.cachedValues.Delete(key) return } c.cachedMtx.Lock() defer c.cachedMtx.Unlock() - c.cached.PushBack(key) + elem := c.cached.PushBack(key) + // Store the element reference in the promise for O(1) LRU access + if p, ok := c.cachedValues.Load(key); ok { + p.(*cacheEntryPromise[V]).elem = elem + } c.cachedBytes += sizeBytes } -func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) { +func (c *lruCache[V]) updateSize(oldSize, newSizeBytes int64) { if oldSize == newSizeBytes { return } @@ -508,6 +526,7 @@ func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) { type cacheEntryPromise[V any] struct { ts time.Time sizeBytes int64 + elem *list.Element done chan struct{} v V diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go index f446803318c..4f4f0741a0a 100644 --- a/pkg/storage/tsdb/expanded_postings_cache_test.go +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -55,7 +55,7 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) { MaxBytes: 10 << 20, } m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) - cache := newFifoCache[int](cfg, "test", m, time.Now) + cache := newLruCache[int](cfg, "test", m, time.Now) calls := atomic.Int64{} concurrency := 100 wg := sync.WaitGroup{} @@ -84,7 +84,7 @@ func TestFifoCacheDisabled(t *testing.T) { cfg.Enabled = false m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) timeNow := time.Now - cache := newFifoCache[int](cfg, "test", m, timeNow) + cache := newLruCache[int](cfg, "test", m, timeNow) old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) { return 1, 0, nil }) @@ -127,7 +127,7 @@ func TestFifoCacheExpire(t *testing.T) { r := prometheus.NewPedanticRegistry() m := NewPostingCacheMetrics(r) timeNow := time.Now - cache := newFifoCache[int](c.cfg, "test", m, timeNow) + cache := newLruCache[int](c.cfg, "test", m, timeNow) for i := range numberOfKeys { key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) @@ -292,3 +292,107 @@ func TestPostingsCacheFetchTimeout(t *testing.T) { close(fetchShouldBlock) } + +func TestLruCacheEvictsLeastRecentlyUsed(t *testing.T) { + r := prometheus.NewPedanticRegistry() + m := NewPostingCacheMetrics(r) + + // Cache fits exactly 3 entries (each entry = 8 bytes value + 4 bytes key = 12 bytes) + cfg := PostingsCacheConfig{ + Enabled: true, + Ttl: time.Hour, + MaxBytes: int64(3 * (8 + 4)), + } + cache := newLruCache[int](cfg, "test", m, time.Now) + + // Insert 3 entries: A, B, C + cache.getPromiseForKey("aaaa", func() (int, int64, error) { return 1, 8, nil }) + cache.getPromiseForKey("bbbb", func() (int, int64, error) { return 2, 8, nil }) + cache.getPromiseForKey("cccc", func() (int, int64, error) { return 3, 8, nil }) + + require.True(t, cache.contains("aaaa")) + require.True(t, cache.contains("bbbb")) + require.True(t, cache.contains("cccc")) + + // Access A to make it recently used (B is now least recently used) + cache.getPromiseForKey("aaaa", func() (int, int64, error) { return 1, 8, nil }) + + // Insert D — should evict B (least recently used), not A + cache.getPromiseForKey("dddd", func() (int, int64, error) { return 4, 8, nil }) + + require.True(t, cache.contains("aaaa"), "A should still be cached (recently accessed)") + require.False(t, cache.contains("bbbb"), "B should be evicted (least recently used)") + require.True(t, cache.contains("cccc"), "C should still be cached") + require.True(t, cache.contains("dddd"), "D should be cached (just inserted)") +} + +func BenchmarkLruCacheHitUnderPressure(b *testing.B) { + // Simulates: 50 "hot" queries (rulers, every 30s) + many "cold" queries (ad-hoc) + // Cache fits 100 entries. With FIFO, cold queries push out hot ones. + // With LRU, hot queries stay cached because they're accessed frequently. + + r := prometheus.NewPedanticRegistry() + m := NewPostingCacheMetrics(r) + + keySize := 20 + cfg := PostingsCacheConfig{ + Enabled: true, + Ttl: time.Hour, + MaxBytes: int64(100 * (8 + keySize)), // fits 100 entries + } + cache := newLruCache[int](cfg, "bench", m, time.Now) + + // Pre-populate with 50 hot keys + hotKeys := make([]string, 50) + for i := range hotKeys { + hotKeys[i] = RepeatStringIfNeeded(fmt.Sprintf("hot-%d", i), keySize) + cache.getPromiseForKey(hotKeys[i], func() (int, int64, error) { return i, 8, nil }) + } + + coldIdx := 0 + b.ReportAllocs() + + for i := 0; b.Loop(); i++ { + if i%3 == 0 { + // 1/3 of accesses are hot queries (simulating ruler every 30s) + key := hotKeys[i%len(hotKeys)] + cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 8, nil }) + } else { + // 2/3 are cold unique queries (ad-hoc from Grafana) + key := RepeatStringIfNeeded(fmt.Sprintf("cold-%d", coldIdx), keySize) + coldIdx++ + cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 8, nil }) + } + } + + // Report hit rate for hot keys + hits := 0 + for _, k := range hotKeys { + if cache.contains(k) { + hits++ + } + } + b.ReportMetric(float64(hits)/float64(len(hotKeys))*100, "%hot-retained") +} + +func BenchmarkCacheGetPromise(b *testing.B) { + r := prometheus.NewPedanticRegistry() + m := NewPostingCacheMetrics(r) + cfg := PostingsCacheConfig{Enabled: true, Ttl: time.Hour, MaxBytes: 10 << 20} + cache := newLruCache[int](cfg, "bench", m, time.Now) + + // Pre-populate 1000 keys + for i := range 1000 { + key := fmt.Sprintf("key-%04d", i) + cache.getPromiseForKey(key, func() (int, int64, error) { return i, 100, nil }) + } + + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + key := fmt.Sprintf("key-%04d", i%1000) + cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 100, nil }) + i++ + } + }) +}