Skip to content

Commit

Permalink
DAOS-14908 vos: Reduce aggregation conflicts (daos-stack#14143)
Browse files Browse the repository at this point in the history
Rather than blocking vos_obj_discard entirely when
discard or aggregation are running, let's block it
only when there is an actual conflict on the object
being discarded.

* Fix log messages to specify EC or VOS aggregation
* Add metrics for conflicts

Signed-off-by: Jeff Olivier <[email protected]>
  • Loading branch information
jolivier23 authored May 20, 2024
1 parent 2d8ff91 commit c0d9109
Show file tree
Hide file tree
Showing 18 changed files with 350 additions and 201 deletions.
36 changes: 19 additions & 17 deletions src/container/srv_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,8 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req,
* see pool_iv_pre_sync(), the IV fetch from the following
* ds_cont_csummer_init() will fail anyway.
*/
D_DEBUG(DB_EPC, DF_CONT": skip aggregation "
"No pool map yet or stopping %d\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
D_DEBUG(DB_EPC, DF_CONT ": skip %s aggregation: No pool map yet or stopping %d\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), vos_agg ? "VOS" : "EC",
pool->sp_stopping);
return false;
}
Expand Down Expand Up @@ -207,15 +206,17 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req,
if (cont->sc_props.dcp_dedup_enabled ||
cont->sc_props.dcp_compress_enabled ||
cont->sc_props.dcp_encrypt_enabled) {
D_DEBUG(DB_EPC, DF_CONT": skip aggregation for "
"deduped/compressed/encrypted container\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid));
D_DEBUG(DB_EPC,
DF_CONT ": skip %s aggregation for deduped/compressed/encrypted"
" container\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), vos_agg ? "VOS" : "EC");
return false;
}

/* snapshot list isn't fetched yet */
if (cont->sc_aggregation_max == 0) {
D_DEBUG(DB_EPC, "No aggregation before snapshots fetched\n");
D_DEBUG(DB_EPC, "No %s aggregation before snapshots fetched\n",
vos_agg ? "VOS" : "EC");
/* fetch snapshot list */
if (dss_get_module_info()->dmi_tgt_id == 0)
ds_cont_tgt_snapshots_refresh(cont->sc_pool->spc_uuid,
Expand All @@ -238,8 +239,8 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req,

if (pool->sp_reclaim == DAOS_RECLAIM_LAZY && dss_xstream_is_busy() &&
sched_req_space_check(req) == SCHED_SPACE_PRESS_NONE) {
D_DEBUG(DB_EPC, "Pool reclaim strategy is lazy, service is "
"busy and no space pressure\n");
D_DEBUG(DB_EPC, "Pool reclaim strategy is lazy, service is busy and no space"
" pressure\n");
return false;
}

Expand Down Expand Up @@ -450,9 +451,9 @@ cont_aggregate_interval(struct ds_cont_child *cont, cont_aggregate_cb_t cb,
struct sched_request *req = cont2req(cont, param->ap_vos_agg);
int rc = 0;

D_DEBUG(DB_EPC, DF_CONT"[%d]: Aggregation ULT started\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
dmi->dmi_tgt_id);
D_DEBUG(DB_EPC, DF_CONT "[%d]: %s Aggregation ULT started\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), dmi->dmi_tgt_id,
param->ap_vos_agg ? "VOS" : "EC");

if (req == NULL)
goto out;
Expand All @@ -474,8 +475,9 @@ cont_aggregate_interval(struct ds_cont_child *cont, cont_aggregate_cb_t cb,
break; /* pool destroyed */
} else if (rc < 0) {
DL_CDEBUG(rc == -DER_BUSY, DB_EPC, DLOG_ERR, rc,
DF_CONT ": VOS aggregate failed",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid));
DF_CONT ": %s aggregate failed",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
param->ap_vos_agg ? "VOS" : "EC");
} else if (sched_req_space_check(req) != SCHED_SPACE_PRESS_NONE) {
/* Don't sleep too long when there is space pressure */
msecs = 2ULL * 100;
Expand All @@ -487,9 +489,9 @@ cont_aggregate_interval(struct ds_cont_child *cont, cont_aggregate_cb_t cb,
sched_req_sleep(req, msecs);
}
out:
D_DEBUG(DB_EPC, DF_CONT"[%d]: Aggregation ULT stopped\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
dmi->dmi_tgt_id);
D_DEBUG(DB_EPC, DF_CONT "[%d]: %s Aggregation ULT stopped\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), dmi->dmi_tgt_id,
param->ap_vos_agg ? "VOS" : "EC");
}

static int
Expand Down
26 changes: 14 additions & 12 deletions src/include/daos_srv/vos_types.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2015-2023 Intel Corporation.
* (C) Copyright 2015-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -353,29 +353,31 @@ D_CASSERT((VOS_USE_TIMESTAMPS & (VOS_GET_MAX | VOS_GET_MIN | VOS_GET_DKEY |

enum {
/** The absence of any flags means iterate all unsorted extents */
VOS_IT_RECX_ALL = 0,
VOS_IT_RECX_ALL = 0,
/** Include visible extents in sorted iteration */
VOS_IT_RECX_VISIBLE = (1 << 0),
VOS_IT_RECX_VISIBLE = (1 << 0),
/** Include covered extents, implies VOS_IT_RECX_VISIBLE */
VOS_IT_RECX_COVERED = (1 << 1) | VOS_IT_RECX_VISIBLE,
VOS_IT_RECX_COVERED = (1 << 1) | VOS_IT_RECX_VISIBLE,
/** Include hole extents in sorted iteration
* Only applicable if VOS_IT_RECX_COVERED is not set
*/
VOS_IT_RECX_SKIP_HOLES = (1 << 2),
VOS_IT_RECX_SKIP_HOLES = (1 << 2),
/** When sorted iteration is enabled, iterate in reverse */
VOS_IT_RECX_REVERSE = (1 << 3),
VOS_IT_RECX_REVERSE = (1 << 3),
/** The iterator is for purge operation */
VOS_IT_FOR_PURGE = (1 << 4),
VOS_IT_FOR_PURGE = (1 << 4),
/** The iterator is for data migration scan */
VOS_IT_FOR_MIGRATION = (1 << 5),
VOS_IT_FOR_MIGRATION = (1 << 5),
/** Iterate only show punched records in interval */
VOS_IT_PUNCHED = (1 << 6),
VOS_IT_PUNCHED = (1 << 6),
/** Cleanup stale DTX entry. */
VOS_IT_FOR_DISCARD = (1 << 7),
VOS_IT_FOR_DISCARD = (1 << 7),
/** Entry is not committed */
VOS_IT_UNCOMMITTED = (1 << 8),
VOS_IT_UNCOMMITTED = (1 << 8),
/** The iterator is for an aggregation operation (EC or VOS) */
VOS_IT_FOR_AGG = (1 << 9),
/** Mask for all flags */
VOS_IT_MASK = (1 << 9) - 1,
VOS_IT_MASK = (1 << 10) - 1,
};

typedef struct {
Expand Down
2 changes: 2 additions & 0 deletions src/object/obj_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ struct obj_pool_metrics {
struct d_tm_node_t *opm_update_ec_full;
/** Total number of EC partial update operations (type = counter) */
struct d_tm_node_t *opm_update_ec_partial;
/** Total number of EC agg conflicts with VOS aggregation or discard */
struct d_tm_node_t *opm_ec_agg_blocked;
};

void
Expand Down
9 changes: 9 additions & 0 deletions src/object/obj_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,15 @@ obj_metrics_alloc_internal(const char *path, int tgt_id, bool server)
if (rc)
D_WARN("Failed to create EC partial update counter: " DF_RC "\n", DP_RC(rc));

/** Total number of times EC aggregation conflicts with discard or VOS
* aggregation
*/
rc = d_tm_add_metric(&metrics->opm_ec_agg_blocked, D_TM_COUNTER,
"total number of EC agg pauses due to VOS discard or agg", NULL,
"%s/EC_agg/blocked%s", path, tgt_path);
if (rc)
D_WARN("Failed to create EC agg blocked counter: " DF_RC "\n", DP_RC(rc));

return metrics;
}

Expand Down
28 changes: 20 additions & 8 deletions src/object/srv_ec_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -2608,10 +2608,12 @@ static int
cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
uint32_t flags, struct agg_param *agg_param)
{
struct obj_pool_metrics *opm;
struct ec_agg_param *ec_agg_param = agg_param->ap_data;
vos_iter_param_t iter_param = { 0 };
struct vos_iter_anchors anchors = { 0 };
int rc = 0;
int blocks = 0;

/*
* Avoid calling into vos_aggregate() when aborting aggregation
Expand Down Expand Up @@ -2645,24 +2647,21 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
goto update_hae;
}

rc = vos_aggregate_enter(cont->sc_hdl, epr);
if (rc)
goto update_hae;

iter_param.ip_hdl = cont->sc_hdl;
iter_param.ip_epr.epr_lo = epr->epr_lo;
iter_param.ip_epr.epr_hi = epr->epr_hi;
iter_param.ip_epc_expr = VOS_IT_EPC_RR;
iter_param.ip_flags = VOS_IT_RECX_VISIBLE;
iter_param.ip_flags = VOS_IT_RECX_VISIBLE | VOS_IT_FOR_AGG;
iter_param.ip_recx.rx_idx = 0ULL;
iter_param.ip_recx.rx_nr = ~PARITY_INDICATOR;
iter_param.ip_filter_cb = agg_filter;
iter_param.ip_filter_arg = ec_agg_param;

agg_reset_entry(&ec_agg_param->ap_agg_entry, NULL, NULL);

rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors,
agg_iterate_pre_cb, agg_iterate_post_cb, ec_agg_param, NULL);
retry:
rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors, agg_iterate_pre_cb,
agg_iterate_post_cb, ec_agg_param, NULL);

/* Post_cb may not being executed in some cases */
agg_clear_extents(&ec_agg_param->ap_agg_entry);
Expand All @@ -2681,7 +2680,20 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
sched_req_sleep(cont->sc_ec_agg_req, 5 * 1000);
}

vos_aggregate_exit(cont->sc_hdl);
if (rc == -DER_BUSY) {
/** Hit an object conflict VOS aggregation or discard. Rather than exiting, let's
* yield and try again.
*/
opm = cont->sc_pool->spc_metrics[DAOS_OBJ_MODULE];
d_tm_inc_counter(opm->opm_ec_agg_blocked, 1);
blocks++;
/** Warn once if it goes over 20 times */
D_CDEBUG(blocks == 20, DLOG_WARN, DB_EPC,
"EC agg hit conflict with VOS agg or discard (nr=%d), retrying...\n",
blocks);
ec_aggregate_yield(ec_agg_param);
goto retry;
}

update_hae:
if (rc == 0) {
Expand Down
4 changes: 2 additions & 2 deletions src/vos/tests/vts_aggregate.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2019-2023 Intel Corporation.
* (C) Copyright 2019-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -212,7 +212,7 @@ lookup_object(struct io_test_args *arg, daos_unit_oid_t oid)
vos_hdl2cont(arg->ctx.tc_co_hdl), oid, &epr, 0,
VOS_OBJ_VISIBLE, DAOS_INTENT_DEFAULT, &obj, 0);
if (rc == 0)
vos_obj_release(vos_obj_cache_current(true), obj, false);
vos_obj_release(vos_obj_cache_current(true), obj, 0, false);
return rc;
}

Expand Down
65 changes: 49 additions & 16 deletions src/vos/tests/vts_io.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2016-2023 Intel Corporation.
* (C) Copyright 2016-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -996,11 +996,18 @@ io_obj_cache_test(void **state)
&objs[0], 0);
assert_rc_equal(rc, 0);

rc = vos_obj_discard_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &obj1);
/** Hold object for discard */
rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_DISCARD,
DAOS_INTENT_DISCARD, &obj1, 0);
assert_rc_equal(rc, 0);
/** Should be prevented because object already held for discard */
rc = vos_obj_discard_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &obj2);
assert_rc_equal(rc, -DER_UPDATE_AGAIN);
/** Second discard should fail */
rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_DISCARD,
DAOS_INTENT_DISCARD, &obj2, 0);
assert_rc_equal(rc, -DER_BUSY);
/** Should prevent simultaneous aggregation */
rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_AGGREGATE,
DAOS_INTENT_PURGE, &obj2, 0);
assert_rc_equal(rc, -DER_BUSY);
/** Should prevent simultaneous hold for create as well */
rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0,
VOS_OBJ_CREATE | VOS_OBJ_VISIBLE, DAOS_INTENT_DEFAULT,
Expand All @@ -1010,17 +1017,43 @@ io_obj_cache_test(void **state)
/** Need to be able to hold for read though or iteration won't work */
rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0,
VOS_OBJ_VISIBLE, DAOS_INTENT_DEFAULT, &obj2, 0);
vos_obj_discard_release(occ, obj2);
vos_obj_discard_release(occ, obj1);
vos_obj_release(occ, obj2, 0, false);
vos_obj_release(occ, obj1, VOS_OBJ_DISCARD, false);

