Skip to content

Commit

Permalink
Add API for leaving and closing channels
Browse files Browse the repository at this point in the history
  • Loading branch information
José Valim committed May 11, 2015
1 parent 01b06b0 commit 3155d37
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 13 deletions.
15 changes: 14 additions & 1 deletion lib/phoenix/channel/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,21 @@ defmodule Phoenix.Channel.Server do
This event is synchronous as we want to guarantee
proper termination of the channel.
"""
def close(pid) do
def close(pid, timeout \\ 5000) do
# We need to guarantee that the channel has been closed
# otherwise the link in the transport will trigger it to
# crash.
ref = Process.monitor(pid)
GenServer.cast(pid, :close)
receive do
{:DOWN, ^ref, _, _, _} -> :ok
after
timeout ->
Process.exit(pid, :kill)
receive do
{:DOWN, ^ref, _, _, _} -> :ok
end
end
end

## Channel API
Expand Down
27 changes: 23 additions & 4 deletions lib/phoenix/test/channel_test.ex
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,25 @@ defmodule Phoenix.ChannelTest do
ref
end

@doc """
Emulates the client leaving the channel.
"""
def leave(socket) do
ref = make_ref()
Server.leave(socket.channel_pid, ref)
ref
end

@doc """
Emulates the client closing the channel.
Closing channels is synchronous and has a default timeout
of 5000 miliseconds.
"""
def close(socket, timeout \\ 5000) do
Server.close(socket.channel_pid, timeout)
end

@doc """
Broadcast event from pid to all subscribers of the socket topic.
Expand Down Expand Up @@ -146,14 +165,14 @@ defmodule Phoenix.ChannelTest do
Notice event and payload are patterns. This means one can write:
assert_pushed "some_event", %{"data" => _}
assert_push "some_event", %{"data" => _}
In the assertion above, we don't particularly care about
the data being sent, as long as something was sent.
The timeout is in miliseconds and defaults to 100ms.
"""
defmacro assert_pushed(event, payload, timeout \\ 100) do
defmacro assert_push(event, payload, timeout \\ 100) do
quote do
assert_receive %Phoenix.Socket.Message{event: unquote(event),
payload: unquote(payload)}, unquote(timeout)
Expand All @@ -167,14 +186,14 @@ defmodule Phoenix.ChannelTest do
Notice status and payload are patterns. This means one can write:
ref = push channel, "some_event"
assert_replied ref, :ok, %{"data" => _}
assert_reply ref, :ok, %{"data" => _}
In the assertion above, we don't particularly care about
the data being sent, as long as something was replied.
The timeout is in miliseconds and defaults to 100ms.
"""
defmacro assert_replied(ref, status, payload \\ Macro.escape(%{}), timeout \\ 100) do
defmacro assert_reply(ref, status, payload \\ Macro.escape(%{}), timeout \\ 100) do
quote do
ref = unquote(ref)
assert_receive %Phoenix.Socket.Reply{status: unquote(status), ref: ^ref,
Expand Down
113 changes: 105 additions & 8 deletions test/phoenix/test/channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ defmodule Phoenix.Test.ChannelTest do
{:ok, socket}
end

def join("foo:timeout", _, socket) do
Process.flag(:trap_exit, true)
{:ok, socket}
end

def join("foo:socket", _, socket) do
{:ok, socket, socket}
end
Expand Down Expand Up @@ -65,6 +70,29 @@ defmodule Phoenix.Test.ChannelTest do
def handle_out(event, payload, socket) do
super(event, payload, socket)
end

def handle_info(:stop, socket) do
{:stop, :shutdown, socket}
end

def handle_info(:push, socket) do
push socket, "info", %{"reason" => "push"}
{:noreply, socket}
end

def handle_info(:broadcast, socket) do
broadcast_from socket, "info", %{"reason" => "broadcast"}
{:noreply, socket}
end

def terminate(_reason, %{topic: "foo:timeout"}) do
:timer.sleep(:infinity)
end

def terminate(reason, socket) do
send socket.transport_pid, {:terminate, reason}
:ok
end
end

@endpoint Endpoint
Expand Down Expand Up @@ -107,24 +135,25 @@ defmodule Phoenix.Test.ChannelTest do
test "pushes and receives pushed messages" do
{:ok, _, socket} = join(Channel, "foo:ok")
push socket, "noreply", %{"req" => "foo"}
assert_pushed "noreply", %{"resp" => "foo"}
assert_push "noreply", %{"resp" => "foo"}
end

test "pushes and receives replies" do
{:ok, _, socket} = join(Channel, "foo:ok")

ref = push socket, "reply", %{}
assert_replied ref, :ok
assert_reply ref, :ok

ref = push socket, "reply", %{"req" => "foo"}
assert_replied ref, :ok, %{"resp" => "foo"}
assert_reply ref, :ok, %{"resp" => "foo"}
end

test "pushes on stop" do
Process.flag(:trap_exit, true)
{:ok, _, socket} = join(Channel, "foo:ok")
push socket, "stop", %{"reason" => :normal}
pid = socket.channel_pid
assert_receive {:terminate, :normal}
assert_receive {:EXIT, ^pid, :normal}

# Pushing after stop doesn't crash the client/transport
Expand All @@ -137,14 +166,16 @@ defmodule Phoenix.Test.ChannelTest do

{:ok, _, socket} = join(Channel, "foo:ok")
ref = push socket, "stop_and_reply", %{}
assert_replied ref, :ok
assert_reply ref, :ok
pid = socket.channel_pid
assert_receive {:terminate, :shutdown}
assert_receive {:EXIT, ^pid, :shutdown}

{:ok, _, socket} = join(Channel, "foo:ok")
ref = push socket, "stop_and_reply", %{"req" => "foo"}
assert_replied ref, :ok, %{"resp" => "foo"}
assert_reply ref, :ok, %{"resp" => "foo"}
pid = socket.channel_pid
assert_receive {:terminate, :shutdown}
assert_receive {:EXIT, ^pid, :shutdown}
end

Expand All @@ -156,17 +187,83 @@ defmodule Phoenix.Test.ChannelTest do

## handle_out

test "pushes broadcasts by default" do
test "push broadcasts by default" do
{:ok, _, socket} = subscribe_and_join(Channel, "foo:ok")
broadcast_from! socket, "default", %{"foo" => "bar"}
assert_pushed "default", %{"foo" => "bar"}
assert_push "default", %{"foo" => "bar"}
end

test "pushes broadcasts and stops" do
test "handles broadcasts and stops" do
Process.flag(:trap_exit, true)
{:ok, _, socket} = subscribe_and_join(Channel, "foo:ok")
broadcast_from! socket, "stop", %{"foo" => "bar"}
pid = socket.channel_pid
assert_receive {:terminate, :shutdown}
assert_receive {:EXIT, ^pid, :shutdown}
end

## handle_info

test "handles messages and stops" do
Process.flag(:trap_exit, true)
{:ok, _, socket} = subscribe_and_join(Channel, "foo:ok")
pid = socket.channel_pid
send pid, :stop
assert_receive {:terminate, :shutdown}
assert_receive {:EXIT, ^pid, :shutdown}
end

test "handles messages and pushes" do
{:ok, _, socket} = subscribe_and_join(Channel, "foo:ok")
send socket.channel_pid, :push
assert_push "info", %{"reason" => "push"}
end

test "handles messages and broadcasts" do
{:ok, _, socket} = subscribe_and_join(Channel, "foo:ok")
send socket.channel_pid, :broadcast
assert_broadcast "info", %{"reason" => "broadcast"}
end

## terminate

test "leaves the channel" do
Process.flag(:trap_exit, true)
{:ok, _, socket} = join(Channel, "foo:ok")
ref = leave(socket)
assert_reply ref, :ok

pid = socket.channel_pid
assert_receive {:terminate, {:shutdown, :left}}
assert_receive {:EXIT, ^pid, {:shutdown, :left}}

# Leaving again doesn't crash
_ = leave(socket)
end

test "closes the channel" do
Process.flag(:trap_exit, true)
{:ok, _, socket} = join(Channel, "foo:ok")
close(socket)

pid = socket.channel_pid
assert_receive {:terminate, {:shutdown, :closed}}
assert_receive {:EXIT, ^pid, {:shutdown, :closed}}

# Closing again doesn't crash
_ = close(socket)
end

test "kills the channel when we reach timeout on close" do
Process.flag(:trap_exit, true)
{:ok, _, socket} = join(Channel, "foo:timeout")
close(socket, 0)

pid = socket.channel_pid
assert_receive {:EXIT, ^pid, :killed}
refute_received {:terminate, :killed}

# Closing again doesn't crash
_ = close(socket)
end
end

0 comments on commit 3155d37

Please sign in to comment.