Skip to content

Commit

Permalink
WIP: engine work
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldix committed Oct 6, 2015
1 parent 12ea1cb commit 7555ccb
Show file tree
Hide file tree
Showing 12 changed files with 1,517 additions and 22 deletions.
2 changes: 2 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
tsdbStore := tsdb.NewStore(c.Data.Dir)
tsdbStore.EngineOptions.Config = c.Data

runtime.GOMAXPROCS(runtime.NumCPU())

s := &Server{
buildInfo: *buildInfo,
err: make(chan error),
Expand Down
2 changes: 1 addition & 1 deletion services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, statMap
mux: pat.New(),
requireAuthentication: requireAuthentication,
Logger: log.New(os.Stderr, "[http] ", log.LstdFlags),
loggingEnabled: loggingEnabled,
loggingEnabled: false,
WriteTrace: writeTrace,
statMap: statMap,
}
Expand Down
17 changes: 16 additions & 1 deletion tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Engine interface {
Close() error

SetLogOutput(io.Writer)
LoadMetadataIndex(index *DatabaseIndex, measurementFields map[string]*MeasurementFields) error
LoadMetadataIndex(shard *Shard, index *DatabaseIndex, measurementFields map[string]*MeasurementFields) error

Begin(writable bool) (Tx, error)
WritePoints(points []models.Point, measurementFieldsToSave map[string]*MeasurementFields, seriesToCreate []*SeriesCreate) error
Expand Down Expand Up @@ -60,6 +60,21 @@ func NewEngine(path string, walPath string, options EngineOptions) (Engine, erro
// Only bolt-based backends are currently supported so open it and check the format.
var format string
if err := func() error {
// if it's a dir then it's a pd1 engine
f, err := os.Open(path)
if err != nil {
return err
}
fi, err := f.Stat()
f.Close()
if err != nil {
return err
}
if fi.Mode().IsDir() {
format = "pd1"
return nil
}

db, err := bolt.Open(path, 0666, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/b1/b1.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (e *Engine) close() error {
func (e *Engine) SetLogOutput(w io.Writer) { e.LogOutput = w }

// LoadMetadataIndex loads the shard metadata into memory.
func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
return e.db.View(func(tx *bolt.Tx) error {
// load measurement metadata
meta := tx.Bucket([]byte("fields"))
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/bz1/bz1.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (e *Engine) close() error {
func (e *Engine) SetLogOutput(w io.Writer) {}

// LoadMetadataIndex loads the shard metadata into memory.
func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error {
if err := e.db.View(func(tx *bolt.Tx) error {
// Load measurement metadata
fields, err := e.readFields(tx)
Expand Down
4 changes: 2 additions & 2 deletions tsdb/engine/bz1/bz1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestEngine_LoadMetadataIndex_Series(t *testing.T) {

// Load metadata index.
index := tsdb.NewDatabaseIndex()
if err := e.LoadMetadataIndex(index, make(map[string]*tsdb.MeasurementFields)); err != nil {
if err := e.LoadMetadataIndex(nil, index, make(map[string]*tsdb.MeasurementFields)); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -80,7 +80,7 @@ func TestEngine_LoadMetadataIndex_Fields(t *testing.T) {

// Load metadata index.
mfs := make(map[string]*tsdb.MeasurementFields)
if err := e.LoadMetadataIndex(tsdb.NewDatabaseIndex(), mfs); err != nil {
if err := e.LoadMetadataIndex(nil, tsdb.NewDatabaseIndex(), mfs); err != nil {
t.Fatal(err)
}

Expand Down
1 change: 1 addition & 0 deletions tsdb/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ package engine
import (
_ "github.com/influxdb/influxdb/tsdb/engine/b1"
_ "github.com/influxdb/influxdb/tsdb/engine/bz1"
_ "github.com/influxdb/influxdb/tsdb/engine/pd1"
)
36 changes: 27 additions & 9 deletions tsdb/engine/pd1/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,43 @@ package pd1

import (
"time"

"github.com/dgryski/go-tsz"
)

type FloatValue struct {
Time int64
Time time.Time
Value float64
}

// First 8 bytes should be the timestamp, second 8 bytes should be
// the first float value
type FloatValues []FloatValue

func (a FloatValues) Len() int { return len(a) }
func (a FloatValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a FloatValues) Less(i, j int) bool { return a[i].Time.UnixNano() < a[j].Time.UnixNano() }

// TODO: make this work with nanosecond timestamps
func EncodeFloatBlock(buf []byte, values []FloatValue) []byte {
return nil
s := tsz.New(uint32(values[0].Time.Unix()))
for _, v := range values {
s.Push(uint32(v.Time.Unix()), v.Value)
}
s.Finish()
return s.Bytes()
}

func DecodeFloatBlock(block []byte) ([]FloatValue, error) {
return nil, nil
iter, _ := tsz.NewIterator(block)
a := make([]FloatValue, 0)
for iter.Next() {
t, f := iter.Values()
a = append(a, FloatValue{time.Unix(int64(t), 0), f})
}
return a, nil
}

type BoolValue struct {
Time int64
Time time.Time
Value bool
}

Expand All @@ -33,7 +51,7 @@ func DecodeBoolBlock(block []byte) ([]BoolValue, error) {
}

type Int64Value struct {
Time int64
Time time.Time
Value int64
}

Expand All @@ -46,10 +64,10 @@ func DecodeInt64Block(block []byte) ([]Int64Value, error) {
}

type StringValue struct {
Time int64
Time time.Time
Value string
}

func EncodeStringBlock(values []StringValue) []byte {
func EncodeStringBlock(buf []byte, values []StringValue) []byte {
return nil
}
9 changes: 5 additions & 4 deletions tsdb/engine/pd1/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ func TestEncoding_FloatBlock(t *testing.T) {
if err != nil {
t.Fatalf("error decoding: %s", err.Error)
}

if !reflect.DeepEqual(decodedValues, values) {
t.Fatalf("values not equal:\n\tgot: %s\n\texp: %s", values, decodedValues)
t.Fatalf("unexpected results:\n\tgot: %v\n\texp: %v\n", decodedValues, values)
}
}

func getTimes(n, step int, precision time.Duration) []int64 {
func getTimes(n, step int, precision time.Duration) []time.Time {
t := time.Now().Round(precision)
a := make([]int64, n)
a := make([]time.Time, n)
for i := 0; i < n; i++ {
a[i] = t.Add(60 * precision).UnixNano()
a[i] = t.Add(60 * precision)
}
return a
}
Loading

0 comments on commit 7555ccb

Please sign in to comment.