/** Hold object for aggregation */
rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_AGGREGATE,
DAOS_INTENT_PURGE, &obj1, 0);
assert_rc_equal(rc, 0);
/** Discard should fail */
rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_DISCARD,
DAOS_INTENT_DISCARD, &obj2, 0);
assert_rc_equal(rc, -DER_BUSY);
/** Second aggregation should fail */
rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_AGGREGATE,
DAOS_INTENT_PURGE, &obj2, 0);
assert_rc_equal(rc, -DER_BUSY);
/** Simultaneous create should work */
rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0,
VOS_OBJ_CREATE | VOS_OBJ_VISIBLE, DAOS_INTENT_DEFAULT, &obj2, 0);
assert_rc_equal(rc, 0);
vos_obj_release(occ, obj2, 0, false);

/** Need to be able to hold for read though or iteration won't work */
rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_VISIBLE,
DAOS_INTENT_DEFAULT, &obj2, 0);
vos_obj_release(occ, obj2, 0, false);
vos_obj_release(occ, obj1, VOS_OBJ_AGGREGATE, false);

/** Now that other one is done, this should work */
rc = vos_obj_discard_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &obj2);
rc = vos_obj_hold(occ, vos_hdl2cont(ctx->tc_co_hdl), oids[0], &epr, 0, VOS_OBJ_DISCARD,
DAOS_INTENT_DISCARD, &obj2, 0);
assert_rc_equal(rc, 0);
vos_obj_discard_release(occ, obj2);
vos_obj_release(occ, obj2, VOS_OBJ_DISCARD, false);

