Skip to content

Commit

Permalink
Merge pull request esl#23 from Lastres/pr-fixes
Browse files Browse the repository at this point in the history
Re-send PR 15
  • Loading branch information
dcorbacho committed Jan 14, 2013
2 parents fb727ee + 7071283 commit ed9aa8b
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 24 deletions.
105 changes: 90 additions & 15 deletions src/lhttpc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@

-export([start/0, stop/0, request/4, request/5, request/6, request/9]).
-export([start/2, stop/1]).
-export([add_pool/1,
add_pool/2,
add_pool/3,
delete_pool/1]).
-export([
send_body_part/2,
send_body_part/3,
send_body_part/3,
send_trailers/2,
send_trailers/3
]).
Expand Down Expand Up @@ -94,6 +98,67 @@ start() ->
stop() ->
application:stop(lhttpc).

%% @spec (Name) -> {ok, Pid} | {error, Reason}
%% Name = atom()
%% Pid = pid()
%% Reason = term()
%% @doc
%% Add a new named httpc_manager pool to the supervisor tree
%% @end
-spec add_pool(atom()) ->
{ok, pid()} | {error, term()}.
add_pool(Name) when is_atom(Name) ->
{ok, ConnTimeout} = application:get_env(lhttpc, connection_timeout),
{ok, PoolSize} = application:get_env(lhttpc, pool_size),
add_pool(Name,
ConnTimeout,
PoolSize).

%% Add a new httpc_manager to the supervisor tree
-spec add_pool(atom(), non_neg_integer()) ->
{ok, pid()} | {error, term()}.
add_pool(Name, ConnTimeout) when is_atom(Name),
is_integer(ConnTimeout),
ConnTimeout > 0 ->
{ok, PoolSize} = application:get_env(lhttpc, pool_size),
add_pool(Name,
ConnTimeout,
PoolSize).

%% Add a new httpc_manager to the supervisor tree
-spec add_pool(atom(), non_neg_integer(), non_neg_integer() | atom()) ->
{ok, pid()} | {error, term()}.
add_pool(Name, ConnTimeout, PoolSize) ->
ChildSpec = {Name,
{lhttpc_manager, start_link, [[{name, Name},
{connection_timeout, ConnTimeout},
{pool_size, PoolSize}]]},
permanent, 10000, worker, [lhttpc_manager]},
case supervisor:start_child(lhttpc_sup, ChildSpec) of
{error, {already_started, _Pid}} ->
{error, already_exists};
{error, Error} ->
{error, Error};
{ok, Pid} ->
{ok, Pid};
{ok, Pid, _Info} ->
{ok, Pid}
end.

%% Delete a pool
-spec delete_pool(atom() | pid()) -> ok.
delete_pool(PoolPid) when is_pid(PoolPid) ->
{registered_name, Name} = erlang:process_info(PoolPid, registered_name),
delete_pool(Name);
delete_pool(PoolName) when is_atom(PoolName) ->
case supervisor:terminate_child(lhttpc_sup, PoolName) of
ok -> case supervisor:delete_child(lhttpc_sup, PoolName) of
ok -> ok;
{error, not_found} -> ok
end;
{error, Reason} -> {error, Reason}
end.

