Skip to content

Commit

Permalink
Merge PR ceph#22534 into master
Browse files Browse the repository at this point in the history
* refs/pull/22534/head:
	msg/Messenger: use safe_item_history<> for my_addrs
	msg/simple: fix set_myaddrs()
	msg/Messenger: use mutable_item_history<> for my_addrs
	common/item_history: container to support lockless reads, infrequent updates
	msg/simple/Accepter: fix my_addr == my_addrs.v[0] constraint
	msg/async: track target_addr for each connection
	msg/async: mark accepted connections with addr type (legacy or msgr2)
	msg/async: mark AsyncConnection with msgr2 flag
	msg/async: track connections by addrvec
	mon/Session: inst -> name and addrs
	osd/OSDMap: don't print hb addrs
	msg/DispatchQueue: myaddr -> myaddrs
	mgr: myaddr -> myaddrs
	msg: make set_addr_unknowns take an addrvec
	mon/LogMonitor: myaddr -> myaddrs
	librados: myaddr -> myaddrs
	common/LogClient: myaddr -> myaddrs
	client: myaddr -> myaddrs
	osd/OSDMap: is_blacklisted() for addrvecs
	osd: populate metadata with all addrs
	mds: addr -> addrvec
	mon/MonClient: get_myaddrs()
	mon/OSDMonitor: addrvec blacklist helper
	mds: use new pick_addresses
	ceph-osd: use new bindv()
	msg/async: bind to multiple addresses
	msg/async: (legacy) handshake using legacy addr
	msg/async: fix some debug prefixes
	msg/async: multiple listening ServerSockets
	osd/OSDMap: make cluster addrs addrvecs too
	msg/Messenger: my_addr -> my_addrs
	msg/Connection: peer_addr -> peer_addrs
	msg/msg_types: hash<> for entity_addrvec_t
	mgr/DaemonServer: use new bindv() and pick_addresses()
	msg/Messenger: new bindv() that takes an addrvec
	common/pick_address: fill in ipv4/6 and msgr1/msgr2 via config options
	common/pick_addresses: new addrvec-based pick_addresses()
	common/pick_addresses: add filtering by ipv4 and ipv6
	CEPH_MON_PORT -> CEPH_MON_PORT_LEGACY; define CEPH_MON_PORT_IANA
	osd: kill osd_heartbeat_addr option
	common/options: add addrvec option type

Reviewed-by: Ricardo Dias <[email protected]>
  • Loading branch information
liewegas committed Jul 4, 2018
2 parents 3222aa9 + 73c4d4a commit b12dfc1
Show file tree
Hide file tree
Showing 60 changed files with 1,502 additions and 641 deletions.
4 changes: 4 additions & 0 deletions PendingReleaseNotes
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@
filesystem ``cephfs_a``, use command:

- ``ceph auth caps client.foo mon 'allow r' osd 'allow rw tag cephfs data=cephfs_a' mds 'allow rw, allow rws path=/bar'``

* The ``osd_heartbeat_addr`` option has been removed as it served no
(good) purpose: the OSD should always check heartbeats on both the
public and cluster networks.
7 changes: 4 additions & 3 deletions src/ceph_mds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ int main(int argc, const char **argv)

Preforker forker;

pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC);
entity_addrvec_t addrs;
pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC, &addrs);