rc = umem_tx_end(ummg, 0);
assert_rc_equal(rc, 0);

vos_obj_release(occ, objs[0], false);
vos_obj_release(occ, objs[0], 0, false);

rc = umem_tx_begin(umml, NULL);
assert_rc_equal(rc, 0);
Expand All @@ -1029,7 +1062,7 @@ io_obj_cache_test(void **state)
VOS_OBJ_CREATE | VOS_OBJ_VISIBLE, DAOS_INTENT_DEFAULT,
&objs[0], 0);
assert_rc_equal(rc, 0);
vos_obj_release(occ, objs[0], false);
vos_obj_release(occ, objs[0], 0, false);

rc = umem_tx_end(umml, 0);
assert_rc_equal(rc, 0);
Expand All @@ -1047,20 +1080,20 @@ io_obj_cache_test(void **state)
VOS_OBJ_VISIBLE, DAOS_INTENT_DEFAULT, &objs[16], 0);
assert_rc_equal(rc, 0);

vos_obj_release(occ, objs[16], false);
vos_obj_release(occ, objs[16], 0, false);

for (i = 0; i < 5; i++)
vos_obj_release(occ, objs[i], false);
vos_obj_release(occ, objs[i], 0, false);
for (i = 10; i < 15; i++)
vos_obj_release(occ, objs[i], false);
vos_obj_release(occ, objs[i], 0, false);

rc = hold_objects(objs, occ, &l_coh, &oids[1], 15, 20, true, 0);
assert_int_equal(rc, 0);

for (i = 5; i < 10; i++)
vos_obj_release(occ, objs[i], false);
vos_obj_release(occ, objs[i], 0, false);
for (i = 15; i < 20; i++)
vos_obj_release(occ, objs[i], false);
vos_obj_release(occ, objs[i], 0, false);

rc = vos_cont_close(l_coh);
assert_rc_equal(rc, 0);
Expand Down
Loading

0 comments on commit c0d9109

Please sign in to comment.