Skip to content

Commit

Permalink
queue-policy: APIs to reconstruct the resource state
Browse files Browse the repository at this point in the history
Problem: we have an API to reconstruct the state
of running-job queue state for a job, but we don’t have
an interface to reconstruct the state of resource
data store for the job.

Add reconstruct_resource as a pure virtual method
to the base queue policy class (queue_policy_base_t).
Because the base queue policy classes (queue_policy_base_t
and queue_policy_base_impl_t) must manage various job
queues while being agnostic to the resources of jobs, this
pure virtual method must be implemented by the
derived classes, the queue policy layer that is aware
of the resources. This virtual interface allows
queue_policy_base_t to make a up-call to the methods
overridden by all of the derived classes. It then makes
this scheme future proof with respect to future
queueing-policy implementation classes.
  • Loading branch information
dongahn committed Jun 11, 2020
1 parent 924341d commit 8b31281
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 5 deletions.
39 changes: 36 additions & 3 deletions qmanager/policies/base/queue_policy_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,28 @@ class queue_policy_base_t : public detail::queue_policy_base_impl_t
*/
virtual int run_sched_loop (void *h, bool use_alloced_queue) = 0;

/*! Resource reconstruct interface that must be implemented by
* derived classes.
*
* \param h Opaque handle. How it is used is an implementation
* detail. However, when it is used within a Flux's
* service module such as qmanager, it is expected
* to be a pointer to a flux_t object.
* \param job shared pointer to a running job whose resource
* state is being requested to be reconstructed.
* job->schedule.R is the requested R.
* \param ret_R Replied R (must be equal to job->schedule.R
* if succeeded).
* \return 0 on success; -1 on error.
* EINVAL: invalid argument.
* ENOMEM: out of memory.
* ERANGE: out of range request.
* EPROTO: job->schedule.R doesn't comply.
* ENOTSUP: job->schedule.R has unsupported feature.
*/
virtual int reconstruct_resource (void *h, std::shared_ptr<job_t> job,
std::string &ret_R) = 0;

/*! Set queue parameters. Can be called multiple times.
*
* \param params comma-delimited key-value pairs string
Expand Down Expand Up @@ -241,12 +263,23 @@ class queue_policy_base_t : public detail::queue_policy_base_impl_t
/*! Append a job into the internal running-job queue to reconstruct
* the queue state.
*
* \param running_job
* a shared pointer pointing to a job_t object.
* \param h Opaque handle. How it is used is an implementation
* detail. However, when it is used within a Flux's
* service module such as qmanager, it is expected
* to be a pointer to a flux_t object.
* \param job shared pointer to a running job whose resource
* state is being requested to be reconstructed.
* job->schedule.R is the requested R.
* \param ret_R replied R (must be equal to job->schedule.R
* if succeeded).
* \return 0 on success; -1 on error.
* EINVAL: invalid argument.
* ENOMEM: out of memory.
* ERANGE: out of range request.
* EPROTO: job->schedule.R doesn't comply.
* ENOTSUP: job->schedule.R has unsupported feature.
*/
int reconstruct (std::shared_ptr<job_t> running_job);
int reconstruct (void *h, std::shared_ptr<job_t> job, std::string &R_out);

/*! Pop the first job from the pending job queue. The popped
* job is completely graduated from the queue policy layer.
Expand Down
8 changes: 6 additions & 2 deletions qmanager/policies/base/queue_policy_base_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,13 @@ const std::shared_ptr<job_t> queue_policy_base_t::lookup (flux_jobid_t id)
return detail::queue_policy_base_impl_t::lookup (id);
}

int queue_policy_base_t::reconstruct (std::shared_ptr<job_t> running_job)
int queue_policy_base_t::reconstruct (void *h, std::shared_ptr<job_t> job,
std::string &R_out)
{
return detail::queue_policy_base_impl_t::reconstruct_queue (running_job);
int rc = 0;
if ( (rc = reconstruct_resource (h, job, R_out)) < 0)
return rc;
return detail::queue_policy_base_impl_t::reconstruct_queue (job);
}

std::shared_ptr<job_t> queue_policy_base_t::pending_pop ()
Expand Down
2 changes: 2 additions & 0 deletions qmanager/policies/queue_policy_bf_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class queue_policy_bf_base_t : public queue_policy_base_t
public:
virtual ~queue_policy_bf_base_t ();
virtual int run_sched_loop (void *h, bool use_alloced_queue);
virtual int reconstruct_resource (void *h, std::shared_ptr<job_t> job,
std::string &R_out);
virtual int apply_params ();

protected:
Expand Down
9 changes: 9 additions & 0 deletions qmanager/policies/queue_policy_bf_base_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ int queue_policy_bf_base_t<reapi_type>::run_sched_loop (void *h,
return rc;
}

template<class reapi_type>
int queue_policy_bf_base_t<reapi_type>::reconstruct_resource (
void *h, std::shared_ptr< job_t> job, std::string &R_out)
{
return reapi_type::update_allocate (h, job->id, job->schedule.R,
job->schedule.at,
job->schedule.ov, R_out);
}

} // namespace Flux::queue_manager::detail
} // namespace Flux::queue_manager
} // namespace Flux
Expand Down
2 changes: 2 additions & 0 deletions qmanager/policies/queue_policy_fcfs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class queue_policy_fcfs_t : public queue_policy_base_t
public:
virtual ~queue_policy_fcfs_t ();
virtual int run_sched_loop (void *h, bool use_alloced_queue);
virtual int reconstruct_resource (void *h, std::shared_ptr<job_t> job,
std::string &R_out);
virtual int apply_params ();

private:
Expand Down
10 changes: 10 additions & 0 deletions qmanager/policies/queue_policy_fcfs_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ int queue_policy_fcfs_t<reapi_type>::run_sched_loop (void *h,
return rc;
}

template<class reapi_type>
int queue_policy_fcfs_t<reapi_type>::reconstruct_resource (
void *h, std::shared_ptr<job_t> job, std::string &R_out)
{
return reapi_type::update_allocate (h, job->id, job->schedule.R,
job->schedule.at,
job->schedule.ov, R_out);
}


} // namespace Flux::queue_manager::detail
} // namespace Flux::queue_manager
} // namespace Flux
Expand Down

0 comments on commit 8b31281

Please sign in to comment.