Skip to content

Commit

Permalink
refactored retries
Browse files Browse the repository at this point in the history
  • Loading branch information
adranwit committed Mar 27, 2020
1 parent cecdf52 commit b311bc5
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 90 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## March 14 2020 2.0.3
* Refactored retries

## March 14 2020 2.0.2
* Patch transient table auto clustering and partitioning with table split option
* Added expiry time exclusion, when create table from a template
Expand Down
2 changes: 1 addition & 1 deletion TODO.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
- Update documentation/examples
- Add bqtail docker image
- Add conditional action
- Integrate with authly
- Integrate with authly
2 changes: 1 addition & 1 deletion Version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.2
2.0.3
69 changes: 69 additions & 0 deletions base/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package base

import (
"github.com/viant/bqtail/shared"
"github.com/viant/toolbox"
"math/rand"
"os"
"time"
)

//Retry represents abstraction holding sleep duration between retries (back-off)
type Retry struct {
Count int
Initial time.Duration
Max time.Duration
Multiplier float64
duration time.Duration
}

// Pause returns the next time.Duration that the caller should use to backoff.
func (b *Retry) Pause() time.Duration {
if b.Initial == 0 {
b.Initial = time.Second
}
if b.duration == 0 {
b.duration = b.Initial
}
if b.Max == 0 {
b.Max = 30 * time.Second
}
if b.Multiplier < 1 {
b.Multiplier = 2
}

rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
result := time.Duration(1 + rnd.Int63n(int64(b.duration)))
b.duration = time.Duration(float64(b.duration) * b.Multiplier)
if b.duration > b.Max {
b.duration = b.Max
}
return result
}

//NewRetry creates a retry
func NewRetry() *Retry {
return &Retry{}
}

//RunWithRetries run with retries
func RunWithRetries(f func() error) (err error) {
maxRetries := GetMaxRetries()
retry := NewRetry()
for i := 0; i < maxRetries; i++ {
err = f()
if !IsRetryError(err) {
return err
}
time.Sleep(retry.Pause())
}
return err
}

func GetMaxRetries() int {
maxRetries := toolbox.AsInt(os.Getenv(shared.MaxRetriesEnvKey))
if maxRetries == 0 {
maxRetries = shared.MaxRetries
}
return maxRetries
}
15 changes: 3 additions & 12 deletions service/bq/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/viant/bqtail/shared"
"github.com/viant/bqtail/task"
"google.golang.org/api/bigquery/v2"
"time"
)

//Drop drop source table
Expand All @@ -22,17 +21,9 @@ func (s *service) Drop(ctx context.Context, request *DropRequest, action *task.A
call := bigquery.NewTablesService(s.Service).Delete(table.ProjectId, table.DatasetId, table.TableId)
call.Context(ctx)
var err error
for i := 0; i < shared.MaxRetries; i++ {
if err = call.Do(); err == nil {
return err
}
if base.IsRetryError(err) {
//do extra sleep before retrying
time.Sleep(shared.RetrySleepInSec * time.Second)
continue
}
break
}
err = base.RunWithRetries(func() error {
return call.Do()
})
if base.IsNotFoundError(err) {
err = nil
}
Expand Down
15 changes: 4 additions & 11 deletions service/bq/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package bq
import (
"context"
"github.com/viant/bqtail/base"
"github.com/viant/bqtail/shared"
"google.golang.org/api/bigquery/v2"
"time"
)
Expand All @@ -14,17 +13,11 @@ func (s *service) GetJob(ctx context.Context, location, projectID, jobID string)
call := jobService.Get(projectID, jobID)
call.Location(location)
call.Context(ctx)
for i := 0; i < shared.MaxRetries; i++ {

err = base.RunWithRetries(func() error {
job, err = call.Do()
if err == nil {
break
}
if base.IsRetryError(err) {
//do extra sleep before retrying
time.Sleep(shared.RetrySleepInSec * time.Second)
continue
}
}
return err
})
return job, err
}

Expand Down
21 changes: 5 additions & 16 deletions service/bq/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"fmt"
"github.com/pkg/errors"
"github.com/viant/bqtail/base"
"github.com/viant/bqtail/shared"
"google.golang.org/api/bigquery/v2"
"time"
)

//Patch patch temp table
Expand Down Expand Up @@ -36,22 +34,13 @@ func (s *service) Patch(ctx context.Context, request *PatchRequest) (*bigquery.T
return nil, errors.Wrapf(err, "invalid get template table: %v", request.Table)
}
}

var table *bigquery.Table
call := s.Service.Tables.Patch(tableRef.ProjectId, tableRef.DatasetId, tableRef.TableId, request.TemplateTable)
call.Context(ctx)
var table *bigquery.Table
for i := 0; i < shared.MaxRetries; i++ {
call.Context(ctx)
if table, err = call.Do(); err == nil {
return nil, err
}
if base.IsRetryError(err) {
//do extra sleep before retrying
time.Sleep(shared.RetrySleepInSec * time.Second)
continue
}
break
}
err = base.RunWithRetries(func() error {
table, err = call.Do()
return err
})
return table, err
}

Expand Down
38 changes: 17 additions & 21 deletions service/bq/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/viant/bqtail/task"
"google.golang.org/api/bigquery/v2"
"strings"
"time"
)

