Skip to content

Commit

Permalink
Merge pull request ceph#18954 from adamemerson/wip-hole-in-the-bucket…
Browse files Browse the repository at this point in the history
…-dear-liza

rgw: Add try_refresh_bucket_info function
rgw: Add retry_raced_bucket_write
rgw: Handle stale bucket info in RGWPutMetadataBucket
rgw: Handle stale bucket info in RGWSetBucketVersioning
rgw: Handle stale bucket info in RGWSetBucketWebsite
rgw: Handle stale bucket info in RGWDeleteBucketWebsite
rgw: Handle stale bucket info in RGWPutBucketPolicy
rgw: Handle stale bucket info in RGWDeleteBucketPolicy
rgw: Expire entries in bucket info cache

Reviewed-by: Yehuda Sadeh <[email protected]>
  • Loading branch information
adamemerson committed Nov 29, 2017
2 parents 61123c4 + 4489cb5 commit d07588a
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 88 deletions.
22 changes: 21 additions & 1 deletion src/common/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4300,7 +4300,7 @@ std::vector<Option> get_global_options() {
Option("debug_deliberately_leak_memory", Option::TYPE_BOOL, Option::LEVEL_DEV)
.set_default(false)
.set_description(""),

Option("debug_asserts_on_shutdown", Option::TYPE_BOOL,Option::LEVEL_DEV)
.set_default(false)
.set_description("Enable certain asserts to check for refcounting bugs on shutdown; see http://tracker.ceph.com/issues/21738"),
Expand Down Expand Up @@ -5540,6 +5540,26 @@ std::vector<Option> get_rgw_options() {
Option("rgw_reshard_thread_interval", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(10_min)
.set_description(""),

Option("rgw_bucket_info_cache_expiry_interval", Option::TYPE_UINT,
Option::LEVEL_ADVANCED)
.set_default(15_min)
.set_description("Number of seconds before entries in the bucket info "
"cache are assumed stale and re-fetched. Zero is never.")
.add_tag("performance")
.add_service("rgw")
.set_long_description("The Rados Gateway stores metadata about buckets in "
"an internal cache. This should be kept consistent "
"by the OSD's relaying notify events between "
"multiple watching RGW processes. In the event "
"that this notification protocol fails, bounding "
"the length of time that any data in the cache will "
"be assumed valid will ensure that any RGW instance "
"that falls out of sync will eventually recover. "
"This seems to be an issue mostly for large numbers "
"of RGW instances under heavy use. If you would like "
"to turn off cache expiry, set this value to zero."),

});
}

Expand Down
199 changes: 123 additions & 76 deletions src/rgw/rgw_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,37 @@ void rgw_bucket_object_pre_exec(struct req_state *s)
dump_bucket_from_state(s);
}

// So! Now and then when we try to update bucket information, the
// bucket has changed during the course of the operation. (Or we have
// a cache consistency problem that Watch/Notify isn't ruling out
// completely.)
//
// When this happens, we need to update the bucket info and try
// again. We have, however, to try the right *part* again. We can't
// simply re-send, since that will obliterate the previous update.
//
// Thus, callers of this function should include everything that
// merges information to be changed into the bucket information as
// well as the call to set it.
//
// The called function must return an integer, negative on error. In
// general, they should just return op_ret.
namespace {
template<typename F>
int retry_raced_bucket_write(RGWRados* g, req_state* s, const F& f) {
auto r = f();
for (auto i = 0u; i < 15u && r == -ECANCELED; ++i) {
r = g->try_refresh_bucket_info(s->bucket_info, nullptr,
&s->bucket_attrs);
if (r >= 0) {
r = f();
}
}
return r;
}
}


int RGWGetObj::verify_permission()
{
obj = rgw_obj(s->bucket, s->object);
Expand Down Expand Up @@ -2063,17 +2094,20 @@ void RGWSetBucketVersioning::execute()
}
}

