Skip to content

Commit

Permalink
Streaming remote read (grafana#1735)
Browse files Browse the repository at this point in the history
* implement read v2

* updated CHANGELOG.md

* extend maxBytesInFram comment.

* addressed PR feedback

* addressed PR feedback

* addressed PR feedback

* use indexed xor chunk function to assert stream remote read tests

* updated CHANGELOG.md

Co-authored-by: Miguel Ángel Ortuño <[email protected]>
  • Loading branch information
ortuman and Miguel Ángel Ortuño authored Apr 29, 2022
1 parent 62148b2 commit f33dde0
Show file tree
Hide file tree
Showing 7 changed files with 1,925 additions and 274 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [CHANGE] Store-gateway: enabled attributes in-memory cache by default. New default configuration is `-blocks-storage.bucket-store.chunks-cache.attributes-in-memory-max-items=50000`. #1727
* [CHANGE] Compactor: Removed the metric `cortex_compactor_garbage_collected_blocks_total` since it duplicates `cortex_compactor_blocks_marked_for_deletion_total`. #1728
* [CHANGE] All: Logs that used the`org_id` label now use `user` label. #1634 #1758
* [FEATURE] Querier: Added support for [streaming remote read](https://prometheus.io/blog/2019/10/10/remote-read-meets-streaming/). Should be noted that benefits of chunking the response are partial here, since in a typical `query-frontend` setup responses will be buffered until they've been completed. #1735
* [FEATURE] Ruler: Allow setting `evaluation_delay` for each rule group via rules group configuration file. #1474
* [FEATURE] Ruler: Added support for expression remote evaluation. #1536
* The following CLI flags (and their respective YAML config options) have been added:
Expand Down
120 changes: 119 additions & 1 deletion integration/querier_remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ package integration
import (
"bytes"
"context"
"io"
"io/ioutil"
"math/rand"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -82,7 +84,7 @@ func TestQuerierRemoteRead(t *testing.T) {

req := &prompb.ReadRequest{
Queries: []*prompb.Query{q},
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_SAMPLES},
}

data, err := proto.Marshal(req)
Expand Down Expand Up @@ -125,3 +127,119 @@ func TestQuerierRemoteRead(t *testing.T) {
require.Equal(t, int64(expectedVectors[0].Timestamp), resp.Results[0].Timeseries[0].Samples[0].Timestamp)
require.Equal(t, float64(expectedVectors[0].Value), resp.Results[0].Timeseries[0].Samples[0].Value)
}

func TestQuerierStreamingRemoteRead(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-distributor.ingestion-rate-limit": "1048576",
"-distributor.ingestion-burst-size": "1048576",
})

// Start dependencies.
minio := e2edb.NewMinio(9000, blocksBucketName)

consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(minio, consul))

// Start Mimir components for the write path.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester))

// Wait until the distributor has updated the ring.
// The distributor should have 512 tokens for the ingester ring and 1 for the distributor ring
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512+1), "cortex_ring_tokens_total"))

querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), BlocksStorageFlags())
require.NoError(t, s.StartAndWaitReady(querier))

// Wait until the querier has updated the ring.
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Push a series to Mimir.
now := time.Now()

c, err := e2emimir.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

// Generate the series
startMs := now.Add(-time.Minute).Unix() * 1000
endMs := now.Add(time.Minute).Unix() * 1000

var samples []prompb.Sample
for i := startMs; i < endMs; i++ {
samples = append(samples, prompb.Sample{
Value: rand.Float64(),
Timestamp: i,
})
}

var series []prompb.TimeSeries
series = append(series, prompb.TimeSeries{
Labels: []prompb.Label{
{Name: labels.MetricName, Value: "series_1"},
},
Samples: samples,
})

res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

matcher, err := labels.NewMatcher(labels.MatchEqual, "__name__", "series_1")
require.NoError(t, err)

q, err := remote.ToQuery(startMs, endMs, []*labels.Matcher{matcher}, &storage.SelectHints{
Step: 1,
Start: startMs,
End: endMs,
})
require.NoError(t, err)

req := &prompb.ReadRequest{
Queries: []*prompb.Query{q},
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
}

data, err := proto.Marshal(req)
require.NoError(t, err)
compressed := snappy.Encode(nil, data)

// Call the remote read API endpoint with a timeout.
httpReqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

httpReq, err := http.NewRequestWithContext(httpReqCtx, "POST", "http://"+querier.HTTPEndpoint()+"/prometheus/api/v1/read", bytes.NewReader(compressed))
require.NoError(t, err)
httpReq.Header.Set("X-Scope-OrgID", "user-1")
httpReq.Header.Set("User-Agent", "Prometheus/1.8.2")
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")

httpResp, err := http.DefaultClient.Do(httpReq)
require.NoError(t, err)
require.Equal(t, http.StatusOK, httpResp.StatusCode)

// Fetch streaming response
stream := remote.NewChunkedReader(httpResp.Body, remote.DefaultChunkedReadLimit, nil)

results := []prompb.ChunkedReadResponse{}
for {
var res prompb.ChunkedReadResponse
err := stream.NextProto(&res)
if err == io.EOF {
break
}
require.NoError(t, err)
results = append(results, res)
}

// Validate the returned remote read data
require.Len(t, results, 1)
require.Len(t, results[0].ChunkedSeries, 1)
require.Len(t, results[0].ChunkedSeries[0].Labels, 1)
require.Equal(t, "series_1", results[0].ChunkedSeries[0].Labels[0].GetValue())
require.True(t, len(results[0].ChunkedSeries[0].Chunks) > 0)
}
Loading

0 comments on commit f33dde0

Please sign in to comment.