Skip to content

Commit

Permalink
Merge pull request cqerl#71 from bernardd/fix_client_startup_race
Browse files Browse the repository at this point in the history
Fix client startup race
  • Loading branch information
Mathieu D'Amours committed Feb 12, 2016
2 parents fb63399 + 0663a40 commit 546f5cc
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 92 deletions.
94 changes: 48 additions & 46 deletions src/cqerl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -270,24 +270,11 @@ init([]) ->
handle_call(get_any_client, _From, State=#cqerl_state{client_stats=[]}) ->
{reply, {error, no_configured_node}, State#cqerl_state{retrying=false}};

handle_call(get_any_client, From, State=#cqerl_state{clients=Clients, retrying=Retrying}) ->
case select_client(Clients, #cql_client{busy=false, _ = '_'}, From, State) of
no_available_clients when Retrying ->
retry;

no_available_clients ->
erlang:send_after(?RETRY_INITIAL_DELAY, self(), {retry, get_any_client, From, ?RETRY_INITIAL_DELAY}),
{noreply, State#cqerl_state{retrying=false}};

{existing, _, _} ->
{noreply, State#cqerl_state{retrying=false}};

{new, _Pid} ->
{noreply, State#cqerl_state{retrying=false}}
end;
handle_call(get_any_client, From, State) ->
try_select_client(#cql_client{busy=false, _ = '_'}, get_any_client, From, State);

handle_call(Req={get_client, Node, Opts}, From,
State=#cqerl_state{clients=Clients, client_stats=Stats, retrying=Retrying, globalopts=GlobalOpts, named_nodes=NamedNodes}) ->
State=#cqerl_state{client_stats=Stats, globalopts=GlobalOpts, named_nodes=NamedNodes}) ->

NodeKey = if
is_atom(Node) ->
Expand All @@ -300,28 +287,16 @@ handle_call(Req={get_client, Node, Opts}, From,
error ->
State2 = new_pool(NodeKey, Opts, GlobalOpts, State),
case orddict:find(NodeKey, State2#cqerl_state.client_stats) of
error ->
{reply, {closed, process_died}, State2#cqerl_state{retrying=false}};
{ok, #cql_client_stats{count=0}} ->
{reply, {error, no_available_clients}, State2#cqerl_state{retrying=false}};
{reply, {closed, process_died}, State2#cqerl_state{retrying=false}};
_ ->
select_client(Clients, #cql_client{node=NodeKey, busy=false, pid='_'}, From, State),
{noreply, State2#cqerl_state{retrying=false}}
try_select_client(#cql_client{node=NodeKey, busy=false, pid='_'}, Req, From, State2)
end;

_ ->
case select_client(Clients, #cql_client{node=NodeKey, busy=false, pid='_'}, From, State) of
no_available_clients when Retrying ->
retry;

no_available_clients ->
erlang:send_after(?RETRY_INITIAL_DELAY, self(), {retry, Req, From, ?RETRY_INITIAL_DELAY}),
{noreply, State#cqerl_state{retrying=false}};

{existing, _, _} ->
{noreply, State#cqerl_state{retrying=false}};

{new, _Pid} ->
{noreply, State#cqerl_state{retrying=false}}
end
try_select_client(#cql_client{node=NodeKey, busy=false, pid='_'}, Req, From, State)
end;

handle_call(_Msg, _From, State) ->
Expand Down Expand Up @@ -392,12 +367,12 @@ handle_cast({client_asleep, Pid}, State=#cqerl_state{clients=Clients, client_sta
case orddict:find(NodeKey, Stats) of
{ok, #cql_client_stats{min_count=Min, count=Count}} when Count =< Min ->
{noreply, State};
{ok, CStats=#cql_client_stats{count=Count}} ->
{ok, #cql_client_stats{}} ->
Pool = pool_from_node(NodeKey),
pooler:return_member(Pool, Pid, ok),
unlink(Pid),
ets:delete(Clients, Pid),
Stats1 = orddict:store(NodeKey, CStats#cql_client_stats{count=Count-1}, Stats),
Stats1 = dec_stats_count(NodeKey, Stats),
{noreply, State#cqerl_state{client_stats=Stats1}};
error ->
{noreply, State}
Expand All @@ -416,8 +391,7 @@ handle_info({'EXIT', From, Reason}, State=#cqerl_state{clients=Clients, client_s
case ets:lookup(Clients, From) of
[#cql_client{node=NodeKey}] ->
ets:delete(Clients, From),
{ok, CStats=#cql_client_stats{count=Count}} = orddict:find(NodeKey, Stats),
{noreply, State#cqerl_state{client_stats = orddict:store(NodeKey, CStats#cql_client_stats{count = Count-1}, Stats)}};
{noreply, State#cqerl_state{client_stats = dec_stats_count(NodeKey, Stats)}};
[] ->
{stop, Reason, State}
end;
Expand Down Expand Up @@ -585,12 +559,10 @@ select_client(Clients, MatchClient = #cql_client{node=Node}, User, _State) ->
AvailableClients when length(AvailableClients) > 0 ->
RandIdx = random:uniform(length(AvailableClients)),
#cql_client{pid=Pid, node=NodeKey} = lists:nth(RandIdx, AvailableClients),
case is_process_alive(Pid) of
true ->
cqerl_client:new_user(Pid, User),
{existing, Pid, NodeKey};
false ->
no_available_clients
case cqerl_client:new_user(Pid, User) of
ok -> {existing, Pid, NodeKey};
{error, {closed, Reason}} -> {closed, Reason};
{error, _E} -> no_available_clients
end;

[] ->
Expand All @@ -603,10 +575,13 @@ select_client(Clients, MatchClient = #cql_client{node=Node}, User, _State) ->
no_available_clients;

Pid ->
link(Pid),
ets:insert(Clients, #cql_client{node=Node, busy=false, pid=Pid}),
cqerl_client:new_user(Pid, User),
{new, Pid}
case cqerl_client:new_user(Pid, User) of
ok ->
link(Pid),
{new, Pid};
{error, _E} -> no_available_clients
end
end
end.

Expand Down Expand Up @@ -649,3 +624,30 @@ pool_from_node({ Addr, Port, Keyspace }) when is_binary(Port) ->
pool_from_node({ Addr, binary_to_list(Port), Keyspace });
pool_from_node(Node = { Addr, Port, Keyspace }) when is_tuple(Addr) orelse is_list(Addr), is_integer(Port), is_atom(Keyspace) ->
binary_to_atom(base64:encode(term_to_binary(Node)), latin1).

dec_stats_count(NodeKey, Stats) ->
case orddict:find(NodeKey, Stats) of
{ok, #cql_client_stats{count = 1}} -> orddict:erase(NodeKey, Stats);
{ok, Stat = #cql_client_stats{count = Count}} -> orddict:store(NodeKey, Stat#cql_client_stats{count=Count-1}, Stats);
error -> Stats
end.

try_select_client(Client, Req, From, State = #cqerl_state{clients = Clients, retrying = Retrying}) ->
case select_client(Clients, Client, From, State) of
{closed, Reason} ->
{reply, {closed, Reason}, State#cqerl_state{retrying=false}};

no_available_clients when Retrying ->
retry;

no_available_clients ->
erlang:send_after(?RETRY_INITIAL_DELAY, self(), {retry, Req, From, ?RETRY_INITIAL_DELAY}),
{noreply, State#cqerl_state{retrying=false}};

{existing, _, _} ->
{noreply, State#cqerl_state{retrying=false}};

{new, _Pid} ->
{noreply, State#cqerl_state{retrying=false}}
end.

67 changes: 28 additions & 39 deletions src/cqerl_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ start_link(Inet, Opts) ->
gen_fsm:start_link(?MODULE, [Inet, Opts], []).

new_user(Pid, From) ->
gen_fsm:send_event(Pid, {new_user, From}).
try
gen_fsm:sync_send_event(Pid, {new_user, From}, infinity)
catch
exit:_ -> {error, {closed, process_died}}
end.

remove_user({ClientPid, ClientRef}) ->
gen_fsm:send_event(ClientPid, {remove_user, ClientRef}).
Expand Down Expand Up @@ -135,12 +139,12 @@ init([Inet, Opts]) ->



starting({new_user, From}, State=#client_state{users=Users}) ->
{next_state, starting, State#client_state{users=[From | Users]}};

starting(_Event, State) ->
{next_state, starting, State}.

starting({new_user, User}, _From, State=#client_state{users=Users}) ->
{reply, ok, starting, State#client_state{users=[User | Users]}};

starting(_Event, _From, State) ->
{reply, unexpected_msg, starting, State}.

Expand All @@ -157,10 +161,6 @@ live({batch_ready, Call, QueryBatch}, State=#client_state{available_slots=[], qu
live({batch_ready, Call, QueryBatch}, State) ->
{next_state, live, process_outgoing_query(Call, QueryBatch, State)};

live({new_user, From}, State=#client_state{users=Users}) ->
add_user(From, Users),
{next_state, live, State};

live({remove_user, Ref}, State) ->
{next_state, live, remove_user(Ref, State)};

Expand All @@ -187,6 +187,9 @@ live(_Event, State) ->
{next_state, live, State}.


live({new_user, User}, _From, State=#client_state{users=Users}) ->
add_user(User, Users),
{reply, ok, live, State};

live({send_query, Ref, Batch=#cql_query_batch{}}, From, State=#client_state{inet=Inet}) ->
cqerl_batch_sup:new_batch_coordinator(#cql_call{type=sync, caller=From, client=Ref}, Inet, Batch),
Expand All @@ -213,17 +216,17 @@ live(_Event, _From, State) ->



sleep({new_user, From}, State=#client_state{users=Users}) ->
add_user(From, Users),
{next_state, live, State};

sleep(timeout, State) ->
signal_asleep(),
{next_state, sleep, State};

sleep(_Event, State=#client_state{sleep=Duration}) ->
{next_state, sleep, State, Duration}.

sleep({new_user, User}, _From, State=#client_state{users=Users}) ->
add_user(User, Users),
{reply, ok, live, State};

sleep(_Event, _From, State) ->
{reply, ok, sleep, State}.

Expand Down Expand Up @@ -284,9 +287,8 @@ handle_info({preparation_failed, {_Inet, Statement}, Reason}, live,
{next_state, live, State}
end;

handle_info({ tcp_closed, _Socket }, starting, State = #client_state{users=Users}) ->
[ gen_server:reply(From, {error, connection_closed}) || From <- Users],
{stop, connection_closed, State};
handle_info({ tcp_closed, _Socket }, starting, State) ->
stop_during_startup({error, connection_closed}, State);

handle_info({ tcp_closed, _Socket }, live, State = #client_state{ queries = Queries }) ->
[ respond_to_user(Call, {error, connection_closed}) || {_, {Call, _}} <- Queries ],
Expand Down Expand Up @@ -319,8 +321,7 @@ handle_info({ Transport, Socket, BinaryMsg }, starting, State = #client_state{ s
#client_state{ authmod=AuthMod, authargs=AuthArgs, inet=Inet } = State,
case AuthMod:auth_init(AuthArgs, Body, Inet) of
{close, Reason} ->
close_socket(State),
{stop, {auth_client_closed, Reason}, State#client_state{socket=undefined}};
stop_during_startup({auth_client_closed, Reason}, State);

{reply, Reply, AuthState} ->
{ok, AuthFrame} = cqerl_protocol:auth_frame(base_frame(State), Reply),
Expand All @@ -333,9 +334,7 @@ handle_info({ Transport, Socket, BinaryMsg }, starting, State = #client_state{ s
#client_state{ authmod=AuthMod, authstate=AuthState } = State,
case AuthMod:auth_handle_challenge(Body, AuthState) of
{close, Reason} ->
close_socket(State),
{stop, {auth_client_closed, Reason}, State#client_state{socket=undefined}};

stop_during_startup({auth_client_closed, Reason}, State);
{reply, Reply, AuthState} ->
{ok, AuthFrame} = cqerl_protocol:auth_frame(base_frame(State), Reply),
send_to_db(State, AuthFrame),
Expand All @@ -346,21 +345,18 @@ handle_info({ Transport, Socket, BinaryMsg }, starting, State = #client_state{ s
{ok, #cqerl_frame{opcode=?CQERL_OP_ERROR}, {16#0100, AuthErrorDescription, _}, Delayed} ->
#client_state{ authmod=AuthMod, authstate=AuthState } = State,
AuthMod:auth_handle_error(AuthErrorDescription, AuthState),
close_socket(State),
{stop, {auth_server_refused, AuthErrorDescription}, State#client_state{socket=undefined}};
stop_during_startup({auth_server_refused, AuthErrorDescription}, State);

%% Server tells us something an error occured
{ok, #cqerl_frame{opcode=?CQERL_OP_ERROR}, {ErrorCode, ErrorMessage, _}, Delayed} ->
close_socket(State),
{stop, {server_error, ErrorCode, ErrorMessage}, State#client_state{socket=undefined}};
stop_during_startup({server_error, ErrorCode, ErrorMessage}, State);

%% Server tells us the authentication went well, we can start shooting queries
{ok, #cqerl_frame{opcode=?CQERL_OP_AUTH_SUCCESS}, Body, Delayed} ->
#client_state{ authmod=AuthMod, authstate=AuthState} = State,
case AuthMod:auth_handle_success(Body, AuthState) of
{close, Reason} ->
close_socket(State),
{stop, {auth_client_closed, Reason}, State#client_state{socket=undefined}};
stop_during_startup({auth_client_closed, Reason}, State);

ok ->
{StateName, FinalState} = maybe_set_keyspace(State),
Expand Down Expand Up @@ -495,10 +491,8 @@ terminate(Reason, live, #client_state{queries=Queries}) ->
({_I, _}) -> ok
end, Queries);

terminate(Reason, starting, #client_state{users=Users}) ->
lists:foreach(fun (From) -> gen_server:reply(From, {closed, Reason}) end, Users),
timer:sleep(500).

terminate(_Reason, starting, _State) ->
ok.



Expand Down Expand Up @@ -736,15 +730,6 @@ create_socket({Addr, Port}, Opts) ->




close_socket(#client_state{trans=ssl, socket=Socket}) ->
ssl:close(Socket);
close_socket(#client_state{trans=tcp, socket=Socket}) ->
gen_tcp:close(Socket).




activate_socket(#client_state{socket=undefined}) ->
ok;
activate_socket(#client_state{trans=ssl, socket=Socket}) ->
Expand Down Expand Up @@ -844,3 +829,7 @@ get_sleep_duration(Opts) ->
{Amount, hour} -> Amount * 1000 * 60 * 60;
Amount when is_integer(Amount) -> Amount
end).

stop_during_startup(Reason, State = #client_state{users = Users}) ->
lists:foreach(fun (From) -> gen_server:reply(From, {closed, Reason}) end, Users),
{stop, normal, State#client_state{socket=undefined}}.
26 changes: 20 additions & 6 deletions test/integrity_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ suite() ->
%%--------------------------------------------------------------------
groups() -> [
{connection, [sequence], [
random_selection
random_selection,
failed_connection
]},
{database, [sequence], [
{initial, [sequence], [connect, create_keyspace]},
Expand Down Expand Up @@ -255,6 +256,13 @@ random_selection(Config) ->
MaxSize = length(DistinctPids),
lists:foreach(fun(Client) -> cqerl:close_client(Client) end, Clients).

failed_connection(Config) ->
{closed, _} = maybe_get_client([{keyspace, <<"not_a_real_keyspace">>} | Config]),
{closed, _} = maybe_get_client([{keyspace, <<"another_fake_keyspace">>} | Config]),
% A previous bug would cause timeouts on subsequent calls with an already
% used invalid keyspace. Test that case here.
{closed, _} = maybe_get_client([{keyspace, <<"not_a_real_keyspace">>} | Config]).

connect(Config) ->
{Pid, Ref} = get_client(Config),
true = is_pid(Pid),
Expand Down Expand Up @@ -617,19 +625,25 @@ batches_and_pages(Config) ->
ct:log("Time elapsed inserting ~B entries and fetching in batches of ~B: ~B ms", [N, Bsz, round(timer:now_diff(now(), T1)/1000)]),
cqerl:close_client(Client).

% Call when you're expecting a valid client
get_client(Config) ->
{ok, Client} = maybe_get_client(Config),
Client.

% Call to test new_client error cases
maybe_get_client(Config) ->
Host = proplists:get_value(host, Config),
SSL = proplists:get_value(prepared_ssl, Config),
Auth = proplists:get_value(auth, Config, undefined),
Keyspace = proplists:get_value(keyspace, Config),
PoolMinSize = proplists:get_value(pool_min_size, Config),
PoolMaxSize = proplists:get_value(pool_max_size, Config),

io:format("Options : ~w~n", [[
{ssl, SSL}, {auth, Auth}, {keyspace, Keyspace},
{pool_min_size, PoolMinSize}, {pool_max_size, PoolMaxSize}
]]),
{ok, Client} = cqerl:new_client(Host, [{ssl, SSL}, {auth, Auth}, {keyspace, Keyspace},
{pool_min_size, PoolMinSize}, {pool_max_size, PoolMaxSize} ]),
Client.

cqerl:new_client(Host, [{ssl, SSL}, {auth, Auth}, {keyspace, Keyspace},
{pool_min_size, PoolMinSize}, {pool_max_size, PoolMaxSize} ]).

2 changes: 1 addition & 1 deletion test/load_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
%% default data values, not perform any other operations.
%%--------------------------------------------------------------------
suite() ->
[{timetrap, {seconds, 20}},
[{timetrap, {seconds, 30}},
{require, ssl, cqerl_test_ssl},
{require, auth, cqerl_test_auth},
% {require, keyspace, cqerl_test_keyspace},
Expand Down

0 comments on commit 546f5cc

Please sign in to comment.