if (versioning_status == VersioningEnabled) {
s->bucket_info.flags |= BUCKET_VERSIONED;
s->bucket_info.flags &= ~BUCKET_VERSIONS_SUSPENDED;
} else if (versioning_status == VersioningSuspended) {
s->bucket_info.flags |= (BUCKET_VERSIONED | BUCKET_VERSIONS_SUSPENDED);
} else {
return;
}
op_ret = retry_raced_bucket_write(store, s, [this] {
if (versioning_status == VersioningEnabled) {
s->bucket_info.flags |= BUCKET_VERSIONED;
s->bucket_info.flags &= ~BUCKET_VERSIONS_SUSPENDED;
} else if (versioning_status == VersioningSuspended) {
s->bucket_info.flags |= (BUCKET_VERSIONED | BUCKET_VERSIONS_SUSPENDED);
} else {
return op_ret;
}
op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
&s->bucket_attrs);
return op_ret;
});

op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(),
&s->bucket_attrs);
if (op_ret < 0) {
ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name
<< " returned err=" << op_ret << dendl;
Expand Down Expand Up @@ -2123,10 +2157,14 @@ void RGWSetBucketWebsite::execute()
}
}

s->bucket_info.has_website = true;
s->bucket_info.website_conf = website_conf;
op_ret = retry_raced_bucket_write(store, s, [this] {
s->bucket_info.has_website = true;
s->bucket_info.website_conf = website_conf;
op_ret = store->put_bucket_instance_info(s->bucket_info, false,
real_time(), &s->bucket_attrs);
return op_ret;
});

