Skip to content

Commit

Permalink
Merge pull request ceph#55214 from AliMasarweh/wip-alimasa-notif-data…
Browse files Browse the repository at this point in the history
…-path-v2

RGW: Migrate topics to data path v2

reviewed-by: kchheda3, cbodley, yuvalif
  • Loading branch information
yuvalif authored Apr 3, 2024
2 parents 095ecb6 + c0e540e commit e533d2a
Show file tree
Hide file tree
Showing 13 changed files with 939 additions and 6 deletions.
4 changes: 3 additions & 1 deletion qa/suites/rgw/notifications/overrides.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ overrides:
rgw crypt s3 kms encryption keys: testkey-1=YmluCmJvb3N0CmJvb3N0LWJ1aWxkCmNlcGguY29uZgo= testkey-2=aWIKTWFrZWZpbGUKbWFuCm91dApzcmMKVGVzdGluZwo=
rgw crypt require ssl: false
rgw:
realm: default
realm: MyRealm
zonegroup: MyZoneGroup
zone: MyZone
storage classes: LUKEWARM, FROZEN
1 change: 1 addition & 0 deletions qa/suites/rgw/notifications/tasks/others/0-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ tasks:
- openssl_keys:
- rgw:
client.0:
client.1:

overrides:
ceph:
Expand Down
2 changes: 1 addition & 1 deletion qa/tasks/notification_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def task(ctx,config):
{
'port':endpoint.port,
'host':endpoint.dns_name,
'zonegroup':'default',
'zonegroup':ctx.rgw.zonegroup,
'cluster':'noname',
'version':'v2'
},
Expand Down
3 changes: 2 additions & 1 deletion src/rgw/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ set(librgw_common_srcs
driver/rados/rgw_user.cc
driver/rados/rgw_zone.cc
driver/rados/sync_fairness.cc
driver/rados/topic.cc)
driver/rados/topic.cc
driver/rados/topic_migration.cc)

list(APPEND librgw_common_srcs
driver/immutable_config/store.cc
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/driver/rados/rgw_notify.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ int publish_reserve(const DoutPrefixProvider* dpp,
rgw_pubsub_topic result;
const RGWPubSub ps(res.store, res.user_tenant, site);
auto ret =
ps.get_topic(res.dpp, topic_cfg.name, result, res.yield, nullptr);
ps.get_topic(res.dpp, topic_cfg.dest.arn_topic, result, res.yield, nullptr);
if (ret < 0) {
ldpp_dout(res.dpp, 1)
<< "INFO: failed to load topic: " << topic_cfg.name
Expand Down
13 changes: 13 additions & 0 deletions src/rgw/driver/rados/rgw_rados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
#include "rgw_realm_watcher.h"
#include "rgw_reshard.h"
#include "rgw_cr_rados.h"
#include "topic_migration.h"

#include "services/svc_zone.h"
#include "services/svc_zone_utils.h"
Expand Down Expand Up @@ -1100,6 +1101,7 @@ void RGWRados::finalize()

if (run_notification_thread) {
rgw::notify::shutdown();
v1_topic_migration.stop();
}
}

Expand Down Expand Up @@ -1357,6 +1359,17 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp, optional_yield y)
ret = rgw::notify::init(cct, driver, *svc.site, dpp);
if (ret < 0 ) {
ldpp_dout(dpp, 1) << "ERROR: failed to initialize notification manager" << dendl;
return ret;
}

using namespace rgw;
if (svc.site->is_meta_master() &&
all_zonegroups_support(*svc.site, zone_features::notification_v2)) {
spawn::spawn(v1_topic_migration, [this] (spawn::yield_context yield) {
DoutPrefix dpp{cct, dout_subsys, "v1 topic migration: "};
rgwrados::topic_migration::migrate(&dpp, driver, v1_topic_migration, yield);
});
v1_topic_migration.start(1);
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/rgw/driver/rados/rgw_rados.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "common/RefCountedObj.h"
#include "common/ceph_time.h"
#include "common/Timer.h"
#include "common/async/context_pool.h"
#include "rgw_common.h"
#include "cls/rgw/cls_rgw_types.h"
#include "cls/version/cls_version_types.h"
Expand Down Expand Up @@ -390,6 +391,8 @@ class RGWRados
ceph::mutex meta_sync_thread_lock{ceph::make_mutex("meta_sync_thread_lock")};
ceph::mutex data_sync_thread_lock{ceph::make_mutex("data_sync_thread_lock")};

ceph::async::io_context_pool v1_topic_migration;

librados::IoCtx root_pool_ctx; // .rgw

ceph::mutex bucket_id_lock{ceph::make_mutex("rados_bucket_id")};
Expand Down
5 changes: 3 additions & 2 deletions src/rgw/driver/rados/rgw_sal_rados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ namespace rgw::sal {
// default number of entries to list with each bucket listing call
// (use marker to bridge between calls)
static constexpr size_t listing_max_entries = 1000;
static std::string pubsub_oid_prefix = "pubsub.";
const std::string pubsub_oid_prefix = "pubsub.";
const std::string pubsub_bucket_oid_infix = ".bucket.";

static int drain_aio(std::list<librados::AioCompletion*>& handles)
{
Expand Down Expand Up @@ -869,7 +870,7 @@ int RadosBucket::abort_multiparts(const DoutPrefixProvider* dpp,
}

std::string RadosBucket::topics_oid() const {
return pubsub_oid_prefix + get_tenant() + ".bucket." + get_name() + "/" + get_marker();
return pubsub_oid_prefix + get_tenant() + pubsub_bucket_oid_infix + get_name() + "/" + get_marker();
}

int RadosBucket::read_topics(rgw_pubsub_bucket_topics& notifications,
Expand Down
3 changes: 3 additions & 0 deletions src/rgw/driver/rados/rgw_sal_rados.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ namespace rgw { namespace sal {

class RadosMultipartUpload;

extern const std::string pubsub_oid_prefix; // v1 topic metadata prefix
extern const std::string pubsub_bucket_oid_infix; // v1 notification in-fix

class RadosPlacementTier: public StorePlacementTier {
RadosStore* store;
RGWZoneGroupPlacementTier tier;
Expand Down
Loading

0 comments on commit e533d2a

Please sign in to comment.