Skip to content

Commit

Permalink
monc: Asifoact MonClient
Browse files Browse the repository at this point in the history
Of course now everyone has to feed an io_context into the MonClient.

Signed-off-by: Adam C. Emerson <[email protected]>
  • Loading branch information
adamemerson committed May 15, 2020
1 parent 7d73fa6 commit 306eebe
Show file tree
Hide file tree
Showing 35 changed files with 674 additions and 370 deletions.
8 changes: 7 additions & 1 deletion src/ceph_fuse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
#include <sys/utsname.h>
#include <iostream>
#include <string>
#include <optional>

#include "common/async/context_pool.h"
#include "common/config.h"
#include "common/errno.h"

Expand Down Expand Up @@ -45,6 +47,8 @@

#define dout_context g_ceph_context

ceph::async::io_context_pool icp;

static void fuse_usage()
{
const char* argv[] = {
Expand Down Expand Up @@ -234,7 +238,8 @@ int main(int argc, const char **argv, const char *envp[]) {
int tester_r = 0;
void *tester_rp = nullptr;

MonClient *mc = new MonClient(g_ceph_context);
icp.start(cct->_conf.get_val<std::uint64_t>("client_asio_thread_count"));
MonClient *mc = new MonClient(g_ceph_context, icp);
int r = mc->build_initial_monmap();
if (r == -EINVAL) {
cerr << "failed to generate initial mon list" << std::endl;
Expand Down Expand Up @@ -316,6 +321,7 @@ int main(int argc, const char **argv, const char *envp[]) {
client->unmount();
cfuse->finalize();
out_shutdown:
icp.stop();
client->shutdown();
out_init_failed:
unregister_async_signal_handler(SIGHUP, sighup_handler);
Expand Down
8 changes: 5 additions & 3 deletions src/ceph_mds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <iostream>
#include <string>

#include "common/async/context_pool.h"
#include "include/ceph_features.h"
#include "include/compat.h"
#include "include/random.h"
Expand Down Expand Up @@ -195,15 +196,16 @@ int main(int argc, const char **argv)
register_async_signal_handler(SIGHUP, sighup_handler);

// get monmap
MonClient mc(g_ceph_context);
ceph::async::io_context_pool ctxpool(2);
MonClient mc(g_ceph_context, ctxpool);
if (mc.build_initial_monmap() < 0)
forker.exit(1);
global_init_chdir(g_ceph_context);

msgr->start();

// start mds
mds = new MDSDaemon(g_conf()->name.get_id().c_str(), msgr, &mc);
mds = new MDSDaemon(g_conf()->name.get_id().c_str(), msgr, &mc, ctxpool);

// in case we have to respawn...
mds->orig_argc = argc;
Expand Down Expand Up @@ -234,6 +236,7 @@ int main(int argc, const char **argv)
shutdown_async_signal_handler();

shutdown:
ctxpool.stop();
// yuck: grab the mds lock, so we can be sure that whoever in *mds
// called shutdown finishes what they were doing.
mds->mds_lock.lock();
Expand All @@ -257,4 +260,3 @@ int main(int argc, const char **argv)

return 0;
}

9 changes: 7 additions & 2 deletions src/ceph_osd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,10 @@ int main(int argc, const char **argv)

srand(time(NULL) + getpid());

MonClient mc(g_ceph_context);
ceph::async::io_context_pool poolctx(
cct->_conf.get_val<std::uint64_t>("osd_asio_thread_count"));

MonClient mc(g_ceph_context, poolctx);
if (mc.build_initial_monmap() < 0)
return -1;
global_init_chdir(g_ceph_context);
Expand All @@ -691,7 +694,8 @@ int main(int argc, const char **argv)
ms_objecter,
&mc,
data_path,
journal_path);
journal_path,
poolctx);

int err = osdptr->pre_init();
if (err < 0) {
Expand Down Expand Up @@ -746,6 +750,7 @@ int main(int argc, const char **argv)
shutdown_async_signal_handler();

// done
poolctx.stop();
delete osdptr;
delete ms_public;
delete ms_hb_front_client;
Expand Down
9 changes: 6 additions & 3 deletions src/ceph_syn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "common/config.h"

#include "common/async/context_pool.h"
#include "client/SyntheticClient.h"
#include "client/Client.h"

Expand Down Expand Up @@ -50,7 +51,8 @@ int main(int argc, const char **argv, char *envp[])
pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);

// get monmap
MonClient mc(g_ceph_context);
ceph::async::io_context_pool poolctx(1);
MonClient mc(g_ceph_context, poolctx);
if (mc.build_initial_monmap() < 0)
return -1;

Expand All @@ -64,7 +66,7 @@ int main(int argc, const char **argv, char *envp[])
messengers[i] = Messenger::create_client_messenger(g_ceph_context,
"synclient");
messengers[i]->bind(g_conf()->public_addr);
mclients[i] = new MonClient(g_ceph_context);
mclients[i] = new MonClient(g_ceph_context, poolctx);
mclients[i]->build_initial_monmap();
auto client = new StandaloneClient(messengers[i], mclients[i]);
client->set_filer_flags(syn_filer_flags);
Expand All @@ -79,6 +81,8 @@ int main(int argc, const char **argv, char *envp[])
++p)
(*p)->start_thread();

poolctx.stop();

//cout << "waiting for client(s) to finish" << std::endl;
while (!clients.empty()) {
Client *client = clients.front();
Expand All @@ -99,4 +103,3 @@ int main(int argc, const char **argv, char *envp[])
}
return 0;
}