op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
if (op_ret < 0) {
ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
return;
Expand All @@ -2145,10 +2183,13 @@ void RGWDeleteBucketWebsite::pre_exec()

void RGWDeleteBucketWebsite::execute()
{
s->bucket_info.has_website = false;
s->bucket_info.website_conf = RGWBucketWebsiteConf();

op_ret = store->put_bucket_instance_info(s->bucket_info, false, real_time(), &s->bucket_attrs);
op_ret = retry_raced_bucket_write(store, s, [this] {
s->bucket_info.has_website = false;
s->bucket_info.website_conf = RGWBucketWebsiteConf();
op_ret = store->put_bucket_instance_info(s->bucket_info, false,
real_time(), &s->bucket_attrs);
return op_ret;
});
if (op_ret < 0) {
ldout(s->cct, 0) << "NOTICE: put_bucket_info on bucket=" << s->bucket.name << " returned err=" << op_ret << dendl;
return;
Expand Down Expand Up @@ -3990,55 +4031,61 @@ void RGWPutMetadataBucket::execute()
return;
}

/* Encode special metadata first as we're using std::map::emplace under
* the hood. This method will add the new items only if the map doesn't
* contain such keys yet. */
if (has_policy) {
if (s->dialect.compare("swift") == 0) {
auto old_policy = \
static_cast<RGWAccessControlPolicy_SWIFT*>(s->bucket_acl.get());
auto new_policy = static_cast<RGWAccessControlPolicy_SWIFT*>(&policy);
new_policy->filter_merge(policy_rw_mask, old_policy);
policy = *new_policy;
}
buffer::list bl;
policy.encode(bl);
emplace_attr(RGW_ATTR_ACL, std::move(bl));
}
op_ret = retry_raced_bucket_write(store, s, [this] {
/* Encode special metadata first as we're using std::map::emplace under
* the hood. This method will add the new items only if the map doesn't
* contain such keys yet. */
if (has_policy) {
if (s->dialect.compare("swift") == 0) {
auto old_policy = \
static_cast<RGWAccessControlPolicy_SWIFT*>(s->bucket_acl.get());
auto new_policy = static_cast<RGWAccessControlPolicy_SWIFT*>(&policy);
new_policy->filter_merge(policy_rw_mask, old_policy);
policy = *new_policy;
}
buffer::list bl;
policy.encode(bl);
emplace_attr(RGW_ATTR_ACL, std::move(bl));
}

if (has_cors) {
buffer::list bl;
cors_config.encode(bl);
emplace_attr(RGW_ATTR_CORS, std::move(bl));
}
if (has_cors) {
buffer::list bl;
cors_config.encode(bl);
emplace_attr(RGW_ATTR_CORS, std::move(bl));
}

/* It's supposed that following functions WILL NOT change any special
* attributes (like RGW_ATTR_ACL) if they are already present in attrs. */
prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
populate_with_generic_attrs(s, attrs);
/* It's supposed that following functions WILL NOT change any
* special attributes (like RGW_ATTR_ACL) if they are already
* present in attrs. */
prepare_add_del_attrs(s->bucket_attrs, rmattr_names, attrs);
populate_with_generic_attrs(s, attrs);

/* According to the Swift's behaviour and its container_quota WSGI middleware
* implementation: anyone with write permissions is able to set the bucket
* quota. This stays in contrast to account quotas that can be set only by
* clients holding reseller admin privileges. */
op_ret = filter_out_quota_info(attrs, rmattr_names, s->bucket_info.quota);
if (op_ret < 0) {
return;
}
/* According to the Swift's behaviour and its container_quota
* WSGI middleware implementation: anyone with write permissions
* is able to set the bucket quota. This stays in contrast to
* account quotas that can be set only by clients holding
* reseller admin privileges. */
op_ret = filter_out_quota_info(attrs, rmattr_names, s->bucket_info.quota);
if (op_ret < 0) {
return op_ret;
}

if (swift_ver_location) {
s->bucket_info.swift_ver_location = *swift_ver_location;
s->bucket_info.swift_versioning = (! swift_ver_location->empty());
}
if (swift_ver_location) {
s->bucket_info.swift_ver_location = *swift_ver_location;
s->bucket_info.swift_versioning = (!swift_ver_location->empty());
}

/* Web site of Swift API. */
filter_out_website(attrs, rmattr_names, s->bucket_info.website_conf);
s->bucket_info.has_website = !s->bucket_info.website_conf.is_empty();
/* Web site of Swift API. */
filter_out_website(attrs, rmattr_names, s->bucket_info.website_conf);
s->bucket_info.has_website = !s->bucket_info.website_conf.is_empty();

/* Setting attributes also stores the provided bucket info. Due to this
* fact, the new quota settings can be serialized with the same call. */
op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
&s->bucket_info.objv_tracker);
/* Setting attributes also stores the provided bucket info. Due
* to this fact, the new quota settings can be serialized with
* the same call. */
op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
&s->bucket_info.objv_tracker);
return op_ret;
});
}

int RGWPutMetadataObject::verify_permission()
Expand Down Expand Up @@ -6836,15 +6883,15 @@ void RGWPutBucketPolicy::execute()
}

