forked from minio/minio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
background-heal-ops.go
180 lines (153 loc) · 4.45 KB
/
background-heal-ops.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"context"
"fmt"
"runtime"
"strconv"
"time"
"github.com/minio/madmin-go/v2"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/env"
)
// healTask represents what to heal along with options
//
// path: '/' => Heal disk formats along with metadata
// path: 'bucket/' or '/bucket/' => Heal bucket
// path: 'bucket/object' => Heal object
type healTask struct {
bucket string
object string
versionID string
opts madmin.HealOpts
// Healing response will be sent here
respCh chan healResult
}
// healResult represents a healing result with a possible error
type healResult struct {
result madmin.HealResultItem
err error
}
// healRoutine receives heal tasks, to heal buckets, objects and format.json
type healRoutine struct {
tasks chan healTask
workers int
}
func activeListeners() int {
// Bucket notification and http trace are not costly, it is okay to ignore them
// while counting the number of concurrent connections
return int(globalHTTPListen.Subscribers()) + int(globalTrace.Subscribers())
}
func waitForLowIO(maxIO int, maxWait time.Duration, currentIO func() int) {
// No need to wait run at full speed.
if maxIO <= 0 {
return
}
const waitTick = 100 * time.Millisecond
tmpMaxWait := maxWait
for currentIO() >= maxIO {
if tmpMaxWait > 0 {
if tmpMaxWait < waitTick {
time.Sleep(tmpMaxWait)
} else {
time.Sleep(waitTick)
}
tmpMaxWait -= waitTick
}
if tmpMaxWait <= 0 {
return
}
}
}
func currentHTTPIO() int {
httpServer := newHTTPServerFn()
if httpServer == nil {
return 0
}
return httpServer.GetRequestCount() - activeListeners()
}
func waitForLowHTTPReq() {
maxIO, maxWait, _ := globalHealConfig.Clone()
waitForLowIO(maxIO, maxWait, currentHTTPIO)
}
func initBackgroundHealing(ctx context.Context, objAPI ObjectLayer) {
// Run the background healer
globalBackgroundHealRoutine = newHealRoutine()
for i := 0; i < globalBackgroundHealRoutine.workers; i++ {
go globalBackgroundHealRoutine.AddWorker(ctx, objAPI)
}
globalBackgroundHealState.LaunchNewHealSequence(newBgHealSequence(), objAPI)
}
// Wait for heal requests and process them
func (h *healRoutine) AddWorker(ctx context.Context, objAPI ObjectLayer) {
for {
select {
case task, ok := <-h.tasks:
if !ok {
return
}
var res madmin.HealResultItem
var err error
switch task.bucket {
case nopHeal:
err = errSkipFile
case SlashSeparator:
res, err = healDiskFormat(ctx, objAPI, task.opts)
default:
if task.object == "" {
res, err = objAPI.HealBucket(ctx, task.bucket, task.opts)
} else {
res, err = objAPI.HealObject(ctx, task.bucket, task.object, task.versionID, task.opts)
}
}
if task.respCh != nil {
task.respCh <- healResult{result: res, err: err}
}
case <-ctx.Done():
return
}
}
}
func newHealRoutine() *healRoutine {
workers := runtime.GOMAXPROCS(0) / 2
if envHealWorkers := env.Get("_MINIO_HEAL_WORKERS", ""); envHealWorkers != "" {
if numHealers, err := strconv.Atoi(envHealWorkers); err != nil {
logger.LogIf(context.Background(), fmt.Errorf("invalid _MINIO_HEAL_WORKERS value: %w", err))
} else {
workers = numHealers
}
}
if workers == 0 {
workers = 4
}
return &healRoutine{
tasks: make(chan healTask),
workers: workers,
}
}
// healDiskFormat - heals format.json, return value indicates if a
// failure error occurred.
func healDiskFormat(ctx context.Context, objAPI ObjectLayer, opts madmin.HealOpts) (madmin.HealResultItem, error) {
res, err := objAPI.HealFormat(ctx, opts.DryRun)
// return any error, ignore error returned when disks have
// already healed.
if err != nil && err != errNoHealRequired {
return madmin.HealResultItem{}, err
}
return res, nil
}