Skip to content

Commit

Permalink
Merge pull request #3447 from googs1025/refector_preempt
Browse files Browse the repository at this point in the history
fix: log when outputting preempt error
  • Loading branch information
volcano-sh-bot authored May 23, 2024
2 parents 6b32f7b + e74d9c6 commit 0dc370d
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 16 deletions.
12 changes: 6 additions & 6 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,16 @@ func (alloc *Action) pickUpQueuesAndJobs(queues *util.PriorityQueue, jobsMap map
ssn := alloc.session
for _, job := range ssn.Jobs {
// If not config enqueue action, change Pending pg into Inqueue statue to avoid blocking job scheduling.
if conf.EnabledActionMap["enqueue"] {
if job.IsPending() {
if job.IsPending() {
if conf.EnabledActionMap["enqueue"] {
klog.V(4).Infof("Job <%s/%s> Queue <%s> skip allocate, reason: job status is pending.",
job.Namespace, job.Name, job.Queue)
continue
} else {
klog.V(4).Infof("Job <%s/%s> Queue <%s> status update from pending to inqueue, reason: no enqueue action is configured.",
job.Namespace, job.Name, job.Queue)
job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
}
} else if job.IsPending() {
klog.V(4).Infof("Job <%s/%s> Queue <%s> status update from pending to inqueue, reason: no enqueue action is configured.",
job.Namespace, job.Name, job.Queue)
job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
}

if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
Expand Down
17 changes: 11 additions & 6 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
preemptorJob := preemptors.Pop().(*api.JobInfo)

stmt := framework.NewStatement(ssn)
assigned := false
var assigned bool
var err error
for {
// If job is not request more resource, then stop preempting.
if !ssn.JobStarving(preemptorJob) {
Expand All @@ -112,7 +113,7 @@ func (pmpt *Action) Execute(ssn *framework.Session) {

preemptor := preemptorTasks[preemptorJob.UID].Pop().(*api.TaskInfo)

if preempted, _ := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
assigned, err = preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
// Ignore non running task.
if !api.PreemptableStatus(task.Status) {
return false
Expand All @@ -130,8 +131,9 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
}
// Preempt other jobs within queue
return job.Queue == preemptorJob.Queue && preemptor.Job != task.Job
}, ph); preempted {
assigned = true
}, ph)
if err != nil {
klog.V(3).Infof("Preemptor <%s/%s> failed to preempt Task , err: %s", preemptor.Namespace, preemptor.Name, err)
}
}

Expand Down Expand Up @@ -167,7 +169,7 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
preemptor := preemptorTasks[job.UID].Pop().(*api.TaskInfo)

stmt := framework.NewStatement(ssn)
assigned, _ := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
assigned, err := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
// Ignore non running task.
if !api.PreemptableStatus(task.Status) {
return false
Expand All @@ -179,6 +181,9 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
// Preempt tasks within job.
return preemptor.Job == task.Job
}, ph)
if err != nil {
klog.V(3).Infof("Preemptor <%s/%s> failed to preempt Task , err: %s", preemptor.Namespace, preemptor.Name, err)
}
stmt.Commit()

// If no preemption, next job.
Expand Down Expand Up @@ -228,7 +233,7 @@ func preempt(

job, found := ssn.Jobs[preemptor.Job]
if !found {
return false, fmt.Errorf("Job %s not found in SSN", preemptor.Job)
return false, fmt.Errorf("not found Job %s in Session", preemptor.Job)
}

currentQueue := ssn.Queues[job.Queue]
Expand Down
6 changes: 2 additions & 4 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ func (ra *Action) Execute(ssn *framework.Session) {
}

if queue, found := ssn.Queues[job.Queue]; !found {
klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
job.Queue, job.Namespace, job.Name)
klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>", job.Queue, job.Namespace, job.Name)
continue
} else if _, existed := queueMap[queue.UID]; !existed {
klog.V(4).Infof("Added Queue <%s> for Job <%s/%s>", queue.Name, job.Namespace, job.Name)
Expand Down Expand Up @@ -137,8 +136,7 @@ func (ra *Action) Execute(ssn *framework.Session) {
task.Namespace, task.Name, n.Name, statusSets.Message())
continue
}
klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
task.Namespace, task.Name, n.Name)
klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", task.Namespace, task.Name, n.Name)

var reclaimees []*api.TaskInfo
for _, task := range n.Tasks {
Expand Down

0 comments on commit 0dc370d

Please sign in to comment.