Skip to content

Commit

Permalink
Merge pull request #15 from pendo-io/oot-taskkeys
Browse files Browse the repository at this point in the history
replace numberic taskkeys with named ones to remove need for Allocate…
  • Loading branch information
jshore1296 authored Sep 3, 2019
2 parents e847e0a + a90b765 commit 1a9e268
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 49 deletions.
2 changes: 1 addition & 1 deletion console.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func ConsoleHandler(w http.ResponseWriter, r *http.Request) {
return
} else {
tasks = tl
taskKeys = makeTaskKeys(ds, job.FirstTaskId, job.TaskCount)
taskKeys = makeTaskKeys(ds, job.Id, job.FirstTaskId, job.TaskCount)
}
default:
jobKey := ds.NewKey(JobEntity, "", id, nil)
Expand Down
9 changes: 1 addition & 8 deletions map.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,14 @@ func mapMonitorTask(c context.Context, ds appwrap.Datastore, pipeline MapReduceP
}
}

firstId, err := mkIds(ds, TaskEntity, len(job.WriterNames))
if err != nil {
jobFailed(c, ds, pipeline, jobKey, fmt.Errorf("failed to allocate ids for reduce tasks: %s", err.Error()), log)
return 200
}
taskKeys := makeTaskKeys(ds, firstId, len(job.WriterNames))
taskKeys := makeTaskKeys(ds, job.Id, 0, len(job.WriterNames))
tasks := make([]JobTask, 0, len(job.WriterNames))

for shard := range job.WriterNames {
if shards := storageNames[shard]; len(shards) > 0 {
url := fmt.Sprintf("%s/reduce?taskKey=%s;shard=%d;writer=%s",
job.UrlPrefix, taskKeys[len(tasks)].Encode(), shard, url.QueryEscape(job.WriterNames[shard]))

firstId++

shardJson, _ := json.Marshal(shards)
shardZ := &bytes.Buffer{}
w := zlib.NewWriter(shardZ)
Expand Down
34 changes: 1 addition & 33 deletions mapreduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package mapreduce
import (
"fmt"
"net/http"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -159,12 +158,7 @@ func Run(c context.Context, ds appwrap.Datastore, job MapReduceJob) (int64, erro
return 0, fmt.Errorf("creating job: %s", err)
}

firstId, err := mkIds(ds, TaskEntity, len(readerNames))
if err != nil {
return 0, fmt.Errorf("allocating keys: %s", err)
}

taskKeys := makeTaskKeys(ds, firstId, len(readerNames))
taskKeys := makeTaskKeys(ds, appwrap.KeyIntID(jobKey), 0, len(readerNames))
tasks := make([]JobTask, len(readerNames))

for i, readerName := range readerNames {
Expand Down Expand Up @@ -303,29 +297,3 @@ func tryAgainIfNonFatal(err error) error {
}
return nil
}

func mkIds(ds appwrap.Datastore, kind string, count int) (int64, error) {
incomplete := make([]*appwrap.DatastoreKey, count)
for i := range incomplete {
incomplete[i] = ds.NewKey(kind, "", 0, nil)
}

if completeKeys, err := ds.AllocateIDSet(incomplete); err != nil {
return 0, fmt.Errorf("reserving keys: %s", err)
} else {
ids := make([]int, len(completeKeys))
for i, k := range completeKeys {
ids[i] = int(appwrap.KeyIntID(k))
}

sort.Sort(sort.IntSlice(ids))

for i := 0; i < len(ids) - 1; i++ {
if ids[i] != ids[i+1]-1 {
return 0, fmt.Errorf("nonconsecutive keys allocated")
}
}

return int64(ids[0]), nil
}
}
21 changes: 14 additions & 7 deletions tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type JobInfo struct {
UpdatedAt time.Time
StartTime time.Time
TaskCount int `datastore:"TasksRunning,noindex"`
FirstTaskId int64 `datastore:",noindex"`
FirstTaskId int64 `datastore:",noindex"` // 0 here means to use task keys like "%d.%d" (Id, taskNum)
RetryCount int `datastore:",noindex"`
SeparateReduceItems bool `datastore:",noindex"`
OnCompleteUrl string `datastore:",noindex"`
Expand Down Expand Up @@ -181,7 +181,7 @@ func createTasks(ds appwrap.Datastore, jobKey *appwrap.DatastoreKey, taskKeys []
}

job.TaskCount = len(tasks)
job.FirstTaskId = firstId
job.FirstTaskId = 0 // use new style ids
job.Stage = newStage

_, err := ds.Put(jobKey, &job)
Expand Down Expand Up @@ -284,6 +284,7 @@ func jobStageComplete(ds appwrap.Datastore, jobKey *appwrap.DatastoreKey, taskKe

job.Stage = nextStage
job.UpdatedAt = time.Now()
job.Id = appwrap.KeyIntID(jobKey)

_, err := ds.Put(jobKey, &job)
stageChanged = (err == nil)
Expand Down Expand Up @@ -443,17 +444,23 @@ func RemoveJob(ds appwrap.Datastore, jobId int64) error {
return nil
}

func makeTaskKeys(ds appwrap.Datastore, firstId int64, count int) []*appwrap.DatastoreKey {
func makeTaskKeys(ds appwrap.Datastore, jobId int64, firstId int64, count int) []*appwrap.DatastoreKey {
taskKeys := make([]*appwrap.DatastoreKey, count)
for i := 0; i < count; i++ {
taskKeys[i] = ds.NewKey(TaskEntity, "", firstId+int64(i), nil)
if firstId == 0 {
for i := 0; i < count; i++ {
taskKeys[i] = ds.NewKey(TaskEntity, fmt.Sprintf("%d.task%d", jobId, i), 0, nil)
}
} else {
for i := 0; i < count; i++ {
taskKeys[i] = ds.NewKey(TaskEntity, "", firstId+int64(i), nil)
}
}

return taskKeys
}

func gatherTasks(ds appwrap.Datastore, job JobInfo) ([]JobTask, error) {
taskKeys := makeTaskKeys(ds, job.FirstTaskId, job.TaskCount)
taskKeys := makeTaskKeys(ds, job.Id, job.FirstTaskId, job.TaskCount)
tasks := make([]JobTask, len(taskKeys))

i := 0
Expand Down Expand Up @@ -557,7 +564,7 @@ func doWaitForStageCompletion(c context.Context, ds appwrap.Datastore, taskIntf
return JobInfo{}, err
} else {
job = j
taskKeys = makeTaskKeys(ds, job.FirstTaskId, job.TaskCount)
taskKeys = makeTaskKeys(ds, job.Id, job.FirstTaskId, job.TaskCount)
}

start := time.Now()
Expand Down

0 comments on commit 1a9e268

Please sign in to comment.