Skip to content

Commit

Permalink
renamed networkin -> io and rewritten raw objects, bored to death typ…
Browse files Browse the repository at this point in the history
…ing such a long words
  • Loading branch information
Kobolog committed Jun 9, 2012
1 parent 3d1cd97 commit 109af70
Show file tree
Hide file tree
Showing 15 changed files with 66 additions and 65 deletions.
6 changes: 3 additions & 3 deletions include/cocaine/engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
// Has to be included after common.h
#include <ev++.h>

#include "cocaine/io.hpp"
#include "cocaine/master.hpp"
#include "cocaine/networking.hpp"

#include "cocaine/helpers/json.hpp"

Expand Down Expand Up @@ -156,7 +156,7 @@ class engine_t:
Packed& packed)
{
m_bus.send(
networking::protect(master.id()),
io::protect(master.id()),
ZMQ_SNDMORE
);

Expand Down Expand Up @@ -216,7 +216,7 @@ class engine_t:
ev::async m_notification;

// Slave RPC bus.
networking::channel_t m_bus;
io::channel_t m_bus;

// Engine's thread.
std::auto_ptr<boost::thread> m_thread;
Expand Down
74 changes: 37 additions & 37 deletions include/cocaine/networking.hpp → include/cocaine/io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
// limitations under the License.
//

#ifndef COCAINE_NETWORKING_HPP
#define COCAINE_NETWORKING_HPP
#ifndef COCAINE_IO_HPP
#define COCAINE_IO_HPP

#include <boost/mpl/int.hpp>
#include <boost/type_traits/remove_const.hpp>
#include <boost/tuple/tuple.hpp>
#include <msgpack.hpp>
#include <zmq.hpp>
Expand All @@ -27,7 +28,7 @@

#define HOSTNAME_MAX_LENGTH 256

namespace cocaine { namespace networking {
namespace cocaine { namespace io {

using namespace boost::tuples;

Expand Down Expand Up @@ -192,44 +193,37 @@ class scoped_option {
size_t size;
};

// Specialize this to disable specific type serialization.
template<class T> class raw;

template<> class raw<std::string> {
public:
raw(std::string& string):
m_string(string)
{ }

void pack(zmq::message_t& message) const {
message.rebuild(m_string.size());
memcpy(message.data(), m_string.data(), m_string.size());
}
// A wrapper class to disable automatic type serialization.
// --------------------------------------------------------

bool unpack(/* const */ zmq::message_t& message) {
m_string.assign(
static_cast<const char*>(message.data()),
message.size());
return true;
}
template<class T>
struct raw {
raw(T& value_):
value(value_)
{ }

private:
std::string& m_string;
T& value;
};

template<> class raw<const std::string> {
public:
raw(const std::string& string):
m_string(string)
{ }
// Specialize this to disable specific type serialization.
template<class T>
struct serialization_traits;

void pack(zmq::message_t& message) const {
message.rebuild(m_string.size());
memcpy(message.data(), m_string.data(), m_string.size());
}
template<>
struct serialization_traits<std::string> {
static void pack(zmq::message_t& message, const std::string& value) {
message.rebuild(value.size());
memcpy(message.data(), value.data(), value.size());
}

private:
const std::string& m_string;
static bool unpack(zmq::message_t& message, std::string& value) {
value.assign(
static_cast<const char*>(message.data()),
message.size()
);

return true;
}
};

template<class T>
Expand Down Expand Up @@ -276,7 +270,11 @@ class channel_t:
int flags = 0)
{
zmq::message_t message;
object.pack(message);

serialization_traits<
typename boost::remove_const<T>::type
>::pack(message, object.value);

return send(message, flags);
}

Expand Down Expand Up @@ -341,7 +339,9 @@ class channel_t:
return false;
}

return result.unpack(message);
return serialization_traits<
typename boost::remove_const<T>::type
>::unpack(message, result.value);
}

// Receives and unpacks a tuple.
Expand Down
2 changes: 1 addition & 1 deletion include/cocaine/rpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#ifndef COCAINE_RPC_HPP
#define COCAINE_RPC_HPP

#include "cocaine/networking.hpp"
#include "cocaine/io.hpp"

namespace cocaine { namespace engine { namespace rpc {

Expand Down
7 changes: 3 additions & 4 deletions include/cocaine/server/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
#include <ev++.h>

#include "cocaine/auth.hpp"
#include "cocaine/context.hpp"
#include "cocaine/networking.hpp"
#include "cocaine/io.hpp"

#include "cocaine/helpers/json.hpp"

Expand Down Expand Up @@ -96,11 +95,11 @@ class server_t:
ev::timer m_pumper;

// System I/O.
networking::socket_t m_server;
io::socket_t m_server;

// Automatic discovery support.
std::auto_ptr<ev::timer> m_announce_timer;
std::auto_ptr<networking::socket_t> m_announces;
std::auto_ptr<io::socket_t> m_announces;

// Authorization subsystem.
crypto::auth_t m_auth;
Expand Down
5 changes: 2 additions & 3 deletions include/cocaine/slave/slave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
// Has to be included after common.h
#include <ev++.h>

#include "cocaine/context.hpp"
#include "cocaine/networking.hpp"
#include "cocaine/io.hpp"

#include "cocaine/interfaces/sandbox.hpp"

Expand Down Expand Up @@ -90,7 +89,7 @@ class slave_t:
m_suicide_timer;

// Engine RPC.
networking::channel_t m_bus;
io::channel_t m_bus;
};

}}
Expand Down
2 changes: 1 addition & 1 deletion plugins/native-server/native_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

using namespace cocaine::engine;
using namespace cocaine::engine::drivers;
using namespace cocaine::networking;
using namespace cocaine::io;

native_job_t::native_job_t(const std::string& event,
const blob_t& request,
Expand Down
12 changes: 6 additions & 6 deletions plugins/native-server/native_job.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
#ifndef COCAINE_NATIVE_SERVER_JOB_HPP
#define COCAINE_NATIVE_SERVER_JOB_HPP

#include "cocaine/io.hpp"
#include "cocaine/job.hpp"
#include "cocaine/networking.hpp"

namespace cocaine { namespace engine { namespace drivers {

Expand All @@ -26,8 +26,8 @@ class native_job_t:
native_job_t(const std::string& event,
const blob_t& request,
const policy_t& policy,
networking::channel_t& channel,
const networking::route_t& route,
io::channel_t& channel,
const io::route_t& route,
const std::string& tag);

virtual void react(const events::chunk& event);
Expand All @@ -41,7 +41,7 @@ class native_job_t:
std::for_each(
m_route.begin(),
m_route.end(),
networking::route(m_channel)
io::route(m_channel)
);
} catch(const zmq::error_t& e) {
// NOTE: The client is down.
Expand All @@ -56,8 +56,8 @@ class native_job_t:
}

private:
networking::channel_t& m_channel;
const networking::route_t m_route;
io::channel_t& m_channel;
const io::route_t m_route;
const std::string m_tag;
};

Expand Down
2 changes: 1 addition & 1 deletion plugins/native-server/native_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

using namespace cocaine;
using namespace cocaine::engine::drivers;
using namespace cocaine::networking;
using namespace cocaine::io;

namespace msgpack {
inline engine::policy_t& operator >> (msgpack::object o,
Expand Down
4 changes: 2 additions & 2 deletions plugins/native-server/native_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
// Has to be included after common.h
#include <ev++.h>

#include "cocaine/io.hpp"
#include "cocaine/job.hpp"
#include "cocaine/networking.hpp"

#include "cocaine/interfaces/driver.hpp"

Expand Down Expand Up @@ -67,7 +67,7 @@ class native_server_t:
// reason doesn't trigger the socket's fd on message arrival (or I poll it in a wrong way).
ev::timer m_pumper;

networking::channel_t m_channel;
io::channel_t m_channel;

// Dynamic port.
uint16_t m_port;
Expand Down
6 changes: 3 additions & 3 deletions src/context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

#include "cocaine/context.hpp"

#include "cocaine/io.hpp"
#include "cocaine/logging.hpp"
#include "cocaine/networking.hpp"

#include "cocaine/storages/files.hpp"

Expand Down Expand Up @@ -114,8 +114,8 @@ config_t::config_t(const std::string& path):
spool_path = root["paths"].get("spool", defaults::spool_path).asString();
validate_path(spool_path);

// Networking configuration
// ------------------------
// IO configuration
// ----------------

ipc_path = root["paths"].get("ipc", defaults::ipc_path).asString();
validate_path(ipc_path);
Expand Down
2 changes: 1 addition & 1 deletion src/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include "cocaine/dealer/types.hpp"

using namespace cocaine::engine;
using namespace cocaine::networking;
using namespace cocaine::io;

void job_queue_t::push(const_reference job) {
if(job->policy.urgent) {
Expand Down
1 change: 1 addition & 0 deletions src/server/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <iostream>

#include "cocaine/config.hpp"
#include "cocaine/context.hpp"

#include "cocaine/server/server.hpp"

Expand Down
2 changes: 1 addition & 1 deletion src/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ server_t::server_t(context_t& context, server_config_t config):
// -------------

if(!config.announce_endpoints.empty()) {
m_announces.reset(new networking::socket_t(m_context.io(), ZMQ_PUB));
m_announces.reset(new io::socket_t(m_context.io(), ZMQ_PUB));
m_announces->setsockopt(ZMQ_LINGER, &linger, sizeof(linger));

for(std::vector<std::string>::const_iterator it = config.announce_endpoints.begin();
Expand Down
1 change: 1 addition & 0 deletions src/slave/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <iostream>

#include "cocaine/config.hpp"
#include "cocaine/context.hpp"

#include "cocaine/slave/slave.hpp"

Expand Down
5 changes: 3 additions & 2 deletions src/slave/slave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "cocaine/slave/slave.hpp"

#include "cocaine/context.hpp"
#include "cocaine/logging.hpp"
#include "cocaine/manifest.hpp"
#include "cocaine/rpc.hpp"
Expand Down Expand Up @@ -92,8 +93,8 @@ void slave_t::run() {
blob_t slave_t::read(int timeout) {
zmq::message_t message;

networking::scoped_option<
networking::options::receive_timeout
io::scoped_option<
io::options::receive_timeout
> option(m_bus, timeout);

m_bus.recv(&message);
Expand Down

0 comments on commit 109af70

Please sign in to comment.