Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/wooga/eredis
Browse files Browse the repository at this point in the history
  • Loading branch information
Ransom Richardson committed Jul 6, 2012
2 parents a3ca819 + 4d35fcc commit 4803877
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 7 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# CHANGELOG

## v1.0.2

* Fixed bug in eredis_sub where the socket was incorrectly set to
`{active, once}` twice. At large volumes of messages, this resulted
in too many messages from the socket and we would be unable to keep
up.

## v1.0

* Support added for pubsub thanks to Dave Peticolas
Expand Down Expand Up @@ -28,4 +35,4 @@
* Configurable reconnect sleep time, by Valentino Volonghi (dialtone)

* Support for using eredis as a poolboy worker, by Valentino Volonghi
(dialtone)
(dialtone)
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ Pubsub:
2> redis_sub:pub_example().
received {message,<<"foo">>,<<"bar">>,<0.34.0>}

Pattern Subscribe:

1> eredis_sub:psub_example().
received {subscribed,<<"foo*">>,<0.33.0>}
{<0.33.0>,<0.36.0>}
2> eredis_sub:ppub_example().
received {pmessage,<<"foo*">>,<<"foo123">>,<<"bar">>,<0.33.0>}
ok
3>

EUnit tests:

Expand Down Expand Up @@ -120,8 +129,9 @@ Subscriptions are managed using `eredis_sub:subscribe/2` and
subscription, a message is sent to the controlling process for each
channel.

For now, channel patterns are not supported, but it is relatively easy
to add support. Patches are welcome :)
eredis also supports Pattern Subscribe using `eredis_sub:psubscribe/2`
and `eredis_sub:unsubscribe/2`. As with normal subscriptions, a message
is sent to the controlling process for each channel.

## AUTH and SELECT

Expand Down
2 changes: 1 addition & 1 deletion src/eredis.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, eredis, [
{description, "Erlang Redis Client"},
{vsn, "1.0.1"},
{vsn, "1.0.2"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib]}
Expand Down
32 changes: 32 additions & 0 deletions src/eredis_sub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
controlling_process/1, controlling_process/2, controlling_process/3,
ack_message/1, subscribe/2, unsubscribe/2, channels/1]).

-export([psubscribe/2,punsubscribe/2]).

-export([receiver/1, sub_example/0, pub_example/0]).

-export([psub_example/0,ppub_example/0]).

%%
%% PUBLIC API
%%
Expand Down Expand Up @@ -114,9 +118,21 @@ ack_message(Client) ->
subscribe(Client, Channels) ->
gen_server:cast(Client, {subscribe, self(), Channels}).

%% @doc: Pattern subscribe to the given channels. Returns immediately. The
%% result will be delivered to the controlling process as any other
%% message. Delivers {subscribed, Channel::binary(), pid()}
-spec psubscribe(pid(), [channel()]) -> ok.
psubscribe(Client, Channels) ->
gen_server:cast(Client, {psubscribe, self(), Channels}).



unsubscribe(Client, Channels) ->
gen_server:cast(Client, {unsubscribe, self(), Channels}).

punsubscribe(Client, Channels) ->
gen_server:cast(Client, {punsubscribe, self(), Channels}).

%% @doc: Returns the channels the given client is currently
%% subscribing to. Note: this list is based on the channels at startup
%% and any channel added during runtime. It might not immediately
Expand Down Expand Up @@ -147,7 +163,23 @@ sub_example() ->
end),
{Sub, Receiver}.

psub_example() ->
{ok, Sub} = start_link(),
Receiver = spawn_link(fun () ->
controlling_process(Sub),
psubscribe(Sub, [<<"foo*">>]),
receiver(Sub)
end),
{Sub, Receiver}.

pub_example() ->
{ok, P} = eredis:start_link(),
eredis:q(P, ["PUBLISH", "foo", "bar"]),
eredis_client:stop(P).

ppub_example() ->
{ok, P} = eredis:start_link(),
eredis:q(P, ["PUBLISH", "foo123", "bar"]),
eredis_client:stop(P).


