Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [ENHANCEMENT] Query Frontend: Add `query_too_expensive` reason to QFE and `reason` field to query stats. #7479
* [ENHANCEMENT] Distributor: Add HMAC-SHA256 stream authentication for `PushStream` via `-distributor.sign-write-requests-keys`. #7475
* [ENHANCEMENT] Instrument Ingester CPU profile with source for read APIs. #7494
* [ENHANCEMENT] Ingester: Convert expanded postings cache from FIFO to LRU eviction to retain frequently-queried entries under memory pressure. #7510
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389
Expand Down
55 changes: 37 additions & 18 deletions pkg/storage/tsdb/expanded_postings_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ type ExpandedPostingsCache interface {
type blocksPostingsForMatchersCache struct {
userId string

headCache *fifoCache[[]storage.SeriesRef]
blocksCache *fifoCache[[]storage.SeriesRef]
headCache *lruCache[[]storage.SeriesRef]
blocksCache *lruCache[[]storage.SeriesRef]
postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
timeNow func() time.Time

Expand All @@ -158,8 +158,8 @@ func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfi
}

return &blocksPostingsForMatchersCache{
headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow),
blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow),
headCache: newLruCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow),
blocksCache: newLruCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow),
postingsForMatchersFunc: cfg.PostingsForMatchers,
timeNow: cfg.timeNow,
metrics: metrics,
Expand Down Expand Up @@ -352,7 +352,7 @@ func (s *seedByHash) incrementSeed(userId string, v string) {
s.seedByHash[i]++
}

