Skip to content
This repository has been archived by the owner on Jun 1, 2023. It is now read-only.

Commit

Permalink
WL#11954 Close connections to Primary we failover from
Browse files Browse the repository at this point in the history
Close client connections to MySQL Server if
- server is removed from cluster
- RW connections when server is demoted to Secondary
- RO connections when server is promoted from Secondary to Primary and
disconnect_on_promoted_to_primary is set to 'yes'
- when group is partitioned then connections to minority are closed
- when group is overloaded and disconnect_on_metadata_unavailable is set
to 'yes' then all connections to cluster are closed
  • Loading branch information
Andrzej Religa authored and Jacek Roman committed Jun 15, 2018
1 parent 5b2b7c7 commit da605a7
Show file tree
Hide file tree
Showing 43 changed files with 5,126 additions and 609 deletions.
4 changes: 4 additions & 0 deletions src/harness/include/mysql_router_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class HARNESS_EXPORT MySQLRouterThread {
* Allocates memory for thread of execution.
*
* @param thread_stack_size the memory size allocated to thread's stack
*
* @throw std::runtime_error if cannot adjust thread size
*/
MySQLRouterThread(size_t thread_stack_size = mysql_harness::kDefaultStackSizeInKiloBytes);

Expand All @@ -92,6 +94,8 @@ class HARNESS_EXPORT MySQLRouterThread {
* @param run_thread the pointer to the function that is executed in thread. It has to be non-member void*(void*) function
* @param args_ptr pointer to run_thread parameter
* @param detach true if thread is detached, false if thread is joinable
*
* @throw std::runtime_error if cannot create new thread of execution
*/
void run(thread_function run_thread, void* args_ptr, bool detach = false);

Expand Down
9 changes: 9 additions & 0 deletions src/harness/include/tcp_address.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ class HARNESS_EXPORT TCPAddress {
return (left.addr == right.addr) && (left.port == right.port);
}

/**
* @brief Function for performing comparision of TCPAddresses
*/
friend bool operator<(const TCPAddress& left, const TCPAddress& right) {
if (left.addr < right.addr) return true;
else if (left.addr > right.addr) return false;
return left.port < right.port;
}

/** @brief Returns whether the TCPAddress is valid
*
* Returns whether the address and port are valid. This function also
Expand Down
76 changes: 75 additions & 1 deletion src/metadata_cache/include/mysqlrouter/metadata_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <exception>
#include <vector>
#include <map>
#include <list>
#include <string>

#include "mysqlrouter/utils.h"
Expand Down Expand Up @@ -168,7 +169,59 @@ class METADATA_API LookupResult {
const std::vector<metadata_cache::ManagedInstance> instance_vector;
};

METADATA_API class MetadataCacheAPIBase {
/**
* @brief Abstract class that provides interface for listener on
* replicaset status changes.
*
* When state of replicaset is changed, notify function is called.
*/
class METADATA_API ReplicasetStateListenerInterface {
public:

/**
* @brief Callback function that is called when state of replicaset is changed.
*
* @param instances allowed nodes
* @param md_servers_reachable true if metadata changed, false if metadata unavailable
*/
virtual void notify(const LookupResult& instances, const bool md_servers_reachable) noexcept = 0;
virtual ~ReplicasetStateListenerInterface();
};

/**
* @brief Abstract class that provides interface for adding and removing
* observers on replicaset status changes.
*
* When state of replicaset is changed, then ReplicasetStateListenerInterface::notify
* function is called for every registered observer.
*/
class METADATA_API ReplicasetStateNotifierInterface {
public:

/**
* @brief Register observer that is notified when there is a change in the replicaset nodes setup/state
* discovered.
*
* @param replicaset_name name of the replicaset
* @param listener Observer object that is notified when replicaset nodes state is changed.
*
* @throw std::runtime_error if metadata cache not initialized
*/
virtual void add_listener(const std::string& replicaset_name, ReplicasetStateListenerInterface* listener) = 0;

/**
* @brief Unregister observer previously registered with add_listener()
*
* @param replicaset_name name of the replicaset
* @param listener Observer object that should be unregistered.
*
* @throw std::runtime_error if metadata cache not initialized
*/
virtual void remove_listener(const std::string& replicaset_name, ReplicasetStateListenerInterface* listener) = 0;
virtual ~ReplicasetStateNotifierInterface();
};

METADATA_API class MetadataCacheAPIBase : public ReplicasetStateNotifierInterface {
public:

/** @brief Initialize a MetadataCache object and start caching
Expand Down Expand Up @@ -247,6 +300,23 @@ METADATA_API class MetadataCacheAPIBase {
virtual bool wait_primary_failover(const std::string &replicaset_name,
int timeout) = 0;

/**
* @brief Register observer that is notified when there is a change in the replicaset nodes setup/state
* discovered.
*
* @param replicaset_name name of the replicaset
* @param listener Observer object that is notified when replicaset nodes state is changed.
*/
virtual void add_listener(const std::string& replicaset_name, ReplicasetStateListenerInterface* listener) = 0;

/**
* @brief Unregister observer previously registered with add_listener()
*
* @param replicaset_name name of the replicaset
* @param listener Observer object that should be unregistered.
*/
virtual void remove_listener(const std::string& replicaset_name, ReplicasetStateListenerInterface* listener) = 0;

virtual ~MetadataCacheAPIBase() {}
};

Expand All @@ -269,6 +339,10 @@ METADATA_API class MetadataCacheAPI: public MetadataCacheAPIBase {

bool wait_primary_failover(const std::string &replicaset_name,
int timeout) override;

void add_listener(const std::string& replicaset_name, ReplicasetStateListenerInterface* listener) override;
void remove_listener(const std::string& replicaset_name, ReplicasetStateListenerInterface* listener) override;

private:
MetadataCacheAPI() {}
MetadataCacheAPI(const MetadataCacheAPI&) = delete;
Expand Down
36 changes: 23 additions & 13 deletions src/metadata_cache/src/cache_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,21 @@ const std::string kDefaultMetadataCluster = ""; // blank cluster name means pick
const unsigned int kDefaultConnectTimeout = 30;
const unsigned int kDefaultReadTimeout = 30;

ReplicasetStateListenerInterface::~ReplicasetStateListenerInterface() = default;
ReplicasetStateNotifierInterface::~ReplicasetStateNotifierInterface() = default;

MetadataCacheAPIBase* MetadataCacheAPI::instance() {
static MetadataCacheAPI instance_;
return &instance_;
}

#define LOCK_METADATA_AND_CHECK_INITIALIZED() \
std::lock_guard<std::mutex> lock(g_metadata_cache_m); \
if (g_metadata_cache == nullptr) { \
throw std::runtime_error("Metadata Cache not initialized"); \
}


/**
* Initialize the metadata cache.
*
Expand Down Expand Up @@ -106,32 +116,32 @@ void MetadataCacheAPI::cache_stop() noexcept {
*
*/
LookupResult MetadataCacheAPI::lookup_replicaset(const std::string &replicaset_name) {
std::lock_guard<std::mutex> lock(g_metadata_cache_m);

if (g_metadata_cache == nullptr) {
throw std::runtime_error("Metadata Cache not initialized");
}
LOCK_METADATA_AND_CHECK_INITIALIZED();

return LookupResult(g_metadata_cache->replicaset_lookup(replicaset_name));
}


void MetadataCacheAPI::mark_instance_reachability(const std::string &instance_id,
InstanceStatus status) {
std::lock_guard<std::mutex> lock(g_metadata_cache_m);
if (g_metadata_cache == nullptr) {
throw std::runtime_error("Metadata Cache not initialized");
}
LOCK_METADATA_AND_CHECK_INITIALIZED();

g_metadata_cache->mark_instance_reachability(instance_id, status);
}

bool MetadataCacheAPI::wait_primary_failover(const std::string &replicaset_name, int timeout) {
std::lock_guard<std::mutex> lock(g_metadata_cache_m);
if (g_metadata_cache == nullptr) {
throw std::runtime_error("Metadata Cache not initialized");
}
LOCK_METADATA_AND_CHECK_INITIALIZED();

return g_metadata_cache->wait_primary_failover(replicaset_name, timeout);
}

void MetadataCacheAPI::add_listener(const std::string& replicaset_name, ReplicasetStateListenerInterface* listener) {
LOCK_METADATA_AND_CHECK_INITIALIZED();
g_metadata_cache->add_listener(replicaset_name, listener);
}
void MetadataCacheAPI::remove_listener(const std::string& replicaset_name, ReplicasetStateListenerInterface* listener) {
LOCK_METADATA_AND_CHECK_INITIALIZED();
g_metadata_cache->remove_listener(replicaset_name, listener);
}

} // namespace metadata_cache
53 changes: 44 additions & 9 deletions src/metadata_cache/src/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -612,22 +612,30 @@ static const char *str_mode(metadata_cache::ServerMode mode) {
*/
void MetadataCache::refresh() {

// used when something unusual happens that prevents
// us from querying the metadata
auto clear_metadata_for_replicaset = [&]() {
bool clearing;
{
std::lock_guard<std::mutex> lock(cache_refreshing_mutex_);
clearing = !replicaset_data_.empty();
if (clearing)
replicaset_data_.clear();
}
if (clearing) {
log_info("... cleared current routing table as a precaution");
on_instances_changed(/*md_servers_reachable=*/false);
}
};

{
#if 0 // not used anywhere else so far
std::lock_guard<std::mutex> lock(metadata_servers_mutex_);
#endif
// TODO: connect() could really be called from inside of metadata_->fetch_instances()
if (!meta_data_->connect(metadata_servers_)) { // metadata_servers_ come from config file
log_error("Failed connecting to metadata servers");
bool clearing;
{
std::lock_guard<std::mutex> lock(cache_refreshing_mutex_);
clearing = !replicaset_data_.empty();
if (clearing)
replicaset_data_.clear();
}
if (clearing)
log_info("... cleared current routing table as a precaution");
clear_metadata_for_replicaset();
return;
}
}
Expand Down Expand Up @@ -681,6 +689,8 @@ void MetadataCache::refresh() {
}
}
}

on_instances_changed(/*md_servers_reachable=*/true);
}

/* Not sure about this, the metadata server could be stored elsewhere
Expand All @@ -696,7 +706,22 @@ void MetadataCache::refresh() {
metadata_servers_ = metadata_servers_temp_;
}*/
} catch (const std::runtime_error &exc) {
// fetching the meatadata failed
log_error("Failed fetching metadata: %s", exc.what());
clear_metadata_for_replicaset();
}
}

