diff --git a/.seqbench/baseline.env b/.seqbench/baseline.env index ee12dfcb..eea4bb9c 100644 --- a/.seqbench/baseline.env +++ b/.seqbench/baseline.env @@ -1,8 +1,20 @@ GOGC=100 -SEQDB_STORAGE_FRAC_SIZE=16MiB +SEQDB_STORAGE_FRAC_SIZE=1MiB SEQDB_STORAGE_TOTAL_SIZE=10GiB +SEQDB_COMPACTION_ENABLED=true +SEQDB_COMPACTION_WORKERS=4 +SEQDB_COMPACTION_TIME_WINDOW=1h +SEQDB_COMPACTION_TICK_INTERVAL=1s + +SEQDB_COMPACTION_STCS_MERGE_TRIGGER=4 +SEQDB_COMPACTION_STCS_MERGE_FAN_IN=8 +SEQDB_COMPACTION_STCS_MERGE_FAN_OUT_SIZE=256MiB + +SEQDB_COMPACTION_STCS_BUCKET_LOWERBOUND=0.5 +SEQDB_COMPACTION_STCS_BUCKET_UPPERBOUND=1.5 + SEQDB_LIMITS_QUERY_RATE=1024 SEQDB_LIMITS_SEARCH_REQUESTS=1024 SEQDB_LIMITS_BULK_REQUESTS=128 diff --git a/.seqbench/comparison.env b/.seqbench/comparison.env index ee12dfcb..eea4bb9c 100644 --- a/.seqbench/comparison.env +++ b/.seqbench/comparison.env @@ -1,8 +1,20 @@ GOGC=100 -SEQDB_STORAGE_FRAC_SIZE=16MiB +SEQDB_STORAGE_FRAC_SIZE=1MiB SEQDB_STORAGE_TOTAL_SIZE=10GiB +SEQDB_COMPACTION_ENABLED=true +SEQDB_COMPACTION_WORKERS=4 +SEQDB_COMPACTION_TIME_WINDOW=1h +SEQDB_COMPACTION_TICK_INTERVAL=1s + +SEQDB_COMPACTION_STCS_MERGE_TRIGGER=4 +SEQDB_COMPACTION_STCS_MERGE_FAN_IN=8 +SEQDB_COMPACTION_STCS_MERGE_FAN_OUT_SIZE=256MiB + +SEQDB_COMPACTION_STCS_BUCKET_LOWERBOUND=0.5 +SEQDB_COMPACTION_STCS_BUCKET_UPPERBOUND=1.5 + SEQDB_LIMITS_QUERY_RATE=1024 SEQDB_LIMITS_SEARCH_REQUESTS=1024 SEQDB_LIMITS_BULK_REQUESTS=128 diff --git a/.seqbench/continuous.env b/.seqbench/continuous.env index 96a870ac..9fb9c7fc 100644 --- a/.seqbench/continuous.env +++ b/.seqbench/continuous.env @@ -2,9 +2,21 @@ GOGC=100 SEQDB_RESOURCES_SKIP_FSYNC=true -SEQDB_STORAGE_FRAC_SIZE=16MiB +SEQDB_STORAGE_FRAC_SIZE=1MiB SEQDB_STORAGE_TOTAL_SIZE=10GiB +SEQDB_COMPACTION_ENABLED=true +SEQDB_COMPACTION_WORKERS=4 +SEQDB_COMPACTION_TIME_WINDOW=1h +SEQDB_COMPACTION_TICK_INTERVAL=1s + +SEQDB_COMPACTION_STCS_MERGE_TRIGGER=4 +SEQDB_COMPACTION_STCS_MERGE_FAN_IN=8 +SEQDB_COMPACTION_STCS_MERGE_FAN_OUT_SIZE=256MiB + +SEQDB_COMPACTION_STCS_BUCKET_LOWERBOUND=0.5 +SEQDB_COMPACTION_STCS_BUCKET_UPPERBOUND=1.5 + SEQDB_LIMITS_QUERY_RATE=1024 SEQDB_LIMITS_SEARCH_REQUESTS=1024 SEQDB_LIMITS_BULK_REQUESTS=128 diff --git a/cmd/seq-db/seq-db.go b/cmd/seq-db/seq-db.go index de29ac4c..25b65f32 100644 --- a/cmd/seq-db/seq-db.go +++ b/cmd/seq-db/seq-db.go @@ -19,6 +19,7 @@ import ( "github.com/ozontech/seq-db/asyncsearcher" "github.com/ozontech/seq-db/buildinfo" + "github.com/ozontech/seq-db/compaction" "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" @@ -289,6 +290,7 @@ func startStore( OffloadingRetention: cfg.Offloading.Retention, OffloadingRetryDelay: cfg.Offloading.RetryDelay, OffloadingQueueSize: uint64(float64(cfg.Storage.TotalSize) * cfg.Offloading.QueueSizePercent / 100), + CompactionEnabled: cfg.Compaction.Enabled, }, API: storeapi.APIConfig{ StoreMode: configMode, @@ -324,6 +326,20 @@ func startStore( Workers: cfg.SkipMaskManager.Workers, CacheSizeLimit: uint64(cfg.SkipMaskManager.CacheSize), }, + Compaction: compaction.Config{ + Enabled: cfg.Compaction.Enabled, + + MergeTrigger: cfg.Compaction.STCS.MergeTrigger, + MergeFanIn: cfg.Compaction.STCS.MergeFanIn, + MergeFanOutSize: uint64(cfg.Compaction.STCS.MergeFanOutSize), + + BucketLowerbound: cfg.Compaction.STCS.BucketLowerbound, + BucketUpperbound: cfg.Compaction.STCS.BucketUpperbound, + + Workers: cfg.Compaction.Workers, + TimeWindow: cfg.Compaction.TimeWindow, + TickInterval: cfg.Compaction.TickInterval, + }, } s3cli := initS3Client(cfg) diff --git a/compaction/executor.go b/compaction/executor.go new file mode 100644 index 00000000..b0cc6db9 --- /dev/null +++ b/compaction/executor.go @@ -0,0 +1,76 @@ +package compaction + +import ( + "sync" + "time" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed" + "github.com/ozontech/seq-db/logger" +) + +type Executor struct { + params common.SealParams + + workers int + wg sync.WaitGroup + + p *planner +} + +func NewExecutor(workers int, params common.SealParams, p *planner) *Executor { + e := Executor{params: params, workers: workers, p: p} + e.init() + return &e +} + +func (e *Executor) Stop() { + e.p.stop() + e.wg.Wait() +} + +func (e *Executor) init() { + for range e.workers { + e.wg.Go(func() { + for t := range e.p.tasks { + start := time.Now() + + result, err := e.compact(t) + compactionDurationSeconds. + WithLabelValues(t.bucketSize). + Observe(time.Since(start).Seconds()) + + t.onComplete(result, err) + } + }) + } +} + +func (e *Executor) compact(t task) (*sealed.PreloadedData, error) { + var ( + names []string + srcs []Source + ) + + for _, f := range t.snapshot.Fractions() { + names = append(names, f.Info().Name()) + srcs = append(srcs, frac.NewSealedSource(f)) + + compactionBytesTotal. + WithLabelValues(t.bucketSize). + Add(float64(f.Info().IndexOnDisk)) + } + + logger.Info( + "compacting fractions", + zap.Time("bin", t.bin), + zap.Strings("names", names), + zap.String("bucket_size", t.bucketSize), + ) + + preloaded, err := Merge(t.filename, e.params, srcs...) + return preloaded, err +} diff --git a/compaction/merge.go b/compaction/merge.go index 928b3044..30aa3ab8 100644 --- a/compaction/merge.go +++ b/compaction/merge.go @@ -11,13 +11,13 @@ import ( ) func Merge(filename string, params common.SealParams, srcs ...Source) (*sealed.PreloadedData, error) { - writer := indexwriter.New(params) + w := indexwriter.New(params) src := NewMergeSource(filename, srcs) if err := createAndWrite( filename+consts.OffsetsTmpFileSuffix, filename+consts.OffsetsFileSuffix, - func(f *os.File) error { return writer.WriteOffsetsFile(f, src) }, + func(f *os.File) error { return w.WriteOffsetsFile(f, src) }, ); err != nil { return nil, err } @@ -25,7 +25,7 @@ func Merge(filename string, params common.SealParams, srcs ...Source) (*sealed.P if err := createAndWrite( filename+consts.IDTmpFileSuffix, filename+consts.IDFileSuffix, - func(f *os.File) error { return writer.WriteIDFile(f, src) }, + func(f *os.File) error { return w.WriteIDFile(f, src) }, ); err != nil { return nil, err } @@ -35,7 +35,7 @@ func Merge(filename string, params common.SealParams, srcs ...Source) (*sealed.P filename+consts.TokenFileSuffix, filename+consts.LIDTmpFileSuffix, filename+consts.LIDFileSuffix, - func(tf, lf *os.File) error { return writer.WriteTokenTriplet(tf, lf, src) }, + func(tf, lf *os.File) error { return w.WriteTokenTriplet(tf, lf, src) }, ); err != nil { return nil, err } @@ -43,7 +43,7 @@ func Merge(filename string, params common.SealParams, srcs ...Source) (*sealed.P if err := createAndWrite( filename+consts.InfoTmpFileSuffix, filename+consts.InfoFileSuffix, - func(f *os.File) error { return writer.WriteInfoFile(f, src) }, + func(f *os.File) error { return w.WriteInfoFile(f, src) }, ); err != nil { return nil, err } @@ -69,13 +69,13 @@ func Merge(filename string, params common.SealParams, srcs ...Source) (*sealed.P info.IndexOnDisk += uint64(st.Size()) } - lidsTable := writer.LIDsTable() + lidsTable := w.LIDsTable() preloaded := &sealed.PreloadedData{ Info: info, - TokenTable: writer.TokenTable(), + TokenTable: w.TokenTable(), BlocksData: sealed.BlocksData{ LIDsTable: &lidsTable, - IDsTable: writer.IDsTable(), + IDsTable: w.IDsTable(), BlocksOffsets: src.BlockOffsets(), }, } diff --git a/compaction/metrics.go b/compaction/metrics.go new file mode 100644 index 00000000..a6fe88fd --- /dev/null +++ b/compaction/metrics.go @@ -0,0 +1,46 @@ +package compaction + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/ozontech/seq-db/metric" +) + +var ( + compactionSkipped = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "seq_db_store", + Subsystem: "compaction", + Name: "skipped_total", + Help: "Tick-triggered tasks dropped because all workers were busy or no candidates were found", + }) + + compactionBins = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "compaction", + Name: "bins", + Help: "Number of active time-bins considered for compaction", + }) + + compactionDurationSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "seq_db_store", + Subsystem: "compaction", + Name: "duration_seconds", + Help: "Time spent executing a single compaction", + Buckets: metric.SecondsBuckets, + }, []string{"bucket"}) + + compactionBytesTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "seq_db_store", + Subsystem: "compaction", + Name: "bytes_total", + Help: "Total index bytes merged across all compactions", + }, []string{"bucket"}) + + compactionResultTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "seq_db_store", + Subsystem: "compaction", + Name: "result_total", + Help: "Compaction outcomes by result (success, empty, error)", + }, []string{"bucket", "result"}) +) diff --git a/compaction/planner.go b/compaction/planner.go new file mode 100644 index 00000000..da0c0008 --- /dev/null +++ b/compaction/planner.go @@ -0,0 +1,239 @@ +package compaction + +import ( + "cmp" + "context" + "maps" + "math/bits" + "slices" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed" + "github.com/ozontech/seq-db/fracmanager" + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/util" +) + +type Config struct { + Enabled bool + + MergeTrigger int + MergeFanIn int + MergeFanOutSize uint64 + + BucketLowerbound float64 + BucketUpperbound float64 + + Workers int + TimeWindow time.Duration + TickInterval time.Duration +} + +type fraction interface { + Info() *common.Info +} + +type task struct { + bin time.Time + bucketSize string + + filename string + snapshot *fracmanager.CompactionSnapshot + + onComplete func(*sealed.PreloadedData, error) +} + +type planner struct { + cfg Config + ctx context.Context + + wg sync.WaitGroup + done chan struct{} + + fm *fracmanager.FracManager + tasks chan task +} + +func NewPlanner(ctx context.Context, fm *fracmanager.FracManager, cfg Config) *planner { + p := planner{ + cfg: cfg, + ctx: ctx, + + done: make(chan struct{}), + fm: fm, + + tasks: make(chan task), + } + + if cfg.Enabled { + p.init() + return &p + } + + return &p +} + +func (p *planner) init() { + p.wg.Go(func() { + t := time.NewTicker(p.cfg.TickInterval) + + for { + select { + case <-p.ctx.Done(): + close(p.tasks) + return + + case <-p.done: + close(p.tasks) + return + + case <-t.C: + task, ok := p.pick() + if !ok { + compactionSkipped.Inc() + continue + } + + select { + case p.tasks <- task: + case <-time.NewTimer(time.Second).C: + // If all executor workers are busy for some long period of time, + // we want to drop the task because it might contain stale decision. + compactionSkipped.Inc() + } + } + } + }) +} + +func (p *planner) stop() { + close(p.done) + if !p.cfg.Enabled { + close(p.tasks) + } +} + +func (p *planner) pick() (task, bool) { + fractions := p.fm.SealedFractionsSnapshot() + snapshot := make([]fraction, len(fractions)) + + for i := range fractions { + snapshot[i] = fractions[i] + } + + bins := p.distribute(p.cfg.TimeWindow, snapshot) + compactionBins.Set(float64(len(bins))) + times := p.prioritize(bins) + + for _, t := range times { + picked := strategySTCS{ + mergeTrigger: p.cfg.MergeTrigger, + mergeFanIn: p.cfg.MergeFanIn, + mergeFanOutSize: p.cfg.MergeFanOutSize, + bucketLowerbound: p.cfg.BucketLowerbound, + bucketUpperbound: p.cfg.BucketUpperbound, + }.Pick(bins[t]) + + if len(picked.fracs) == 0 { + // No candidates were found. + continue + } + + bucketSize := util.SizeStr(powerOfTwo(picked.sizeAvg)) + csnapshot, err := p.fm.ClaimForCompaction(names(picked.fracs)) + if err != nil { + continue + } + + return task{ + bin: t, + bucketSize: bucketSize, + + filename: p.fm.FractionName(), + snapshot: csnapshot, + + onComplete: func(s *sealed.PreloadedData, err error) { + if err != nil { + compactionResultTotal.WithLabelValues(bucketSize, "error").Inc() + + logger.Error( + "failed to compact fractions", + zap.Error(err), + zap.Any("snapshot", names(csnapshot.Fractions())), + ) + + return + } + + if s == nil { + logger.Info( + "compaction did not produce fraction", + zap.Any("snapshot", names(csnapshot.Fractions())), + ) + return + } + + compactionResultTotal.WithLabelValues(bucketSize, "success").Inc() + // TODO(dkharms): Is it fine to substitute and delete? + // We need somehow substitute and delete atomically. + p.fm.SubstituteWithSealed(s, csnapshot) + csnapshot.Destroy() + }, + }, true + } + + return task{}, false +} + +func (p *planner) distribute(window time.Duration, fracs []fraction) map[time.Time][]fraction { + bins := make(map[time.Time][]fraction) + + for _, f := range fracs { + // TODO(dkharms): Group by time-range fraction cover. + // + // Once we implement timestamp-binning, we need to group fractions into bins + // not by creation time, but by time-range they cover. + creation := time.UnixMilli(int64(f.Info().CreationTime)) + + bin := creation.Truncate(window) + bins[bin] = append(bins[bin], f) + } + + return bins +} + +func (p *planner) prioritize(bins map[time.Time][]fraction) []time.Time { + ordered := slices.Collect(maps.Keys(bins)) + + // Prioritize bins with the most fractions above target since they hurt search the most. + // Older bins are preferred on ties since they have been sitting above target longer. + slices.SortFunc(ordered, func(x, y time.Time) int { + xcount := len(bins[x]) + ycount := len(bins[y]) + if xcount == ycount { + return -x.Compare(y) + } + return -cmp.Compare(xcount, ycount) + }) + + return ordered +} + +func names[T interface{ Info() *common.Info }, S ~[]T](fracs S) []string { + fnames := make([]string, len(fracs)) + for i := range fracs { + fnames[i] = fracs[i].Info().Name() + } + return fnames +} + +func powerOfTwo(v uint64) uint64 { + if v == 0 { + return 1 + } + return 1 << bits.Len64(v-1) +} diff --git a/compaction/stcs.go b/compaction/stcs.go new file mode 100644 index 00000000..3baa9d6b --- /dev/null +++ b/compaction/stcs.go @@ -0,0 +1,112 @@ +package compaction + +import ( + "cmp" + "slices" +) + +type strategySTCS struct { + // To trigger compaction of bucket there must be + // at least [mergeTrigger] fractions. + mergeTrigger int + + // At most this many fractions are compacted from a single bucket + // per compaction iteration. + mergeFanIn int + mergeFanOutSize uint64 + + // Fraction size must be within [bucketLowerbound, bucketUpperbound] * avg(bucket) + // to be considered part of the bucket. + bucketLowerbound float64 + bucketUpperbound float64 +} + +type bucket struct { + sizeAvg uint64 + fracs []fraction +} + +func (s strategySTCS) Pick(candidates []fraction) bucket { + if len(candidates) < s.mergeTrigger { + return bucket{} + } + + sorted := slices.Clone(candidates) + slices.SortFunc(sorted, func(a, b fraction) int { + return cmp.Compare(a.Info().IndexOnDisk, b.Info().IndexOnDisk) + }) + + buckets := s.group(sorted) + // We are interested in buckets with the most amount of fractions. + // Usually, these are the lowest tiers where all freshly sealed fractions end up. + slices.SortFunc(buckets, func(x, y bucket) int { + return -cmp.Compare(len(x.fracs), len(y.fracs)) + }) + + for _, b := range buckets { + if len(b.fracs) < s.mergeTrigger { + continue + } + + b.fracs = b.fracs[:min(len(b.fracs), s.mergeFanIn)] + if picked := s.takeUntilSize(b); len(picked.fracs) >= s.mergeTrigger { + return picked + } + } + + return bucket{} +} + +func (s strategySTCS) group(sorted []fraction) []bucket { + var ( + sum uint64 + current []fraction + buckets []bucket + ) + + for _, f := range sorted { + size := f.Info().IndexOnDisk + + if len(current) == 0 { + current = append(current, f) + sum = size + continue + } + + avg := float64(sum) / float64(len(current)) + fsize := float64(size) + + lower := avg * s.bucketLowerbound + upper := avg * s.bucketUpperbound + + if lower <= fsize && fsize <= upper { + current = append(current, f) + sum += size + continue + } + + buckets = append(buckets, bucket{uint64(avg), current}) + current = []fraction{f} + sum = size + } + + if len(current) > 0 { + avg := float64(sum) / float64(len(current)) + buckets = append(buckets, bucket{uint64(avg), current}) + } + + return buckets +} + +func (s strategySTCS) takeUntilSize(b bucket) bucket { + var picked uint64 + + for i := range b.fracs { + picked += b.fracs[i].Info().IndexOnDisk + if picked >= s.mergeFanOutSize { + return bucket{b.sizeAvg, b.fracs[:i]} + } + } + + return b +} diff --git a/compaction/stcs_test.go b/compaction/stcs_test.go new file mode 100644 index 00000000..d1800bea --- /dev/null +++ b/compaction/stcs_test.go @@ -0,0 +1,74 @@ +package compaction + +import ( + "math" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/frac/common" +) + +type mockFraction struct { + indexOnDisk uint64 +} + +func (m *mockFraction) Info() *common.Info { + return &common.Info{IndexOnDisk: m.indexOnDisk} +} + +func makeFracs(sizes ...uint64) []fraction { + out := make([]fraction, len(sizes)) + for i, s := range sizes { + out[i] = &mockFraction{indexOnDisk: s} + } + return out +} + +func TestSTCS_Pick(t *testing.T) { + s := strategySTCS{ + mergeTrigger: 4, + mergeFanIn: 32, + mergeFanOutSize: math.MaxUint64, + bucketLowerbound: 0.5, + bucketUpperbound: 1.5, + } + + t.Run("not-enough-candidates", func(t *testing.T) { + for n := range s.mergeTrigger { + require.Nil(t, s.Pick(makeFracs(make([]uint64, n)...))) + } + }) + + t.Run("requirement-not-met", func(t *testing.T) { + // Each Fraction size is 10x the previous. + // They land in different buckets and no bucket with [mergeTrigger] fractions exists. + require.Nil(t, s.Pick(makeFracs(100, 1000, 10000, 100000))) + }) + + t.Run("one-bucket", func(t *testing.T) { + require.Len(t, s.Pick(makeFracs(1000, 1000, 1000, 1000)), 4) + }) + + t.Run("largest-bucket", func(t *testing.T) { + b := s.Pick(makeFracs( + 1000, 1000, + 100000, 100000, 100000, 100000, 100000, // Will take this bucket. + )) + + require.Len(t, b, 5) + for _, f := range b.fracs { + require.Equal(t, uint64(100000), f.Info().IndexOnDisk) + } + }) + + t.Run("cap-at-fan-in", func(t *testing.T) { + sizes := make([]uint64, s.mergeFanIn+10) + + for i := range sizes { + sizes[i] = 5000 + } + + require.Len(t, s.Pick(makeFracs(sizes...)), s.mergeFanIn) + }) +} diff --git a/config.example.yaml b/config.example.yaml index c5bcb24b..e4cf0a9a 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -10,13 +10,25 @@ storage: frac_size: 16MiB total_size: 1GiB +compaction: + enabled: true + workers: 4 + time_window: 24h + tick_interval: 1s + stcs: + merge_trigger: 4 + merge_fan_in: 32 + merge_fan_out_size: 256MiB + bucket_lowerbound: 0.5 + bucket_upperbound: 1.5 + # For testing or developments purposes you can run MinIO S3 compatible object storage locally. # # docker run -p 9000:9000 -p 9001:9001 \ # quay.io/minio/minio server /data --console-address ":9001" offloading: - enabled: true + enabled: false retention: 5m endpoint: http://localhost:9000/ bucket: remote-storage diff --git a/config/config.go b/config/config.go index 0d929a7f..d023aeb8 100644 --- a/config/config.go +++ b/config/config.go @@ -35,6 +35,7 @@ func Parse(path string) (Config, error) { } /* Set computed defaults if user did not override them */ + c.Compaction.Workers = cmp.Or(c.Compaction.Workers, NumCPU) c.Resources.ReaderWorkers = cmp.Or(c.Resources.ReaderWorkers, NumCPU) c.Resources.SearchWorkers = cmp.Or(c.Resources.SearchWorkers, NumCPU) @@ -59,7 +60,7 @@ type Config struct { // DataDir is a path to a directory where fractions will be stored. DataDir string `config:"data_dir"` // FracSize specifies the maximum size of an active fraction before it gets sealed. - FracSize Bytes `config:"frac_size" default:"128MiB"` + FracSize Bytes `config:"frac_size" default:"16MiB"` // TotalSize specifies upper bound of how much disk space can be occupied // by sealed fractions before they get deleted (or offloaded). TotalSize Bytes `config:"total_size" default:"1GiB"` @@ -209,6 +210,20 @@ type Config struct { DocBlockZstdCompressionLevel int `config:"doc_block_zstd_compression_level" default:"3"` } `config:"compression"` + Compaction struct { + STCS struct { + MergeTrigger int `config:"merge_trigger" default:"4"` + MergeFanIn int `config:"merge_fan_in" default:"32"` + MergeFanOutSize Bytes `config:"merge_fan_out_size" default:"512MiB"` + BucketLowerbound float64 `config:"bucket_lowerbound" default:"0.5"` + BucketUpperbound float64 `config:"bucket_upperbound" default:"1.5"` + } `config:"stcs"` + Enabled bool `config:"enabled"` + Workers int `config:"workers"` + TimeWindow time.Duration `config:"time_window" default:"1h"` + TickInterval time.Duration `config:"tick_interval" default:"1s"` + } `config:"compaction"` + Indexing struct { MaxTokenSize int `config:"max_token_size" default:"72"` CaseSensitive bool `config:"case_sensitive"` diff --git a/config/validation.go b/config/validation.go index c305a706..a09cb23e 100644 --- a/config/validation.go +++ b/config/validation.go @@ -73,6 +73,17 @@ func (c *Config) storeValidations() []validateFn { inRange("offloading.queue_size_percent", 0, 100, c.Offloading.QueueSizePercent), greaterThan("experimental.max_regex_tokens_check", -1, c.Experimental.MaxRegexTokensCheck), + + greaterThan("compaction.stcs.merge_trigger", 0, c.Compaction.STCS.MergeTrigger), + greaterThan("compaction.stcs.merge_fan_out_size", 0, c.Compaction.STCS.MergeFanOutSize), + greaterOrEqThan("compaction.stcs.merge_fan_in", c.Compaction.STCS.MergeTrigger, c.Compaction.STCS.MergeFanIn), + + greaterThan("compaction.stcs.bucket_lowerbound", 0, c.Compaction.STCS.BucketLowerbound), + greaterOrEqThan("compaction.stcs.bucket_upperbound", c.Compaction.STCS.BucketLowerbound, c.Compaction.STCS.BucketUpperbound), + + greaterOrEqThan("compaction.workers", 0, c.Compaction.Workers), + greaterThan("compaction.time_window", 0, c.Compaction.TimeWindow), + greaterThan("compaction.tick_interval", 0, c.Compaction.TickInterval), } if c.Offloading.Enabled { @@ -112,7 +123,19 @@ func lessOrEqThan[T cmp.Ordered](field string, base, v T) validateFn { return func() error { if v > base { return fmt.Errorf( - "field %q must be greater than %v", + "field %q must be less or equal than %v", + field, base, + ) + } + return nil + } +} + +func greaterOrEqThan[T cmp.Ordered](field string, base, v T) validateFn { + return func() error { + if v < base { + return fmt.Errorf( + "field %q must be greater or equal than %v", field, base, ) } diff --git a/frac/active_indexer_test.go b/frac/active_indexer_test.go index a1200a7c..812b2763 100644 --- a/frac/active_indexer_test.go +++ b/frac/active_indexer_test.go @@ -1,4 +1,4 @@ -package frac +package frac_test import ( "bytes" @@ -12,6 +12,7 @@ import ( "go.uber.org/zap/zapcore" "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/metric/stopwatch" @@ -76,20 +77,20 @@ func getTestProcessor() *indexer.Processor { func BenchmarkIndexer(b *testing.B) { logger.SetLevel(zapcore.FatalLevel) - idx, stop := NewActiveIndexer(8, 8) + idx, stop := frac.NewActiveIndexer(8, 8) defer stop() allLogs, err := readFileAllAtOnce(filepath.Join(common.TestDataDir, "k8s.logs")) readers := splitLogsToBulks(allLogs, 1000) assert.NoError(b, err) - active := NewActive( + active := frac.NewActive( filepath.Join(b.TempDir(), "test"), idx, storage.NewReadLimiter(1, nil), cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), - &Config{}, + &frac.Config{}, testSkipMaskProvider{}, ) diff --git a/frac/common/seal_params.go b/frac/common/seal_params.go index 05f89696..1d0c7b4a 100644 --- a/frac/common/seal_params.go +++ b/frac/common/seal_params.go @@ -6,8 +6,8 @@ type SealParams struct { TokenListZstdLevel int DocsPositionsZstdLevel int TokenTableZstdLevel int + DocBlocksZstdLevel int // DocBlocksZstdLevel is the zstd compress level of each document block. - DocBlocksZstdLevel int // DocBlocksZstdLevel is the zstd compress level of each document block. - LIDBlockSize int - DocBlockSize int // DocBlockSize is decompressed payload size of document block. + LIDBlockSize int + DocBlockSize int // DocBlockSize is decompressed payload size of document block. } diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index 40024a6e..dd85ae91 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -1,4 +1,4 @@ -package frac +package frac_test import ( "fmt" @@ -14,6 +14,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" "github.com/ozontech/seq-db/indexer" @@ -39,16 +40,16 @@ func TestConcurrentAppendAndQuery(t *testing.T) { fracPath := filepath.Join(tmpDir, "test_fraction") defer testcommon.RemoveDir(fracPath) - activeIndexer, stop := NewActiveIndexer(numIndexWorkers, 1000) + activeIndexer, stop := frac.NewActiveIndexer(numIndexWorkers, 1000) defer stop() - active := NewActive( + active := frac.NewActive( fracPath, activeIndexer, storage.NewReadLimiter(numReaders/2, nil), cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), - &Config{}, + &frac.Config{}, testSkipMaskProvider{}, ) @@ -154,7 +155,7 @@ const ( kafka = "kafka" ) -func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs []*testDoc, fromTime, toTime time.Time, mapping seq.Mapping) { +func readTest(t *testing.T, fraction frac.Fraction, numReaders, numQueries int, docs []*testDoc, fromTime, toTime time.Time, mapping seq.Mapping) { readersGroup, ctx := errgroup.WithContext(t.Context()) type queryFilter func(doc *testDoc) bool @@ -332,7 +333,7 @@ func generatesMessages(numMessages, bulkSize int) ([]*testDoc, [][]string, time. return docs, bulks, fromTime, toTime } -func seal(active *Active) (*Sealed, error) { +func seal(active *frac.Active) (*frac.Sealed, error) { sealParams := common.SealParams{ IDsZstdLevel: 1, LIDsZstdLevel: 1, @@ -343,7 +344,7 @@ func seal(active *Active) (*Sealed, error) { DocBlockSize: 128 * int(units.KiB), LIDBlockSize: 512, } - activeSealingSource, err := NewActiveSealingSource(active, sealParams) + activeSealingSource, err := frac.NewActiveSealingSource(active, sealParams) if err != nil { return nil, err } @@ -352,13 +353,13 @@ func seal(active *Active) (*Sealed, error) { return nil, err } - sealed := NewSealedPreloaded( + sealed := frac.NewSealedPreloaded( active.BaseFileName, preloaded, storage.NewReadLimiter(1, nil), - newIndexCache(), + frac.NewIndexCache(), cache.NewCache[[]byte](nil, nil), - &Config{}, + &frac.Config{}, testSkipMaskProvider{}, ) diff --git a/frac/fraction_test.go b/frac/fraction_test.go index d5fc39ed..b89e5fb2 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1,7 +1,9 @@ -package frac +package frac_test import ( "context" + cryptorand "crypto/rand" + "encoding/hex" "fmt" "math" "math/rand/v2" @@ -21,6 +23,8 @@ import ( "github.com/stretchr/testify/suite" "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/compaction" + "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" "github.com/ozontech/seq-db/indexer" @@ -46,20 +50,20 @@ func (testSkipMaskProvider) RemoveFrac(_ string) {} type FractionTestSuite struct { suite.Suite tmpDir string - config *Config + config *frac.Config mapping seq.Mapping tokenizers map[seq.TokenizerType]tokenizer.Tokenizer - activeIndexer *ActiveIndexer + activeIndexer *frac.ActiveIndexer stopIndexer func() sealParams common.SealParams - fraction Fraction + fraction frac.Fraction insertDocuments func(docs ...[]string) } func (s *FractionTestSuite) SetupSuiteCommon() { - s.activeIndexer, s.stopIndexer = NewActiveIndexer(4, 10) + s.activeIndexer, s.stopIndexer = frac.NewActiveIndexer(4, 10) } func (s *FractionTestSuite) TearDownSuiteCommon() { @@ -67,7 +71,7 @@ func (s *FractionTestSuite) TearDownSuiteCommon() { } func (s *FractionTestSuite) SetupTestCommon() { - s.config = &Config{} + s.config = &frac.Config{} s.tokenizers = map[seq.TokenizerType]tokenizer.Tokenizer{ seq.TokenizerTypeKeyword: tokenizer.NewKeywordTokenizer(20, false, true), seq.TokenizerTypeText: tokenizer.NewTextTokenizer(20, false, true, 100), @@ -115,6 +119,12 @@ func (s *FractionTestSuite) TearDownTestCommon() { s.NoError(err, "Failed to remove tmp dir") } +func randomHex(n int) string { + b := make([]byte, (n+1)/2) + cryptorand.Read(b) + return hex.EncodeToString(b)[:n] +} + func (s *FractionTestSuite) TestSearchKeyword() { docs := []string{ /*0*/ `{"timestamp":"2000-01-01T13:00:25Z","service":"service_a","message":"first message some text","trace_id":"abcdef","source":"prod01","level":"1"}`, @@ -1856,7 +1866,7 @@ func (s *FractionTestSuite) TestMIDDistribution() { s.insertDocuments(docs) - _, ok := s.fraction.(*Active) + _, ok := s.fraction.(*frac.Active) if ok { s.Require().Nil(s.fraction.Info().Distribution, "active fraction has MID distribution") return @@ -1895,15 +1905,15 @@ func (s *FractionTestSuite) TestFractionInfo() { s.Require().Equal(seq.MID(946731654000000000), info.To, "to doesn't match") switch s.fraction.(type) { - case *Active: + case *frac.Active: s.Require().True(info.MetaOnDisk >= uint64(250) && info.MetaOnDisk <= uint64(400), "meta on disk doesn't match. actual value: %d", info.MetaOnDisk) s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match") - case *Sealed: + case *frac.Sealed: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1450), "index on disk doesn't match. actual value: %d", info.IndexOnDisk) - case *Remote: + case *frac.Remote: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1450), "index on disk doesn't match. actual value: %d", info.IndexOnDisk) @@ -2102,9 +2112,10 @@ func (s *FractionTestSuite) AssertHist( } } -func (s *FractionTestSuite) newActive(bulks ...[]string) *Active { - baseName := filepath.Join(s.tmpDir, "test_fraction") - active := NewActive( +func (s *FractionTestSuite) newActive(bulks ...[]string) *frac.Active { + baseName := filepath.Join(s.tmpDir, randomHex(12)) + + active := frac.NewActive( baseName, s.activeIndexer, storage.NewReadLimiter(1, nil), @@ -2148,20 +2159,20 @@ func (s *FractionTestSuite) newActive(bulks ...[]string) *Active { return active } -func (s *FractionTestSuite) newSealed(bulks ...[]string) *Sealed { +func (s *FractionTestSuite) newSealed(bulks ...[]string) *frac.Sealed { active := s.newActive(bulks...) - activeSealingSource, err := NewActiveSealingSource(active, s.sealParams) + activeSealingSource, err := frac.NewActiveSealingSource(active, s.sealParams) s.Require().NoError(err, "Sealing source creation failed") preloaded, err := sealing.Seal(activeSealingSource, s.sealParams) s.Require().NoError(err, "Sealing failed") - sealed := NewSealedPreloaded( + sealed := frac.NewSealedPreloaded( active.BaseFileName, preloaded, storage.NewReadLimiter(1, nil), - newIndexCache(), + frac.NewIndexCache(), cache.NewCache[[]byte](nil, nil), s.config, testSkipMaskProvider{}, @@ -2194,7 +2205,7 @@ func (s *ActiveFractionTestSuite) SetupTest() { } func (s *ActiveFractionTestSuite) TearDownTest() { - if active, ok := s.fraction.(*Active); ok { + if active, ok := s.fraction.(*frac.Active); ok { active.Release() } else { s.Require().Nil(s.fraction, "fraction is not of Active type") @@ -2212,7 +2223,7 @@ ActiveReplayedFractionTestSuite run tests for active fraction which was replayed */ type ActiveReplayedFractionTestSuite struct { FractionTestSuite - originalFrac *Active + originalFrac *frac.Active } func (s *ActiveReplayedFractionTestSuite) SetupSuite() { @@ -2233,26 +2244,29 @@ func (s *ActiveReplayedFractionTestSuite) SetupTest() { } } -func (s *ActiveReplayedFractionTestSuite) Replay(frac *Active) Fraction { - fracFileName := frac.BaseFileName - s.originalFrac = frac - replayedFrac := NewActive( +func (s *ActiveReplayedFractionTestSuite) Replay(f *frac.Active) frac.Fraction { + s.originalFrac = f + fracFileName := f.BaseFileName + + replayedFrac := frac.NewActive( fracFileName, s.activeIndexer, storage.NewReadLimiter(1, nil), cache.NewCache[[]byte](nil, nil), cache.NewCache[[]byte](nil, nil), - &Config{}, + &frac.Config{}, testSkipMaskProvider{}, ) + err := replayedFrac.Replay(context.Background()) s.Require().NoError(err, "replay failed") + return replayedFrac } func (s *ActiveReplayedFractionTestSuite) TearDownTest() { s.originalFrac.Release() - if active, ok := s.fraction.(*Active); ok { + if active, ok := s.fraction.(*frac.Active); ok { active.Release() } else { s.Require().Nil(s.fraction, "fraction is not of Active type") @@ -2287,7 +2301,7 @@ func (s *SealedFractionTestSuite) SetupTest() { } func (s *SealedFractionTestSuite) TearDownTest() { - if sealed, ok := s.fraction.(*Sealed); ok { + if sealed, ok := s.fraction.(*frac.Sealed); ok { sealed.Release() } else { s.Require().Nil(s.fraction, "fraction is not of Sealed type") @@ -2323,7 +2337,7 @@ func (s *SealedLoadedFractionTestSuite) SetupTest() { } func (s *SealedLoadedFractionTestSuite) TearDownTest() { - if sealed, ok := s.fraction.(*Sealed); ok { + if sealed, ok := s.fraction.(*frac.Sealed); ok { sealed.Release() } else { s.Require().Nil(s.fraction, "fraction is not of Sealed type") @@ -2335,14 +2349,14 @@ func (s *SealedLoadedFractionTestSuite) TearDownSuite() { s.TearDownSuiteCommon() } -func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *Sealed { +func (s *SealedLoadedFractionTestSuite) newSealedLoaded(bulks ...[]string) *frac.Sealed { sealed := s.newSealed(bulks...) sealed.Release() - sealed = NewSealed( + sealed = frac.NewSealed( sealed.BaseFileName, storage.NewReadLimiter(1, nil), - newIndexCache(), + frac.NewIndexCache(), cache.NewCache[[]byte](nil, nil), nil, s.config, @@ -2399,13 +2413,13 @@ func (s *RemoteFractionTestSuite) SetupTest() { s.Require().NoError(err, "offload failed") s.Require().True(offloaded, "didn't offload frac") - remoteFrac := NewRemote( + remoteFrac := frac.NewRemote( context.Background(), sealed.BaseFileName, storage.NewReadLimiter(1, nil), - newIndexCache(), + frac.NewIndexCache(), cache.NewCache[[]byte](nil, nil), - sealed.info, + sealed.Info(), s.config, s3cli, testSkipMaskProvider{}, @@ -2417,7 +2431,7 @@ func (s *RemoteFractionTestSuite) SetupTest() { } func (s *RemoteFractionTestSuite) TearDownTest() { - if remote, ok := s.fraction.(*Remote); ok { + if remote, ok := s.fraction.(*frac.Remote); ok { remote.Suicide() } else { s.Require().Nil(s.fraction, "fraction is not of Remote type") @@ -2431,6 +2445,113 @@ func (s *RemoteFractionTestSuite) TearDownSuite() { s.s3server.Close() } +type CompactedFractionTestSuite struct { + FractionTestSuite +} + +func (s *CompactedFractionTestSuite) SetupSuite() { + s.SetupSuiteCommon() +} + +func (s *CompactedFractionTestSuite) SetupTest() { + s.SetupTestCommon() + + s.insertDocuments = func(bulks ...[]string) { + if s.fraction != nil { + s.Require().Fail("can insert docs only once") + } + s.fraction = s.newCompacted(bulks...) + } +} + +func (s *CompactedFractionTestSuite) TearDownTest() { + if sealed, ok := s.fraction.(*frac.Sealed); ok { + sealed.Release() + } else { + s.Require().Nil(s.fraction, "fraction is not of Sealed type") + } + s.TearDownTestCommon() +} + +func (s *CompactedFractionTestSuite) TearDownSuite() { + s.TearDownSuiteCommon() +} + +// newCompacted flattens all bulks into one doc list, splits it in half, +// seals each half as a separate fraction, and merges them with compaction.Merge. +func (s *CompactedFractionTestSuite) newCompacted(bulks ...[]string) *frac.Sealed { + // Flatten all documents because we are going to reorganize it. + var docs []string + for _, b := range bulks { + docs = append(docs, b...) + } + + var ( + reorganized [][]string + bulkSize = max(len(docs)/32, 1) + ) + + for i := 0; i < len(docs); i += bulkSize { + reorganized = append( + reorganized, + docs[i:min(i+bulkSize, len(docs))], + ) + } + + merged := s.newSealed(reorganized[0]) + for i, bulk := range reorganized[1:] { + current := s.newSealed(bulk) + + mergedBase := filepath.Join( + s.tmpDir, + fmt.Sprintf("merged-%d", i), + ) + + preloaded, err := compaction.Merge( + mergedBase, s.sealParams, + frac.NewSealedSource(merged), + frac.NewSealedSource(current), + ) + + s.Require().NoError(err) + merged = frac.NewSealedPreloaded( + mergedBase, + preloaded, + storage.NewReadLimiter(1, nil), + frac.NewIndexCache(), + cache.NewCache[[]byte](nil, nil), + s.config, + testSkipMaskProvider{}, + ) + } + + return merged +} + +// TestFractionInfo overrides the base test because DocsOnDisk is larger in a +// merged fraction (sum of two source docs files) and MIDsDistribution is not +// populated by compaction.Merge. +func (s *CompactedFractionTestSuite) TestFractionInfo() { + docs := []string{ + `{"timestamp":"2000-01-01T13:00:25Z","service":"service_a","message":"first message some text", "container":"gateway"}`, + `{"timestamp":"2000-01-01T13:00:32Z","service":"service_b","message":"second message other text", "container":"kube-proxy"}`, + `{"timestamp":"2000-01-01T13:00:43Z","service":"service_c","message":"third message other text", "container":"gateway"}`, + `{"timestamp":"2000-01-01T13:00:53Z","service":"service_a","message":"fourth message some text", "container":"kube-proxy"}`, + `{"timestamp":"2000-01-01T13:00:54Z","service":"service_c","message":"apple","container":"kube-scheduler"}`, + } + + s.insertDocuments(docs) + + info := s.fraction.Info() + + s.Require().Equal(uint32(5), info.DocsTotal, "doc total doesn't match") + s.Require().Equal(uint64(583), info.DocsRaw, "doc raw doesn't match") + s.Require().Equal(seq.MID(946731625000000000), info.From, "from doesn't match") + s.Require().Equal(seq.MID(946731654000000000), info.To, "to doesn't match") + s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match") + s.Require().True(info.IndexOnDisk > 0, "index on disk should be non-zero") +} + func TestActiveFractionTestSuite(t *testing.T) { suite.Run(t, new(ActiveFractionTestSuite)) } @@ -2450,3 +2571,7 @@ func TestSealedLoadedFractionTestSuite(t *testing.T) { func TestRemoteFractionTestSuite(t *testing.T) { suite.Run(t, new(RemoteFractionTestSuite)) } + +func TestCompactedFractionTestSuite(t *testing.T) { + suite.Run(t, new(CompactedFractionTestSuite)) +} diff --git a/frac/index_cache.go b/frac/index_cache.go index 043e8c5c..f270f209 100644 --- a/frac/index_cache.go +++ b/frac/index_cache.go @@ -7,7 +7,7 @@ import ( "github.com/ozontech/seq-db/frac/sealed/token" ) -func newIndexCache() *IndexCache { +func NewIndexCache() *IndexCache { return &IndexCache{ LegacyRegistry: cache.NewCache[[]byte](nil, nil), diff --git a/fracmanager/config.go b/fracmanager/config.go index e295aada..6bc8acf2 100644 --- a/fracmanager/config.go +++ b/fracmanager/config.go @@ -35,6 +35,8 @@ type Config struct { OffloadingQueueSize uint64 OffloadingRetention time.Duration OffloadingRetryDelay time.Duration + + CompactionEnabled bool } func FillConfigWithDefault(config *Config) *Config { diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index 81960502..bdd411a1 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -13,6 +13,7 @@ import ( "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/sealed" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/storage/s3" @@ -116,6 +117,12 @@ func (cs *CompactionSnapshot) Destroy() { } } +func (fm *FracManager) FractionName() string { + filePath := fileBasePattern + fm.lc.provider.nextFractionID() + baseFilePath := filepath.Join(fm.lc.provider.config.DataDir, filePath) + return baseFilePath +} + func (fm *FracManager) SealedFractionsSnapshot() []*frac.Sealed { return fm.lc.registry.sealedSnapshot() } @@ -128,8 +135,11 @@ func (fm *FracManager) ClaimForCompaction(names []string) (*CompactionSnapshot, return &CompactionSnapshot{claimed: claimed}, nil } -func (fm *FracManager) SubstituteWithSealed(produced *frac.Sealed, snapshot *CompactionSnapshot) { - fm.lc.registry.substituteWithSealed(produced, snapshot.claimed...) +func (fm *FracManager) SubstituteWithSealed(produced *sealed.PreloadedData, snapshot *CompactionSnapshot) { + fm.lc.registry.substituteWithSealed( + fm.lc.provider.NewSealedPreloaded(produced.Info.Path, produced), + snapshot.claimed..., + ) } func (fm *FracManager) AcquireFraction(name string) (frac.Fraction, func(), bool) { diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index 0445d8cc..143616d3 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -5,6 +5,7 @@ import ( "io" "math/rand" "path/filepath" + "sync" "time" "github.com/RoaringBitmap/roaring/v2" @@ -39,8 +40,10 @@ type fractionProvider struct { cacheProvider *CacheMaintainer // Cache provider for data access optimization activeIndexer *frac.ActiveIndexer // Indexer for active fractions readLimiter *storage.ReadLimiter // Read rate limiter - ulidEntropy io.Reader // Entropy source for ULID generation skipMaskProvider skipMaskProvider + + mu sync.Mutex + ulidEntropy io.Reader // Entropy source for ULID generation } func newFractionProvider( @@ -115,6 +118,8 @@ func (fp *fractionProvider) NewRemote(ctx context.Context, name string, cachedIn // IMPORTANT: This method is not thread-safe. When used in concurrent environments, // external synchronization must be provided to avoid ID collisions func (fp *fractionProvider) nextFractionID() string { + fp.mu.Lock() + defer fp.mu.Unlock() return ulid.MustNew(ulid.Timestamp(time.Now()), fp.ulidEntropy).String() } @@ -136,7 +141,21 @@ func (fp *fractionProvider) Seal(a *frac.Active) (*frac.Sealed, error) { if err != nil { return nil, err } - preloaded, err := sealing.Seal(src, fp.config.SealParams) + + params := fp.config.SealParams + // NOTE(dkharms): If compaction is enabled we do not want to waste CPU on compression. + // + // Sealed fractions will be picked up by compaction workers almost instantly, + // and that will trigger compression again. + if fp.config.CompactionEnabled { + params = common.SealParams{ + DocBlocksZstdLevel: params.DocBlocksZstdLevel, + LIDBlockSize: params.LIDBlockSize, + DocBlockSize: params.DocBlockSize, + } + } + + preloaded, err := sealing.Seal(src, params) if err != nil { return nil, err } diff --git a/indexwriter/index.go b/indexwriter/index.go index c28c7b6e..78144ff3 100644 --- a/indexwriter/index.go +++ b/indexwriter/index.go @@ -205,10 +205,15 @@ func newIndexBlock(raw []byte) indexBlock { } func (s *IndexWriter) newIndexBlockZSTD(raw []byte, level int) indexBlock { + if level <= 0 { + return newIndexBlock(raw) + } + s.buf2 = zstd.CompressLevel(raw, s.buf2[:0], level) if len(s.buf2) < len(raw) { return indexBlock{codec: storage.CodecZSTD, rawLen: uint32(len(raw)), payload: s.buf2} } + return newIndexBlock(raw) } diff --git a/storeapi/store.go b/storeapi/store.go index dd53079e..8cdd76b0 100644 --- a/storeapi/store.go +++ b/storeapi/store.go @@ -8,6 +8,7 @@ import ( "go.uber.org/atomic" + "github.com/ozontech/seq-db/compaction" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/logger" @@ -30,7 +31,8 @@ type Store struct { FracManager *fracmanager.FracManager fracManagerStop func() - SkipMaskManager *skipmaskmanager.SkipMaskManager + SkipMaskManager *skipmaskmanager.SkipMaskManager + CompactionExecutor *compaction.Executor isStopped atomic.Bool } @@ -39,6 +41,7 @@ type StoreConfig struct { API APIConfig FracManager fracmanager.Config SkipMaskManagerConfig skipmaskmanager.Config + Compaction compaction.Config } func (c *StoreConfig) setDefaults() error { @@ -66,23 +69,26 @@ func NewStore( } skipMaskManager := skipmaskmanager.New(ctx, c.SkipMaskManagerConfig, skipMaskParams, mappingProvider) - fracManager, stop, err := fracmanager.New(ctx, &c.FracManager, s3cli, skipMaskManager) if err != nil { return nil, fmt.Errorf("loading fractions error: %w", err) } + planner := compaction.NewPlanner(ctx, fracManager, c.Compaction) + executor := compaction.NewExecutor(c.Compaction.Workers, c.FracManager.SealParams, planner) + skipMaskManager.Start(fracManager) return &Store{ Config: c, // We will set grpcAddr later in Start() - grpcAddr: "", - grpcServer: newGRPCServer(c.API, fracManager, mappingProvider), - FracManager: fracManager, - fracManagerStop: stop, - SkipMaskManager: skipMaskManager, - isStopped: atomic.Bool{}, + grpcAddr: "", + grpcServer: newGRPCServer(c.API, fracManager, mappingProvider), + FracManager: fracManager, + fracManagerStop: stop, + SkipMaskManager: skipMaskManager, + CompactionExecutor: executor, + isStopped: atomic.Bool{}, }, nil } @@ -107,6 +113,7 @@ func (s *Store) Stop() { s.grpcServer.Stop(ctx) s.fracManagerStop() s.SkipMaskManager.Stop() + s.CompactionExecutor.Stop() logger.Info("store stopped") }