Skip to content

Commit

Permalink
This commit moves initial connection process out of init to avoid
Browse files Browse the repository at this point in the history
blocking or crashing supervisor. Besides if reconnect is allowed
it will be used even if connection failed right after init.
  • Loading branch information
sumerman authored and Kwisatx committed Sep 18, 2017
1 parent c24dfb2 commit dd0d7e9
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 25 deletions.
59 changes: 34 additions & 25 deletions src/eredis_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,8 @@ init([Host, Port, Database, Password, ReconnectSleep, ConnectTimeout]) ->
parser_state = eredis_parser:init(),
queue = queue:new()},

case connect(State) of
{ok, NewState} ->
{ok, NewState};
{error, Reason} ->
{stop, Reason}
end.
self() ! initiate_connection,
{ok, State}.

handle_call({request, Req}, From, State) ->
do_request(Req, From, State);
Expand Down Expand Up @@ -143,24 +139,8 @@ handle_info({tcp_error, _Socket, _Reason}, State) ->
%% clients. If desired, spawn of a new process which will try to reconnect and
%% notify us when Redis is ready. In the meantime, we can respond with
%% an error message to all our clients.
handle_info({tcp_closed, _Socket}, #state{reconnect_sleep = no_reconnect,
queue = Queue} = State) ->
reply_all({error, tcp_closed}, Queue),
%% If we aren't going to reconnect, then there is nothing else for
%% this process to do.
{stop, normal, State#state{socket = undefined}};

handle_info({tcp_closed, _Socket}, #state{queue = Queue} = State) ->
Self = self(),
spawn(fun() -> reconnect_loop(Self, State) end),

%% tell all of our clients what has happened.
reply_all({error, tcp_closed}, Queue),

%% Throw away the socket and the queue, as we will never get a
%% response to the requests sent on the old socket. The absence of
%% a socket is used to signal we are "down"
{noreply, State#state{socket = undefined, queue = queue:new()}};
handle_info({tcp_closed, _Socket}, State) ->
maybe_reconnect(tcp_closed, State);

%% Redis is ready to accept requests, the given Socket is a socket
%% already connected and authenticated.
Expand All @@ -172,6 +152,14 @@ handle_info({connection_ready, Socket}, #state{socket = undefined} = State) ->
handle_info(stop, State) ->
{stop, shutdown, State};

handle_info(initiate_connection, #state{socket = undefined} = State) ->
case connect(State) of
{ok, NewState} ->
{noreply, NewState};
{error, Reason} ->
maybe_reconnect(Reason, State)
end;

handle_info(_Info, State) ->
{stop, {unhandled_message, _Info}, State}.

Expand Down Expand Up @@ -266,7 +254,7 @@ reply(Value, Queue) ->
{empty, Queue} ->
%% Oops
error_logger:info_msg("Nothing in queue, but got value from parser~n"),
throw(empty_queue)
exit(empty_queue)
end.

%% @doc Send `Value' to each client in queue. Only useful for sending
Expand Down Expand Up @@ -369,6 +357,27 @@ do_sync_command(Socket, Command) ->
{error, Reason}
end.

maybe_reconnect(Reason, #state{reconnect_sleep = no_reconnect, queue = Queue} = State) ->
reply_all({error, Reason}, Queue),
%% If we aren't going to reconnect, then there is nothing else for
%% this process to do.
Reason1 = case Reason of
tcp_closed -> normal;
_Else -> Reason
end,
{stop, Reason1, State#state{socket = undefined}};
maybe_reconnect(Reason, #state{queue = Queue} = State) ->
Self = self(),
spawn_link(fun() -> reconnect_loop(Self, State) end),

%% tell all of our clients what has happened.
reply_all({error, Reason}, Queue),

%% Throw away the socket and the queue, as we will never get a
%% response to the requests sent on the old socket. The absence of
%% a socket is used to signal we are "down"
{noreply, State#state{socket = undefined, queue = queue:new()}}.

%% @doc: Loop until a connection can be established, this includes
%% successfully issuing the auth and select calls. When we have a
%% connection, give the socket to the redis client.
Expand Down
10 changes: 10 additions & 0 deletions test/eredis_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ multibulk_test_() ->
undefined_database_test() ->
?assertMatch({ok,_}, eredis:start_link("localhost", 6379, undefined)).

connection_failure_on_start_no_reconnect_test() ->
Res = eredis:start_link("this_host_does_not_exist", 6379, 0, "", no_reconnect),
?assertMatch({ok, _}, Res),
{ok, ClientPid} = Res,
process_flag(trap_exit, true),
IsDied = receive {'EXIT', ClientPid, _} -> died
after 1000 -> still_alive end,
process_flag(trap_exit, false),
?assertEqual(died, IsDied).

tcp_closed_test() ->
C = c(),
tcp_closed_rig(C).
Expand Down

0 comments on commit dd0d7e9

Please sign in to comment.