Skip to content

Commit

Permalink
Notify all consumers awaiting results upon receiving tcp_closed message
Browse files Browse the repository at this point in the history
Consumers in the redis_client's queue will be sent a `{error,
tcp_closed}` message if the connection to redis goes down. Since the
queue is reset as part of reconnect, this avoids consumers waiting for
a response they will never receive and hitting the gen_server:call
timeout.

This patch includes a small adjustment to the handle_info callback for
receiving TCP data in order to allow test code to fake a TCP closed
message. The fake close is achieved by sending a raw {tcp_closed,
fake_socket} message to redis_client. Since the unit tests use a real
connection to redis which will not have been closed, we have to be
able to handle additional data being received on the socket while
redis_client is in the process of handling the close and reconnect.
  • Loading branch information
Seth Falcon committed Jun 15, 2013
1 parent 4bcddfd commit af2ce0a
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/eredis.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, eredis, [
{description, "Erlang Redis Client"},
{vsn, "1.0.5"},
{vsn, "1.0.6"},
{modules, [eredis, eredis_client, eredis_parser, eredis_sub, eredis_sub_client]},
{registered, []},
{applications, [kernel, stdlib]}
Expand Down
45 changes: 39 additions & 6 deletions src/eredis_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,21 @@ handle_cast({request, Req}, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.

%% Receive data from socket, see handle_response/2
handle_info({tcp, _Socket, Bs}, State) ->
inet:setopts(State#state.socket, [{active, once}]),
%% Receive data from socket, see handle_response/2. Match `Socket' to
%% enforce sanity.
handle_info({tcp, Socket, Bs}, #state{socket = Socket} = State) ->
inet:setopts(Socket, [{active, once}]),
{noreply, handle_response(Bs, State)};

handle_info({tcp, Socket, _}, #state{socket = OurSocket} = State)
when OurSocket =/= Socket ->
%% Ignore tcp messages when the socket in message doesn't match
%% our state. In order to test behavior around receiving
%% tcp_closed message with clients waiting in queue, we send a
%% fake tcp_close message. This allows us to ignore messages that
%% arrive after that while we are reconnecting.
{noreply, State};

handle_info({tcp_error, _Socket, _Reason}, State) ->
%% This will be followed by a close
{noreply, State};
Expand All @@ -121,14 +131,20 @@ handle_info({tcp_error, _Socket, _Reason}, State) ->
%% clients. If desired, spawn of a new process which will try to reconnect and
%% notify us when Redis is ready. In the meantime, we can respond with
%% an error message to all our clients.
handle_info({tcp_closed, _Socket}, #state{reconnect_sleep = no_reconnect} = State) ->
%% If we aren't going to reconnect, then there is nothing else for this process to do.
handle_info({tcp_closed, _Socket}, #state{reconnect_sleep = no_reconnect,
queue = Queue} = State) ->
reply_all({error, tcp_closed}, Queue),
%% If we aren't going to reconnect, then there is nothing else for
%% this process to do.
{stop, normal, State#state{socket = undefined}};

handle_info({tcp_closed, _Socket}, State) ->
handle_info({tcp_closed, _Socket}, #state{queue = Queue} = State) ->
Self = self(),
spawn(fun() -> reconnect_loop(Self, State) end),

%% tell all of our clients what has happened.
reply_all({error, tcp_closed}, Queue),

%% Throw away the socket and the queue, as we will never get a
%% response to the requests sent on the old socket. The absence of
%% a socket is used to signal we are "down"
Expand Down Expand Up @@ -241,6 +257,23 @@ reply(Value, Queue) ->
throw(empty_queue)
end.

%% @doc Send `Value' to each client in queue. Only useful for sending
%% an error message. Any in-progress reply data is ignored.
-spec reply_all(any(), queue()) -> ok.
reply_all(Value, Queue) ->
case queue:peek(Queue) of
empty ->
ok;
{value, Item} ->
safe_reply(receipient(Item), Value),
reply_all(Value, queue:drop(Queue))
end.

receipient({_, From}) ->
From;
receipient({_, From, _}) ->
From.

safe_reply(undefined, _Value) ->
ok;
safe_reply(From, Value) ->
Expand Down
54 changes: 54 additions & 0 deletions test/eredis_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ c() ->
{ok, C} = Res,
C.

c_no_reconnect() ->
Res = eredis:start_link("127.0.0.1", 6379, 0, "", no_reconnect),
?assertMatch({ok, _}, Res),
{ok, C} = Res,
C.

multibulk_test_() ->
[?_assertEqual(<<"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n">>,
list_to_binary(create_multibulk(["SET", "foo", "bar"]))),
Expand All @@ -129,3 +135,51 @@ multibulk_test_() ->

undefined_database_test() ->
?assertMatch({ok,_}, eredis:start_link("localhost", 6379, undefined)).

tcp_closed_test() ->
C = c(),
tcp_closed_rig(C).

tcp_closed_no_reconnect_test() ->
C = c_no_reconnect(),
tcp_closed_rig(C).

tcp_closed_rig(C) ->
%% fire async requests to add to redis client queue and then trick
%% the client into thinking the connection to redis has been
%% closed. This behavior can be observed when Redis closes an idle
%% connection just as a traffic burst starts.
DoSend = fun(tcp_closed) ->
C ! {tcp_closed, fake_socket};
(Cmd) ->
eredis:q(C, Cmd)
end,
%% attach an id to each message for later
Msgs = [{1, ["GET", "foo"]},
{2, ["GET", "bar"]},
{3, tcp_closed}],
Pids = [ remote_query(DoSend, M) || M <- Msgs ],
Results = gather_remote_queries(Pids),
?assertEqual({error, tcp_closed}, proplists:get_value(1, Results)),
?assertEqual({error, tcp_closed}, proplists:get_value(2, Results)).

remote_query(Fun, {Id, Cmd}) ->
Parent = self(),
spawn(fun() ->
Result = Fun(Cmd),
Parent ! {self(), Id, Result}
end).

gather_remote_queries(Pids) ->
gather_remote_queries(Pids, []).

gather_remote_queries([], Acc) ->
Acc;
gather_remote_queries([Pid | Rest], Acc) ->
receive
{Pid, Id, Result} ->
gather_remote_queries(Rest, [{Id, Result} | Acc])
after
10000 ->
error({gather_remote_queries, timeout})
end.

0 comments on commit af2ce0a

Please sign in to comment.