Skip to content

Commit

Permalink
Merge branch 'cm-channel-processes'
Browse files Browse the repository at this point in the history
  • Loading branch information
chrismccord committed Mar 20, 2015
2 parents 2f1031c + 42e88af commit 1d94097
Show file tree
Hide file tree
Showing 14 changed files with 563 additions and 338 deletions.
6 changes: 3 additions & 3 deletions lib/phoenix/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ defmodule Phoenix.Channel do
Phoenix.Channel.broadcast_from(socket.pubsub_server, socket, event, msg)
end
def broadcast_from(pubsub_server, socket = %Socket{}, event, message) do
broadcast_from(pubsub_server, socket.pid, socket.topic, event, message)
broadcast_from(pubsub_server, self, socket.topic, event, message)
{:ok, socket}
end
def broadcast_from(pubsub_server, from, topic, event, message) when is_map(message) do
Expand All @@ -238,7 +238,7 @@ defmodule Phoenix.Channel do
Phoenix.Channel.broadcast_from!(socket.pubsub_server, socket, event, msg)
end
def broadcast_from!(pubsub_server, socket = %Socket{}, event, message) do
broadcast_from!(pubsub_server, socket.pid, socket.topic, event, message)
broadcast_from!(pubsub_server, self, socket.topic, event, message)
{:ok, socket}
end
def broadcast_from!(pubsub_server, from, topic, event, message) when is_map(message) do
Expand All @@ -254,7 +254,7 @@ defmodule Phoenix.Channel do
Sends Dict, JSON serializable message to socket.
"""
def reply(socket, event, message) when is_map(message) do
send socket.pid, {:socket_reply, %Message{
send socket.transport_pid, {:socket_reply, %Message{
topic: socket.topic,
event: event,
payload: message
Expand Down
93 changes: 93 additions & 0 deletions lib/phoenix/channel/server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
defmodule Phoenix.Channel.Server do
use GenServer
alias Phoenix.PubSub

@moduledoc """
Handles `%Phoenix.Socket{}` state and invokes channel callbacks.
## handle_info/2
Regular Elixir messages are forwarded to the socket channel's
`handle_info/2` callback.
"""

def start_link(socket, auth_payload) do
GenServer.start_link(__MODULE__, [socket, auth_payload])
end

@doc """
Initializes the Socket server for `Phoenix.Channel` joins.
To start the server, return `{:ok, socket}`.
To ignore the join request, return `:ignore`
Any other result will exit with `:badarg`
See `Phoenix.Channel.join/3` documentation.
"""
def init([socket, auth_payload]) do
case socket.channel.join(socket.topic, auth_payload, socket) do
{:ok, socket} ->
{:ok, socket}
PubSub.subscribe(socket.pubsub_server, self, socket.topic, link: true)

{:ok, socket}

:ignore -> :ignore

result ->
{:stop, {:badarg, result}}
end
end

def handle_cast({:handle_in, "leave", payload}, socket) do
leave_and_stop(payload, socket)
end

@doc """
Forwards incoming client messages through `handle_in/3` callbacks
"""
def handle_cast({:handle_in, event, payload}, socket) when event != "join" do
event
|> socket.channel.handle_in(payload, socket)
|> handle_result
end

@doc """
Forwards broadcast through `handle_out/3` callbacks
"""
def handle_info({:socket_broadcast, msg}, socket) do
msg.event
|> socket.channel.handle_out(msg.payload, socket)
|> handle_result
end

@doc """
Forwards regular Elixir messages through `handle_info/2` callbacks
"""
def handle_info(msg, socket) do
msg
|> socket.channel.handle_info(socket)
|> handle_result
end


defp handle_result({:ok, socket}), do: {:noreply, socket}
defp handle_result({:leave, socket}), do: leave_and_stop(:normal, socket)
defp handle_result({:error, reason, socket}) do
{:stop, {:error, reason}, socket}
end
defp handle_result(result) do
raise """
Expected callback to return `{:ok, socket} | {:error, reason, socket} || {:leave, socket}`,
got #{inspect result}
"""
end

defp leave_and_stop(reason, socket) do
{:ok, socket} = socket.channel.leave(reason, socket)

PubSub.unsubscribe(socket.pubsub_server, self, socket.topic)

{:stop, :normal, :left}
end
end
25 changes: 25 additions & 0 deletions lib/phoenix/channel/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defmodule Phoenix.Channel.Supervisor do

@moduledoc false

use Supervisor

def start_link() do
Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end

def start_child(%Phoenix.Socket{} = socket, auth_payload) do
Supervisor.start_child(__MODULE__, [socket, auth_payload])
end

def terminate_child(child) do
Supervisor.terminate_child(__MODULE__, child)
end

def init(:ok) do
children = [
worker(Phoenix.Channel.Server, [], restart: :temporary)
]
supervise(children, strategy: :simple_one_for_one)
end
end
165 changes: 36 additions & 129 deletions lib/phoenix/channel/transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ defmodule Phoenix.Channel.Transport do
require Logger
alias Phoenix.Socket
alias Phoenix.Socket.Message
alias Phoenix.PubSub


@moduledoc """
Expand All @@ -20,14 +19,14 @@ defmodule Phoenix.Channel.Transport do
* Handle receiving incoming, encoded `%Phoenix.Socket.Message{}`'s from
remote clients, then deserialing and fowarding message through
`Phoenix.Transport.dispatch/2`. Finish by keeping state of returned
HashDict of `%Phoenix.Socket{}`s. Message keys must be deserialized as strings.
`Phoenix.Transport.dispatch/2`. Message keys must be deserialized as strings.
* Handle receiving outgoing `{:socket_reply, %Phoenix.Socket.Message{}}` as
Elixir process messages, then encoding and fowarding to remote client.
* Handle receiving outgoing `{:socket_broadcast, %Phoenix.Socket.Message{}}` as
Elixir process messages, then forwarding message through
`Phoenix.Transport.dispatch_broadcast/2`. Finish by keeping state of returned
HashDict of `%Phoenix.Socket{}`s.
* Handle receiving `{:put_socket, topic, socket_pid}` messages and storing a
HashDict of a string topics to Pid matches. The HashDict of topic => pids
is dispatched through the transport layer's `Phoenix.Transport.dispatch/2`.
* Handle receiving `{:delete_socket, topic}` messages and delete the entry
from the kept HashDict of socket processes.
* Handle remote client disconnects and relaying event through
`Phoenix.Transport.dispatch_leave/2`
Expand All @@ -45,61 +44,22 @@ defmodule Phoenix.Channel.Transport do
See `web/static/js/phoenix.js` for an example transport client implementation.
"""

defmodule InvalidReturn do
defexception [:message]
def exception(msg) do
%InvalidReturn{message: "Invalid Handler return: #{inspect msg}"}
end
end


@doc """
Dispatches `%Phoenix.Socket.Message{}` to Channel. All serialized, remote client messages
should be deserialized and forwarded through this function by adapters.
The following return signatures must be handled by transport adapters:
* `{:ok, sockets}` - Successful dispatch, with updated `HashDict` of sockets
* `{:error, reason, sockets}` - Failed dispatched with updatd `HashDict` of sockets
* `{:ok, socket_pid}` - Successful dispatch, with pid of new socket
* `{:error, reason}` - Failed dispatch
* `:ignore` - Unauthorized or unmatched dispatch
The returned `HashDict` of `%Phoenix.Socket{}`s must be held by the adapter
"""
def dispatch(msg = %Message{}, sockets, adapter_pid, router, pubsub_server, transport) do
socket = %Socket{pid: adapter_pid,
router: router,
pubsub_server: pubsub_server,
topic: msg.topic,
transport: transport}

def dispatch(%Message{} = msg, sockets, transport_pid, router, pubsub_server, transport) do
sockets
|> HashDict.get(msg.topic, socket)
|> dispatch(msg.topic, msg.event, msg.payload)
|> transport_response(sockets)
end
defp transport_response({:ok, socket}, sockets) do
{:ok, HashDict.put(sockets, socket.topic, socket)}
end
defp transport_response({:leave, socket}, sockets) do
{:ok, HashDict.delete(sockets, socket.topic)}
end
defp transport_response({:heartbeat, _socket}, sockets) do
{:ok, sockets}
end
defp transport_response(:ignore, sockets) do
{:ok, sockets}
end
defp transport_response({:error, reason, %Socket{} = socket}, sockets) do
Logger.error fn ->
"""
Crashed dispatching topic \"#{inspect socket.topic}\" to #{inspect(socket.channel || socket.router)}
Reason: #{inspect(reason)}
Router: #{inspect(socket.router)}
State: #{inspect(socket)}
"""
end
{:error, reason, HashDict.delete(sockets, socket.topic)}
|> HashDict.get(msg.topic)
|> dispatch(msg.topic, msg.event, msg.payload, transport_pid, router, pubsub_server, transport)
end


@doc """
Dispatches `%Phoenix.Socket.Message{}` in response to a heartbeat message sent from the client.
Expand All @@ -111,87 +71,35 @@ defmodule Phoenix.Channel.Transport do
The server will respond to heartbeats with the same message
"""
def dispatch(socket, "phoenix", "heartbeat", _msg) do
send socket.pid, {:socket_reply, %Message{topic: "phoenix", event: "heartbeat", payload: %{}}}

{:heartbeat, socket}
def dispatch(_, "phoenix", "heartbeat", _payload, transport_pid, _router, _pubsub_server, _transport) do
send transport_pid, {:socket_reply, %Message{topic: "phoenix", event: "heartbeat", payload: %{}}}
end
def dispatch(socket, topic, "join", msg) do
case socket.router.channel_for_topic(topic, socket.transport) do
nil ->
Logger.debug fn -> "Ignoring unmatched topic \"#{socket.topic}\" in #{inspect(socket.router)}" end
handle_result(:ignore, "join")
def dispatch(nil, topic, "join", payload, transport_pid, router, pubsub_server, transport) do
case router.channel_for_topic(topic, transport) do
nil -> log_ignore(topic, router)
channel ->
topic
|> channel.join(msg, Socket.put_channel(socket, channel))
|> handle_result("join")
socket = %Socket{transport_pid: transport_pid,
router: router,
pubsub_server: pubsub_server,
topic: topic,
channel: channel,
transport: transport}

Phoenix.Channel.Server.start_link(socket, payload)
end
end
def dispatch(socket, topic, event, msg) do
if Socket.authorized?(socket, topic) do
case event do
"leave" -> socket.channel.leave(msg, socket)
event -> socket.channel.handle_in(event, msg, socket)
end |> handle_result(event)
else
handle_result({:error, :unauthenticated, socket}, event)
end
def dispatch(nil, topic, _event, _payload, _adapter_pid, router, _pubsub_server, _transport) do
log_ignore(topic, router)
:ignore
end

defp handle_result({:ok, %Socket{} = socket}, "join") do
PubSub.subscribe(socket.pubsub_server, socket.pid, socket.topic, link: true)
{:ok, Socket.authorize(socket, socket.topic)}
end
defp handle_result({:ok, %Socket{} = socket}, "leave") do
PubSub.unsubscribe(socket.pubsub_server, socket.pid, socket.topic)
{:leave, Socket.deauthorize(socket)}
end
defp handle_result({:ok, %Socket{} = socket}, _event) do
{:ok, socket}
end
defp handle_result(:ignore, "join"), do: :ignore
defp handle_result({:leave, %Socket{} = socket}, event)
when not event in ["join", "leave"] do

socket.channel.leave(%{reason: :leave}, socket) |> handle_result("leave")
end
defp handle_result({:error, reason, %Socket{} = socket}, _event) do
{:error, reason, socket}
end
defp handle_result(bad_return, event) when event == "join" do
raise InvalidReturn, message: """
expected `join` to return `{:ok, %Socket{}} | :ignore | {:error, reason, socket}` got `#{inspect bad_return}`
"""
end
defp handle_result(bad_return, event) when event == "leave" do
raise InvalidReturn, message: """
expected `leave` to return `{:ok, %Socket{}} | {:error, reason, %Socket{}}` got `#{inspect bad_return}`
"""
end
defp handle_result(bad_return, event) do
raise InvalidReturn, message: """
expected `#{event}` to return `{:ok, %Socket{}} | {:leave, %Socket{}} | {:error, reason, %Socket{}}` got `#{inspect bad_return}`
"""
def dispatch(socket_pid, _topic, event, payload, _adapter_pid, _router, _pubsub_server, _transport) do
GenServer.cast(socket_pid, {:handle_in, event, payload})
:ok
end

@doc """
When an Adapter receives `{:socket_broadcast, %Message{}}`, it dispatches to this
function with its socket state.
The message is routed to the intended channel's outgoing/3 callback.
"""
def dispatch_broadcast(sockets, %Message{event: event, payload: payload} = msg) do
sockets
|> HashDict.get(msg.topic)
|> case do
nil ->
{:ok, sockets}
socket ->
event
|> socket.channel.handle_out(payload, socket)
|> handle_result(event)
|> transport_response(sockets)
end
defp log_ignore(topic, router) do
Logger.debug fn -> "Ignoring unmatched topic \"#{topic}\" in #{inspect(router)}" end
:ignore
end

@doc """
Expand All @@ -201,10 +109,9 @@ defmodule Phoenix.Channel.Transport do
Most adapters shutdown after this dispatch as they client has disconnected
"""
def dispatch_leave(sockets, reason) do
Enum.each sockets, fn {_, socket} ->
socket.channel.leave(%{reason: reason}, socket) |> handle_result("leave")
Enum.each sockets, fn {_, socket_pid} ->
GenServer.cast(socket_pid, {:handle_in, "leave", reason})
end
:ok
end

@doc """
Expand Down
Loading

0 comments on commit 1d94097

Please sign in to comment.