diff --git a/README.md b/README.md index d0281af5..0f4276e3 100644 --- a/README.md +++ b/README.md @@ -114,8 +114,8 @@ requests, you may use `eredis:qp(Client::pid(), where the values are the redis responses in the same order as the commands you provided. -To start the client, use any of the `eredis:start_link/0,1,2,3,4,5` -functions. They all include sensible defaults. `start_link/5` takes +To start the client, use any of the `eredis:start_link/0,1,2,3,4,5,6,7` +functions. They all include sensible defaults. `start_link/7` takes the following arguments: * Host, dns name or ip adress as string; or unix domain socket as {local, Path} (available in OTP 19+) @@ -123,6 +123,8 @@ the following arguments: * Database, integer or 0 for default database * Password, string or empty string([]) for no password * Reconnect sleep, integer of milliseconds to sleep between reconnect attempts +* Connect timeout, timeout value in milliseconds to use in `gen_tcp:connect`, default is 5000 +* Socket options, proplist of options to be sent to `gen_tcp:connect`, default is `?SOCKET_OPTS` ## Reconnecting on Redis down / network failure / timeout / etc diff --git a/include/eredis.hrl b/include/eredis.hrl index 93bd12b5..33da2658 100644 --- a/include/eredis.hrl +++ b/include/eredis.hrl @@ -33,8 +33,9 @@ -define(NL, "\r\n"). --define(SOCKET_OPTS, [binary, {active, once}, {packet, raw}, {reuseaddr, false}, - {send_timeout, ?SEND_TIMEOUT}]). +-define(SOCKET_MODE, binary). +-define(SOCKET_OPTS, [{active, once}, {packet, raw}, {reuseaddr, false}, + {keepalive, false}, {send_timeout, ?SEND_TIMEOUT}]). -define(RECV_TIMEOUT, 5000). -define(SEND_TIMEOUT, 5000). diff --git a/src/eredis.erl b/src/eredis.erl index f4cd8981..4d93e044 100644 --- a/src/eredis.erl +++ b/src/eredis.erl @@ -14,7 +14,7 @@ -define(TIMEOUT, 5000). -export([start_link/0, start_link/1, start_link/2, start_link/3, start_link/4, - start_link/5, start_link/6, stop/1, q/2, q/3, qp/2, qp/3, q_noreply/2, + start_link/5, start_link/6, start_link/7, stop/1, q/2, q/3, qp/2, qp/3, q_noreply/2, q_async/2, q_async/3]). %% Exported for testing @@ -46,17 +46,21 @@ start_link(Host, Port, Database, Password) -> start_link(Host, Port, Database, Password, ReconnectSleep) -> start_link(Host, Port, Database, Password, ReconnectSleep, ?TIMEOUT). -start_link(Host, Port, Database, Password, ReconnectSleep, ConnectTimeout) +start_link(Host, Port, Database, Password, ReconnectSleep, ConnectTimeout) -> + start_link(Host, Port, Database, Password, ReconnectSleep, ConnectTimeout, []). + +start_link(Host, Port, Database, Password, ReconnectSleep, ConnectTimeout, SocketOptions) when is_list(Host) orelse (is_tuple(Host) andalso tuple_size(Host) =:= 2 andalso element(1, Host) =:= local), is_integer(Port), is_integer(Database) orelse Database == undefined, is_list(Password), is_integer(ReconnectSleep) orelse ReconnectSleep =:= no_reconnect, - is_integer(ConnectTimeout) -> + is_integer(ConnectTimeout), + is_list(SocketOptions) -> eredis_client:start_link(Host, Port, Database, Password, - ReconnectSleep, ConnectTimeout). + ReconnectSleep, ConnectTimeout, SocketOptions). %% @doc: Callback for starting from poolboy -spec start_link(server_args()) -> {ok, Pid::pid()} | {error, Reason::term()}. @@ -67,7 +71,9 @@ start_link(Args) -> Password = proplists:get_value(password, Args, ""), ReconnectSleep = proplists:get_value(reconnect_sleep, Args, 100), ConnectTimeout = proplists:get_value(connect_timeout, Args, ?TIMEOUT), - start_link(Host, Port, Database, Password, ReconnectSleep, ConnectTimeout). + SocketOptions = proplists:get_value(socket_options, Args, []), + + start_link(Host, Port, Database, Password, ReconnectSleep, ConnectTimeout, SocketOptions). stop(Client) -> eredis_client:stop(Client). diff --git a/src/eredis_client.erl b/src/eredis_client.erl index d94c1a1d..fef052c9 100644 --- a/src/eredis_client.erl +++ b/src/eredis_client.erl @@ -25,7 +25,7 @@ -include("eredis.hrl"). %% API --export([start_link/6, stop/1, select_database/2]). +-export([start_link/7, stop/1, select_database/2]). -export([do_sync_command/2]). @@ -40,6 +40,7 @@ database :: binary() | undefined, reconnect_sleep :: reconnect_sleep() | undefined, connect_timeout :: integer() | undefined, + socket_options :: list(), socket :: port() | undefined, parser_state :: #pstate{} | undefined, @@ -55,11 +56,12 @@ Database::integer() | undefined, Password::string(), ReconnectSleep::reconnect_sleep(), - ConnectTimeout::integer() | undefined) -> + ConnectTimeout::integer() | undefined, + SocketOptions::list()) -> {ok, Pid::pid()} | {error, Reason::term()}. -start_link(Host, Port, Database, Password, ReconnectSleep, ConnectTimeout) -> +start_link(Host, Port, Database, Password, ReconnectSleep, ConnectTimeout, SocketOptions) -> gen_server:start_link(?MODULE, [Host, Port, Database, Password, - ReconnectSleep, ConnectTimeout], []). + ReconnectSleep, ConnectTimeout, SocketOptions], []). stop(Pid) -> @@ -69,13 +71,14 @@ stop(Pid) -> %% gen_server callbacks %%==================================================================== -init([Host, Port, Database, Password, ReconnectSleep, ConnectTimeout]) -> +init([Host, Port, Database, Password, ReconnectSleep, ConnectTimeout, SocketOptions]) -> State = #state{host = Host, port = Port, database = read_database(Database), password = list_to_binary(Password), reconnect_sleep = ReconnectSleep, connect_timeout = ConnectTimeout, + socket_options = SocketOptions, parser_state = eredis_parser:init(), queue = queue:new()}, @@ -306,8 +309,11 @@ connect(State) -> local -> 0; _ -> State#state.port end, - case gen_tcp:connect(Addr, Port, - [AFamily | ?SOCKET_OPTS], State#state.connect_timeout) of + + SocketOptions = lists:ukeymerge(1, lists:keysort(1, State#state.socket_options), lists:keysort(1, ?SOCKET_OPTS)), + ConnectOptions = [AFamily | [?SOCKET_MODE | SocketOptions]], + + case gen_tcp:connect(Addr, Port, ConnectOptions, State#state.connect_timeout) of {ok, Socket} -> case authenticate(Socket, State#state.password) of ok -> diff --git a/src/eredis_sub_client.erl b/src/eredis_sub_client.erl index d7d08451..d924061c 100644 --- a/src/eredis_sub_client.erl +++ b/src/eredis_sub_client.erl @@ -310,7 +310,7 @@ queue_or_send(Msg, State) -> %% synchronous and if Redis returns something we don't expect, we %% crash. Returns {ok, State} or {error, Reason}. connect(State) -> - case gen_tcp:connect(State#state.host, State#state.port, ?SOCKET_OPTS) of + case gen_tcp:connect(State#state.host, State#state.port, [?SOCKET_MODE | ?SOCKET_OPTS]) of {ok, Socket} -> case authenticate(Socket, State#state.password) of ok -> diff --git a/test/eredis_sub_tests.erl b/test/eredis_sub_tests.erl index d6dbcb8f..f1544170 100644 --- a/test/eredis_sub_tests.erl +++ b/test/eredis_sub_tests.erl @@ -29,11 +29,6 @@ add_channels(Sub, Channels) -> end end, Channels). - - - - - pubsub_test() -> Pub = c(), Sub = s(), diff --git a/test/eredis_tests.erl b/test/eredis_tests.erl index 4c1ab113..1e3244bf 100644 --- a/test/eredis_tests.erl +++ b/test/eredis_tests.erl @@ -9,6 +9,10 @@ connect_test() -> ?assertMatch({ok, _}, eredis:start_link("127.0.0.1", 6379)), ?assertMatch({ok, _}, eredis:start_link("localhost", 6379)). +connect_socket_options_test() -> + ?assertMatch({ok, _}, eredis:start_link([{socket_options, [{keepalive, true}]}])), + ?assertMatch({ok, _}, eredis:start_link("localhost", 6379, 0, "",100, 5000, [{keepalive, true}])). + get_set_test() -> C = c(), ?assertMatch({ok, _}, eredis:q(C, ["DEL", foo])), @@ -156,20 +160,20 @@ connection_failure_during_start_no_reconnect_test() -> process_flag(trap_exit, true), Res = eredis:start_link("localhost", 6378, 0, "", no_reconnect), ?assertMatch({error, _}, Res), - IsDied = receive {'EXIT', _, _} -> died + IsDead = receive {'EXIT', _, _} -> died after 1000 -> still_alive end, process_flag(trap_exit, false), - ?assertEqual(died, IsDied). + ?assertEqual(died, IsDead). connection_failure_during_start_reconnect_test() -> process_flag(trap_exit, true), Res = eredis:start_link("localhost", 6378, 0, "", 100), ?assertMatch({ok, _}, Res), {ok, ClientPid} = Res, - IsDied = receive {'EXIT', ClientPid, _} -> died + IsDead = receive {'EXIT', ClientPid, _} -> died after 400 -> still_alive end, process_flag(trap_exit, false), - ?assertEqual(still_alive, IsDied). + ?assertEqual(still_alive, IsDead). tcp_closed_test() -> C = c(),