Skip to content

Commit

Permalink
Support specifying a retention policy for the graphite service
Browse files Browse the repository at this point in the history
The graphite service will attempt to create the retention policy and use
it. If the retention policy doesn't exist, it will be created with the
default options.

Fixes influxdata#5655.
  • Loading branch information
jsternberg committed Jun 8, 2016
1 parent 256f57a commit 4f37bc5
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- [#6686](https://github.com/influxdata/influxdb/pull/6686): Optimize timestamp run-length decoding
- [#6713](https://github.com/influxdata/influxdb/pull/6713): Reduce allocations during query parsing.
- [#3733](https://github.com/influxdata/influxdb/issues/3733): Modify the default retention policy name and make it configurable.
- [#5655](https://github.com/influxdata/influxdb/issues/5655): Support specifying a retention policy for the graphite service.

### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions services/graphite/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Config struct {
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`
Database string `toml:"database"`
RetentionPolicy string `toml:"retention-policy"`
Protocol string `toml:"protocol"`
BatchSize int `toml:"batch-size"`
BatchPending int `toml:"batch-pending"`
Expand Down
3 changes: 3 additions & 0 deletions services/graphite/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func TestConfig_Parse(t *testing.T) {
if _, err := toml.Decode(`
bind-address = ":8080"
database = "mydb"
retention-policy = "myrp"
enabled = true
protocol = "tcp"
batch-size=100
Expand All @@ -31,6 +32,8 @@ tags=["region=us-east"]
t.Fatalf("unexpected bind address: %s", c.BindAddress)
} else if c.Database != "mydb" {
t.Fatalf("unexpected database selected: %s", c.Database)
} else if c.RetentionPolicy != "myrp" {
t.Fatalf("unexpected retention policy selected: %s", c.RetentionPolicy)
} else if c.Enabled != true {
t.Fatalf("unexpected graphite enabled: %v", c.Enabled)
} else if c.Protocol != "tcp" {
Expand Down
60 changes: 38 additions & 22 deletions services/graphite/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ func (c *tcpConnection) Close() {
type Service struct {
mu sync.Mutex

bindAddress string
database string
protocol string
batchSize int
batchPending int
batchTimeout time.Duration
udpReadBuffer int
bindAddress string
database string
retentionPolicy string
protocol string
batchSize int
batchPending int
batchTimeout time.Duration
udpReadBuffer int

batcher *tsdb.PointBatcher
parser *Parser
Expand All @@ -85,6 +86,10 @@ type Service struct {
}
MetaClient interface {
CreateDatabase(name string) (*meta.DatabaseInfo, error)
CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error)
CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
Database(name string) *meta.DatabaseInfo
RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error)
}
}

Expand All @@ -94,17 +99,18 @@ func NewService(c Config) (*Service, error) {
d := c.WithDefaults()

s := Service{
bindAddress: d.BindAddress,
database: d.Database,
protocol: d.Protocol,
batchSize: d.BatchSize,
batchPending: d.BatchPending,
udpReadBuffer: d.UDPReadBuffer,
batchTimeout: time.Duration(d.BatchTimeout),
logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags),
tcpConnections: make(map[string]*tcpConnection),
done: make(chan struct{}),
diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"),
bindAddress: d.BindAddress,
database: d.Database,
retentionPolicy: d.RetentionPolicy,
protocol: d.Protocol,
batchSize: d.BatchSize,
batchPending: d.BatchPending,
udpReadBuffer: d.UDPReadBuffer,
batchTimeout: time.Duration(d.BatchTimeout),
logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags),
tcpConnections: make(map[string]*tcpConnection),
done: make(chan struct{}),
diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"),
}

parser, err := NewParserWithOptions(Options{
Expand Down Expand Up @@ -137,9 +143,19 @@ func (s *Service) Open() error {
s.Monitor.RegisterDiagnosticsClient(s.diagsKey, s)
}

if _, err := s.MetaClient.CreateDatabase(s.database); err != nil {
s.logger.Printf("Failed to ensure target database %s exists: %s", s.database, err.Error())
return err
if db := s.MetaClient.Database(s.database); db != nil {
if rp, _ := s.MetaClient.RetentionPolicy(s.database, s.retentionPolicy); rp == nil {
rpi := meta.NewRetentionPolicyInfo(s.retentionPolicy)
if _, err := s.MetaClient.CreateRetentionPolicy(s.database, rpi); err != nil {
s.logger.Printf("Failed to ensure target retention policy %s exists: %s", s.database, err.Error())
}
}
} else {
rpi := meta.NewRetentionPolicyInfo(s.retentionPolicy)
if _, err := s.MetaClient.CreateDatabaseWithRetentionPolicy(s.database, rpi); err != nil {
s.logger.Printf("Failed to ensure target database %s exists: %s", s.database, err.Error())
return err
}
}

s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchPending, s.batchTimeout)
Expand Down Expand Up @@ -355,7 +371,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
for {
select {
case batch := <-batcher.Out():
if err := s.PointsWriter.WritePoints(s.database, "", models.ConsistencyLevelAny, batch); err == nil {
if err := s.PointsWriter.WritePoints(s.database, s.retentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
s.statMap.Add(statBatchesTransmitted, 1)
s.statMap.Add(statPointsTransmitted, int64(len(batch)))
} else {
Expand Down
17 changes: 17 additions & 0 deletions services/graphite/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,23 @@ func (d *DatabaseCreator) CreateDatabase(name string) (*meta.DatabaseInfo, error
return nil, nil
}

func (d *DatabaseCreator) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) {
return nil, nil
}

func (d *DatabaseCreator) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error) {
d.Created = true
return nil, nil
}

func (d *DatabaseCreator) Database(name string) *meta.DatabaseInfo {
return nil
}

func (d *DatabaseCreator) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) {
return nil, nil
}

// Test Helpers
func errstr(err error) string {
if err != nil {
Expand Down

0 comments on commit 4f37bc5

Please sign in to comment.