Skip to content
This repository has been archived by the owner on May 8, 2019. It is now read-only.

Commit

Permalink
Implement some methods
Browse files Browse the repository at this point in the history
- get_schema
- list_dbs
- transaction
  - select
  • Loading branch information
shun159 committed Mar 15, 2016
1 parent bfb5967 commit bf20163
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 32 deletions.
8 changes: 4 additions & 4 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
warn_unused_import,
warn_unused_vars]}.

{deps, [{jsx, ".*",
{git, "https://github.com/talentdeficit/jsx.git", {tag, "2.8.0"}}},
{shackle, ".*",
{git, "https://github.com/lpgauth/shackle.git", {tag, "0.2.0"}}}]}.
{deps, [{jsone, ".*",
{git, "https://github.com/sile/jsone.git",
{tag, "1.2.1"}}}
]}.

{deps_dir, "deps"}.
{lib_dirs, ["deps"]}.
Expand Down
84 changes: 70 additions & 14 deletions src/eovsdb_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,23 @@
-define(RETRY_CONNECT_TIME, 5000).
-define(STATE, eovsdb_client_state).

-record(?STATE, { mref :: reference(),
ipaddr :: inet:ip_address(),
port :: integer() }).
-record(?STATE, { mref :: reference(),
ipaddr :: inet:ip_address(),
port :: integer(),
database :: binary(),
conn_pid :: pid() }).

%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------

