Skip to content

Commit

Permalink
Heal buckets at node level (minio#18612)
Browse files Browse the repository at this point in the history
Signed-off-by: Shubhendu Ram Tripathi <[email protected]>
  • Loading branch information
shtripat authored Jan 10, 2024
1 parent f02d282 commit e31081d
Show file tree
Hide file tree
Showing 12 changed files with 326 additions and 197 deletions.
171 changes: 17 additions & 154 deletions cmd/erasure-healing.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,159 +128,6 @@ func (er erasureObjects) listAndHeal(bucket, prefix string, scanMode madmin.Heal
return nil
}

// HealBucket heals a bucket if it doesn't exist on one of the disks, additionally
// also heals the missing entries for bucket metadata files
// `policy.json, notification.xml, listeners.json`.
func (er erasureObjects) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (
result madmin.HealResultItem, err error,
) {
storageDisks := er.getDisks()
storageEndpoints := er.getEndpoints()

// Heal bucket.
return er.healBucket(ctx, storageDisks, storageEndpoints, bucket, opts)
}

// Heal bucket - create buckets on disks where it does not exist.
func (er erasureObjects) healBucket(ctx context.Context, storageDisks []StorageAPI, storageEndpoints []Endpoint, bucket string, opts madmin.HealOpts) (res madmin.HealResultItem, err error) {
// get write quorum for an object
writeQuorum := len(storageDisks) - er.defaultParityCount
if writeQuorum == er.defaultParityCount {
writeQuorum++
}

if globalTrace.NumSubscribers(madmin.TraceHealing) > 0 {
startTime := time.Now()
defer func() {
healTrace(healingMetricBucket, startTime, bucket, "", &opts, err, &res)
}()
}

// Initialize sync waitgroup.
g := errgroup.WithNErrs(len(storageDisks))

// Disk states slices
beforeState := make([]string, len(storageDisks))
afterState := make([]string, len(storageDisks))

// Make a volume entry on all underlying storage disks.
for index := range storageDisks {
index := index
g.Go(func() error {
if storageDisks[index] == nil {
beforeState[index] = madmin.DriveStateOffline
afterState[index] = madmin.DriveStateOffline
return errDiskNotFound
}

beforeState[index] = madmin.DriveStateOk
afterState[index] = madmin.DriveStateOk

if bucket == minioReservedBucket {
return nil
}

if _, serr := storageDisks[index].StatVol(ctx, bucket); serr != nil {
if serr == errDiskNotFound {
beforeState[index] = madmin.DriveStateOffline
afterState[index] = madmin.DriveStateOffline
return serr
}
if serr != errVolumeNotFound {
beforeState[index] = madmin.DriveStateCorrupt
afterState[index] = madmin.DriveStateCorrupt
return serr
}

beforeState[index] = madmin.DriveStateMissing
afterState[index] = madmin.DriveStateMissing

// mutate only if not a dry-run
if opts.DryRun {
return nil
}

return serr
}
return nil
}, index)
}

errs := g.Wait()

// Initialize heal result info
res = madmin.HealResultItem{
Type: madmin.HealItemBucket,
Bucket: bucket,
DiskCount: len(storageDisks),
ParityBlocks: er.defaultParityCount,
DataBlocks: len(storageDisks) - er.defaultParityCount,
}

for i := range beforeState {
res.Before.Drives = append(res.Before.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i].String(),
State: beforeState[i],
})
}

reducedErr := reduceReadQuorumErrs(ctx, errs, bucketOpIgnoredErrs, res.DataBlocks)
if errors.Is(reducedErr, errVolumeNotFound) && !opts.Recreate {
for i := range beforeState {
res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i].String(),
State: madmin.DriveStateOk,
})
}
return res, nil
}

// Initialize sync waitgroup.
g = errgroup.WithNErrs(len(storageDisks))

// Make a volume entry on all underlying storage disks.
for index := range storageDisks {
index := index
g.Go(func() error {
if beforeState[index] == madmin.DriveStateMissing {
makeErr := storageDisks[index].MakeVol(ctx, bucket)
if makeErr == nil {
afterState[index] = madmin.DriveStateOk
}
return makeErr
}
return errs[index]
}, index)
}

errs = g.Wait()

reducedErr = reduceWriteQuorumErrs(ctx, errs, bucketOpIgnoredErrs, writeQuorum)
if reducedErr != nil {
// If we have exactly half the drives not available,
// we should still allow HealBucket to not return error.
// this is necessary for starting the server.
readQuorum := res.DataBlocks
switch reduceReadQuorumErrs(ctx, errs, nil, readQuorum) {
case nil:
case errDiskNotFound:
default:
return res, reducedErr
}
}

for i := range afterState {
res.After.Drives = append(res.After.Drives, madmin.HealDriveInfo{
UUID: "",
Endpoint: storageEndpoints[i].String(),
State: afterState[i],
})
}
return res, nil
}

