Skip to content
This repository has been archived by the owner on Jun 1, 2023. It is now read-only.

Commit

Permalink
added DELETE /api/v1/mock_server/connections/
Browse files Browse the repository at this point in the history
* and let SIGTERM close_all_connections()
  • Loading branch information
weigon committed Jun 14, 2018
1 parent c5a5ef2 commit baa2556
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class MOCK_SERVER_EXPORT MockServerComponent {
void init(std::shared_ptr<server_mock::MySQLServerMock> srv);

std::shared_ptr<MockServerGlobalScope> getGlobalScope();
void close_all_connections();
};

#endif
6 changes: 6 additions & 0 deletions src/mock_server/src/mock_server_component.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,9 @@ MockServerComponent& MockServerComponent::getInstance() {
return instance;
}

void MockServerComponent::close_all_connections() {
// if we have a mock_server instance, call its close_all_connections()
if (auto srv = srv_.lock()) {
srv->close_all_connections();
}
}
39 changes: 39 additions & 0 deletions src/mock_server/src/mysql_protocol_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
# include <sys/socket.h>
# include <sys/types.h>
# include <unistd.h>
# include <poll.h>
# include <cstring>
#else
# define WIN32_LEAN_AND_MEAN
# include <windows.h>
Expand Down Expand Up @@ -74,6 +76,43 @@ void read_packet(socket_t client_socket, uint8_t *data, size_t size, int flags)
ssize_t received = 0;
size_t buffer_offset = 0;
while (buffer_offset < size) {
// check if the current socket is readable/open
//
// allow interrupting the read() by closing the socket in another thread
#ifdef _WIN32
WSAPOLLFD
#else
struct pollfd
#endif
fds[1];
memset(fds, 0, sizeof(fds));

fds[0].fd = client_socket;
#ifdef _WIN32
fds[0].events = POLLRDNORM;
#else
fds[0].events = POLLIN|POLLHUP;
#endif

while (true) {
// check if someone closed our socket externally
#ifdef _WIN32
int r = ::WSAPoll(fds, 1, 100);
#else
int r = ::poll(fds, 1, 100);
#endif

if (r > 0) break;
if (r < 0) throw std::system_error(get_socket_errno(), std::system_category(), "poll() failed");

if (fds[0].revents & POLLNVAL) {
// another thread may have closed the socket
throw std::runtime_error("poll() reported: invalid socket");
}

// timeout, just wait a bit more
}

received = recv(client_socket, reinterpret_cast<char*>(data)+buffer_offset,
size-buffer_offset, flags);
if (received < 0) {
Expand Down
44 changes: 36 additions & 8 deletions src/mock_server/src/mysql_server_mock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ IMPORT_LOG_FUNCTIONS()
#include <system_error>
#include <deque>
#include <queue>
#include <set>

#ifndef _WIN32
# include <netdb.h>
Expand Down Expand Up @@ -89,13 +90,19 @@ MySQLServerMock::MySQLServerMock(

MySQLServerMock::~MySQLServerMock() {
if (listener_ > 0) {
#ifndef _WIN32
::shutdown(listener_, SHUT_RDWR);
#endif
close_socket(listener_);
}
}

// close all active connections
void MySQLServerMock::close_all_connections() {
std::lock_guard<std::mutex> active_fd_lock(active_fds_mutex_);
for (auto it = active_fds_.begin(); it != active_fds_.end(); ) {
close_socket(*it);
it = active_fds_.erase(it);
}
}

void MySQLServerMock::run(mysql_harness::PluginFuncEnv* env) {
setup_service();
handle_connections(env);
Expand Down Expand Up @@ -300,9 +307,6 @@ MySQLServerMockSession::MySQLServerMockSession(
}

MySQLServerMockSession::~MySQLServerMockSession() {
if (client_socket_ != -1) {
close_socket(client_socket_);
}
}

void MySQLServerMockSession::run() {
Expand Down Expand Up @@ -409,6 +413,7 @@ void MySQLServerMock::handle_connections(mysql_harness::PluginFuncEnv* env) {
log_info("Starting to handle connections on port: %d", bind_port_);

concurrent_queue<Work> work_queue;
concurrent_queue<socket_t> socket_queue;

auto connection_handler = [&]() -> void {
while (true) {
Expand Down Expand Up @@ -448,9 +453,21 @@ void MySQLServerMock::handle_connections(mysql_harness::PluginFuncEnv* env) {
MySQLProtocolEncoder().encode_error_message(
0, 1064, "", "reader error: " + std::string(e.what())),
0);
close_socket(work.client_socket);
log_error("%s", e.what());
}

// first remove the book-keeping, then close the socket
// to avoid a race between the acceptor and the worker thread
{
// socket is done, unregister it
std::lock_guard<std::mutex> active_fd_lock(active_fds_mutex_);
auto it = active_fds_.find(work.client_socket);
if (it != active_fds_.end()) {
// it should always be there
active_fds_.erase(it);
}
}
close_socket(work.client_socket);
}
};

Expand Down Expand Up @@ -502,12 +519,23 @@ void MySQLServerMock::handle_connections(mysql_harness::PluginFuncEnv* env) {
return;
}

{
// socket is new, register it
std::lock_guard<std::mutex> active_fd_lock(active_fds_mutex_);
active_fds_.emplace(client_socket);
}

// std::cout << "Accepted client " << client_socket << std::endl;
work_queue.push(Work {client_socket, expected_queries_file_, module_prefix_, debug_mode_});
}
}
}

// beware, this closes all sockets that are either in the work-queue or
// currently handled by worker-threads. As long as we don't reuse the file-handles
// for anything else before we leave this function, this approach is safe.
close_all_connections();

// std::cerr << "sending death-signal to threads" << std::endl;
for (size_t ndx = 0; ndx < worker_threads.size(); ndx++) {
work_queue.push(Work { kInvalidSocket, "", "", 0});
Expand All @@ -522,7 +550,7 @@ void MySQLServerMock::handle_connections(mysql_harness::PluginFuncEnv* env) {
bool MySQLServerMockSession::process_statements(socket_t client_socket) {
using mysql_protocol::Command;

while (true) {
while (!killed_) {
protocol_decoder_.read_message(client_socket);
auto cmd = protocol_decoder_.get_command_type();
switch (cmd) {
Expand Down
14 changes: 14 additions & 0 deletions src/mock_server/src/mysql_server_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
#ifndef MYSQLD_MOCK_MYSQL_SERVER_MOCK_INCLUDED
#define MYSQLD_MOCK_MYSQL_SERVER_MOCK_INCLUDED

#include <memory>
#include <mutex>
#include <set>

#include "statement_reader.h"
#include "mysql_protocol_decoder.h"
#include "mysql_protocol_encoder.h"
Expand Down Expand Up @@ -69,7 +73,12 @@ class MySQLServerMockSession {
uint16_t warning_count=0);

void run();

void kill() {
killed_ = true;
}
private:
bool killed_ { false };
socket_t client_socket_;
MySQLProtocolEncoder protocol_encoder_;
MySQLProtocolDecoder protocol_decoder_;
Expand Down Expand Up @@ -110,6 +119,8 @@ class MySQLServerMock {
return shared_globals_;
}

void close_all_connections();

~MySQLServerMock();

private:
Expand All @@ -125,6 +136,9 @@ class MySQLServerMock {
std::string module_prefix_;

std::shared_ptr<MockServerGlobalScope> shared_globals_ {new MockServerGlobalScope};

std::mutex active_fds_mutex_;
std::set<socket_t> active_fds_;
};

} // namespace
Expand Down
45 changes: 40 additions & 5 deletions src/mock_server/src/rest_mock_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ IMPORT_LOG_FUNCTIONS()
#endif

static constexpr const char kSectionName[] { "rest_mock_server" };
static constexpr const char kHttpHandlerUri[] { "^/api/v1/mock_server/globals/$" };
static constexpr const char kRestGlobalsUri[] { "^/api/v1/mock_server/globals/$" };
static constexpr const char kRestConnectionsUri[] { "^/api/v1/mock_server/connections/$" };

// AddressSanitizer gets confused by the default, MemoryPoolAllocator
// Solaris sparc also gets crashes
Expand All @@ -68,9 +69,9 @@ using mysql_harness::PluginFuncEnv;
using mysql_harness::PLUGIN_ABI_VERSION;
using mysql_harness::Plugin;

class RestApiV1MockServer: public BaseRequestHandler {
class RestApiV1MockServerGlobals: public BaseRequestHandler {
public:
RestApiV1MockServer():
RestApiV1MockServerGlobals():
last_modified_(time(nullptr)) {}

// GET|PUT
Expand Down Expand Up @@ -202,6 +203,38 @@ class RestApiV1MockServer: public BaseRequestHandler {

};

class RestApiV1MockServerConnections: public BaseRequestHandler {
public:
// allow methods: DELETE
//
void handle_request(HttpRequest &req) override {
if (!((HttpMethod::Delete) & req.get_method())) {
req.get_output_headers().add("Allow", "DELETE");
req.send_reply(HttpStatusCode::MethodNotAllowed);
return;
}

if (req.get_input_headers().get("Content-Range")) {
req.send_reply(HttpStatusCode::NotImplemented);
return;
}

handle_connections_delete_all(req);
}
private:
/**
* close all connections.
*/
void handle_connections_delete_all(HttpRequest &req) {
// tell the mock_server to close all connections
MockServerComponent::getInstance().close_all_connections();

req.send_reply(HttpStatusCode::Ok);
}

};


static void init(PluginFuncEnv* env) {
const mysql_harness::AppInfo* info = get_app_info(env);

Expand All @@ -221,13 +254,15 @@ static void init(PluginFuncEnv* env) {
static void start(PluginFuncEnv*) {
auto &srv = HttpServerComponent::getInstance();

srv.add_route(kHttpHandlerUri, std::unique_ptr<RestApiV1MockServer>(new RestApiV1MockServer()));
srv.add_route(kRestGlobalsUri, std::unique_ptr<BaseRequestHandler>(new RestApiV1MockServerGlobals()));
srv.add_route(kRestConnectionsUri, std::unique_ptr<BaseRequestHandler>(new RestApiV1MockServerConnections()));
}

static void stop(PluginFuncEnv*) {
auto &srv = HttpServerComponent::getInstance();

srv.remove_route(kHttpHandlerUri);
srv.remove_route(kRestConnectionsUri);
srv.remove_route(kRestGlobalsUri);
}


Expand Down

0 comments on commit baa2556

Please sign in to comment.