Skip to content

Commit

Permalink
Refactoring of the Loki storage (grafana#5833)
Browse files Browse the repository at this point in the history
* Extract index interface from the series store

* moving package around

* Moving more things around

Signed-off-by: Cyril Tovena <[email protected]>

* Moving more things around

Signed-off-by: Cyril Tovena <[email protected]>

* moving package around

* Moving more things around again

Signed-off-by: Cyril Tovena <[email protected]>

* Moving more things around again

Signed-off-by: Cyril Tovena <[email protected]>

* improve composite store for more generics index store

* fixes first round of tests and ensure coverage is still good

* implement series method

* Fixes storage tests

Signed-off-by: Cyril Tovena <[email protected]>

* Add test for GetSeries

Signed-off-by: Cyril Tovena <[email protected]>

* Make Loki compile

Signed-off-by: Cyril Tovena <[email protected]>

* lint the whole project

Signed-off-by: Cyril Tovena <[email protected]>

* Fixes tests

Signed-off-by: Cyril Tovena <[email protected]>

* tsdb util package usage

Signed-off-by: Cyril Tovena <[email protected]>

* Add more information of why SetChunkFilterer exists for the index

Signed-off-by: Cyril Tovena <[email protected]>

* Fixes ruler/base package

Signed-off-by: Cyril Tovena <[email protected]>

* Fixes o2

Signed-off-by: Cyril Tovena <[email protected]>
  • Loading branch information
cyriltovena authored Apr 8, 2022
1 parent 46b552d commit 8f02495
Show file tree
Hide file tree
Showing 232 changed files with 5,857 additions and 9,188 deletions.
29 changes: 10 additions & 19 deletions cmd/migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/grafana/loki/pkg/loki"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/cfg"
util_log "github.com/grafana/loki/pkg/util/log"
Expand Down Expand Up @@ -93,26 +93,17 @@ func main() {
}
// Create a new registerer to avoid registering duplicate metrics
prometheus.DefaultRegisterer = prometheus.NewRegistry()
clientMetrics := chunk_storage.NewClientMetrics()
sourceStore, err := chunk_storage.NewStore(sourceConfig.StorageConfig.Config, sourceConfig.ChunkStoreConfig.StoreConfig, sourceConfig.SchemaConfig.SchemaConfig, limits, clientMetrics, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
log.Println("Failed to create source store:", err)
os.Exit(1)
}
s, err := storage.NewStore(sourceConfig.StorageConfig, sourceConfig.SchemaConfig, sourceStore, prometheus.DefaultRegisterer)
clientMetrics := storage.NewClientMetrics()
s, err := storage.NewStore(sourceConfig.StorageConfig, sourceConfig.ChunkStoreConfig, sourceConfig.SchemaConfig, limits, clientMetrics, prometheus.DefaultRegisterer, util_log.Logger)
if err != nil {
log.Println("Failed to create source store:", err)
os.Exit(1)
}

// Create a new registerer to avoid registering duplicate metrics
prometheus.DefaultRegisterer = prometheus.NewRegistry()
destStore, err := chunk_storage.NewStore(destConfig.StorageConfig.Config, destConfig.ChunkStoreConfig.StoreConfig, destConfig.SchemaConfig.SchemaConfig, limits, clientMetrics, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
log.Println("Failed to create destination store:", err)
os.Exit(1)
}
d, err := storage.NewStore(destConfig.StorageConfig, destConfig.SchemaConfig, destStore, prometheus.DefaultRegisterer)

d, err := storage.NewStore(destConfig.StorageConfig, destConfig.ChunkStoreConfig, destConfig.SchemaConfig, limits, clientMetrics, prometheus.DefaultRegisterer, util_log.Logger)
if err != nil {
log.Println("Failed to create destination store:", err)
os.Exit(1)
Expand Down Expand Up @@ -174,7 +165,7 @@ func main() {
log.Printf("With a shard duration of %v, %v ranges have been calculated.\n", shardByNs, len(syncRanges))

// Pass dest schema config, the destination determines the new chunk external keys using potentially a different schema config.
cm := newChunkMover(ctx, destConfig.SchemaConfig.SchemaConfig, s, d, *source, *dest, matchers, *batch)
cm := newChunkMover(ctx, destConfig.SchemaConfig, s, d, *source, *dest, matchers, *batch)
syncChan := make(chan *syncRange)
errorChan := make(chan error)
statsChan := make(chan stats)
Expand Down Expand Up @@ -267,7 +258,7 @@ type stats struct {

type chunkMover struct {
ctx context.Context
schema chunk.SchemaConfig
schema config.SchemaConfig
source storage.Store
dest storage.Store
sourceUser string
Expand All @@ -276,7 +267,7 @@ type chunkMover struct {
batch int
}

func newChunkMover(ctx context.Context, s chunk.SchemaConfig, source, dest storage.Store, sourceUser, destUser string, matchers []*labels.Matcher, batch int) *chunkMover {
func newChunkMover(ctx context.Context, s config.SchemaConfig, source, dest storage.Store, sourceUser, destUser string, matchers []*labels.Matcher, batch int) *chunkMover {
cm := &chunkMover{
ctx: ctx,
schema: s,
Expand Down Expand Up @@ -325,10 +316,10 @@ func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh <

// FetchChunks requires chunks to be ordered by external key.
sort.Slice(chunks, func(x, y int) bool {
return m.schema.ExternalKey(chunks[x]) < m.schema.ExternalKey(chunks[y])
return m.schema.ExternalKey(chunks[x].ChunkRef) < m.schema.ExternalKey(chunks[y].ChunkRef)
})
for _, chk := range chunks {
key := m.schema.ExternalKey(chk)
key := m.schema.ExternalKey(chk.ChunkRef)
keys = append(keys, key)
chks = append(chks, chk)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/canary/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func NewReader(writer io.Writer,
labelVal string,
streamName string,
streamValue string,
interval time.Duration) *Reader {
interval time.Duration,
) *Reader {
h := http.Header{}
if user != "" {
h = http.Header{"Authorization": {"Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))}}
Expand Down Expand Up @@ -441,11 +442,11 @@ func (r *Reader) closeAndReconnect() {
func parseResponse(entry *loghttp.Entry) (*time.Time, error) {
sp := strings.Split(entry.Line, " ")
if len(sp) != 2 {
return nil, errors.Errorf("received invalid entry: %s\n", entry.Line)
return nil, errors.Errorf("received invalid entry: %s", entry.Line)
}
ts, err := strconv.ParseInt(sp[0], 10, 64)
if err != nil {
return nil, errors.Errorf("failed to parse timestamp: %s\n", sp[0])
return nil, errors.Errorf("failed to parse timestamp: %s", sp[0])
}
t := time.Unix(0, ts)
return &t, nil
Expand Down
26 changes: 13 additions & 13 deletions pkg/chunkenc/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ import (

"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/storage/chunk/encoding"
"github.com/grafana/loki/pkg/storage/chunk"
)

// GzipLogChunk is a cortex encoding type for our chunks.
// Deprecated: the chunk encoding/compression format is inside the chunk data.
const GzipLogChunk = encoding.Encoding(128)
const GzipLogChunk = chunk.Encoding(128)

// LogChunk is a cortex encoding type for our chunks.
const LogChunk = encoding.Encoding(129)
const LogChunk = chunk.Encoding(129)

func init() {
encoding.MustRegisterEncoding(GzipLogChunk, "GzipLogChunk", func() encoding.Chunk {
chunk.MustRegisterEncoding(GzipLogChunk, "GzipLogChunk", func() chunk.Data {
return &Facade{}
})
encoding.MustRegisterEncoding(LogChunk, "LogChunk", func() encoding.Chunk {
chunk.MustRegisterEncoding(LogChunk, "LogChunk", func() chunk.Data {
return &Facade{}
})
}
Expand All @@ -29,19 +29,19 @@ type Facade struct {
c Chunk
blockSize int
targetSize int
encoding.Chunk
chunk.Data
}

// NewFacade makes a new Facade.
func NewFacade(c Chunk, blockSize, targetSize int) encoding.Chunk {
func NewFacade(c Chunk, blockSize, targetSize int) chunk.Data {
return &Facade{
c: c,
blockSize: blockSize,
targetSize: targetSize,
}
}

// Marshal implements encoding.Chunk.
// Marshal implements chunk.Chunk.
func (f Facade) Marshal(w io.Writer) error {
if f.c == nil {
return nil
Expand All @@ -52,15 +52,15 @@ func (f Facade) Marshal(w io.Writer) error {
return nil
}

// UnmarshalFromBuf implements encoding.Chunk.
// UnmarshalFromBuf implements chunk.Chunk.
func (f *Facade) UnmarshalFromBuf(buf []byte) error {
var err error
f.c, err = NewByteChunk(buf, f.blockSize, f.targetSize)
return err
}

// Encoding implements encoding.Chunk.
func (Facade) Encoding() encoding.Encoding {
// Encoding implements chunk.Chunk.
func (Facade) Encoding() chunk.Encoding {
return LogChunk
}

Expand All @@ -86,7 +86,7 @@ func (f Facade) LokiChunk() Chunk {
return f.c
}

func (f Facade) Rebound(start, end model.Time) (encoding.Chunk, error) {
func (f Facade) Rebound(start, end model.Time) (chunk.Data, error) {
newChunk, err := f.c.Rebound(start.Time(), end.Time())
if err != nil {
return nil, err
Expand All @@ -97,7 +97,7 @@ func (f Facade) Rebound(start, end model.Time) (encoding.Chunk, error) {
}

// UncompressedSize is a helper function to hide the type assertion kludge when wanting the uncompressed size of the Cortex interface encoding.Chunk.
func UncompressedSize(c encoding.Chunk) (int, bool) {
func UncompressedSize(c chunk.Data) (int, bool) {
f, ok := c.(*Facade)

if !ok || f.c == nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage/chunk/encoding"
"github.com/grafana/loki/pkg/storage/chunk"
util_log "github.com/grafana/loki/pkg/util/log"
)

Expand Down Expand Up @@ -937,7 +937,7 @@ func (c *MemChunk) Rebound(start, end time.Time) (Chunk, error) {
}

if newChunk.Size() == 0 {
return nil, encoding.ErrSliceNoDataInRange
return nil, chunk.ErrSliceNoDataInRange
}

if err := newChunk.Close(); err != nil {
Expand Down
5 changes: 2 additions & 3 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/chunk/encoding"

"github.com/grafana/loki/pkg/chunkenc/testdata"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage/chunk"
)

var testEncoding = []Encoding{
Expand Down Expand Up @@ -1176,7 +1175,7 @@ func TestMemChunk_Rebound(t *testing.T) {
},
{
name: "slice out of bounds without overlap",
err: encoding.ErrSliceNoDataInRange,
err: chunk.ErrSliceNoDataInRange,
sliceFrom: chkThrough.Add(time.Minute),
sliceTo: chkThrough.Add(time.Hour),
},
Expand Down
9 changes: 5 additions & 4 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ import (
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/validation"
)

Expand Down Expand Up @@ -332,17 +333,17 @@ func (s *testStore) SelectSamples(ctx context.Context, req logql.SelectSamplePar
return nil, nil
}

func (s *testStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
func (s *testStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
return nil, nil, nil
}

func (s *testStore) GetSchemaConfigs() []chunk.PeriodConfig {
func (s *testStore) GetSchemaConfigs() []config.PeriodConfig {
return nil
}

func (s *testStore) Stop() {}

func (s *testStore) SetChunkFilterer(_ storage.RequestChunkFilterer) {}
func (s *testStore) SetChunkFilterer(_ chunk.RequestChunkFilterer) {}

func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]logproto.Stream {
userIDs := []string{"1", "2", "3"}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ingester/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/series"
)

const DefaultIndexShards = 32
Expand Down Expand Up @@ -288,9 +288,9 @@ func (shard *indexShard) lookup(matchers []*labels.Matcher) []model.Fingerprint
if matcher.Type == labels.MatchEqual {
fps := values.fps[matcher.Value]
toIntersect = append(toIntersect, fps.fps...) // deliberate copy
} else if matcher.Type == labels.MatchRegexp && len(chunk.FindSetMatches(matcher.Value)) > 0 {
} else if matcher.Type == labels.MatchRegexp && len(series.FindSetMatches(matcher.Value)) > 0 {
// The lookup is of the form `=~"a|b|c|d"`
set := chunk.FindSetMatches(matcher.Value)
set := series.FindSetMatches(matcher.Value)
for _, value := range set {
toIntersect = append(toIntersect, values.fps[value].fps...)
}
Expand Down Expand Up @@ -329,7 +329,7 @@ func (shard *indexShard) allFPs() model.Fingerprints {
}

var result model.Fingerprints
var m = map[model.Fingerprint]struct{}{}
m := map[model.Fingerprint]struct{}{}
for _, fp := range fps {
if _, ok := m[fp]; !ok {
m[fp] = struct{}{}
Expand Down
Loading

0 comments on commit 8f02495

Please sign in to comment.