%% @spec (URL, Method, Hdrs, Timeout) -> Result
%% URL = string()
%% Method = string() | atom()
Expand Down Expand Up @@ -124,7 +189,7 @@ request(URL, Method, Hdrs, Timeout) ->
%% Hdrs = [{Header, Value}]
%% Header = string() | binary() | atom()
%% Value = string() | binary()
%% RequestBody = iolist()
%% RequestBody = iodata()
%% Timeout = integer() | infinity
%% Result = {ok, {{StatusCode, ReasonPhrase}, Hdrs, ResponseBody}}
%% | {error, Reason}
Expand All @@ -137,7 +202,7 @@ request(URL, Method, Hdrs, Timeout) ->
%% `request(URL, Method, Hdrs, Body, Timeout, [])'.
%% @end
%% @see request/9
-spec request(string(), string() | atom(), headers(), iolist(),
-spec request(string(), string() | atom(), headers(), iodata(),
pos_integer() | infinity) -> result().
request(URL, Method, Hdrs, Body, Timeout) ->
request(URL, Method, Hdrs, Body, Timeout, []).
Expand All @@ -148,7 +213,7 @@ request(URL, Method, Hdrs, Body, Timeout) ->
%% Hdrs = [{Header, Value}]
%% Header = string() | binary() | atom()
%% Value = string() | binary()
%% RequestBody = iolist()
%% RequestBody = iodata()
%% Timeout = integer() | infinity
%% Options = [Option]
%% Option = {connect_timeout, Milliseconds | infinity} |
Expand Down Expand Up @@ -181,11 +246,11 @@ request(URL, Method, Hdrs, Body, Timeout) ->
%% request(Host, Port, Path, Ssl, Method, Hdrs, Body, Timeout, Options).
%% </pre>
%%
%% `URL' is expected to be a valid URL:
%% `URL' is expected to be a valid URL:
%% `scheme://host[:port][/path]'.
%% @end
%% @see request/9
-spec request(string(), string() | atom(), headers(), iolist(),
-spec request(string(), string() | atom(), headers(), iodata(),
pos_integer() | infinity, [option()]) -> result().
request(URL, Method, Hdrs, Body, Timeout, Options) ->
#lhttpc_url{
Expand Down Expand Up @@ -215,7 +280,7 @@ request(URL, Method, Hdrs, Body, Timeout, Options) ->
%% Hdrs = [{Header, Value}]
%% Header = string() | binary() | atom()
%% Value = string() | binary()
%% RequestBody = iolist()
%% RequestBody = iodata()
%% Timeout = integer() | infinity
%% Options = [Option]
%% Option = {connect_timeout, Milliseconds | infinity} |
Expand Down Expand Up @@ -277,7 +342,7 @@ request(URL, Method, Hdrs, Body, Timeout, Options) ->
%% choose to give up earlier than the connect timeout, in which case the
%% client will also give up. The default value is infinity, which means that
%% it will either give up when the TCP stack gives up, or when the overall
%% request timeout is reached.
%% request timeout is reached.
%%
%% `{connect_options, Options}' specifies options to pass to the socket at
%% connect time. This makes it possible to specify both SSL options and
Expand Down Expand Up @@ -351,7 +416,7 @@ request(URL, Method, Hdrs, Body, Timeout, Options) ->
%% list of all available options, please check OTP's ssl module manpage.
%% @end
-spec request(string(), 1..65535, true | false, string(), atom() | string(),
headers(), iolist(), pos_integer(), [option()]) -> result().
headers(), iodata(), pos_integer() | infinity, [option()]) -> result().
request(Host, Port, Ssl, Path, Method, Hdrs, Body, Timeout, Options) ->
verify_options(Options),
Args = [self(), Host, Port, Ssl, Path, Method, Hdrs, Body, Options],
Expand All @@ -369,7 +434,7 @@ request(Host, Port, Ssl, Path, Method, Hdrs, Body, Timeout, Options) ->
end.

%% @spec (UploadState :: UploadState, BodyPart :: BodyPart) -> Result
%% BodyPart = iolist() | binary()
%% BodyPart = iodata() | binary()
%% Timeout = integer() | infinity
%% Result = {error, Reason} | UploadState
%% Reason = connection_closed | connect_timeout | timeout
Expand All @@ -380,12 +445,12 @@ request(Host, Port, Ssl, Path, Method, Hdrs, Body, Timeout, Options) ->
%% Would be the same as calling
%% `send_body_part(UploadState, BodyPart, infinity)'.
%% @end
-spec send_body_part(upload_state(), iolist() | 'http_eob') -> result().
-spec send_body_part(upload_state(), iodata() | 'http_eob') -> result().
send_body_part({Pid, Window}, IoList) ->
send_body_part({Pid, Window}, IoList, infinity).

%% @spec (UploadState :: UploadState, BodyPart :: BodyPart, Timeout) -> Result
%% BodyPart = iolist() | binary()
%% BodyPart = iodata() | binary()
%% Timeout = integer() | infinity
%% Result = {error, Reason} | UploadState
%% Reason = connection_closed | connect_timeout | timeout
Expand All @@ -397,15 +462,15 @@ send_body_part({Pid, Window}, IoList) ->
%% milliseconds. If there is no acknowledgement received during that time the
%% the request is cancelled and `{error, timeout}' is returned.
%%
%% As long as the window size is larger than 0 the function will return
%% As long as the window size is larger than 0 the function will return
%% immediately after sending the body part to the request handling process.
%%
%%
%% The `BodyPart' `http_eob' signals an end of the entity body, the request
%% is considered sent and the response will be read from the socket. If
%% there is no response within `Timeout' milliseconds, the request is
%% canceled and `{error, timeout}' is returned.
%% @end
-spec send_body_part(upload_state(), iolist() | 'http_eob', timeout()) -> result().
-spec send_body_part(upload_state(), iodata() | 'http_eob', timeout()) -> result().
send_body_part({Pid, _Window}, http_eob, Timeout) when is_pid(Pid) ->
Pid ! {body_part, self(), http_eob},
read_response(Pid, Timeout);
Expand Down Expand Up @@ -581,6 +646,16 @@ verify_options([{proxy_ssl_options, List} | Options]) when is_list(List) ->
verify_options([{pool, PidOrName} | Options])
when is_pid(PidOrName); is_atom(PidOrName) ->
verify_options(Options);
verify_options([{pool_ensure, Bool} | Options])
when is_boolean(Bool) ->
verify_options(Options);
verify_options([{pool_connection_timeout, Size} | Options])
when is_integer(Size) ->
verify_options(Options);
verify_options([{pool_max_size, Size} | Options])
when is_integer(Size) orelse
Size =:= infinity->
verify_options(Options);
verify_options([Option | _Rest]) ->
erlang:error({bad_option, Option});
verify_options([]) ->
Expand Down
55 changes: 50 additions & 5 deletions src/lhttpc_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,8 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
Hdrs, Host, Port, Body, PartialUpload),
SocketRequest = {socket, self(), Host, Port, Ssl},
Pool = proplists:get_value(pool, Options, whereis(lhttpc_manager)),
Socket = case gen_server:call(Pool, SocketRequest, infinity) of
{ok, S} -> S; % Re-using HTTP/1.1 connections
no_socket -> undefined % Opening a new HTTP/1.1 connection
end,
%% Get a socket for the pool or exit
Socket = ensure_call(Pool, SocketRequest, Options),
State = #client_state{
host = Host,
port = Port,
Expand Down Expand Up @@ -171,6 +169,45 @@ execute(From, Host, Port, Ssl, Path, Method, Hdrs, Body, Options) ->
end,
{response, self(), Response}.

