Skip to content

Commit

Permalink
Allow to customize BucketStore query gate (thanos-io#2798)
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Jun 23, 2020
1 parent b17ed32 commit 686c49b
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 27 deletions.
15 changes: 14 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
Expand Down Expand Up @@ -276,16 +278,27 @@ func runStore(
return errors.Wrap(err, "meta fetcher")
}

// Limit the concurrency on queries against the Thanos store.
if maxConcurrency < 0 {
return errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrency)
}

queriesGate := gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg)).NewGate(maxConcurrency)
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_bucket_store_queries_concurrent_max",
Help: "Number of maximum concurrent queries.",
}).Set(float64(maxConcurrency))

bs, err := store.NewBucketStore(
logger,
reg,
bkt,
metaFetcher,
dataDir,
indexCache,
queriesGate,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrency,
verbose,
blockSyncConcurrency,
filterConf,
Expand Down
32 changes: 11 additions & 21 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
Expand Down Expand Up @@ -101,7 +100,6 @@ type bucketStoreMetrics struct {
resultSeriesCount prometheus.Summary
chunkSizeBytes prometheus.Histogram
queriesDropped prometheus.Counter
queriesLimit prometheus.Gauge
seriesRefetches prometheus.Counter

cachedPostingsCompressions *prometheus.CounterVec
Expand Down Expand Up @@ -184,10 +182,6 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Name: "thanos_bucket_store_queries_dropped_total",
Help: "Number of queries that were dropped due to the sample limit.",
})
m.queriesLimit = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_bucket_store_queries_concurrent_max",
Help: "Number of maximum concurrent queries.",
})
m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_series_refetches_total",
Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize),
Expand Down Expand Up @@ -273,9 +267,9 @@ func NewBucketStore(
fetcher block.MetadataFetcher,
dir string,
indexCache storecache.IndexCache,
queryGate gate.Gate,
maxChunkPoolBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
filterConfig *FilterConfig,
Expand All @@ -288,10 +282,6 @@ func NewBucketStore(
logger = log.NewNopLogger()
}

if maxConcurrent < 0 {
return nil, errors.Errorf("max concurrency value cannot be lower than 0 (got %v)", maxConcurrent)
}

chunkPool, err := pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes)
if err != nil {
return nil, errors.Wrap(err, "create chunk pool")
Expand All @@ -310,7 +300,7 @@ func NewBucketStore(
debugLogging: debugLogging,
blockSyncConcurrency: blockSyncConcurrency,
filterConfig: filterConfig,
queryGate: gate.NewKeeper(extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg)).NewGate(maxConcurrent),
queryGate: queryGate,
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
enableCompatibilityLabel: enableCompatibilityLabel,
Expand All @@ -324,8 +314,6 @@ func NewBucketStore(
return nil, errors.Wrap(err, "create dir")
}

s.metrics.queriesLimit.Set(float64(maxConcurrent))

return s, nil
}

Expand Down Expand Up @@ -844,14 +832,16 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill

// Series implements the storepb.StoreServer interface.
func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) {
tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) {
err = s.queryGate.Start(srv.Context())
})
if err != nil {
return errors.Wrapf(err, "failed to wait for turn")
}
if s.queryGate != nil {
tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) {
err = s.queryGate.Start(srv.Context())
})
if err != nil {
return errors.Wrapf(err, "failed to wait for turn")
}

defer s.queryGate.Done()
defer s.queryGate.Done()
}

matchers, err := promclient.TranslateMatchers(req.Matchers)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
metaFetcher,
dir,
s.cache,
nil,
0,
maxSampleCount,
20,
false,
20,
filterConf,
Expand Down
8 changes: 4 additions & 4 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,9 @@ func TestBucketStore_Info(t *testing.T) {
nil,
dir,
noopCache{},
nil,
2e5,
0,
0,
false,
20,
allowAllFilterConf,
Expand Down Expand Up @@ -825,9 +825,9 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
metaFetcher,
dir,
noopCache{},
nil,
0,
0,
99,
false,
20,
allowAllFilterConf,
Expand Down Expand Up @@ -1776,9 +1776,9 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
fetcher,
tmpDir,
indexCache,
nil,
1000000,
10000,
10,
false,
10,
nil,
Expand Down Expand Up @@ -1885,9 +1885,9 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
fetcher,
tmpDir,
indexCache,
nil,
1000000,
10000,
10,
false,
10,
nil,
Expand Down

0 comments on commit 686c49b

Please sign in to comment.