func (s *service) setJobID(action *task.Action) (*bigquery.JobReference, error) {
Expand Down Expand Up @@ -102,28 +101,25 @@ func (s *service) post(ctx context.Context, job *bigquery.Job, action *task.Acti
call := jobService.Insert(projectID, job)
call.Context(ctx)
var callJob *bigquery.Job
for i := 0; i < shared.MaxRetries; i++ {
if callJob, err = call.Do(); err == nil {
break
}
if base.IsRetryError(err) {
//do extra sleep before retrying
time.Sleep(shared.RetrySleepInSec * time.Second)
continue
}
if i > 0 && base.IsDuplicateJobError(err) {
if shared.IsDebugLoggingLevel() {
shared.LogF("duplicate job: [%v]: %v\n", job.Id, err)
}
err = nil
callJob, _ = s.GetJob(ctx, job.JobReference.Location, job.JobReference.ProjectId, job.JobReference.JobId)
break
}
if err != nil {
detail, _ := json.Marshal(job)
err = errors.Wrapf(err, "failed to submit: %T %s", call, detail)

err = base.RunWithRetries(func() error {
callJob, err = call.Do()
return err
})

if base.IsDuplicateJobError(err) {
if shared.IsDebugLoggingLevel() {
shared.LogF("duplicate job: [%v]: %v\n", job.Id, err)
}
err = nil
callJob, _ = s.GetJob(ctx, job.JobReference.Location, job.JobReference.ProjectId, job.JobReference.JobId)
}

if err != nil {
detail, _ := json.Marshal(job)
err = errors.Wrapf(err, "failed to submit: %T %s", call, detail)
}

if err != nil || (callJob != nil && base.JobError(callJob) != nil) {
if shared.IsDebugLoggingLevel() && callJob != nil && callJob.Status != nil {
shared.LogLn(callJob.Status)
Expand Down
26 changes: 11 additions & 15 deletions service/bq/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/viant/bqtail/base"
"github.com/viant/bqtail/shared"
"google.golang.org/api/bigquery/v2"
"time"
)

//Table returns bif query table
Expand All @@ -16,19 +15,13 @@ func (s *service) Table(ctx context.Context, reference *bigquery.TableReference)
}
tableID := base.TableID(reference.TableId)
call := bigquery.NewTablesService(s.Service).Get(reference.ProjectId, reference.DatasetId, tableID)

for i := 0; i < shared.MaxRetries; i++ {
call.Context(ctx)
if table, err = call.Do(); err == nil {
return table, err
}
if base.IsRetryError(err) {
//do extra sleep before retrying
time.Sleep(shared.RetrySleepInSec * time.Second)
continue
}
call.Context(ctx)
err = base.RunWithRetries(func() error {
table, err = call.Do()
return err
})
if err != nil {
err = errors.Wrapf(err, "failed to lookup table schema: %v:%v.%v", reference.ProjectId, reference.DatasetId, tableID)
break
}
return table, err
}
Expand Down Expand Up @@ -66,14 +59,17 @@ func (s *service) CreateTableIfNotExist(ctx context.Context, table *bigquery.Tab
}
return nil
}

if shared.IsDebugLoggingLevel() {
shared.LogF("create table: %+v\n", table.TableReference)
shared.LogLn(table)
}
insertTableCall := srv.Insert(ref.ProjectId, ref.DatasetId, table)
insertTableCall.Context(ctx)
_, err = insertTableCall.Do()
return err
return base.RunWithRetries(func() error {
_, err = insertTableCall.Do()
return err
})
}

func isSchemaEqual(source []*bigquery.TableFieldSchema, template []*bigquery.TableFieldSchema) bool {
Expand Down
7 changes: 4 additions & 3 deletions shared/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ const (
ErrorKey = "Error"
//ConfigEnvKey config env key
ConfigEnvKey = "CONFIG"

//MaxRetriesEnvKey max reties env key
MaxRetriesEnvKey = "MAX_RETRIES"
)

//BigQuery job status
Expand Down Expand Up @@ -170,9 +173,7 @@ const (
//Waits and retries
const (
//MaxRetries defines max retries
MaxRetries = 3
//RetrySleepInSec sleep between retries
RetrySleepInSec = 3
MaxRetries = 4
//StorageListVisibilityDelay - list storage operation can be delay with actual put object state.
StorageListVisibilityDelay = 5000
)
Expand Down
14 changes: 9 additions & 5 deletions tail/batch/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,15 @@ func (s *service) getBaseURLS(ctx context.Context, rule *config.Rule, window *Wi
}

//MatchWindowData matches window data, it waits for window to ends if needed
func (s *service) MatchWindowDataURLs(ctx context.Context, rule *config.Rule, window *Window) error {
before := window.End //inclusive
afeter := window.Start.Add(-1) //exclusive
modFilter := matcher.NewModification(&before, &afeter)
baseURLS, err := s.getBaseURLS(ctx, rule, window)
func (s *service) MatchWindowDataURLs(ctx context.Context, rule *config.Rule, window *Window) (err error) {
before := window.End //inclusive
after := window.Start.Add(-1) //exclusive
modFilter := matcher.NewModification(&before, &after)
var baseURLS []string
err = base.RunWithRetries(func() error {
baseURLS, err = s.getBaseURLS(ctx, rule, window)
return err
})
if err != nil {
return errors.Wrapf(err, "failed get batch location: %v", window.URL)
}
Expand Down
8 changes: 3 additions & 5 deletions task/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,10 @@ func NewAction(action string, req interface{}) (*Action, error) {

//NewActionFromURL create a new actions from URL
func NewActionFromURL(ctx context.Context, fs afs.Service, URL string) (action *Action, err error) {
for i := 0; i < shared.MaxRetries; i++ {
err = base.RunWithRetries(func() error {
action, err = newActionFromURL(fs, ctx, URL)
if !base.IsRetryError(err) {
return
}
}
return err
})
return action, err
}

Expand Down

0 comments on commit b311bc5

Please sign in to comment.