Skip to content

Commit

Permalink
application object, test renames
Browse files Browse the repository at this point in the history
  • Loading branch information
thesamet committed Dec 26, 2011
1 parent f373353 commit 0faaea3
Show file tree
Hide file tree
Showing 14 changed files with 278 additions and 42 deletions.
91 changes: 91 additions & 0 deletions include/zrpc/application.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2011 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Author: [email protected] <Nadav Samet>

#ifndef ZRPC_APPLICATION_H
#define ZRPC_APPLICATION_H

#include <string>
#include "zrpc/macros.h"

namespace zmq {
class context_t;
} // namespace zmq

namespace zrpc {
class ConnectionManager;
class EventManager;
class RpcChannel;
class Server;

// zrpc::Application is a simple interface that helps setting up a common
// ZRPC client or server application.
class Application {
public:
class Options {
public:
Options() : connection_manager_threads(1),
event_manager_threads(10),
zeromq_context(NULL),
zeromq_io_threads(1) {}

// Number of connection manager threads. Each connection manager thread will
// start a socket to any server we are talking to, so most applications
// should keep this set to 1.
int connection_manager_threads;

// Number of event manager threads. For servers, those threads are going to
// be used to process requests. For clients, those threads are going to be
// used to execute callbacks when the response from a server is available.
int event_manager_threads;

// ZeroMQ context to use for our application. If NULL, then Application will
// construct its own ZeroMQ context and own it. If you provide your own
// ZeroMQ context, Application will not take ownership of it. The ZeroMQ
// context must outlive the application.
zmq::context_t* zeromq_context;

// Number of ZeroMQ I/O threads, to be passed to zmq_init(). This value is
// ignored when you provide your own ZeroMQ context.
int zeromq_io_threads;
};

Application();

explicit Application(const Options& options);

virtual ~Application();

// Creates an RpcChannel to the given endpoint. Attach it to a Stub and you
// can start making calls through this channel from any thread. No locking
// needed. It is your responsibility to delete this object.
virtual RpcChannel* CreateRpcChannel(const std::string& endpoint);

// Creates a server that listens to connection on the provided endpoint.
// Call RegisterServer on the provided object to add services your
// implemented, and then call Start(). The calling thread will start serving
// requests (by forwarding them to the event manager).
virtual Server* CreateServer(const std::string& endpoint);

private:
void Init(const Options& options);

bool owns_context_;
zmq::context_t* context_;
scoped_ptr<ConnectionManager> connection_manager_;
scoped_ptr<EventManager> event_manager_;
};
} // namespace zrpc
#endif
10 changes: 3 additions & 7 deletions include/zrpc/connection_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class Connection;
class ConnectionThreadContext;
class MessageVector;
class RemoteResponse;
class RpcChannel;