type fifoCache[V any] struct {
type lruCache[V any] struct {
cfg PostingsCacheConfig
cachedValues *sync.Map
timeNow func() time.Time
Expand All @@ -365,8 +365,8 @@ type fifoCache[V any] struct {
cachedBytes int64
}

func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] {
return &fifoCache[V]{
func newLruCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *lruCache[V] {
return &lruCache[V]{
cachedValues: new(sync.Map),
cached: list.New(),
cfg: cfg,
Expand All @@ -376,15 +376,15 @@ func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *Expanded
}
}

func (c *fifoCache[V]) clear() {
func (c *lruCache[V]) clear() {
c.cachedMtx.Lock()
defer c.cachedMtx.Unlock()
c.cached = list.New()
c.cachedBytes = 0
c.cachedValues = new(sync.Map)
}

func (c *fifoCache[V]) expire() {
func (c *lruCache[V]) expire() {
if c.cfg.Ttl <= 0 {
return
}
Expand All @@ -402,13 +402,13 @@ func (c *fifoCache[V]) expire() {
}
}

func (c *fifoCache[V]) size() int {
func (c *lruCache[V]) size() int {
c.cachedMtx.RLock()
defer c.cachedMtx.RUnlock()
return c.cached.Len()
}

func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
func (c *lruCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
r := &cacheEntryPromise[V]{
done: make(chan struct{}),
}
Expand All @@ -434,27 +434,41 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
// If the promise is already in the cache, lets wait it to fetch the data.
<-loaded.(*cacheEntryPromise[V]).done

// LRU: move to back on access
c.cachedMtx.Lock()
if elem := loaded.(*cacheEntryPromise[V]).elem; elem != nil {
c.cached.MoveToBack(elem)
}
c.cachedMtx.Unlock()

// If is cached but is expired, lets try to replace the cache value.
if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) {
c.metrics.CacheMiss.WithLabelValues(c.name, "expired").Inc()
r.v, r.sizeBytes, r.err = fetch()
r.sizeBytes += int64(len(k))
c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes)
loaded = r
r.ts = c.timeNow()
// Replace the list element: remove old, push new to back
c.cachedMtx.Lock()
if oldElem := loaded.(*cacheEntryPromise[V]).elem; oldElem != nil {
c.cached.Remove(oldElem)
}
r.elem = c.cached.PushBack(k)
c.cachedMtx.Unlock()
loaded = r
ok = false
}
}

return loaded.(*cacheEntryPromise[V]), ok
}

func (c *fifoCache[V]) contains(k string) bool {
func (c *lruCache[V]) contains(k string) bool {
_, ok := c.cachedValues.Load(k)
return ok
}

func (c *fifoCache[V]) shouldEvictHead() (string, bool) {
func (c *lruCache[V]) shouldEvictHead() (string, bool) {
h := c.cached.Front()
if h == nil {
return "", false
Expand All @@ -475,7 +489,7 @@ func (c *fifoCache[V]) shouldEvictHead() (string, bool) {
return "", false
}

func (c *fifoCache[V]) evictHead() {
func (c *lruCache[V]) evictHead() {
front := c.cached.Front()
c.cached.Remove(front)
oldestKey := front.Value.(string)
Expand All @@ -484,18 +498,22 @@ func (c *fifoCache[V]) evictHead() {
}
}

func (c *fifoCache[V]) created(key string, sizeBytes int64) {
func (c *lruCache[V]) created(key string, sizeBytes int64) {
if c.cfg.Ttl <= 0 {
c.cachedValues.Delete(key)
return
}
c.cachedMtx.Lock()
defer c.cachedMtx.Unlock()
c.cached.PushBack(key)
elem := c.cached.PushBack(key)
// Store the element reference in the promise for O(1) LRU access
if p, ok := c.cachedValues.Load(key); ok {
p.(*cacheEntryPromise[V]).elem = elem
}
c.cachedBytes += sizeBytes
}

func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) {
func (c *lruCache[V]) updateSize(oldSize, newSizeBytes int64) {
if oldSize == newSizeBytes {
return
}
Expand All @@ -508,6 +526,7 @@ func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) {
type cacheEntryPromise[V any] struct {
ts time.Time
sizeBytes int64
elem *list.Element

done chan struct{}
v V
Expand Down
110 changes: 107 additions & 3 deletions pkg/storage/tsdb/expanded_postings_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) {
MaxBytes: 10 << 20,
}
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
cache := newFifoCache[int](cfg, "test", m, time.Now)
cache := newLruCache[int](cfg, "test", m, time.Now)
calls := atomic.Int64{}
concurrency := 100
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestFifoCacheDisabled(t *testing.T) {
cfg.Enabled = false
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
timeNow := time.Now
cache := newFifoCache[int](cfg, "test", m, timeNow)
cache := newLruCache[int](cfg, "test", m, timeNow)
old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) {
return 1, 0, nil
})
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestFifoCacheExpire(t *testing.T) {
r := prometheus.NewPedanticRegistry()
m := NewPostingCacheMetrics(r)
timeNow := time.Now
cache := newFifoCache[int](c.cfg, "test", m, timeNow)
cache := newLruCache[int](c.cfg, "test", m, timeNow)

for i := range numberOfKeys {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
Expand Down Expand Up @@ -292,3 +292,107 @@ func TestPostingsCacheFetchTimeout(t *testing.T) {

close(fetchShouldBlock)
}

func TestLruCacheEvictsLeastRecentlyUsed(t *testing.T) {
r := prometheus.NewPedanticRegistry()
m := NewPostingCacheMetrics(r)

// Cache fits exactly 3 entries (each entry = 8 bytes value + 4 bytes key = 12 bytes)
cfg := PostingsCacheConfig{
Enabled: true,
Ttl: time.Hour,
MaxBytes: int64(3 * (8 + 4)),
}
cache := newLruCache[int](cfg, "test", m, time.Now)

// Insert 3 entries: A, B, C
cache.getPromiseForKey("aaaa", func() (int, int64, error) { return 1, 8, nil })
cache.getPromiseForKey("bbbb", func() (int, int64, error) { return 2, 8, nil })
cache.getPromiseForKey("cccc", func() (int, int64, error) { return 3, 8, nil })

require.True(t, cache.contains("aaaa"))
require.True(t, cache.contains("bbbb"))
require.True(t, cache.contains("cccc"))

// Access A to make it recently used (B is now least recently used)
cache.getPromiseForKey("aaaa", func() (int, int64, error) { return 1, 8, nil })

// Insert D — should evict B (least recently used), not A
cache.getPromiseForKey("dddd", func() (int, int64, error) { return 4, 8, nil })

require.True(t, cache.contains("aaaa"), "A should still be cached (recently accessed)")
require.False(t, cache.contains("bbbb"), "B should be evicted (least recently used)")
require.True(t, cache.contains("cccc"), "C should still be cached")
require.True(t, cache.contains("dddd"), "D should be cached (just inserted)")
}

func BenchmarkLruCacheHitUnderPressure(b *testing.B) {
// Simulates: 50 "hot" queries (rulers, every 30s) + many "cold" queries (ad-hoc)
// Cache fits 100 entries. With FIFO, cold queries push out hot ones.
// With LRU, hot queries stay cached because they're accessed frequently.

r := prometheus.NewPedanticRegistry()
m := NewPostingCacheMetrics(r)

keySize := 20
cfg := PostingsCacheConfig{
Enabled: true,
Ttl: time.Hour,
MaxBytes: int64(100 * (8 + keySize)), // fits 100 entries
}
cache := newLruCache[int](cfg, "bench", m, time.Now)

// Pre-populate with 50 hot keys
hotKeys := make([]string, 50)
for i := range hotKeys {
hotKeys[i] = RepeatStringIfNeeded(fmt.Sprintf("hot-%d", i), keySize)
cache.getPromiseForKey(hotKeys[i], func() (int, int64, error) { return i, 8, nil })
}

coldIdx := 0
b.ReportAllocs()

for i := 0; b.Loop(); i++ {
if i%3 == 0 {
// 1/3 of accesses are hot queries (simulating ruler every 30s)
key := hotKeys[i%len(hotKeys)]
cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 8, nil })
} else {
// 2/3 are cold unique queries (ad-hoc from Grafana)
key := RepeatStringIfNeeded(fmt.Sprintf("cold-%d", coldIdx), keySize)
coldIdx++
cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 8, nil })
}
}

// Report hit rate for hot keys
hits := 0
for _, k := range hotKeys {
if cache.contains(k) {
hits++
}
}
b.ReportMetric(float64(hits)/float64(len(hotKeys))*100, "%hot-retained")
}

func BenchmarkCacheGetPromise(b *testing.B) {
r := prometheus.NewPedanticRegistry()
m := NewPostingCacheMetrics(r)
cfg := PostingsCacheConfig{Enabled: true, Ttl: time.Hour, MaxBytes: 10 << 20}
cache := newLruCache[int](cfg, "bench", m, time.Now)

// Pre-populate 1000 keys
for i := range 1000 {
key := fmt.Sprintf("key-%04d", i)
cache.getPromiseForKey(key, func() (int, int64, error) { return i, 100, nil })
}

b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
key := fmt.Sprintf("key-%04d", i%1000)
cache.getPromiseForKey(key, func() (int, int64, error) { return 1, 100, nil })
i++
}
})
}
Loading