-export([connect/2, signal_connect/1]).
-export([connect/2,
signal_connect/1,
list_dbs/1,
get_schema/1,
get_schema/2,
transaction/2,
transaction/3]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
Expand All @@ -30,20 +38,70 @@
%% API Function Definitions
%% ------------------------------------------------------------------

list_dbs(Pid) ->
gen_server:call(Pid, list_dbs).

get_schema(Pid) ->
gen_server:call(Pid, get_schema).
get_schema(Pid, DB) ->
gen_server:call(Pid, {get_schema, DB}).

transaction(Pid, Op) ->
gen_server:call(Pid, {transaction, Op}).
transaction(Pid, DB, Op) ->
gen_server:call(Pid, {transaction, DB, Op}).

signal_connect(Pid) ->
gen_server:cast(Pid, connect).

connect(Host, Port) ->
gen_server:start_link(?MODULE, [Host, Port], []).
connect(Host, Opts) when is_list(Host) ->
HostBin = list_to_binary(Host),
[AddrBin, PortBin] = binary:split(HostBin, <<":">>),
Port = binary_to_integer(PortBin),
Addr0 = binary_to_list(AddrBin),
Addr1 = case inet:parse_address(Addr0) of
{error, einval} ->
case inet_gethost_native:gethostbyname(Addr0) of
{error, _} = E ->
?ERR("[~p] couldn't connect to ~p~n", [?MODULE, Host]),
error(E);
{ok, Res} ->
[TmpAddr|_T] = element(6, Res),
TmpAddr
end;
{ok, TmpAddr} -> TmpAddr
end,
gen_server:start_link(?MODULE, [Addr1, Port, Opts], []).

%% ------------------------------------------------------------------
%% callbacks
%% ------------------------------------------------------------------

init([Host, Port]) ->
init([Host, Port, Opts]) ->
signal_connect(self()),
{ok, #?STATE{ipaddr = Host, port = Port}}.

DB = proplists:get_value(database, Opts),
{ok, #?STATE{ipaddr = Host, port = Port, database = DB}}.

handle_call(list_dbs, _From,
State = #?STATE{ conn_pid = Conn }) ->
Reply = eovsdb_protocol:list_dbs(Conn),
{reply, Reply, State};
handle_call(get_schema, _From,
State = #?STATE{ conn_pid = Conn, database = DB }) ->
Reply = eovsdb_protocol:get_schema(Conn, DB),
{reply, Reply, State};
handle_call({get_schema, DB}, _From,
State = #?STATE{ conn_pid = Conn }) ->
Reply = eovsdb_protocol:get_schema(Conn, DB),
{reply, Reply, State};
handle_call({transaction, Ops},
_From, State = #?STATE{ conn_pid = Conn, database = DB }) ->
Reply = eovsdb_protocol:transaction(Conn, DB, Ops),
{reply, Reply, State};
handle_call({transaction, DB, Ops},
_From, State = #?STATE{ conn_pid = Conn }) ->
Reply = eovsdb_protocol:transaction(Conn, DB, Ops),
{reply, Reply, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.

Expand All @@ -57,18 +115,16 @@ handle_cast(connect, State = #?STATE{ ipaddr = Host, port = Port }) ->
{ok, Conn} ->
gen_tcp:controlling_process(Socket, Conn),
MRef = erlang:monitor(process, Conn),
State#?STATE{ mref = MRef };
State#?STATE{ mref = MRef, conn_pid = Conn };
{error, ChildReason} ->
HostStr = inet_parse:ntoa(Host),
?WARN("can't start eovsdb_protocol for ~s:~p: ~p\n",
[HostStr, Port, ChildReason]),
?WARN("can't start eovsdb_protocol for ~s:~p: ~p~n", [HostStr, Port, ChildReason]),
retry_connect(self(), ?RETRY_CONNECT_TIME),
State
end;
{error, TcpReason} ->
HostStr = inet_parse:ntoa(Host),
?WARN("tcp error connecting to ~s:~p: ~p\n",
[HostStr, Port, TcpReason]),
?WARN("tcp error connecting to ~s:~p: ~p~n", [HostStr, Port, TcpReason]),
retry_connect(self(), ?RETRY_CONNECT_TIME),
State
end,
Expand Down
1 change: 1 addition & 0 deletions src/eovsdb_logger.hrl
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
-define(INFO(Fmt, Arg), error_logger:info_msg(Fmt, Arg)).
-define(WARN(Fmt, Arg), error_logger:warning_msg(Fmt, Arg)).
-define(ERR(Fmt, Arg), error_logger:error_msg(Fmt, Arg)).
30 changes: 30 additions & 0 deletions src/eovsdb_methods.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-module(eovsdb_methods).

-export([q/3]).

q(echo, Id, Params) ->
#{ id => Id, method => echo, params => Params };
q(echo_reply, Id, Params) ->
#{ id => Id, error => null, result => Params };
q(list_dbs, Id, Params) ->
#{ id => Id, method => list_dbs, params => Params };
q(get_schema, Id, Params) ->
#{ id => Id, method => get_schema, params => Params };
q(transact, Id, Params) ->
#{ id => Id, method => transact, params => Params };
q(cancel, Id, Params) ->
#{ id => Id, method => cancel, params => Params };
q(monitor, Id, Params) ->
#{ id => Id, method => monitor, params => Params };
q(update, Id, Params) ->
#{ id => Id, method => update, params => Params };
q(monitor_cancel, Id, Params) ->
#{ id => Id, method => monitor_cancel, params => Params };
q(lock, Id, Params) ->
#{ id => Id, method => lock, params => Params };
q(steal, Id, Params) ->
#{ id => Id, method => steal, params => Params };
q(unlock, Id, Params) ->
#{ id => Id, method => unlock, params => Params };
q(Invalid, _, _) ->
{ error, { not_supported, Invalid }}.
23 changes: 23 additions & 0 deletions src/eovsdb_op.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-module(eovsdb_op).

-export([select/3]).

select("*", Table, Conds) ->
Where = [[C, F, V] || {C, F, V} <- Conds],
#{ op => select,
table => Table,
where => Where };
select(Cols, Table, Conds) ->
Where = [[C, F, V] || {C, F, V} <- Conds],
#{ op => select,
table => Table,
where => Where,
columns => Cols }.

%% Transact API
%% 1.select
%% [A, B, C] = eovsdb_methods:transaction([
%% eovsdb_op:select([], Tbl),
%% eovsdb_op:select([Col1, Col2], Tbl, [{Col1, <<">">>, 10}]),
%% eovsdb_op:select([Col1, Col2], Tbl, [{Col1, <<">">>, 10}])
%% ]
97 changes: 85 additions & 12 deletions src/eovsdb_protocol.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@
-export([start_link/1]).

%% Protocol API
-export([echo/1]).
-export([echo_reply/2, list_dbs/1, get_schema/2, transaction/3]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

-define(SERVER, ?MODULE).
-define(SYNC_SEND_TIMEOUT, 5000).
-define(STATE, eovsdb_protocol_state).

-record(?STATE, { socket :: inet:socket(),
address :: inet:ip_address(),
port :: integer() }).
-record(?STATE, { socket :: inet:socket(),
address :: inet:ip_address(),
port :: integer(),
protocol = tcp :: atom(),
buffer = <<>> :: binary(),
pending_message = maps:new() :: map() }).

%%%===================================================================
%%% API
Expand All @@ -35,8 +39,23 @@
start_link(Socket) ->
gen_server:start_link(?MODULE, [Socket], []).

echo(_Xid) ->
not_implemented.
list_dbs(Pid) ->
Json = eovsdb_methods:q(list_dbs, 0, []),
gen_server:call(Pid, {sync_send, Json}).

get_schema(Pid, DB) ->
Json = eovsdb_methods:q(get_schema, 0, [DB]),
gen_server:call(Pid, {sync_send, Json}).

transaction(Pid, DB, Op) when not is_list(Op) ->
transaction(Pid, DB, [Op]);
transaction(Pid, DB, Ops) when is_list(Ops) ->
Json = eovsdb_methods:q(transact, 0, [DB] ++ Ops),
gen_server:call(Pid, {sync_send, Json}).

echo_reply(Id, Params) ->
Json = eovsdb_methods:q(echo_reply, Id, Params),
ok = gen_server:cast(self(), {send_json, Json}).

%%%===================================================================
%%% gen_server callbacks
Expand All @@ -54,11 +73,11 @@ echo(_Xid) ->
%% @end
%%--------------------------------------------------------------------
init([Socket]) ->
{ ok, { Address, Port } } = inet:peername(Socket),
eovsdb_util:setopts(tcp, Socket, [{ active, once }]),
{ok, {Address, Port}} = inet:peername(Socket),
eovsdb_util:setopts(tcp, Socket, [{active, once}]),
AddrStr = inet_parse:ntoa(Address),
?INFO("connected to ~s:~p pid:~p\n", [AddrStr, Port, self()]),
{ok, #?STATE{ socket = Socket }}.
{ok, #?STATE{socket = Socket, protocol = tcp}}.

%%--------------------------------------------------------------------
%% @private
Expand All @@ -74,6 +93,15 @@ init([Socket]) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_call({sync_send, Data0}, From,
State = #?STATE{ socket = Socket,
protocol = Proto,
pending_message = PM0 }) ->
Id = eovsdb_util:rand_id(),
Data = Data0#{ id => Id },
PM = maps:put(Id, #{ from => From, reply => noreply }, PM0),
eovsdb_util:send_json(Proto, Socket, Data),
{noreply, State#?STATE{ pending_message = PM }};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
Expand All @@ -88,7 +116,9 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_cast(_Msg, State) ->
handle_cast({send_json, Data},
State = #?STATE{socket = Socket, protocol = Proto}) ->
eovsdb_util:send_json(Proto, Socket, Data),
{noreply, State}.

%%--------------------------------------------------------------------
Expand All @@ -101,8 +131,12 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{noreply, State}.
handle_info({tcp, Socket, Data},
State = #?STATE{socket = Socket, protocol = Proto}) ->
eovsdb_util:setopts(Proto, Socket, [{active, once}]),
handle_tcp(State, Data);
handle_info({tcp_closed, _Socket}, State) ->
terminate_connection(State, tcp_closed).

%%--------------------------------------------------------------------
%% @private
Expand Down Expand Up @@ -132,3 +166,42 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================

handle_tcp(State, <<>>) ->
{noreply, State};
handle_tcp(State = #?STATE{ buffer = Buf }, Data0) ->
Data1 = <<Buf/binary, Data0/binary>>,
case jsone:try_decode(Data1) of
{ok, Term, Left} ->
NewState = handle_message(Term, State),
handle_tcp(NewState#?STATE{buffer = <<>>}, Left);
{error, _Error} ->
{noreply, State#?STATE{buffer = Data1}}
end.

handle_message(#{ <<"method">> := <<"echo">> } = Data, State) ->
#{ <<"id">> := Id, <<"params">> := Params } = Data,
ok = echo_reply(Id, Params),
State;
handle_message(Data, State = #?STATE{pending_message = PM0}) ->
#{ <<"id">> := Id,
<<"result">> := Result,
<<"error">> := Err } = Data,
case maps:get(Id, PM0, not_found) of
not_found -> State;
#{ from := From } ->
ReplyData = case Err of
null -> { ok, Result };
_ -> { error, { Err, Result }}
end,
gen_server:reply(From, ReplyData),
PM = maps:remove(Id, PM0),
State#?STATE{pending_message = PM}
end.

terminate_connection(State = #?STATE{ socket = Socket,
protocol = Proto },
Reason) ->
?WARN("[~p] terminating: ~p~n", [?MODULE, Reason]),
ok = eovsdb_util:close(Proto, Socket),
{stop, normal, State#?STATE{socket = undefined}}.
25 changes: 24 additions & 1 deletion src/eovsdb_util.erl
Original file line number Diff line number Diff line change
@@ -1,8 +1,31 @@
-module(eovsdb_util).

-export([setopts/3]).
-export([send_json/3,
setopts/3,
close/2,
rand_id/0]).

send_json(Protocol, Socket, Map) ->
Json = jsone:encode(Map),
send(Protocol, Socket, Json).

setopts(tcp, Socket, Opts) ->
inet:setopts(Socket, Opts);
setopts(tls, Socket, Opts) ->
ssl:setopts(Socket, Opts).

send(tcp, Socket, Data) ->
gen_tcp:send(Socket, Data);
send(tls, Socket, Data) ->
ssl:send(Socket, Data).

close(_, undefined) ->
ok;
close(tcp, Socket) ->
gen_tcp:close(Socket);
close(tls, Socket) ->
ssl:close(Socket).

rand_id() ->
IdBin = crypto:rand_bytes(4),
binary:decode_unsigned(IdBin, big).
2 changes: 1 addition & 1 deletion start_dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

erl $@ -pa $PWD/ebin $PWD/deps/*/ebin $PWD/test \
-eval "application:start(eovsdb),\
eovsdb_client:connect(localhost, 6632)."
Pid = eovsdb_client:connect(\"localhost:6632\", [<<\"Open_vSwitch\">>])."

0 comments on commit bf20163

Please sign in to comment.