// listAllBuckets lists all buckets from all disks. It also
// returns the occurrence of each buckets in all disks
func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets map[string]VolInfo, readQuorum int) error {
Expand Down Expand Up @@ -1019,6 +866,22 @@ func isAllNotFound(errs []error) bool {
return len(errs) > 0
}

// isAllBucketsNotFound will return true if all the errors are either errFileNotFound
// or errFileCorrupt
// A 0 length slice will always return false.
func isAllBucketsNotFound(errs []error) bool {
if len(errs) == 0 {
return false
}
notFoundCount := 0
for _, err := range errs {
if err != nil && errors.Is(err, errVolumeNotFound) {
notFoundCount++
}
}
return len(errs) == notFoundCount
}

// ObjectDir is considered dangling/corrupted if any only
// if total disks - a combination of corrupted and missing
// files is lesser than N/2+1 number of disks.
Expand All @@ -1044,7 +907,7 @@ func isObjectDirDangling(errs []error) (ok bool) {
return found < notFound && found > 0
}

// Object is considered dangling/corrupted if any only
// Object is considered dangling/corrupted if and only
// if total disks - a combination of corrupted and missing
// files is lesser than number of data blocks.
func isObjectDangling(metaArr []FileInfo, errs []error, dataErrs []error) (validMeta FileInfo, ok bool) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/erasure-healing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func TestHealing(t *testing.T) {
t.Fatal(err)
}
// This would create the bucket.
_, err = er.HealBucket(ctx, bucket, madmin.HealOpts{
_, err = obj.HealBucket(ctx, bucket, madmin.HealOpts{
DryRun: false,
Remove: false,
})
Expand Down Expand Up @@ -543,7 +543,7 @@ func TestHealingVersioned(t *testing.T) {
t.Fatal(err)
}
// This would create the bucket.
_, err = er.HealBucket(ctx, bucket, madmin.HealOpts{
_, err = obj.HealBucket(ctx, bucket, madmin.HealOpts{
DryRun: false,
Remove: false,
})
Expand Down
47 changes: 46 additions & 1 deletion cmd/erasure-server-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1941,8 +1941,53 @@ func (z *erasureServerPools) HealBucket(ctx context.Context, bucket string, opts
hopts.Recreate = false
defer z.HealObject(ctx, minioMetaBucket, pathJoin(bucketMetaPrefix, bucket, bucketMetadataFile), "", hopts)

type DiskStat struct {
VolInfos []VolInfo
Errs []error
}

for _, pool := range z.serverPools {
result, err := pool.HealBucket(ctx, bucket, opts)
// map of node wise disk stats
diskStats := make(map[string]DiskStat)
for _, set := range pool.sets {
for _, disk := range set.getDisks() {
if disk == OfflineDisk {
continue
}
vi, err := disk.StatVol(ctx, bucket)
hostName := disk.Hostname()
if disk.IsLocal() {
hostName = "local"
}
ds, ok := diskStats[hostName]
if !ok {
newds := DiskStat{
VolInfos: []VolInfo{vi},
Errs: []error{err},
}
diskStats[hostName] = newds
} else {
ds.VolInfos = append(ds.VolInfos, vi)
ds.Errs = append(ds.Errs, err)
diskStats[hostName] = ds
}
}
}
nodeCount := len(diskStats)
bktNotFoundCount := 0
for _, ds := range diskStats {
if isAllBucketsNotFound(ds.Errs) {
bktNotFoundCount++
}
}
// if the bucket if not found on more than hslf the no of nodes, its dangling
if bktNotFoundCount > nodeCount/2 {
opts.Remove = true
} else {
opts.Recreate = true
}

result, err := z.s3Peer.HealBucket(ctx, bucket, opts)
if err != nil {
if _, ok := err.(BucketNotFound); ok {
continue
Expand Down
22 changes: 0 additions & 22 deletions cmd/erasure-sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,28 +1186,6 @@ func (s *erasureSets) HealFormat(ctx context.Context, dryRun bool) (res madmin.H
return res, nil
}

// HealBucket - heals inconsistent buckets and bucket metadata on all sets.
func (s *erasureSets) HealBucket(ctx context.Context, bucket string, opts madmin.HealOpts) (result madmin.HealResultItem, err error) {
// Initialize heal result info
result = madmin.HealResultItem{
Type: madmin.HealItemBucket,
Bucket: bucket,
DiskCount: s.setCount * s.setDriveCount,
SetCount: s.setCount,
}

for _, set := range s.sets {
healResult, err := set.HealBucket(ctx, bucket, opts)
if err != nil {
return result, toObjectErr(err, bucket)
}
result.Before.Drives = append(result.Before.Drives, healResult.Before.Drives...)
result.After.Drives = append(result.After.Drives, healResult.After.Drives...)
}

return result, nil
}

// HealObject - heals inconsistent object on a hashedSet based on object name.
func (s *erasureSets) HealObject(ctx context.Context, bucket, object, versionID string, opts madmin.HealOpts) (madmin.HealResultItem, error) {
return s.getHashedSet(object).HealObject(ctx, bucket, object, versionID, opts)
Expand Down
12 changes: 7 additions & 5 deletions cmd/global-heal.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,13 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
healBuckets := make([]string, len(buckets))
copy(healBuckets, buckets)

// Heal all buckets first in this erasure set - this is useful
// for new objects upload in different buckets to be successful
objAPI := newObjectLayerFn()
if objAPI == nil {
return errServerNotInitialized
}

for _, bucket := range healBuckets {
_, err := er.HealBucket(ctx, bucket, madmin.HealOpts{ScanMode: scanMode})
_, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{ScanMode: scanMode})
if err != nil {
// Log bucket healing error if any, we shall retry again.
logger.LogIf(ctx, err)
Expand Down Expand Up @@ -195,10 +198,9 @@ func (er *erasureObjects) healErasureSet(ctx context.Context, buckets []string,
}
tracker.setObject("")
tracker.setBucket(bucket)

// Heal current bucket again in case if it is failed
// in the beginning of erasure set healing
if _, err := er.HealBucket(ctx, bucket, madmin.HealOpts{
if _, err := objAPI.HealBucket(ctx, bucket, madmin.HealOpts{
ScanMode: scanMode,
}); err != nil {
logger.LogIf(ctx, err)
Expand Down
Loading

0 comments on commit e31081d

Please sign in to comment.