try {
Policy p(s->cct, s->bucket_tenant, in_data);
auto attrs = s->bucket_attrs;
attrs[RGW_ATTR_IAM_POLICY].clear();
attrs[RGW_ATTR_IAM_POLICY].append(p.text);
op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
&s->bucket_info.objv_tracker);
if (op_ret == -ECANCELED) {
op_ret = 0; /* lost a race, but it's ok because policies are immutable */
}
const Policy p(s->cct, s->bucket_tenant, in_data);
op_ret = retry_raced_bucket_write(store, s, [&p, this] {
auto attrs = s->bucket_attrs;
attrs[RGW_ATTR_IAM_POLICY].clear();
attrs[RGW_ATTR_IAM_POLICY].append(p.text);
op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
&s->bucket_info.objv_tracker);
return op_ret;
});
} catch (rgw::IAM::PolicyParseException& e) {
ldout(s->cct, 20) << "failed to parse policy: " << e.what() << dendl;
op_ret = -EINVAL;
Expand Down Expand Up @@ -6912,11 +6959,11 @@ int RGWDeleteBucketPolicy::verify_permission()

void RGWDeleteBucketPolicy::execute()
{
auto attrs = s->bucket_attrs;
attrs.erase(RGW_ATTR_IAM_POLICY);
op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
&s->bucket_info.objv_tracker);
if (op_ret == -ECANCELED) {
op_ret = 0; /* lost a race, but it's ok because policies are immutable */
}
op_ret = retry_raced_bucket_write(store, s, [this] {
auto attrs = s->bucket_attrs;
attrs.erase(RGW_ATTR_IAM_POLICY);
op_ret = rgw_bucket_set_attrs(store, s->bucket_info, attrs,
&s->bucket_info.objv_tracker);
return op_ret;
});
}
1 change: 1 addition & 0 deletions src/rgw/rgw_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class RGWOp {
int do_aws4_auth_completion();

virtual int init_quota();

public:
RGWOp()
: s(nullptr),
Expand Down
45 changes: 42 additions & 3 deletions src/rgw/rgw_rados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11901,15 +11901,27 @@ int RGWRados::convert_old_bucket_info(RGWObjectCtx& obj_ctx,
return 0;
}

int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
const string& tenant, const string& bucket_name, RGWBucketInfo& info,
real_time *pmtime, map<string, bufferlist> *pattrs)
int RGWRados::_get_bucket_info(RGWObjectCtx& obj_ctx,
const string& tenant,
const string& bucket_name,
RGWBucketInfo& info,
real_time *pmtime,
map<string, bufferlist> *pattrs,
boost::optional<obj_version> refresh_version)
{
bucket_info_entry e;
string bucket_entry;
rgw_make_bucket_entry_name(tenant, bucket_name, bucket_entry);


if (binfo_cache->find(bucket_entry, &e)) {
if (refresh_version &&
e.info.objv_tracker.read_version.compare(&(*refresh_version))) {
lderr(cct) << "WARNING: The bucket info cache is inconsistent. This is "
<< "a failure that should be debugged. I am a nice machine, "
<< "so I will try to recover." << dendl;
binfo_cache->invalidate(bucket_entry);
}
info = e.info;
if (pattrs)
*pattrs = e.attrs;
Expand Down Expand Up @@ -11960,6 +11972,7 @@ int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
e.info.ep_objv = ot.read_version;
info = e.info;
if (ret < 0) {
lderr(cct) << "ERROR: get_bucket_instance_from_oid failed: " << ret << dendl;
info.bucket.tenant = tenant;
info.bucket.name = bucket_name;
// XXX and why return anything in case of an error anyway?
Expand All @@ -11981,9 +11994,35 @@ int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
ldout(cct, 20) << "couldn't put binfo cache entry, might have raced with data changes" << dendl;
}

if (refresh_version &&
refresh_version->compare(&info.objv_tracker.read_version)) {
lderr(cct) << "WARNING: The OSD has the same version I have. Something may "
<< "have gone squirrelly. An administrator may have forced a "
<< "change; otherwise there is a problem somewhere." << dendl;
}

return 0;
}

int RGWRados::get_bucket_info(RGWObjectCtx& obj_ctx,
const string& tenant, const string& bucket_name,
RGWBucketInfo& info,
real_time *pmtime, map<string, bufferlist> *pattrs)
{
return _get_bucket_info(obj_ctx, tenant, bucket_name, info, pmtime,
pattrs, boost::none);
}

int RGWRados::try_refresh_bucket_info(RGWBucketInfo& info,
ceph::real_time *pmtime,
map<string, bufferlist> *pattrs)
{
RGWObjectCtx obj_ctx(this);

return _get_bucket_info(obj_ctx, info.bucket.tenant, info.bucket.name,
info, pmtime, pattrs, info.objv_tracker.read_version);
}

int RGWRados::put_bucket_entrypoint_info(const string& tenant_name, const string& bucket_name, RGWBucketEntryPoint& entry_point,
bool exclusive, RGWObjVersionTracker& objv_tracker, real_time mtime,
map<string, bufferlist> *pattrs)
Expand Down
Loading

0 comments on commit d07588a

Please sign in to comment.