Skip to content

Commit

Permalink
Avoid double for loops in notification init (minio#8691)
Browse files Browse the repository at this point in the history
  • Loading branch information
harshavardhana authored Dec 24, 2019
1 parent 54431b3 commit 99ad445
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 75 deletions.
5 changes: 4 additions & 1 deletion buildscripts/gateway-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ function main()
gw_pid="$(start_minio_gateway_s3)"

SERVER_ENDPOINT=127.0.0.1:24240 ENABLE_HTTPS=0 ACCESS_KEY=minio \
SECRET_KEY=minio123 MINT_MODE="full" /mint/entrypoint.sh
SECRET_KEY=minio123 MINT_MODE="full" /mint/entrypoint.sh \
aws-sdk-go aws-sdk-java aws-sdk-php aws-sdk-ruby awscli \
healthcheck mc minio-dotnet minio-go minio-java minio-py \
s3cmd security
rv=$?

kill "$sr_pid"
Expand Down
59 changes: 39 additions & 20 deletions cmd/config-encrypted.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"os"
"strings"
"unicode/utf8"
Expand All @@ -39,7 +40,6 @@ func handleEncryptedConfigBackend(objAPI ObjectLayer, server bool) error {

// If its server mode or nas gateway, migrate the backend.
doneCh := make(chan struct{})
defer close(doneCh)

var encrypted bool
var err error
Expand All @@ -48,17 +48,27 @@ func handleEncryptedConfigBackend(objAPI ObjectLayer, server bool) error {
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
for range newRetryTimerSimple(doneCh) {
if encrypted, err = checkBackendEncrypted(objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) {
logger.Info("Waiting for config backend to be encrypted..")
continue
retryTimerCh := newRetryTimerSimple(doneCh)
var stop bool
for !stop {
select {
case <-retryTimerCh:
if encrypted, err = checkBackendEncrypted(objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) {
logger.Info("Waiting for config backend to be encrypted..")
continue
}
close(doneCh)
return err
}
return err
stop = true
case <-globalOSSignalCh:
close(doneCh)
return fmt.Errorf("Config encryption process stopped gracefully")
}
break
}
close(doneCh)

if encrypted {
// backend is encrypted, but credentials are not specified
Expand All @@ -83,24 +93,33 @@ func handleEncryptedConfigBackend(objAPI ObjectLayer, server bool) error {
return err
}

doneCh = make(chan struct{})
defer close(doneCh)

retryTimerCh = newRetryTimerSimple(doneCh)

// Migrating Config backend needs a retry mechanism for
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
for range newRetryTimerSimple(doneCh) {
// Migrate IAM configuration
if err = migrateConfigPrefixToEncrypted(objAPI, activeCredOld, encrypted); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for config backend to be encrypted..")
continue
for {
select {
case <-retryTimerCh:
// Migrate IAM configuration
if err = migrateConfigPrefixToEncrypted(objAPI, activeCredOld, encrypted); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for config backend to be encrypted..")
continue
}
return err
}
return err
return nil
case <-globalOSSignalCh:
return fmt.Errorf("Config encryption process stopped gracefully")
}
break
}
return nil
}

const (
Expand Down
27 changes: 17 additions & 10 deletions cmd/format-fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,17 +343,24 @@ func formatFSFixDeploymentID(fsFormatPath string) error {
defer close(doneCh)

var wlk *lock.LockedFile
for range newRetryTimerSimple(doneCh) {
wlk, err = lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0)
if err == lock.ErrAlreadyLocked {
// Lock already present, sleep and attempt again
logger.Info("Another minio process(es) might be holding a lock to the file %s. Please kill that minio process(es) (elapsed %s)\n", fsFormatPath, getElapsedTime())
continue
retryCh := newRetryTimerSimple(doneCh)
var stop bool
for !stop {
select {
case <-retryCh:
wlk, err = lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR, 0)
if err == lock.ErrAlreadyLocked {
// Lock already present, sleep and attempt again
logger.Info("Another minio process(es) might be holding a lock to the file %s. Please kill that minio process(es) (elapsed %s)\n", fsFormatPath, getElapsedTime())
continue
}
if err != nil {
return err
}
stop = true
case <-globalOSSignalCh:
return fmt.Errorf("Initializing FS format stopped gracefully")
}
break
}
if err != nil {
return err
}

defer wlk.Close()
Expand Down
58 changes: 27 additions & 31 deletions cmd/iam.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"sync"

Expand Down Expand Up @@ -365,46 +366,41 @@ func (sys *IAMSys) Init(objAPI ObjectLayer) error {
doneCh := make(chan struct{})
defer close(doneCh)

// Migrating IAM needs a retry mechanism for
// Migrating IAM amd Loading IAM needs a retry mechanism for
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
for range newRetryTimerSimple(doneCh) {
// Migrate IAM configuration
if err := sys.doIAMConfigMigration(objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for IAM subsystem to be initialized..")
continue
retryCh := newRetryTimerSimple(doneCh)
for {
select {
case <-retryCh:
// Migrate IAM configuration
if err := sys.doIAMConfigMigration(objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for IAM subsystem to be initialized..")
continue
}
return err
}
return err
}
break
}

sys.store.watch(sys)
sys.store.watch(sys)

// Initializing IAM needs a retry mechanism for
// the following reasons:
// - Read quorum is lost just after the initialization
// of the object layer.
for range newRetryTimerSimple(doneCh) {
// Load IAMSys once during boot. Need to pass in
// objAPI as server has not yet initialized.
if err := sys.store.loadAll(sys, objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for IAM subsystem to be initialized..")
continue
if err := sys.store.loadAll(sys, objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
strings.Contains(err.Error(), InsufficientWriteQuorum{}.Error()) {
logger.Info("Waiting for IAM subsystem to be initialized..")
continue
}
return err
}
return err
return nil
case <-globalOSSignalCh:
return fmt.Errorf("Initializing IAM sub-system gracefully stopped")
}
break
}

return nil
}

// DeletePolicy - deletes a canned policy from backend or etcd.
Expand Down
16 changes: 3 additions & 13 deletions cmd/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,8 +718,7 @@ func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error
// - Read quorum is lost just after the initialization
// of the object layer.
retryTimerCh := newRetryTimerSimple(doneCh)
stop := false
for !stop {
for {
select {
case <-retryTimerCh:
if err := sys.load(buckets, objAPI); err != nil {
Expand All @@ -731,17 +730,8 @@ func (sys *NotificationSys) Init(buckets []BucketInfo, objAPI ObjectLayer) error
}
return err
}
stop = true
case <-globalOSSignalCh:
return fmt.Errorf("Initializing Notification sub-system gracefully stopped")
}
}

// Initializing bucket retention config needs a retry mechanism if
// read quorum is lost just after the initialization of the object layer.
for {
select {
case <-retryTimerCh:
// Initializing bucket retention config needs a retry mechanism if
// read quorum is lost just after the initialization of the object layer.
if err := sys.initBucketObjectLockConfig(objAPI); err != nil {
if err == errDiskNotFound ||
strings.Contains(err.Error(), InsufficientReadQuorum{}.Error()) ||
Expand Down

0 comments on commit 99ad445

Please sign in to comment.