36 changes: 35 additions & 1 deletion src/eredis_sub_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,32 @@ handle_cast({subscribe, Pid, Channels}, #state{controlling_process = {_, Pid}} =
ok = gen_tcp:send(State#state.socket, Command),
{noreply, State#state{channels = Channels ++ State#state.channels}};


handle_cast({psubscribe, Pid, Channels}, #state{controlling_process = {_, Pid}} = State) ->
Command = eredis:create_multibulk(["PSUBSCRIBE" | Channels]),
ok = gen_tcp:send(State#state.socket, Command),
{noreply, State#state{channels = Channels ++ State#state.channels}};



handle_cast({unsubscribe, Pid, Channels}, #state{controlling_process = {_, Pid}} = State) ->
Command = eredis:create_multibulk(["UNSUBSCRIBE" | Channels]),
ok = gen_tcp:send(State#state.socket, Command),
NewChannels = lists:foldl(fun (C, Cs) -> lists:delete(C, Cs) end,
State#state.channels, Channels),
{noreply, State#state{channels = NewChannels}};



handle_cast({punsubscribe, Pid, Channels}, #state{controlling_process = {_, Pid}} = State) ->
Command = eredis:create_multibulk(["PUNSUBSCRIBE" | Channels]),
ok = gen_tcp:send(State#state.socket, Command),
NewChannels = lists:foldl(fun (C, Cs) -> lists:delete(C, Cs) end,
State#state.channels, Channels),
{noreply, State#state{channels = NewChannels}};



handle_cast({ack_message, _}, State) ->
{noreply, State};

Expand Down Expand Up @@ -210,7 +229,6 @@ handle_response(Data, #state{parser_state = ParserState} = State) ->
handle_response(Rest, NewState);

{continue, NewParserState} ->
inet:setopts(State#state.socket, [{active, once}]),
State#state{parser_state = NewParserState}
end.

Expand All @@ -219,13 +237,29 @@ handle_response(Data, #state{parser_state = ParserState} = State) ->
%% for later delivery.
reply({ok, [<<"message">>, Channel, Message]}, State) ->
queue_or_send({message, Channel, Message, self()}, State);

reply({ok, [<<"pmessage">>, Pattern, Channel, Message]}, State) ->
queue_or_send({pmessage, Pattern, Channel, Message, self()}, State);



reply({ok, [<<"subscribe">>, Channel, _]}, State) ->
queue_or_send({subscribed, Channel, self()}, State);

reply({ok, [<<"psubscribe">>, Channel, _]}, State) ->
queue_or_send({subscribed, Channel, self()}, State);


reply({ok, [<<"unsubscribe">>, Channel, _]}, State) ->
queue_or_send({unsubscribed, Channel, self()}, State);


reply({ok, [<<"punsubscribe">>, Channel, _]}, State) ->
queue_or_send({unsubscribed, Channel, self()}, State);
reply({ReturnCode, Value}, State) ->
throw({unexpected_response_from_redis, ReturnCode, Value, State}).


queue_or_send(Msg, State) ->
case State#state.msg_state of
need_ack ->
Expand Down
67 changes: 65 additions & 2 deletions test/eredis_sub_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,17 @@ add_channels(Sub, Channels) ->
ok = eredis_sub:subscribe(Sub, Channels),
lists:foreach(
fun (C) ->
receive M -> ?assertEqual({subscribed, C, Sub}, M)
after 1 -> ok end
receive M ->
?assertEqual({subscribed, C, Sub}, M),
eredis_sub:ack_message(Sub)
end
end, Channels).






pubsub_test() ->
Pub = c(),
Sub = s(),
Expand Down Expand Up @@ -219,3 +226,59 @@ get_state([{data, [{"State", State}]} | _]) ->
State;
get_state([_|Rest]) ->
get_state(Rest).





% Tests for Pattern Subscribe

add_channels_pattern(Sub, Channels) ->
ok = eredis_sub:controlling_process(Sub),
ok = eredis_sub:psubscribe(Sub, Channels),
lists:foreach(
fun (C) ->
receive M ->
?assertEqual({subscribed, C, Sub}, M),
eredis_sub:ack_message(Sub)
end
end, Channels).





pubsub_pattern_test() ->
Pub = c(),
Sub = s(),
add_channels_pattern(Sub, [<<"chan1*">>, <<"chan2*">>]),
ok = eredis_sub:controlling_process(Sub),

?assertEqual({ok, <<"1">>}, eredis:q(Pub, ["PUBLISH", <<"chan123">>, <<"msg">>])),
receive
{pmessage, _Pattern, _Channel, _Message, _} = M ->
?assertEqual({pmessage, <<"chan1*">>,<<"chan123">>, <<"msg">>, Sub}, M)
after 10 ->
throw(timeout)
end,

eredis_sub:punsubscribe(Sub, [<<"chan1*">> , <<"chan2*">>]),
eredis_sub:ack_message(Sub),
eredis_sub:ack_message(Sub),
receive {unsubscribed,_,_} = M2 -> ?assertEqual({unsubscribed, <<"chan1*">>, Sub}, M2) end,
eredis_sub:ack_message(Sub),
receive {unsubscribed,_,_} = M3 -> ?assertEqual({unsubscribed, <<"chan2*">>, Sub}, M3) end,
eredis_sub:ack_message(Sub),

?assertEqual({ok, <<"0">>}, eredis:q(Pub, ["PUBLISH", <<"chan123">>, <<"msg">>])),
receive
Msg -> throw({unexpected_message, Msg})
after 10 ->
ok
end,

eredis_sub:stop(Sub).




0 comments on commit 4803877

Please sign in to comment.