void MetadataCache::on_instances_changed(const bool md_servers_reachable) {
std::lock_guard<std::mutex> lock(replicaset_instances_change_callbacks_mtx_);

for (auto& replicaset_clb: listeners_) {
const std::string replicaset_name = replicaset_clb.first;
auto res = replicaset_lookup(replicaset_name);

for(auto each : listeners_[replicaset_name]) {
each->notify(res, md_servers_reachable);
}
}
}

Expand Down Expand Up @@ -766,3 +791,13 @@ bool MetadataCache::wait_primary_failover(const std::string &replicaset_name,
}
return false;
}

void MetadataCache::add_listener(const std::string& replicaset_name, metadata_cache::ReplicasetStateListenerInterface* listener) {
std::lock_guard<std::mutex> lock(replicaset_instances_change_callbacks_mtx_);
listeners_[replicaset_name].insert(listener);
}

void MetadataCache::remove_listener(const std::string& replicaset_name, metadata_cache::ReplicasetStateListenerInterface* listener) {
std::lock_guard<std::mutex> lock(replicaset_instances_change_callbacks_mtx_);
listeners_[replicaset_name].erase(listener);
}
31 changes: 30 additions & 1 deletion src/metadata_cache/src/metadata_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ClusterMetadata;
* MySQL Server.
*
*/
class METADATA_API MetadataCache {
class METADATA_API MetadataCache : public metadata_cache::ReplicasetStateNotifierInterface {

public:

Expand Down Expand Up @@ -121,6 +121,25 @@ class METADATA_API MetadataCache {

/** @brief run refresh thread */
static void* run_thread(void* context);


/**
* @brief Register observer that is notified when there is a change in the replicaset nodes setup/state
* discovered.
*
* @param replicaset_name name of the replicaset
* @param listener Observer object that is notified when replicaset nodes state is changed.
*/
void add_listener(const std::string& replicaset_name, metadata_cache::ReplicasetStateListenerInterface* listener) override;

/**
* @brief Unregister observer previously registered with add_listener()
*
* @param replicaset_name name of the replicaset
* @param listener Observer object that should be unregistered.
*/
void remove_listener(const std::string& replicaset_name, metadata_cache::ReplicasetStateListenerInterface* listener) override;

private:

/** @brief Refreshes the cache
Expand All @@ -129,6 +148,10 @@ class METADATA_API MetadataCache {
*/
void refresh();

// Called each time the metadata has changed and we need to notify
// the subscribed observers
void on_instances_changed(const bool md_servers_reachable);

// Stores the list replicasets and their server instances.
// Keyed by replicaset name
std::map<std::string, metadata_cache::ManagedReplicaSet> replicaset_data_;
Expand Down Expand Up @@ -173,6 +196,12 @@ class METADATA_API MetadataCache {
// Flag used to terminate the refresh thread.
std::atomic_bool terminate_;

// map of lists (per each replicaset name) of registered callbacks to be called
// on selected replicaset instances change event
std::mutex replicaset_instances_change_callbacks_mtx_;

std::map<std::string, std::set<metadata_cache::ReplicasetStateListenerInterface*>> listeners_;

#ifdef FRIEND_TEST
FRIEND_TEST(FailoverTest, basics);
FRIEND_TEST(FailoverTest, primary_failover);
Expand Down
4 changes: 4 additions & 0 deletions src/routing/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ set(ROUTING_SOURCE_FILES
${CMAKE_CURRENT_SOURCE_DIR}/src/dest_round_robin.cc
${CMAKE_CURRENT_SOURCE_DIR}/src/routing.cc
${CMAKE_CURRENT_SOURCE_DIR}/src/protocol/classic_protocol.cc
${CMAKE_CURRENT_SOURCE_DIR}/src/connection.cc
${CMAKE_CURRENT_SOURCE_DIR}/src/context.cc
${CMAKE_CURRENT_SOURCE_DIR}/src/mysql_routing_common.cc
${CMAKE_CURRENT_SOURCE_DIR}/src/connection_container.cc
${ROUTING_SOURCE_FILES_X_PROTOCOL}
)

Expand Down
Loading

0 comments on commit da605a7

Please sign in to comment.