21 changes: 12 additions & 9 deletions src/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

#include "common/config.h"
#include "common/version.h"
#include "common/async/blocked_completion.h"

#include "mon/MonClient.h"

Expand Down Expand Up @@ -124,6 +125,9 @@

#define DEBUG_GETATTR_CAPS (CEPH_CAP_XATTR_SHARED)

namespace bs = boost::system;
namespace ca = ceph::async;

void client_flush_set_callback(void *p, ObjectCacher::ObjectSet *oset)
{
Client *client = static_cast<Client*>(p);
Expand Down Expand Up @@ -5703,22 +5707,21 @@ int Client::authenticate()

int Client::fetch_fsmap(bool user)
{
int r;
// Retrieve FSMap to enable looking up daemon addresses. We need FSMap
// rather than MDSMap because no one MDSMap contains all the daemons, and
// a `tell` can address any daemon.
version_t fsmap_latest;
bs::error_code ec;
do {
C_SaferCond cond;
monclient->get_version("fsmap", &fsmap_latest, NULL, &cond);
client_lock.unlock();
r = cond.wait();
std::tie(fsmap_latest, std::ignore) =
monclient->get_version("fsmap", ca::use_blocked[ec]);
client_lock.lock();
} while (r == -EAGAIN);
} while (ec == bs::errc::resource_unavailable_try_again);

if (r < 0) {
lderr(cct) << "Failed to learn FSMap version: " << cpp_strerror(r) << dendl;
return r;
if (ec) {
lderr(cct) << "Failed to learn FSMap version: " << ec << dendl;
return ceph::from_error_code(ec);
}

ldout(cct, 10) << __func__ << " learned FSMap version " << fsmap_latest << dendl;
Expand Down Expand Up @@ -14669,7 +14672,7 @@ mds_rank_t Client::_get_random_up_mds() const


StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc)
: Client(m, mc, new Objecter(m->cct, m, mc, NULL, 0, 0))
: Client(m, mc, new Objecter(m->cct, m, mc, nullptr, 0, 0))
{
monclient->set_messenger(m);
objecter->set_client_incarnation(0);
Expand Down
17 changes: 16 additions & 1 deletion src/common/ceph_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
#include <typeinfo>
#include <typeindex>

#include "include/common_fwd.h"
#include <boost/intrusive_ptr.hpp>

#include "include/any.h"
#include "include/common_fwd.h"

#include "common/cmdparse.h"
#include "common/code_environment.h"
Expand Down Expand Up @@ -375,4 +377,17 @@ class CephContext {
#endif
#endif // WITH_SEASTAR

#if !(defined(WITH_SEASTAR) && !defined(WITH_ALIEN)) && defined(__cplusplus)
namespace ceph::common {
inline void intrusive_ptr_add_ref(CephContext* cct)
{
cct->get();
}

inline void intrusive_ptr_release(CephContext* cct)
{
cct->put();
}
}
#endif // !(defined(WITH_SEASTAR) && !defined(WITH_ALIEN)) && defined(__cplusplus)
#endif
24 changes: 24 additions & 0 deletions src/common/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5357,6 +5357,18 @@ std::vector<Option> get_global_options() {
.set_default(0)
.set_description("Override 60 second periods for testing only"),

Option("librados_thread_count", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(2)
.set_min(1)
.set_description("Size of thread pool for Objecter")
.add_tag("client"),

Option("osd_asio_thread_count", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(2)
.set_min(1)
.set_description("Size of thread pool for ASIO completions")
.add_tag("osd"),

// ----------------------------
// Crimson specific options

Expand Down Expand Up @@ -8215,6 +8227,12 @@ std::vector<Option> get_mds_options() {
.set_flag(Option::FLAG_RUNTIME)
.set_description("max snapshots per directory")
.set_long_description("maximum number of snapshots that can be created per directory"),

Option("mds_asio_thread_count", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(2)
.set_min(1)
.set_description("Size of thread pool for ASIO completions")
.add_tag("mds")
});
}

Expand Down Expand Up @@ -8457,6 +8475,12 @@ std::vector<Option> get_mds_client_options() {
Option("debug_allow_any_pool_priority", Option::TYPE_BOOL, Option::LEVEL_DEV)
.set_default(false)
.set_description("Allow any pool priority to be set to test conversion to new range"),

Option("client_asio_thread_count", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(2)
.set_min(1)
.set_description("Size of thread pool for ASIO completions")
.add_tag("client")
});
}

Expand Down
16 changes: 5 additions & 11 deletions src/global/global_init.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*
*/

#include "common/async/context_pool.h"
#include "common/ceph_argparse.h"
#include "common/code_environment.h"
#include "common/config.h"
Expand Down Expand Up @@ -343,13 +344,16 @@ global_init(const std::map<std::string,std::string> *defaults,
// make sure our mini-session gets legacy values
g_conf().apply_changes(nullptr);

MonClient mc_bootstrap(g_ceph_context);
ceph::async::io_context_pool cp(1);
MonClient mc_bootstrap(g_ceph_context, cp);
if (mc_bootstrap.get_monmap_and_config() < 0) {
cp.stop();
g_ceph_context->_log->flush();
cerr << "failed to fetch mon config (--no-mon-config to skip)"
<< std::endl;
_exit(1);
}
cp.stop();
}

// Expand metavariables. Invoke configuration observers. Open log file.
Expand Down Expand Up @@ -408,17 +412,7 @@ global_init(const std::map<std::string,std::string> *defaults,

return boost::intrusive_ptr<CephContext>{g_ceph_context, false};
}
namespace TOPNSPC::common {
void intrusive_ptr_add_ref(CephContext* cct)
{
cct->get();
}

void intrusive_ptr_release(CephContext* cct)
{
cct->put();
}
}
void global_print_banner(void)
{
output_ceph_version();
Expand Down
6 changes: 1 addition & 5 deletions src/global/global_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <map>
#include <boost/intrusive_ptr.hpp>
#include "include/ceph_assert.h"
#include "common/ceph_context.h"
#include "common/code_environment.h"
#include "common/common_init.h"

Expand All @@ -38,11 +39,6 @@ global_init(
const char *data_dir_option = 0,
bool run_pre_init = true);

namespace TOPNSPC::common {
void intrusive_ptr_add_ref(CephContext* cct);
void intrusive_ptr_release(CephContext* cct);
}

// just the first half; enough to get config parsed but doesn't start up the
// cct or log.
void global_pre_init(const std::map<std::string,std::string> *defaults,
Expand Down
8 changes: 6 additions & 2 deletions src/libcephfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "auth/Crypto.h"
#include "client/Client.h"
#include "librados/RadosClient.h"
#include "common/async/context_pool.h"
#include "common/ceph_argparse.h"
#include "common/common_init.h"
#include "common/config.h"
Expand All @@ -36,6 +37,7 @@
#define DEFAULT_UMASK 002

static mode_t umask_cb(void *);
ceph::async::io_context_pool icp;

struct ceph_mount_info
{
Expand Down Expand Up @@ -83,8 +85,9 @@ struct ceph_mount_info
cct->_log->start();
}

icp.start(cct->_conf.get_val<std::uint64_t>("client_asio_thread_count"));
{
MonClient mc_bootstrap(cct);
MonClient mc_bootstrap(cct, icp);
ret = mc_bootstrap.get_monmap_and_config();
if (ret < 0)
return ret;
Expand All @@ -93,7 +96,7 @@ struct ceph_mount_info
common_init_finish(cct);

//monmap
monclient = new MonClient(cct);
monclient = new MonClient(cct, icp);
ret = -CEPHFS_ERROR_MON_MAP_BUILD; //defined in libcephfs.h;
if (monclient->build_initial_monmap() < 0)
goto fail;
Expand Down Expand Up @@ -202,6 +205,7 @@ struct ceph_mount_info
delete messenger;
messenger = nullptr;
}
icp.stop();
if (monclient) {
delete monclient;
monclient = nullptr;
Expand Down
Loading

0 comments on commit 306eebe

Please sign in to comment.