// Normal startup
if (g_conf->name.has_default_id()) {
Expand Down Expand Up @@ -174,7 +175,7 @@ int main(int argc, const char **argv)
forker.exit(1);
msgr->set_cluster_protocol(CEPH_MDS_PROTOCOL);

cout << "starting " << g_conf->name << " at " << msgr->get_myaddr()
cout << "starting " << g_conf->name << " at " << msgr->get_myaddrs()
<< std::endl;
uint64_t required =
CEPH_FEATURE_OSDREPLYMUX;
Expand All @@ -188,7 +189,7 @@ int main(int argc, const char **argv)
msgr->set_policy(entity_name_t::TYPE_CLIENT,
Messenger::Policy::stateful_server(0));

int r = msgr->bind(g_conf->public_addr);
int r = msgr->bindv(addrs);
if (r < 0)
forker.exit(1);

Expand Down
10 changes: 5 additions & 5 deletions src/ceph_mon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ int main(int argc, const char **argv)
} else if (!g_conf->public_addr.is_blank_ip()) {
entity_addr_t a = g_conf->public_addr;
if (a.get_port() == 0)
a.set_port(CEPH_MON_PORT);
a.set_port(CEPH_MON_PORT_LEGACY);
if (monmap.contains(a)) {
string name;
monmap.get_addr_name(a, name);
Expand Down Expand Up @@ -633,7 +633,7 @@ int main(int argc, const char **argv)
mon_addr_str, true) == 0) {
if (conf_addr.parse(mon_addr_str.c_str())) {
if (conf_addr.get_port() == 0)
conf_addr.set_port(CEPH_MON_PORT);
conf_addr.set_port(CEPH_MON_PORT_LEGACY);
if (ipaddr != conf_addr) {
derr << "WARNING: 'mon addr' config option " << conf_addr
<< " does not match monmap file" << std::endl
Expand All @@ -650,7 +650,7 @@ int main(int argc, const char **argv)
if (!g_conf->public_addr.is_blank_ip()) {
ipaddr = g_conf->public_addr;
if (ipaddr.get_port() == 0)
ipaddr.set_port(CEPH_MON_PORT);
ipaddr.set_port(CEPH_MON_PORT_LEGACY);
dout(0) << "using public_addr " << g_conf->public_addr << " -> "
<< ipaddr << dendl;
} else {
Expand Down Expand Up @@ -722,7 +722,7 @@ int main(int argc, const char **argv)

// set the default port if not already set
if (bind_addr.get_port() == 0) {
bind_addr.set_port(CEPH_MON_PORT);
bind_addr.set_port(CEPH_MON_PORT_LEGACY);
}
}

Expand All @@ -742,7 +742,7 @@ int main(int argc, const char **argv)
// if the public and bind addr are different set the msgr addr
// to the public one, now that the bind is complete.
if (public_addr != bind_addr) {
msgr->set_addr(public_addr);
msgr->set_addrs(entity_addrvec_t(public_addr));
}

Messenger *mgr_msgr = Messenger::create(g_ceph_context, public_msgr_type,
Expand Down
49 changes: 17 additions & 32 deletions src/ceph_osd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -470,20 +470,6 @@ int main(int argc, const char **argv)
forker.exit(0);
}

pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC
|CEPH_PICK_ADDRESS_CLUSTER);

entity_addr_t paddr = g_conf->get_val<entity_addr_t>("public_addr");
entity_addr_t caddr = g_conf->get_val<entity_addr_t>("cluster_addr");


if (paddr.is_blank_ip() && !caddr.is_blank_ip()) {
derr << TEXT_YELLOW
<< " ** WARNING: specified cluster addr but not public addr; we recommend **\n"
<< " ** you specify neither or both. **"
<< TEXT_NORMAL << dendl;
}

std::string msg_type = g_conf->get_val<std::string>("ms_type");
std::string public_msg_type =
g_conf->get_val<std::string>("ms_public_type");
Expand Down Expand Up @@ -526,7 +512,6 @@ int main(int argc, const char **argv)
ms_hb_front_server->set_cluster_protocol(CEPH_OSD_PROTOCOL);

cout << "starting osd." << whoami
<< " at " << ms_public->get_myaddr()
<< " osd_data " << data_path
<< " " << ((journal_path.empty()) ?
"(no journal)" : journal_path)
Expand Down Expand Up @@ -574,10 +559,14 @@ int main(int argc, const char **argv)

ms_objecter->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));

if (ms_public->bind(paddr) < 0)
entity_addrvec_t public_addrs, cluster_addrs;
pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_PUBLIC, &public_addrs);
pick_addresses(g_ceph_context, CEPH_PICK_ADDRESS_CLUSTER, &cluster_addrs);

if (ms_public->bindv(public_addrs) < 0)
forker.exit(1);

