Skip to content

Commit

Permalink
deduped socket listener functions
Browse files Browse the repository at this point in the history
  • Loading branch information
iamaleksey committed Feb 2, 2010
1 parent 96b5dce commit 2dc6a61
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 180 deletions.
92 changes: 2 additions & 90 deletions src/gen_esme_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@
%%% CODE UPDATE EXPORTS
-export([code_change/4]).

%%% INTERNAL EXPORTS
-export([wait_listen/3, wait_recv/4, recv_loop/4]).

%%% RECORDS
-record(st,
{esme,
Expand Down Expand Up @@ -207,7 +204,7 @@ init([Mod, Esme, Opts]) ->

init_open(Mod, Esme, Sock, Tmr, Log) ->
Self = self(),
Pid = spawn_link(fun() -> wait_recv(Self, Sock, Log) end),
Pid = spawn_link(smpp_session, wait_recv, [Self, Sock, Log]),
{ok, open, #st{esme = Esme,
mod = Mod,
log = Log,
Expand All @@ -221,7 +218,7 @@ init_open(Mod, Esme, Sock, Tmr, Log) ->

init_listen(Mod, Esme, LSock, Tmr, Log) ->
Self = self(),
Pid = spawn_link(fun() -> wait_listen(Self, LSock, Log) end),
Pid = spawn_link(smpp_session, wait_listen, [Self, LSock, Log]),
{ok, listen, #st{esme = Esme,
mod = Mod,
log = Log,
Expand Down Expand Up @@ -684,88 +681,3 @@ send_request(CmdId, Params, From, St) ->

send_response(CmdId, Status, SeqNum, Params, Sock, Log) ->
send_pdu(Sock, smpp_operation:new(CmdId, Status, SeqNum, Params), Log).

%%%-----------------------------------------------------------------------------
%%% SOCKET LISTENER FUNCTIONS
%%%-----------------------------------------------------------------------------
handle_accept(Pid, Sock) ->
ok = gen_tcp:controlling_process(Sock, Pid),
case inet:peername(Sock) of
{ok, {Addr, _Port}} ->
gen_fsm:send_event(Pid, {accept, Sock, Addr}),
true;
{error, _Reason} -> % Most probably the socket is closed
false
end.


handle_input(Pid, <<CmdLen:32, Rest/binary>> = Buffer, Lapse, N, Log) ->
Now = now(), % PDU received. PDU handling starts now!
Len = CmdLen - 4,
case Rest of
<<PduRest:Len/binary-unit:8, NextPdus/binary>> ->
BinPdu = <<CmdLen:32, PduRest/binary>>,
case catch smpp_operation:unpack(BinPdu) of
{ok, Pdu} ->
smpp_log_mgr:pdu(Log, BinPdu),
CmdId = smpp_operation:get_value(command_id, Pdu),
Event = {input, CmdId, Pdu, (Lapse div N), Now},
gen_fsm:send_all_state_event(Pid, Event);
{error, _CmdId, _Status, _SeqNum} = Event ->
gen_fsm:send_all_state_event(Pid, Event);
{'EXIT', _What} ->
Event = {error, 0, ?ESME_RUNKNOWNERR, 0},
gen_fsm:send_all_state_event(Pid, Event)
end,
% The buffer may carry more than one SMPP PDU.
handle_input(Pid, NextPdus, Lapse, N + 1, Log);
_IncompletePdu ->
Buffer
end;
handle_input(_Pid, Buffer, _Lapse, _N, _Log) ->
Buffer.


wait_listen(Pid, LSock, Log) ->
case gen_tcp:accept(LSock) of
{ok, Sock} ->
case handle_accept(Pid, Sock) of
true ->
wait_recv(Pid, Sock, Log);
false ->
?MODULE:wait_listen(Pid, LSock, Log)
end;
{error, Reason} ->
gen_fsm:send_all_state_event(Pid, {listen_error, Reason})
end.


wait_recv(Pid, Sock, Log) ->
?MODULE:wait_recv(Pid, Sock, <<>>, Log).

wait_recv(Pid, Sock, Buffer, Log) ->
Timestamp = now(),
case gen_tcp:recv(Sock, 0) of
{ok, Input} ->
L = timer:now_diff(now(), Timestamp), % Time lapse
B = handle_input(Pid, concat_binary([Buffer, Input]), L, 1, Log),
case recv_loop(Pid, Sock, B, Log) of
{ok, NewBuffer} ->
?MODULE:wait_recv(Pid, Sock, NewBuffer, Log);
RecvError ->
gen_fsm:send_all_state_event(Pid, RecvError)
end;
{error, Reason} ->
gen_fsm:send_all_state_event(Pid, {sock_error, Reason})
end.

recv_loop(Pid, Sock, Buffer, Log) ->
case gen_tcp:recv(Sock, 0, 0) of
{ok, Input} -> % Some input waiting already
B = handle_input(Pid, concat_binary([Buffer, Input]), 0, 1, Log),
?MODULE:recv_loop(Pid, Sock, B, Log);
{error, timeout} -> % No data inmediately available
{ok, Buffer};
{error, Reason} ->
{sock_error, Reason}
end.
92 changes: 2 additions & 90 deletions src/gen_mc_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@
%%% CODE UPDATE EXPORTS
-export([code_change/4]).

%%% INTERNAL EXPORTS
-export([wait_listen/3, wait_recv/4, recv_loop/4]).

%%% MACROS
-define(BOUND(B),
if
Expand Down Expand Up @@ -168,7 +165,7 @@ init([Mod, Mc, Opts]) ->

init_open(Mod, Mc, Sock, Tmr, Log) ->
Self = self(),
Pid = spawn_link(fun() -> wait_recv(Self, Sock, Log) end),
Pid = spawn_link(smpp_session, wait_recv, [Self, Sock, Log]),
{ok, open, #st{mc = Mc,
mod = Mod,
log = Log,
Expand All @@ -181,7 +178,7 @@ init_open(Mod, Mc, Sock, Tmr, Log) ->

init_listen(Mod, Mc, LSock, Tmr, Log) ->
Self = self(),
Pid = spawn_link(fun() -> wait_listen(Self, LSock, Log) end),
Pid = spawn_link(smpp_session, wait_listen, [Self, LSock, Log]),
{ok, listen, #st{mc = Mc,
mod = Mod,
log = Log,
Expand Down Expand Up @@ -692,88 +689,3 @@ send_request(CmdId, Params, From, St) ->

send_response(CmdId, Status, SeqNum, Params, Sock, Log) ->
send_pdu(Sock, smpp_operation:new(CmdId, Status, SeqNum, Params), Log).

%%%-----------------------------------------------------------------------------
%%% SOCKET LISTENER FUNCTIONS
%%%-----------------------------------------------------------------------------
handle_accept(Pid, Sock) ->
ok = gen_tcp:controlling_process(Sock, Pid),
case inet:peername(Sock) of
{ok, {Addr, _Port}} ->
gen_fsm:send_event(Pid, {accept, Sock, Addr}),
true;
{error, _Reason} -> % Most probably the socket is closed
false
end.


handle_input(Pid, <<CmdLen:32, Rest/binary>> = Buffer, Lapse, N, Log) ->
Now = now(), % PDU received. PDU handling starts now!
Len = CmdLen - 4,
case Rest of
<<PduRest:Len/binary-unit:8, NextPdus/binary>> ->
BinPdu = <<CmdLen:32, PduRest/binary>>,
case catch smpp_operation:unpack(BinPdu) of
{ok, Pdu} ->
smpp_log_mgr:pdu(Log, BinPdu),
CmdId = smpp_operation:get_value(command_id, Pdu),
Event = {input, CmdId, Pdu, (Lapse div N), Now},
gen_fsm:send_all_state_event(Pid, Event);
{error, _CmdId, _Status, _SeqNum} = Event ->
gen_fsm:send_all_state_event(Pid, Event);
{'EXIT', _What} ->
Event = {error, 0, ?ESME_RUNKNOWNERR, 0},
gen_fsm:send_all_state_event(Pid, Event)
end,
% The buffer may carry more than one SMPP PDU.
handle_input(Pid, NextPdus, Lapse, N + 1, Log);
_IncompletePdu ->
Buffer
end;
handle_input(_Pid, Buffer, _Lapse, _N, _Log) ->
Buffer.


wait_listen(Pid, LSock, Log) ->
case gen_tcp:accept(LSock) of
{ok, Sock} ->
case handle_accept(Pid, Sock) of
true ->
wait_recv(Pid, Sock, Log);
false ->
?MODULE:wait_listen(Pid, LSock, Log)
end;
{error, Reason} ->
gen_fsm:send_all_state_event(Pid, {listen_error, Reason})
end.


wait_recv(Pid, Sock, Log) ->
?MODULE:wait_recv(Pid, Sock, <<>>, Log).

wait_recv(Pid, Sock, Buffer, Log) ->
Timestamp = now(),
case gen_tcp:recv(Sock, 0) of
{ok, Input} ->
L = timer:now_diff(now(), Timestamp),
B = handle_input(Pid, concat_binary([Buffer, Input]), L, 1, Log),
case recv_loop(Pid, Sock, B, Log) of
{ok, NewBuffer} ->
?MODULE:wait_recv(Pid, Sock, NewBuffer, Log);
RecvError ->
gen_fsm:send_all_state_event(Pid, RecvError)
end;
{error, Reason} ->
gen_fsm:send_all_state_event(Pid, {sock_error, Reason})
end.

recv_loop(Pid, Sock, Buffer, Log) ->
case gen_tcp:recv(Sock, 0, 0) of
{ok, Input} -> % Some input waiting already
B = handle_input(Pid, concat_binary([Buffer, Input]), 0, 1, Log),
?MODULE:recv_loop(Pid, Sock, B, Log);
{error, timeout} -> % No data inmediately available
{ok, Buffer};
{error, Reason} ->
{sock_error, Reason}
end.
88 changes: 88 additions & 0 deletions src/smpp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
%%% EXTERNAL EXPORTS
-export([congestion/3, connect/1, listen/1]).

%%% SOCKET LISTENER FUNCTIONS EXPORTS
-export([wait_listen/3, wait_recv/3, wait_recv/4, recv_loop/4]).

%%% MACROS
-define(CONNECT_OPTS, [binary, {packet, 0}, {active, false}]).
-define(CONNECT_TIME, 30000).
Expand Down Expand Up @@ -106,6 +109,91 @@ listen(Opts) ->
end
end.

%%%-----------------------------------------------------------------------------
%%% SOCKET LISTENER FUNCTIONS
%%%-----------------------------------------------------------------------------
handle_accept(Pid, Sock) ->
ok = gen_tcp:controlling_process(Sock, Pid),
case inet:peername(Sock) of
{ok, {Addr, _Port}} ->
gen_fsm:send_event(Pid, {accept, Sock, Addr}),
true;
{error, _Reason} -> % Most probably the socket is closed
false
end.


handle_input(Pid, <<CmdLen:32, Rest/binary>> = Buffer, Lapse, N, Log) ->
Now = now(), % PDU received. PDU handling starts now!
Len = CmdLen - 4,
case Rest of
<<PduRest:Len/binary-unit:8, NextPdus/binary>> ->
BinPdu = <<CmdLen:32, PduRest/binary>>,
case catch smpp_operation:unpack(BinPdu) of
{ok, Pdu} ->
smpp_log_mgr:pdu(Log, BinPdu),
CmdId = smpp_operation:get_value(command_id, Pdu),
Event = {input, CmdId, Pdu, (Lapse div N), Now},
gen_fsm:send_all_state_event(Pid, Event);
{error, _CmdId, _Status, _SeqNum} = Event ->
gen_fsm:send_all_state_event(Pid, Event);
{'EXIT', _What} ->
Event = {error, 0, ?ESME_RUNKNOWNERR, 0},
gen_fsm:send_all_state_event(Pid, Event)
end,
% The buffer may carry more than one SMPP PDU.
handle_input(Pid, NextPdus, Lapse, N + 1, Log);
_IncompletePdu ->
Buffer
end;
handle_input(_Pid, Buffer, _Lapse, _N, _Log) ->
Buffer.


wait_listen(Pid, LSock, Log) ->
case gen_tcp:accept(LSock) of
{ok, Sock} ->
case handle_accept(Pid, Sock) of
true ->
wait_recv(Pid, Sock, Log);
false ->
?MODULE:wait_listen(Pid, LSock, Log)
end;
{error, Reason} ->
gen_fsm:send_all_state_event(Pid, {listen_error, Reason})
end.


wait_recv(Pid, Sock, Log) ->
?MODULE:wait_recv(Pid, Sock, <<>>, Log).

wait_recv(Pid, Sock, Buffer, Log) ->
Timestamp = now(),
case gen_tcp:recv(Sock, 0) of
{ok, Input} ->
L = timer:now_diff(now(), Timestamp),
B = handle_input(Pid, concat_binary([Buffer, Input]), L, 1, Log),
case recv_loop(Pid, Sock, B, Log) of
{ok, NewBuffer} ->
?MODULE:wait_recv(Pid, Sock, NewBuffer, Log);
RecvError ->
gen_fsm:send_all_state_event(Pid, RecvError)
end;
{error, Reason} ->
gen_fsm:send_all_state_event(Pid, {sock_error, Reason})
end.

recv_loop(Pid, Sock, Buffer, Log) ->
case gen_tcp:recv(Sock, 0, 0) of
{ok, Input} -> % Some input waiting already
B = handle_input(Pid, concat_binary([Buffer, Input]), 0, 1, Log),
?MODULE:recv_loop(Pid, Sock, B, Log);
{error, timeout} -> % No data inmediately available
{ok, Buffer};
{error, Reason} ->
{sock_error, Reason}
end.

%%%-----------------------------------------------------------------------------
%%% INTERNAL FUNCTIONS
%%%-----------------------------------------------------------------------------
Expand Down

0 comments on commit 2dc6a61

Please sign in to comment.