Skip to content

Commit

Permalink
qmanager: rename urgency to priority
Browse files Browse the repository at this point in the history
In a number of locations, urgency was used as a variable name instead of
priority to store the job priority.  Update variable names accordingly.
Update queue sorting to use maximum priority of 4294967295 instead of
maximum urgency of 31.

Fixes flux-framework#798
  • Loading branch information
chu11 authored and mergify-bot committed Jan 15, 2021
1 parent 7fe0cde commit 4469b5a
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 15 deletions.
21 changes: 12 additions & 9 deletions qmanager/modules/qmanager_callbacks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,13 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg,
queue_name = qn_attr? qn_attr : ctx->opts.get_opt ().get_default_queue ();
json_decref (o);
queue = ctx->queues.at (queue_name);
// Note that RFC27 defines 31 as the max urgency. Because our queue policy
// layer sorts the pending jobs in lexicographical order
// (<urgency, t_submit, ...> and lower the better, we adjust urgency.
// Note that RFC27 defines 4294967295 as the max priority. Because
// our queue policy layer sorts the pending jobs in
// lexicographical order (<priority, t_submit, ...>) and lower the
// better, we adjust priority.
running_job = std::make_shared<job_t> (job_state_kind_t::RUNNING,
id, uid, 31 - prio, ts, R);
id, uid, 4294967295 - prio,
ts, R);

if ( (rc = queue->reconstruct (static_cast<void *> (h),
running_job, R_out)) < 0) {
Expand Down Expand Up @@ -199,7 +201,7 @@ void qmanager_cb_t::jobmanager_alloc_cb (flux_t *h, const flux_msg_t *msg,
if (flux_msg_unpack (msg,
"{s:I s:i s:i s:f s:o}",
"id", &job->id,
"priority", &job->urgency,
"priority", &job->priority,
"userid", &job->userid,
"t_submit", &job->t_submit,
"jobspec", &jobspec) < 0) {
Expand All @@ -216,10 +218,11 @@ void qmanager_cb_t::jobmanager_alloc_cb (flux_t *h, const flux_msg_t *msg,
queue_name = jobspec_obj.attributes.system.queue;
job->jobspec = jobspec_str;
free (jobspec_str);
// Note that RFC27 defines 31 as the max urgency. Because our queue policy
// layer sorts the pending jobs in lexicographical order
// (<urgency, t_submit, ...> and lower the better, we adjust the urgency.
job->urgency = 31 - job->urgency;
// Note that RFC27 defines 4294967295 as the max priority. Because
// our queue policy layer sorts the pending jobs in
// lexicographical order (<priority, t_submit, ...> and lower the
// better, we adjust the priority.
job->priority = 4294967295 - job->priority;
if (ctx->queues.find (queue_name) == ctx->queues.end ()) {
if (schedutil_alloc_respond_deny (ctx->schedutil, msg, NULL) < 0)
flux_log_error (h, "%s: schedutil_alloc_respond_deny",
Expand Down
6 changes: 3 additions & 3 deletions qmanager/policies/base/queue_policy_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ class job_t {
~job_t () { flux_msg_destroy (msg); }
job_t () = default;
job_t (job_state_kind_t s, flux_jobid_t jid,
uint32_t uid, unsigned int u, double t_s, const std::string &R)
uint32_t uid, unsigned int p, double t_s, const std::string &R)
: state (s), id (jid), userid (uid),
urgency (u), t_submit (t_s), schedule (R) { }
priority (p), t_submit (t_s), schedule (R) { }
job_t (job_t &&j) = default;
job_t (const job_t &j) = default;
job_t& operator= (job_t &&s) = default;
Expand All @@ -99,7 +99,7 @@ class job_t {
job_state_kind_t state = job_state_kind_t::INIT;
flux_jobid_t id = 0;
uint32_t userid = 0;
unsigned int urgency = 0;
unsigned int priority = 0;
double t_submit = 0.0f;;
std::string jobspec = "";
std::string note = "";
Expand Down
6 changes: 3 additions & 3 deletions qmanager/policies/base/queue_policy_base_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ int queue_policy_base_impl_t::insert (std::shared_ptr<job_t> job)
job->state = job_state_kind_t::PENDING;
job->t_stamps.pending_ts = m_pq_cnt++;
m_pending.insert (std::pair<std::vector<double>, flux_jobid_t> (
{static_cast<double> (job->urgency),
{static_cast<double> (job->priority),
static_cast<double> (job->t_submit),
static_cast<double> (job->t_stamps.pending_ts)}, job->id));
m_jobs.insert (std::pair<flux_jobid_t, std::shared_ptr<job_t>> (job->id,
Expand All @@ -276,7 +276,7 @@ int queue_policy_base_impl_t::remove (flux_jobid_t id)
job = m_jobs[id];
switch (job->state) {
case job_state_kind_t::PENDING:
m_pending.erase ({static_cast<double> (job->urgency),
m_pending.erase ({static_cast<double> (job->priority),
static_cast<double> (job->t_submit),
static_cast<double> (job->t_stamps.pending_ts)});
job->state = job_state_kind_t::CANCELED;
Expand Down Expand Up @@ -421,7 +421,7 @@ std::shared_ptr<job_t> queue_policy_base_impl_t::pending_pop ()
if (m_jobs.find (id) == m_jobs.end ())
return nullptr;
job = m_jobs[id];
m_pending.erase ({static_cast<double> (job->urgency),
m_pending.erase ({static_cast<double> (job->priority),
static_cast<double> (job->t_submit),
static_cast<double> (job->t_stamps.pending_ts)});
m_jobs.erase (id);
Expand Down

0 comments on commit 4469b5a

Please sign in to comment.