if (ms_cluster->bind(caddr) < 0)
if (ms_cluster->bindv(cluster_addrs) < 0)
forker.exit(1);

bool is_delay = g_conf->get_val<bool>("osd_heartbeat_use_min_delay_socket");
Expand All @@ -588,26 +577,22 @@ int main(int argc, const char **argv)
ms_hb_front_server->set_socket_priority(SOCKET_PRIORITY_MIN_DELAY);
}

// hb back should bind to same ip as cluster_addr (if specified)
entity_addr_t haddr = g_conf->get_val<entity_addr_t>("osd_heartbeat_addr");
if (haddr.is_blank_ip()) {
haddr = caddr;
if (haddr.is_ip())
haddr.set_port(0);
entity_addrvec_t hb_front_addrs = public_addrs;
for (auto& a : hb_front_addrs.v) {
a.set_port(0);
}

if (ms_hb_back_server->bind(haddr) < 0)
if (ms_hb_front_server->bindv(hb_front_addrs) < 0)
forker.exit(1);
if (ms_hb_back_client->client_bind(haddr) < 0)
if (ms_hb_front_client->client_bind(hb_front_addrs.front()) < 0)
forker.exit(1);

// hb front should bind to same ip as public_addr
entity_addr_t hb_front_addr = paddr;
if (hb_front_addr.is_ip())
hb_front_addr.set_port(0);
if (ms_hb_front_server->bind(hb_front_addr) < 0)
entity_addrvec_t hb_back_addrs = cluster_addrs;
for (auto& a : hb_back_addrs.v) {
a.set_port(0);
}
if (ms_hb_back_server->bindv(hb_back_addrs) < 0)
forker.exit(1);
if (ms_hb_front_client->client_bind(hb_front_addr) < 0)
if (ms_hb_back_client->client_bind(hb_back_addrs.front()) < 0)
forker.exit(1);

// install signal handlers
Expand Down
19 changes: 14 additions & 5 deletions src/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2417,8 +2417,17 @@ void Client::handle_osd_map(MOSDMap *m)
std::set<entity_addr_t> new_blacklists;
objecter->consume_blacklist_events(&new_blacklists);

const auto myaddr = messenger->get_myaddr();
if (!blacklisted && new_blacklists.count(myaddr)) {
const auto myaddrs = messenger->get_myaddrs();
bool new_blacklist = false;
if (!blacklisted) {
for (auto& a : myaddrs.v) {
if (new_blacklists.count(a)) {
new_blacklist = true;
break;
}
}
}
if (new_blacklist) {
auto epoch = objecter->with_osdmap([](const OSDMap &o){
return o.get_epoch();
});
Expand Down Expand Up @@ -2455,8 +2464,8 @@ void Client::handle_osd_map(MOSDMap *m)

} else if (blacklisted) {
// Handle case where we were blacklisted but no longer are
blacklisted = objecter->with_osdmap([myaddr](const OSDMap &o){
return o.is_blacklisted(myaddr);});
blacklisted = objecter->with_osdmap([myaddrs](const OSDMap &o){
return o.is_blacklisted(myaddrs);});
}

if (objecter->osdmap_full_flag()) {
Expand Down Expand Up @@ -13520,7 +13529,7 @@ int Client::get_local_osd()

objecter->with_osdmap([this](const OSDMap& o) {
if (o.get_epoch() != local_osd_epoch) {
local_osd = o.find_osd_on_ip(messenger->get_myaddr());
local_osd = o.find_osd_on_ip(messenger->get_myaddrs().front());
local_osd_epoch = o.get_epoch();
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/common/LogClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ uint64_t LogClient::get_next_seq()

entity_addrvec_t LogClient::get_myaddrs()
{
return entity_addrvec_t(messenger->get_myaddr());
return messenger->get_myaddrs();
}

entity_name_t LogClient::get_myrank()
Expand Down
5 changes: 4 additions & 1 deletion src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ struct md_config_impl {
double md_config_t::*,
bool md_config_t::*,
entity_addr_t md_config_t::*,
entity_addrvec_t md_config_t::*,
uuid_d md_config_t::*> member_ptr_t;

// For use when intercepting configuration updates
Expand Down Expand Up @@ -130,7 +131,7 @@ struct md_config_impl {

typedef enum {
OPT_INT, OPT_LONGLONG, OPT_STR, OPT_DOUBLE, OPT_FLOAT, OPT_BOOL,
OPT_ADDR, OPT_U32, OPT_U64, OPT_UUID
OPT_ADDR, OPT_ADDRVEC, OPT_U32, OPT_U64, OPT_UUID
} opt_type_t;

// Create a new md_config_t structure.
Expand Down Expand Up @@ -364,6 +365,7 @@ struct md_config_impl {
#define OPTION_OPT_FLOAT(name) double name;
#define OPTION_OPT_BOOL(name) bool name;
#define OPTION_OPT_ADDR(name) entity_addr_t name;
#define OPTION_OPT_ADDRVEC(name) entity_addrvec_t name;
#define OPTION_OPT_U32(name) uint64_t name;
#define OPTION_OPT_U64(name) uint64_t name;
#define OPTION_OPT_UUID(name) uuid_d name;
Expand All @@ -381,6 +383,7 @@ struct md_config_impl {
#undef OPTION_OPT_FLOAT
#undef OPTION_OPT_BOOL
#undef OPTION_OPT_ADDR
#undef OPTION_OPT_ADDRVEC
#undef OPTION_OPT_U32
#undef OPTION_OPT_U64
#undef OPTION_OPT_UUID
Expand Down
103 changes: 103 additions & 0 deletions src/common/item_history.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#pragma once

#include <list>
#include <mutex>

/*
Keep a history of item values so that readers can dereference the pointer to
the latest value and continue using it as long as they want. This container
is only appropriate for values that are updated a handful of times over their
total lifetime.
There is a prune() method to throw out old values, but it should only be used
if the caller has some way of knowing all readers are done.
*/

template<class T>
class mutable_item_history {
private:
std::mutex lock;
std::list<T> history;
T *current = nullptr;

public:
mutable_item_history() {
history.emplace_back(T());
current = &history.back();
}

// readers are lock-free
const T& operator*() const {
return *current;
}
const T *operator->() const {
return current;
}

// non-const variants (be careful!)
T& operator*() {
return *current;
}
T *operator->() {
return current;
}

// writes are serialized
const T& operator=(const T& other) {
std::lock_guard<std::mutex> l(lock);
history.push_back(other);
current = &history.back();
return *current;
}

void prune() {
// note: this is not necessarily thread-safe wrt readers
std::lock_guard<std::mutex> l(lock);
while (history.size() > 1) {
history.pop_front();
}
}
};

template<class T>
class safe_item_history {
private:
std::mutex lock;
std::list<T> history;
T *current = nullptr;

public:
safe_item_history() {
history.emplace_back(T());
current = &history.back();
}

// readers are lock-free
const T& operator*() const {
return *current;
}
const T *operator->() const {
return current;
}

// writes are serialized
const T& operator=(const T& other) {
std::lock_guard<std::mutex> l(lock);
history.push_back(other);
current = &history.back();
return *current;
}

void prune() {
// note: this is not necessarily thread-safe wrt readers
std::lock_guard<std::mutex> l(lock);
while (history.size() > 1) {
history.pop_front();
}
}
};
1 change: 0 additions & 1 deletion src/common/legacy_config_opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,6 @@ OPTION(osd_remove_thread_timeout, OPT_INT)
OPTION(osd_remove_thread_suicide_timeout, OPT_INT)
OPTION(osd_command_thread_timeout, OPT_INT)
OPTION(osd_command_thread_suicide_timeout, OPT_INT)
OPTION(osd_heartbeat_addr, OPT_ADDR)
OPTION(osd_heartbeat_interval, OPT_INT) // (seconds) how often we ping peers

// (seconds) how long before we decide a peer has failed
Expand Down
Loading

0 comments on commit b12dfc1

Please sign in to comment.