Skip to content

Commit

Permalink
rpc: allow handler to receive non default constructable types
Browse files Browse the repository at this point in the history
Currently it is not possible because a type is instantiated before been
deserialized. Fix that by allocating space for an object by using
std::aligned_storage.
  • Loading branch information
gleb-cloudius authored and avikivity committed May 3, 2015
1 parent a28f0ef commit 2ec7535
Showing 1 changed file with 65 additions and 19 deletions.
84 changes: 65 additions & 19 deletions rpc/rpc_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,33 @@ inline std::enable_if_t<N != sizeof...(T), future<>> marshall(Serializer& serial
});
}

template<std::size_t N, typename Serializer, typename... T>
inline std::enable_if_t<N == sizeof...(T), future<>> unmarshall(Serializer&, input_stream<char>&, std::tuple<T&...>&&) {
// ArgsReady is a functor that will be called after each element is deserialized.
// It gets element's position in a tuple as a parameter. It is used by argument
// desererialization to mark already deserialized element as containing valid values
// that needs to be destroyed by a destructor.
template<std::size_t N, typename Serializer, typename ArgReady, typename... T>
inline std::enable_if_t<N == sizeof...(T), future<>> unmarshall(Serializer&, input_stream<char>&, std::tuple<T&...>&&, ArgReady&& argready) {
return make_ready_future<>();
}

template<std::size_t N = 0, typename Serializer, typename... T>
inline std::enable_if_t<N != sizeof...(T), future<>> unmarshall(Serializer& deserialize, input_stream<char>& in, std::tuple<T&...>&& args) {
template<std::size_t N = 0, typename Serializer, typename ArgReady, typename... T>
inline std::enable_if_t<N != sizeof...(T), future<>> unmarshall(Serializer& deserialize, input_stream<char>& in, std::tuple<T&...>&& args, ArgReady&& argready) {
// And you may ask yourself "What is that beautiful house?"^H^H^H^H "Why
// make_ready_future() here?". And there answer would be to convert
// exception thrown by deserialize info a future
return make_ready_future().then([&deserialize, &in, args = std::move(args)] {
return deserialize(in, std::get<N>(args)).then([&deserialize, &in, args = std::move(args)] () mutable {
return unmarshall<N + 1>(deserialize, in, std::move(args));
return make_ready_future().then([&deserialize, &in, args = std::move(args), argready = std::forward<ArgReady>(argready)] () mutable {
return deserialize(in, std::get<N>(args)).then([&deserialize, &in, args = std::move(args), argready = std::forward<ArgReady>(argready)] () mutable {
argready(N);
return unmarshall<N + 1>(deserialize, in, std::move(args), std::forward<ArgReady>(argready));
});
});
}

template<typename Serializer, typename... T>
inline future<> unmarshall(Serializer& deserializer, input_stream<char>& in, std::tuple<T&...>&& args) {
return unmarshall(deserializer, in, std::move(args), [](std::size_t n){});
}

// ref_tuple gets tuple and returns another tuple with references to members of received tuple
template<typename... T, std::size_t... I>
inline std::tuple<T&...> ref_tuple_impl(std::tuple<T...>& t, std::index_sequence<I...>) {
Expand Down Expand Up @@ -147,13 +157,44 @@ struct rcv_reply<Serializer, MsgType, void> : rcv_reply_base<void, void> {
}
};

// structure to hold outgoing message parameters on a client side
// while they are serialized
template<typename MsgType, typename... T>
struct message {
struct out_message {
MsgType t;
id_type id = 0;
std::tuple<T...> args;
message() = default;
message(MsgType xt, id_type xid, T&&... xargs) : t(xt), id(xid), args(std::forward<T>(xargs)...) {}
out_message() = delete;
out_message(MsgType xt, id_type xid, T&&... xargs) : t(xt), id(xid), args(std::forward<T>(xargs)...) {}
};

// structure to desrialize incoming message parameters to on a server side
template<typename MsgType, typename... T>
struct in_message {
using args_type = std::tuple<T...>;
id_type id = 0;
bool ready[sizeof...(T)] = {};
union U {
U() {}
~U() {}
typename std::aligned_storage<sizeof(args_type), alignof(args_type)>::type storage;
args_type args;
} u;

void set_ready(std::size_t n) {
assert(n < sizeof...(T));
ready[n] = true;
}

template<std::size_t... I>
inline void deleter(std::index_sequence<I...>) {
// this contraption calls tuple's element destructor if correspondent ready == true
int _[] = {0, (ready[I] && (std::get<I>(u.args).std::tuple_element<I, args_type>::type::~type(), true))...}; (void)_;
}

~in_message() {
deleter(std::make_index_sequence<sizeof...(T)>());
}
};

template<typename T1, typename T2>
Expand Down Expand Up @@ -254,7 +295,7 @@ auto send_helper(MsgType t, std::index_sequence<I...>) {

// send message
auto msg_id = dst.next_message_id();
auto m = std::make_unique<message<MsgType, typename std::tuple_element<I, types>::type...>>(t, msg_id, std::forward<decltype(args)>(args)...);
auto m = std::make_unique<out_message<MsgType, typename std::tuple_element<I, types>::type...>>(t, msg_id, std::forward<decltype(args)>(args)...);
auto xargs = std::tie(m->t, m->id, std::get<I>(m->args)...); // holds references to all message elements
promise<> sent; // will be fulfilled when data is sent
auto fsent = sent.get_future();
Expand Down Expand Up @@ -338,17 +379,17 @@ inline future<> reply(std::unique_ptr<snd_reply<Serializer, MsgType, Ret>>& r, t

// build callback arguments tuple depending on whether it gets client_info as a first parameter
template<bool Info, typename MsgType, typename... M>
inline auto make_apply_args(client_info& info, std::unique_ptr<message<MsgType, M...>>& m, std::enable_if_t<!Info, void*> = nullptr) {
return std::move(m->args);
inline auto make_apply_args(client_info& info, std::unique_ptr<in_message<MsgType, M...>>& m, std::enable_if_t<!Info, void*> = nullptr) {
return std::move(m->u.args);
}

template<bool Info, typename MsgType, typename... M>
inline auto make_apply_args(client_info& info, std::unique_ptr<message<MsgType, M...>>& m, std::enable_if_t<Info, void*> = nullptr) {
return std::tuple_cat(std::make_tuple(std::cref(info)), std::move(m->args));
inline auto make_apply_args(client_info& info, std::unique_ptr<in_message<MsgType, M...>>& m, std::enable_if_t<Info, void*> = nullptr) {
return std::tuple_cat(std::make_tuple(std::cref(info)), std::move(m->u.args));
}

template<typename Ret, bool Info, typename Serializer, typename MsgType, typename Func, typename... M>
inline future<std::unique_ptr<snd_reply<Serializer, MsgType, Ret>>> apply(Func& func, client_info& info, std::unique_ptr<message<MsgType, M...>>&& m) {
inline future<std::unique_ptr<snd_reply<Serializer, MsgType, Ret>>> apply(Func& func, client_info& info, std::unique_ptr<in_message<MsgType, M...>>&& m) {
using futurator = futurize<Ret>;
auto r = std::make_unique<snd_reply<Serializer, MsgType, Ret>>(m->id);
try {
Expand Down Expand Up @@ -386,9 +427,14 @@ template<typename F, typename Serializer, typename MsgType, bool Info, typename
auto recv_helper(std::index_sequence<I...>, Func&& func) {
return [func = lref_to_cref(std::forward<Func>(func))](lw_shared_ptr<typename protocol<Serializer, MsgType>::server::connection> client) mutable {
// create message to hold all received values
auto m = std::make_unique<message<MsgType, typename F::template arg<I>::type...>>();
auto xargs = std::tie(m->id, std::get<I>(m->args)...); // holds reference to all message elements
return unmarshall(client->serializer(), client->in(), std::move(xargs)).then([client, m = std::move(m), &func] () mutable {
auto m = std::make_unique<in_message<MsgType, typename F::template arg<I>::type...>>();
auto argready = [mptr = m.get()] (std::size_t n) {
if (n) { // skip first element since it is not part of a message tuple
mptr->set_ready(n - 1);
}
};
auto xargs = std::tie(m->id, std::get<I>(m->u.args)...); // holds reference to all message elements
return unmarshall(client->serializer(), client->in(), std::move(xargs), std::move(argready)).then([client, m = std::move(m), &func] () mutable {
// note: apply is executed asynchronously with regards to networking so we cannot chain futures here by doing "return apply()"
apply<typename F::return_type, Info, Serializer>(func, client->info(), std::move(m)).then([client] (std::unique_ptr<snd_reply<Serializer, MsgType, typename F::return_type>>&& r) {
client->out_ready() = client->out_ready().then([client, r = std::move(r)] () mutable {
Expand Down

0 comments on commit 2ec7535

Please sign in to comment.