diff --git a/console.go b/console.go index f460cf1..f034c75 100644 --- a/console.go +++ b/console.go @@ -157,7 +157,7 @@ func ConsoleHandler(w http.ResponseWriter, r *http.Request) { return } else { tasks = tl - taskKeys = makeTaskKeys(ds, job.FirstTaskId, job.TaskCount) + taskKeys = makeTaskKeys(ds, job.Id, job.FirstTaskId, job.TaskCount) } default: jobKey := ds.NewKey(JobEntity, "", id, nil) diff --git a/map.go b/map.go index 2f7de5b..3c22e16 100644 --- a/map.go +++ b/map.go @@ -67,12 +67,7 @@ func mapMonitorTask(c context.Context, ds appwrap.Datastore, pipeline MapReduceP } } - firstId, err := mkIds(ds, TaskEntity, len(job.WriterNames)) - if err != nil { - jobFailed(c, ds, pipeline, jobKey, fmt.Errorf("failed to allocate ids for reduce tasks: %s", err.Error()), log) - return 200 - } - taskKeys := makeTaskKeys(ds, firstId, len(job.WriterNames)) + taskKeys := makeTaskKeys(ds, job.Id, 0, len(job.WriterNames)) tasks := make([]JobTask, 0, len(job.WriterNames)) for shard := range job.WriterNames { @@ -80,8 +75,6 @@ func mapMonitorTask(c context.Context, ds appwrap.Datastore, pipeline MapReduceP url := fmt.Sprintf("%s/reduce?taskKey=%s;shard=%d;writer=%s", job.UrlPrefix, taskKeys[len(tasks)].Encode(), shard, url.QueryEscape(job.WriterNames[shard])) - firstId++ - shardJson, _ := json.Marshal(shards) shardZ := &bytes.Buffer{} w := zlib.NewWriter(shardZ) diff --git a/mapreduce.go b/mapreduce.go index 016f213..85cd060 100644 --- a/mapreduce.go +++ b/mapreduce.go @@ -18,7 +18,6 @@ package mapreduce import ( "fmt" "net/http" - "sort" "strings" "time" @@ -159,12 +158,7 @@ func Run(c context.Context, ds appwrap.Datastore, job MapReduceJob) (int64, erro return 0, fmt.Errorf("creating job: %s", err) } - firstId, err := mkIds(ds, TaskEntity, len(readerNames)) - if err != nil { - return 0, fmt.Errorf("allocating keys: %s", err) - } - - taskKeys := makeTaskKeys(ds, firstId, len(readerNames)) + taskKeys := makeTaskKeys(ds, appwrap.KeyIntID(jobKey), 0, len(readerNames)) tasks := make([]JobTask, len(readerNames)) for i, readerName := range readerNames { @@ -303,29 +297,3 @@ func tryAgainIfNonFatal(err error) error { } return nil } - -func mkIds(ds appwrap.Datastore, kind string, count int) (int64, error) { - incomplete := make([]*appwrap.DatastoreKey, count) - for i := range incomplete { - incomplete[i] = ds.NewKey(kind, "", 0, nil) - } - - if completeKeys, err := ds.AllocateIDSet(incomplete); err != nil { - return 0, fmt.Errorf("reserving keys: %s", err) - } else { - ids := make([]int, len(completeKeys)) - for i, k := range completeKeys { - ids[i] = int(appwrap.KeyIntID(k)) - } - - sort.Sort(sort.IntSlice(ids)) - - for i := 0; i < len(ids) - 1; i++ { - if ids[i] != ids[i+1]-1 { - return 0, fmt.Errorf("nonconsecutive keys allocated") - } - } - - return int64(ids[0]), nil - } -} diff --git a/tasks.go b/tasks.go index e92eeb2..ff6bf11 100644 --- a/tasks.go +++ b/tasks.go @@ -70,7 +70,7 @@ type JobInfo struct { UpdatedAt time.Time StartTime time.Time TaskCount int `datastore:"TasksRunning,noindex"` - FirstTaskId int64 `datastore:",noindex"` + FirstTaskId int64 `datastore:",noindex"` // 0 here means to use task keys like "%d.%d" (Id, taskNum) RetryCount int `datastore:",noindex"` SeparateReduceItems bool `datastore:",noindex"` OnCompleteUrl string `datastore:",noindex"` @@ -181,7 +181,7 @@ func createTasks(ds appwrap.Datastore, jobKey *appwrap.DatastoreKey, taskKeys [] } job.TaskCount = len(tasks) - job.FirstTaskId = firstId + job.FirstTaskId = 0 // use new style ids job.Stage = newStage _, err := ds.Put(jobKey, &job) @@ -284,6 +284,7 @@ func jobStageComplete(ds appwrap.Datastore, jobKey *appwrap.DatastoreKey, taskKe job.Stage = nextStage job.UpdatedAt = time.Now() + job.Id = appwrap.KeyIntID(jobKey) _, err := ds.Put(jobKey, &job) stageChanged = (err == nil) @@ -443,17 +444,23 @@ func RemoveJob(ds appwrap.Datastore, jobId int64) error { return nil } -func makeTaskKeys(ds appwrap.Datastore, firstId int64, count int) []*appwrap.DatastoreKey { +func makeTaskKeys(ds appwrap.Datastore, jobId int64, firstId int64, count int) []*appwrap.DatastoreKey { taskKeys := make([]*appwrap.DatastoreKey, count) - for i := 0; i < count; i++ { - taskKeys[i] = ds.NewKey(TaskEntity, "", firstId+int64(i), nil) + if firstId == 0 { + for i := 0; i < count; i++ { + taskKeys[i] = ds.NewKey(TaskEntity, fmt.Sprintf("%d.task%d", jobId, i), 0, nil) + } + } else { + for i := 0; i < count; i++ { + taskKeys[i] = ds.NewKey(TaskEntity, "", firstId+int64(i), nil) + } } return taskKeys } func gatherTasks(ds appwrap.Datastore, job JobInfo) ([]JobTask, error) { - taskKeys := makeTaskKeys(ds, job.FirstTaskId, job.TaskCount) + taskKeys := makeTaskKeys(ds, job.Id, job.FirstTaskId, job.TaskCount) tasks := make([]JobTask, len(taskKeys)) i := 0 @@ -557,7 +564,7 @@ func doWaitForStageCompletion(c context.Context, ds appwrap.Datastore, taskIntf return JobInfo{}, err } else { job = j - taskKeys = makeTaskKeys(ds, job.FirstTaskId, job.TaskCount) + taskKeys = makeTaskKeys(ds, job.Id, job.FirstTaskId, job.TaskCount) } start := time.Now()