namespace internal {
struct ThreadContext;
Expand All @@ -62,9 +61,9 @@ class ConnectionManager {
public:
// Constructs an EventManager that uses the provided ZeroMQ context and
// has nthreads worker threads. The ConnectionManager does not take ownership
// of the given ZeroMQ context. The provided event_manager is used for
// executing user-supplied closures. If the event_manager is NULL then the
// closure parameter supplied to SendRequest must be NULL.
// of the given ZeroMQ context and event manager. The provided event_manager
// is used for executing user-supplied closures. If the event_manager is NULL
// then the closure parameter supplied to SendRequest must be NULL.
ConnectionManager(zmq::context_t* context, EventManager* event_manager,
int nthreads=1);

Expand Down Expand Up @@ -119,9 +118,6 @@ class Connection {

virtual ~Connection() {};

// Creates an RpcChannel for this connection.
virtual RpcChannel* MakeChannel() = 0;

virtual zmq::socket_t* CreateConnectedSocket(zmq::context_t* context) = 0;

protected:
Expand Down
3 changes: 3 additions & 0 deletions include/zrpc/rpc_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class MethodDescriptor;

namespace zrpc {
class Closure;
class Connection;
class RPC;

class RpcChannel {
Expand All @@ -51,6 +52,8 @@ class RpcChannel {
std::string* response,
Closure* done) = 0;

static RpcChannel* Create(Connection* connection, bool owns_connection=false);

virtual ~RpcChannel() {};
};
} // namespace
Expand Down
25 changes: 22 additions & 3 deletions include/zrpc/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,35 @@ class EventManager;
class ServerImpl;
class Service;

// Server receives incoming RPC requests on a socket. When such a request
// arrives it forwards it to an event manager for processing, and later passes
// the response back to the caller. This class is not thread-safe: construct it
// and use it from a single thread.
class Server {
public:
Server(zmq::socket_t* socket, EventManager* event_manager);
// Creates a server that will receive requests from the given socket that has
// already been bound to some endpoint. We assume that the socket is of ROUTER
// type. The provided event manager will be used to handle the requests.
// The Server does not take ownership of the socket, unless owns_socket is
// true. It also does not take ownership of event_manager.
Server(zmq::socket_t* socket, EventManager* event_manager,
bool owns_socket=false);

~Server();

void Start();

// Registers an RPC Service with this server. All registrations must occur
// before Start() is called.
void RegisterService(Service *service);

// Starts serving requests. The calling thread starts forwarding requests
// from the socket to the event manager for processings. Only one thread may
// call this function. The calling thread gets blocked until this function
// returns.
// Currently, the only way to get the control back to the executing thread is
// call InstallSignalHandler() and send the process a SIGTERM or SIGINT
// (Ctrl-C).
void Start();

private:
scoped_ptr<ServerImpl> server_impl_;
DISALLOW_COPY_AND_ASSIGN(Server);
Expand Down
9 changes: 5 additions & 4 deletions src/zrpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ protobuf_generate_cpp(ZRPC_PB_SRCS ZRPC_PB_HDRS proto/zrpc.proto)
set(PROTO_SOURCES ${ZRPC_PB_SRCS} ${ZRPC_PB_HDRS})

include_directories(${PROJECT_BINARY_DIR})
set(ZRPC_SOURCES clock.cc function_server.cc remote_response.cc
connection_manager.cc rpc_channel_impl.cc rpc.cc server.cc
reactor.cc zmq_utils.cc sync_event.cc event_manager.cc
${PROTO_SOURCES})
set(ZRPC_SOURCES
application.cc clock.cc connection_manager.cc event_manager.cc
function_server.cc reactor.cc remote_response.cc rpc.cc
rpc_channel_impl.cc server.cc sync_event.cc zmq_utils.cc
${PROTO_SOURCES})
set(ZRPC_LIB_DEPS ${ZEROMQ_LIBRARIES}
${GFLAGS_LIBRARIES} ${GLOG_LIBRARIES} ${PROTOBUF_LIBRARY}
${Boost_THREAD_LIBRARIES})
Expand Down
70 changes: 70 additions & 0 deletions src/zrpc/application.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2011 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Author: [email protected] <Nadav Samet>

#include <string>
#include "zmq.hpp"
#include "zrpc/application.h"
#include "zrpc/event_manager.h"
#include "zrpc/connection_manager.h"
#include "zrpc/rpc_channel.h"
#include "zrpc/server.h"

namespace zrpc {

Application::Application() {
Init(Options());
};

Application::Application(const Application::Options& options) {
Init(options);
};

Application::~Application() {
connection_manager_.reset();
event_manager_.reset();
if (owns_context_) {
delete context_;
}
}

void Application::Init(const Application::Options& options) {
if (options.zeromq_context) {
context_ = options.zeromq_context;
owns_context_ = false;
} else {
context_ = new zmq::context_t(options.zeromq_io_threads);
owns_context_ = true;
}
event_manager_.reset(new EventManager(context_,
options.event_manager_threads));
connection_manager_.reset(new ConnectionManager(
context_,
event_manager_.get(),
options.connection_manager_threads));
}

RpcChannel* Application::CreateRpcChannel(const std::string& endpoint) {
return RpcChannel::Create(
connection_manager_->Connect(endpoint), true);
}

Server* Application::CreateServer(const std::string& endpoint) {
zmq::socket_t* socket = new zmq::socket_t(*context_, ZMQ_ROUTER);
socket->bind(endpoint.c_str());
Server* server = new Server(socket, event_manager_.get(), true);
return server;
}
} // namespace zrpc
6 changes: 0 additions & 6 deletions src/zrpc/connection_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
#include "zrpc/macros.h"
#include "zrpc/reactor.h"
#include "zrpc/remote_response.h"
#include "zrpc/rpc_channel.h"
#include "zrpc/rpc_channel_impl.h"
#include "zrpc/sync_event.h"

namespace zrpc {
Expand Down Expand Up @@ -232,10 +230,6 @@ class ConnectionImpl : public Connection {

virtual ~ConnectionImpl() {}

virtual RpcChannel* MakeChannel() {
return new RpcChannelImpl(this);
}

virtual void SendRequest(
MessageVector* request,
RemoteResponse* response,
Expand Down
11 changes: 9 additions & 2 deletions src/zrpc/rpc_channel_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@

namespace zrpc {

RpcChannelImpl::RpcChannelImpl(Connection* connection)
: connection_(connection) {
RpcChannel* RpcChannel::Create(Connection* connection, bool owns_connection) {
return new RpcChannelImpl(connection, owns_connection);
}

RpcChannelImpl::RpcChannelImpl(Connection* connection, bool owns_connection)
: connection_(connection), owns_connection_(owns_connection) {
}

RpcChannelImpl::~RpcChannelImpl() {
if (owns_connection_) {
delete connection_;
}
}

struct RpcResponseContext {
Expand Down
3 changes: 2 additions & 1 deletion src/zrpc/rpc_channel_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct RpcResponseContext;

class RpcChannelImpl: public RpcChannel {
public:
RpcChannelImpl(Connection* connection);
RpcChannelImpl(Connection* connection, bool owns_connection=false);

virtual ~RpcChannelImpl();

Expand Down Expand Up @@ -56,6 +56,7 @@ class RpcChannelImpl: public RpcChannel {
Closure* done);

Connection* connection_;
bool owns_connection_;
};
} // namespace zrpc
#endif /* ZRPC_SIMPLE_RPC_CHANNEL_IMPL_H_ */
16 changes: 12 additions & 4 deletions src/zrpc/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ void FinalizeResponse(

class ServerImpl {
public:
ServerImpl(zmq::socket_t* socket, FunctionServer* function_server)
: socket_(socket), function_server_(function_server) {}
ServerImpl(zmq::socket_t* socket, FunctionServer* function_server,
bool owns_socket)
: socket_(socket), function_server_(function_server),
owns_socket_(owns_socket) {}

void Start() {
// The reactor owns all sockets.
Expand All @@ -125,6 +127,9 @@ class ServerImpl {
this, &ServerImpl::HandleFunctionResponse,
fs_socket));
reactor.Loop();
if (owns_socket_) {
delete socket_;
}
VLOG(2) << "Server shutdown.";
}

Expand Down Expand Up @@ -221,13 +226,16 @@ class ServerImpl {

zmq::socket_t* socket_;
FunctionServer* function_server_;
bool owns_socket_;
typedef std::map<std::string, zrpc::Service*> ServiceMap;
ServiceMap service_map_;
DISALLOW_COPY_AND_ASSIGN(ServerImpl);
};

Server::Server(zmq::socket_t* socket, EventManager* event_manager)
: server_impl_(new ServerImpl(socket, event_manager->GetFunctionServer())) {
Server::Server(zmq::socket_t* socket, EventManager* event_manager,
bool owns_socket)
: server_impl_(new ServerImpl(socket, event_manager->GetFunctionServer(),
owns_socket)) {
}

Server::~Server() {
Expand Down
9 changes: 3 additions & 6 deletions src/zrpc/zsendrpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
#include <google/protobuf/compiler/importer.h>
#include <google/protobuf/dynamic_message.h>
#include <google/protobuf/text_format.h>
#include "zrpc/connection_manager.h"
#include "zrpc/application.h"
#include "zrpc/rpc_channel.h"
#include "zrpc/service.h"
#include "zrpc/rpc.h"
#include <zmq.hpp>


static bool ValidateNotEmpty(const char* flagname, const std::string& value) {
Expand Down Expand Up @@ -96,10 +95,8 @@ void RunCall(const std::string& endpoint,
return;
}

zmq::context_t context(1);
zrpc::ConnectionManager cm(&context, NULL, 1);
scoped_ptr<Connection> connection(cm.Connect(endpoint));
scoped_ptr<RpcChannel> channel(connection->MakeChannel());
Application app;
scoped_ptr<RpcChannel> channel(app.CreateRpcChannel(endpoint));
RPC rpc;
::google::protobuf::Message *reply = factory.GetPrototype(
method_desc->output_type())->New();
Expand Down
3 changes: 2 additions & 1 deletion test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ zrpc_test(function_server_test SRCS function_server_test.cc)
zrpc_test(callback_test SRCS callback_test.cc)
zrpc_test(connection_manager_test SRCS connection_manager_test.cc)
zrpc_test(event_manager_test SRCS event_manager_test.cc)
zrpc_test(rpc_test SRCS search_main.cc LIBS search_pb)
zrpc_test(client_server_test SRCS client_server_test.cc LIBS search_pb)
zrpc_test(application_test SRCS application_test.cc LIBS search_pb)
Loading

0 comments on commit 0faaea3

Please sign in to comment.