Skip to content

Commit

Permalink
Add Aggressive cleanup when disk util is high (#335)
Browse files Browse the repository at this point in the history
  • Loading branch information
xinlongz1 authored Jan 17, 2023
1 parent 2835a68 commit 89dd72b
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 6 deletions.
45 changes: 40 additions & 5 deletions lib/store/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/uber/kraken/lib/store/base"
"github.com/uber/kraken/lib/store/metadata"
"github.com/uber/kraken/utils/diskspaceutil"
"github.com/uber/kraken/utils/log"

"github.com/andres-erbsen/clock"
Expand All @@ -29,19 +30,33 @@ import (

// CleanupConfig defines configuration for periodically cleaning up idle files.
type CleanupConfig struct {
Disabled bool `yaml:"disabled"`
Interval time.Duration `yaml:"interval"` // How often cleanup runs.
TTI time.Duration `yaml:"tti"` // Time to idle based on last access time.
TTL time.Duration `yaml:"ttl"` // Time to live regardless of access. If 0, disables TTL.
Disabled bool `yaml:"disabled"`
Interval time.Duration `yaml:"interval"` // How often cleanup runs.
TTI time.Duration `yaml:"tti"` // Time to idle based on last access time.
TTL time.Duration `yaml:"ttl"` // Time to live regardless of access. If 0, disables TTL.
AggressiveThreshold int `yaml:"aggressive_threshold"` // The disk util threshold to trigger aggressive cleanup. If 0, disables aggressive cleanup.
AggressiveTTL time.Duration `yaml:"aggressive_ttL"` // Time to live regardless of access if aggressive cleanup is triggered.
}

type (
// Define a func type for mocking diskSpaceUtil function.
diskSpaceUtilFunc func() (int, error)
)

func (c CleanupConfig) applyDefaults() CleanupConfig {
if c.Interval == 0 {
c.Interval = 30 * time.Minute
}
if c.TTI == 0 {
c.TTI = 6 * time.Hour
}

if c.AggressiveThreshold != 0 {
if c.AggressiveTTL == 0 {
c.AggressiveTTL = 1 * time.Hour
}
}

return c
}

Expand Down Expand Up @@ -81,6 +96,10 @@ func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp
log.Warnf("TTL disabled for %s", op)
}

if config.AggressiveThreshold == 0 {
log.Warnf("Aggressive cleanup disabled for %s", op)
}

ticker := m.clk.Ticker(config.Interval)

usageGauge := m.stats.Tagged(map[string]string{"job": tag}).Gauge("disk_usage")
Expand All @@ -90,7 +109,8 @@ func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp
select {
case <-ticker.C:
log.Debugf("Performing cleanup of %s", op)
usage, err := m.scan(op, config.TTI, config.TTL)
ttl := m.checkAggressiveCleanup(op, config, diskspaceutil.DiskSpaceUtil)
usage, err := m.scan(op, config.TTI, ttl)
if err != nil {
log.Errorf("Error scanning %s: %s", op, err)
}
Expand Down Expand Up @@ -153,3 +173,18 @@ func (m *cleanupManager) readyForDeletion(
}
return m.clk.Now().Sub(lat.Time) > tti, nil
}

func (m *cleanupManager) checkAggressiveCleanup(op base.FileOp, config CleanupConfig, util diskSpaceUtilFunc) time.Duration {
if config.AggressiveThreshold != 0 {
diskspaceutil, err := util()
if err != nil {
log.Errorf("Error checking disk space util %s: %s", op, err)
return config.TTL
}
if diskspaceutil >= config.AggressiveThreshold {
log.Debugf("Aggressive cleanup of %s triggers with disk space util %d", op, diskspaceutil)
return config.AggressiveTTL
}
}
return config.TTL
}
31 changes: 31 additions & 0 deletions lib/store/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package store

import (
"errors"
"io/ioutil"
"os"
"testing"
Expand Down Expand Up @@ -228,3 +229,33 @@ func TestCleanupManageDiskUsage(t *testing.T) {
require.NoError(err)
require.Equal(int64(500), usage)
}

func TestCleanupManagerAggressive(t *testing.T) {
require := require.New(t)

config := CleanupConfig{
AggressiveThreshold: 80,
TTL: 10 * time.Second,
AggressiveTTL: 5 * time.Second,
}

clk := clock.NewMock()
m, err := newCleanupManager(clk, tally.NoopScope)
require.NoError(err)
defer m.stop()

_, op, cleanup := fileOpFixture(clk)
defer cleanup()

require.Equal(m.checkAggressiveCleanup(op, config, func() (int, error) {
return 90, nil
}), 5*time.Second)

require.Equal(m.checkAggressiveCleanup(op, config, func() (int, error) {
return 60, nil
}), 10*time.Second)

require.Equal(m.checkAggressiveCleanup(op, config, func() (int, error) {
return 0, errors.New("fake error")
}), 10*time.Second)
}
2 changes: 1 addition & 1 deletion lib/store/upload_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
"fmt"
"os"

"github.com/andres-erbsen/clock"
"github.com/uber/kraken/lib/store/base"
"github.com/uber/kraken/lib/store/metadata"
"github.com/andres-erbsen/clock"
)

// uploadStore provides basic upload file operations. Intended to be embedded
Expand Down
35 changes: 35 additions & 0 deletions utils/diskspaceutil/diskspaceutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package diskspaceutil

import (
"syscall"
)

const path = "/"

// Helper method to get disk util.
func DiskSpaceUtil() (int, error) {
fs := syscall.Statfs_t{}
err := syscall.Statfs(path, &fs)
if err != nil {
return 0, err
}

diskAll := fs.Blocks * uint64(fs.Bsize)
diskFree := fs.Bfree * uint64(fs.Bsize)
diskUsed := diskAll - diskFree
return int(diskUsed * 100 / diskAll), nil

}
17 changes: 17 additions & 0 deletions utils/diskspaceutil/diskspaceutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package diskspaceutil_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/uber/kraken/utils/diskspaceutil"
)

func TestParseManifestV2List(t *testing.T) {
util, err := diskspaceutil.DiskSpaceUtil()
require.NoError(t, err)

require.Equal(t, true, util > 0)
require.Equal(t, true, util < 100)
}

0 comments on commit 89dd72b

Please sign in to comment.