%% If call contains pool_ensure option, dynamically create the pool with
%% configured parameters.
ensure_call(Pool, SocketRequest, Options) ->
try gen_server:call(Pool, SocketRequest, infinity) of
{ok, S} ->
%% Re-using HTTP/1.1 connections
S;
no_socket ->
%% Opening a new HTTP/1.1 connection
undefined
catch
exit:{noproc, Reason} ->
case proplists:get_value(pool_ensure, Options, false) of
true ->
{ok, DefaultTimeout} = application:get_env(
lhttpc,
connection_timeout),
ConnTimeout = proplists:get_value(pool_connection_timeout,
Options,
DefaultTimeout),
{ok, DefaultMaxPool} = application:get_env(
lhttpc,
pool_size),
PoolMaxSize = proplists:get_value(pool_max_size,
Options,
DefaultMaxPool),
case lhttpc:add_pool(Pool, ConnTimeout, PoolMaxSize) of
{ok, _Pid} ->
ensure_call(Pool, SocketRequest, Options);
_ ->
%% Failed to create pool, exit as expected
exit({noproc, Reason})
end;
false ->
%% No dynamic pool creation, exit as expected
exit({noproc, Reason})
end
end.

send_request(#client_state{attempts = 0}) ->
% Don't try again if the number of allowed attempts is 0.
throw(connection_closed);
Expand All @@ -187,16 +224,24 @@ send_request(#client_state{socket = undefined} = State) ->
ConnectOptions0
end,
SocketOptions = [binary, {packet, http}, {active, false} | ConnectOptions],
case lhttpc_sock:connect(Host, Port, SocketOptions, Timeout, Ssl) of
try lhttpc_sock:connect(Host, Port, SocketOptions, Timeout, Ssl) of
{ok, Socket} ->
send_request(State#client_state{socket = Socket});
{error, etimedout} ->
% TCP stack decided to give up
throw(connect_timeout);
{error, timeout} ->
throw(connect_timeout);
{error, "record overflow"} ->
throw(ssl_error);
{error, Reason} ->
erlang:error(Reason)
catch
exit:{{{badmatch, {error, {asn1, _}}}, _}, _} ->
throw(ssl_decode_error);
Type:Error ->
error_logger:error_msg("Socket connection error: ~p ~p, ~p",
[Type, Error, erlang:get_stacktrace()])
end;
send_request(#client_state{proxy = #lhttpc_url{}, proxy_setup = false} = State) ->
#lhttpc_url{
Expand Down
14 changes: 11 additions & 3 deletions src/lhttpc_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,23 @@ header_value(Hdr, Hdrs) ->
-spec header_value(string(), [{header(), Value}], Default) ->
Default | Value.
header_value(Hdr, [{Hdr, Value} | _], _) ->
Value;
case is_list(Value) of
true -> string:strip(Value);
false -> Value
end;

header_value(Hdr, [{ThisHdr, Value}| Hdrs], Default) when is_atom(ThisHdr) ->
header_value(Hdr, [{atom_to_list(ThisHdr), Value}| Hdrs], Default);
header_value(Hdr, [{ThisHdr, Value}| Hdrs], Default) when is_binary(ThisHdr) ->
header_value(Hdr, [{binary_to_list(ThisHdr), Value}| Hdrs], Default);
header_value(Hdr, [{ThisHdr, Value}| Hdrs], Default) ->
case string:equal(string:to_lower(ThisHdr), Hdr) of
true -> Value;
false -> header_value(Hdr, Hdrs, Default)
true -> case is_list(Value) of
true -> string:strip(Value);
false -> Value
end;
false ->
header_value(Hdr, Hdrs, Default)
end;
header_value(_, [], Default) ->
Default.
Expand Down
45 changes: 44 additions & 1 deletion src/lhttpc_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@
-export([
start_link/0,
start_link/1,
client_count/1,
connection_count/1,
connection_count/2,
update_connection_timeout/2
update_connection_timeout/2,
dump_settings/1,
list_pools/0,
set_max_pool_size/2
]).
-export([
init/1,
Expand All @@ -61,6 +65,39 @@
timeout = 300000 :: non_neg_integer()
}).

%% @spec (PoolPidOrName) -> list()
%% @doc Returns the current settings in state for the
%% specified lhttpc pool (manager).
%% @end
-spec dump_settings(pid() | atom()) -> list().
dump_settings(PidOrName) ->
gen_server:call(PidOrName, dump_settings).

-spec set_max_pool_size(pid() | atom(), non_neg_integer()) -> ok.
set_max_pool_size(PidOrName, Size) when is_integer(Size), Size > 0 ->
gen_server:cast(PidOrName, {set_max_pool_size, Size}).

-spec list_pools() -> term().
list_pools() ->
Children = supervisor:which_children(lhttpc_sup),
lists:foldl(fun(In, Acc) ->
case In of
{N, P, _, [lhttpc_manager]} ->
[{N, dump_settings(P)} | Acc];
_ ->
Acc
end
end, [], Children).

%% @spec (PoolPidOrName) -> Count
%% Count = integer()
%% @doc Returns the total number of active clients maintained by the
%% specified lhttpc pool (manager).
%% @end
-spec client_count(pid() | atom()) -> non_neg_integer().
client_count(PidOrName) ->
gen_server:call(PidOrName, client_count).

%% @spec (PoolPidOrName) -> Count
%% Count = integer()
%% @doc Returns the total number of active connections maintained by the
Expand Down Expand Up @@ -156,6 +193,10 @@ handle_call({socket, Pid, Host, Port, Ssl}, {Pid, _Ref} = From, State) ->
{reply, no_socket, monitor_client(Dest, From, State2)}
end
end;
handle_call(dump_settings, _, State) ->
{reply, [{max_pool_size, State#httpc_man.max_pool_size}, {timeout, State#httpc_man.timeout}], State};
handle_call(client_count, _, State) ->
{reply, dict:size(State#httpc_man.clients), State};
handle_call(connection_count, _, State) ->
{reply, dict:size(State#httpc_man.sockets), State};
handle_call({connection_count, Destination}, _, State) ->
Expand All @@ -179,6 +220,8 @@ handle_call(_, _, State) ->
-spec handle_cast(any(), #httpc_man{}) -> {noreply, #httpc_man{}}.
handle_cast({update_timeout, Milliseconds}, State) ->
{noreply, State#httpc_man{timeout = Milliseconds}};
handle_cast({set_max_pool_size, Size}, State) ->
{noreply, State#httpc_man{max_pool_size = Size}};
handle_cast(_, State) ->
{noreply, State}.

Expand Down
Loading

0 comments on commit ed9aa8b

Please sign in to comment.