Skip to content

Commit

Permalink
Merge pull request ceph#32601 from adamemerson/wip-objection-triumphant
Browse files Browse the repository at this point in the history
Asynchronous work on objecter and asynchronous RADOS library.

Reviewed-by: Patrick Donnelly <[email protected]>
  • Loading branch information
adamemerson authored May 22, 2020
2 parents 4d71b55 + f6727c7 commit f717065
Show file tree
Hide file tree
Showing 132 changed files with 11,510 additions and 2,222 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,5 @@ GTAGS
# mypy cache
.mypy_cache/

# Python building things where it shouldn't
/src/python-common/build/
6 changes: 5 additions & 1 deletion COPYING
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,8 @@ License: BSD 3-clause

Files: src/include/function2.hpp
Copyright: 2015-2018, Denis Blank
License: Boost Software License, Version 1.0
License: Boost Software License, Version 1.0

Files: src/include/expected.hpp
Copyright: 2017, Simon Brand
License: CC0
6 changes: 6 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,13 @@ set(libcommon_files
ceph_ver.c
global/global_context.cc
xxHash/xxhash.c
common/error_code.cc
log/Log.cc
mon/MonCap.cc
mon/MonClient.cc
mon/MonMap.cc
mon/MonSub.cc
mon/error_code.cc
mgr/MgrClient.cc
mon/PGMap.cc
mgr/ServiceMap.cc
Expand All @@ -335,12 +337,14 @@ set(libcommon_files
osd/OSDMap.cc
osd/OSDMapMapping.cc
osd/osd_types.cc
osd/error_code.cc
osd/PGPeeringEvent.cc
osd/OpRequest.cc
osd/ClassHandler.cc
osd/osd_op_util.cc
osdc/Striper.cc
osdc/Objecter.cc
osdc/error_code.cc
librbd/Features.cc
${mds_files})
set_source_files_properties(ceph_ver.c
Expand Down Expand Up @@ -373,6 +377,7 @@ set(ceph_common_deps
Boost::program_options
Boost::date_time
Boost::iostreams
fmt::fmt
StdFilesystem::filesystem
fmt::fmt
${BLKID_LIBRARIES}
Expand Down Expand Up @@ -485,6 +490,7 @@ option(WITH_LIBRADOSSTRIPER "build with libradosstriper support" ON)

add_subdirectory(include)
add_subdirectory(librados)
add_subdirectory(neorados)

if(WITH_LIBRADOSSTRIPER)
add_subdirectory(libradosstriper)
Expand Down
10 changes: 8 additions & 2 deletions src/ceph_fuse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
#include <sys/utsname.h>
#include <iostream>
#include <string>
#include <optional>

#include "common/async/context_pool.h"
#include "common/config.h"
#include "common/errno.h"

Expand Down Expand Up @@ -45,6 +47,8 @@

#define dout_context g_ceph_context

ceph::async::io_context_pool icp;

static void fuse_usage()
{
const char* argv[] = {
Expand Down Expand Up @@ -234,7 +238,8 @@ int main(int argc, const char **argv, const char *envp[]) {
int tester_r = 0;
void *tester_rp = nullptr;

MonClient *mc = new MonClient(g_ceph_context);
icp.start(cct->_conf.get_val<std::uint64_t>("client_asio_thread_count"));
MonClient *mc = new MonClient(g_ceph_context, icp);
int r = mc->build_initial_monmap();
if (r == -EINVAL) {
cerr << "failed to generate initial mon list" << std::endl;
Expand All @@ -249,7 +254,7 @@ int main(int argc, const char **argv, const char *envp[]) {
messenger->set_policy(entity_name_t::TYPE_MDS,
Messenger::Policy::lossless_client(0));

client = new StandaloneClient(messenger, mc);
client = new StandaloneClient(messenger, mc, icp);
if (filer_flags) {
client->set_filer_flags(filer_flags);
}
Expand Down Expand Up @@ -316,6 +321,7 @@ int main(int argc, const char **argv, const char *envp[]) {
client->unmount();
cfuse->finalize();
out_shutdown:
icp.stop();
client->shutdown();
out_init_failed:
unregister_async_signal_handler(SIGHUP, sighup_handler);
Expand Down
8 changes: 5 additions & 3 deletions src/ceph_mds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <iostream>
#include <string>

#include "common/async/context_pool.h"
#include "include/ceph_features.h"
#include "include/compat.h"
#include "include/random.h"
Expand Down Expand Up @@ -195,15 +196,16 @@ int main(int argc, const char **argv)
register_async_signal_handler(SIGHUP, sighup_handler);

// get monmap
MonClient mc(g_ceph_context);
ceph::async::io_context_pool ctxpool(2);
MonClient mc(g_ceph_context, ctxpool);
if (mc.build_initial_monmap() < 0)
forker.exit(1);
global_init_chdir(g_ceph_context);

msgr->start();

// start mds
mds = new MDSDaemon(g_conf()->name.get_id().c_str(), msgr, &mc);
mds = new MDSDaemon(g_conf()->name.get_id().c_str(), msgr, &mc, ctxpool);

// in case we have to respawn...
mds->orig_argc = argc;
Expand Down Expand Up @@ -234,6 +236,7 @@ int main(int argc, const char **argv)
shutdown_async_signal_handler();

shutdown:
ctxpool.stop();
// yuck: grab the mds lock, so we can be sure that whoever in *mds
// called shutdown finishes what they were doing.
mds->mds_lock.lock();
Expand All @@ -257,4 +260,3 @@ int main(int argc, const char **argv)

return 0;
}

9 changes: 7 additions & 2 deletions src/ceph_osd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,10 @@ int main(int argc, const char **argv)

srand(time(NULL) + getpid());

MonClient mc(g_ceph_context);
ceph::async::io_context_pool poolctx(
cct->_conf.get_val<std::uint64_t>("osd_asio_thread_count"));

MonClient mc(g_ceph_context, poolctx);
if (mc.build_initial_monmap() < 0)
return -1;
global_init_chdir(g_ceph_context);
Expand All @@ -691,7 +694,8 @@ int main(int argc, const char **argv)
ms_objecter,
&mc,
data_path,
journal_path);
journal_path,
poolctx);

int err = osdptr->pre_init();
if (err < 0) {
Expand Down Expand Up @@ -746,6 +750,7 @@ int main(int argc, const char **argv)
shutdown_async_signal_handler();

// done
poolctx.stop();
delete osdptr;
delete ms_public;
delete ms_hb_front_client;
Expand Down
11 changes: 7 additions & 4 deletions src/ceph_syn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "common/config.h"

#include "common/async/context_pool.h"
#include "client/SyntheticClient.h"
#include "client/Client.h"

Expand Down Expand Up @@ -50,7 +51,8 @@ int main(int argc, const char **argv, char *envp[])
pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);

// get monmap
MonClient mc(g_ceph_context);
ceph::async::io_context_pool poolctx(1);
MonClient mc(g_ceph_context, poolctx);
if (mc.build_initial_monmap() < 0)
return -1;

Expand All @@ -64,9 +66,9 @@ int main(int argc, const char **argv, char *envp[])
messengers[i] = Messenger::create_client_messenger(g_ceph_context,
"synclient");
messengers[i]->bind(g_conf()->public_addr);
mclients[i] = new MonClient(g_ceph_context);
mclients[i] = new MonClient(g_ceph_context, poolctx);
mclients[i]->build_initial_monmap();
auto client = new StandaloneClient(messengers[i], mclients[i]);
auto client = new StandaloneClient(messengers[i], mclients[i], poolctx);
client->set_filer_flags(syn_filer_flags);
SyntheticClient *syn = new SyntheticClient(client);
clients.push_back(client);
Expand All @@ -79,6 +81,8 @@ int main(int argc, const char **argv, char *envp[])
++p)
(*p)->start_thread();

poolctx.stop();

//cout << "waiting for client(s) to finish" << std::endl;
while (!clients.empty()) {
Client *client = clients.front();
Expand All @@ -99,4 +103,3 @@ int main(int argc, const char **argv, char *envp[])
}
return 0;
}

50 changes: 28 additions & 22 deletions src/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include <boost/lexical_cast.hpp>
#include <boost/fusion/include/std_pair.hpp>

#include "common/async/waiter.h"

#if defined(__FreeBSD__)
#define XATTR_CREATE 0x1
#define XATTR_REPLACE 0x2
Expand All @@ -44,6 +46,7 @@

#include "common/config.h"
#include "common/version.h"
#include "common/async/blocked_completion.h"

#include "mon/MonClient.h"

Expand Down Expand Up @@ -124,6 +127,9 @@

#define DEBUG_GETATTR_CAPS (CEPH_CAP_XATTR_SHARED)

namespace bs = boost::system;
namespace ca = ceph::async;

void client_flush_set_callback(void *p, ObjectCacher::ObjectSet *oset)
{
Client *client = static_cast<Client*>(p);
Expand Down Expand Up @@ -5744,22 +5750,21 @@ int Client::authenticate()

int Client::fetch_fsmap(bool user)
{
int r;
// Retrieve FSMap to enable looking up daemon addresses. We need FSMap
// rather than MDSMap because no one MDSMap contains all the daemons, and
// a `tell` can address any daemon.
version_t fsmap_latest;
bs::error_code ec;
do {
C_SaferCond cond;
monclient->get_version("fsmap", &fsmap_latest, NULL, &cond);
client_lock.unlock();
r = cond.wait();
std::tie(fsmap_latest, std::ignore) =
monclient->get_version("fsmap", ca::use_blocked[ec]);
client_lock.lock();
} while (r == -EAGAIN);
} while (ec == bs::errc::resource_unavailable_try_again);

if (r < 0) {
lderr(cct) << "Failed to learn FSMap version: " << cpp_strerror(r) << dendl;
return r;
if (ec) {
lderr(cct) << "Failed to learn FSMap version: " << ec << dendl;
return ceph::from_error_code(ec);
}

ldout(cct, 10) << __func__ << " learned FSMap version " << fsmap_latest << dendl;
Expand Down Expand Up @@ -11747,9 +11752,8 @@ void Client::_setxattr_maybe_wait_for_osdmap(const char *name, const void *value
});

if (r == -ENOENT) {
C_SaferCond ctx;
objecter->wait_for_latest_osdmap(&ctx);
ctx.wait();
bs::error_code ec;
objecter->wait_for_latest_osdmap(ca::use_blocked[ec]);
}
}
}
Expand Down Expand Up @@ -14321,7 +14325,7 @@ int Client::check_pool_perm(Inode *in, int need)

C_SaferCond rd_cond;
ObjectOperation rd_op;
rd_op.stat(NULL, (ceph::real_time*)nullptr, NULL);
rd_op.stat(nullptr, nullptr, nullptr);

objecter->mutate(oid, OSDMap::file_to_object_locator(in->layout), rd_op,
nullsnapc, ceph::real_clock::now(), 0, &rd_cond);
Expand Down Expand Up @@ -14508,7 +14512,7 @@ void Client::set_session_timeout(unsigned timeout)
int Client::start_reclaim(const std::string& uuid, unsigned flags,
const std::string& fs_name)
{
std::lock_guard l(client_lock);
std::unique_lock l(client_lock);
if (!initialized)
return -ENOTCONN;

Expand Down Expand Up @@ -14584,13 +14588,14 @@ int Client::start_reclaim(const std::string& uuid, unsigned flags,

// use blacklist to check if target session was killed
// (config option mds_session_blacklist_on_evict needs to be true)
C_SaferCond cond;
if (!objecter->wait_for_map(reclaim_osd_epoch, &cond)) {
ldout(cct, 10) << __func__ << ": waiting for OSD epoch " << reclaim_osd_epoch << dendl;
client_lock.unlock();
cond.wait();
client_lock.lock();
}
ldout(cct, 10) << __func__ << ": waiting for OSD epoch " << reclaim_osd_epoch << dendl;
bs::error_code ec;
l.unlock();
objecter->wait_for_map(reclaim_osd_epoch, ca::use_blocked[ec]);
l.lock();

if (ec)
return ceph::from_error_code(ec);

bool blacklisted = objecter->with_osdmap(
[this](const OSDMap &osd_map) -> bool {
Expand Down Expand Up @@ -14714,8 +14719,9 @@ mds_rank_t Client::_get_random_up_mds() const
}


StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc)
: Client(m, mc, new Objecter(m->cct, m, mc, NULL, 0, 0))
StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc,
boost::asio::io_context& ictx)
: Client(m, mc, new Objecter(m->cct, m, mc, ictx, 0, 0))
{
monclient->set_messenger(m);
objecter->set_client_incarnation(0);
Expand Down
2 changes: 1 addition & 1 deletion src/client/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@ class Client : public Dispatcher, public md_config_obs_t {
class StandaloneClient : public Client
{
public:
StandaloneClient(Messenger *m, MonClient *mc);
StandaloneClient(Messenger *m, MonClient *mc, boost::asio::io_context& ictx);

~StandaloneClient() override;

Expand Down
2 changes: 1 addition & 1 deletion src/cls/rgw/cls_rgw_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ void decode_packed_val(T& val, ceph::buffer::list::const_iterator& bl)
}
break;
default:
throw ceph::buffer::error();
throw ceph::buffer::malformed_input();
}
}

Expand Down
Loading

0 comments on commit f717065

Please sign in to comment.