Skip to content
This repository has been archived by the owner on Jan 29, 2019. It is now read-only.

Commit

Permalink
Remove connect and disconnect from the topology
Browse files Browse the repository at this point in the history
  • Loading branch information
José Valim committed Dec 10, 2018
1 parent 0f3e7b8 commit 75ecb36
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 138 deletions.
36 changes: 0 additions & 36 deletions lib/firenest/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ defmodule Firenest.Topology do
The topology is the building block in Firenest. It specifies:
* How nodes are connected and discovered
* How failures are handled (temporary and permanent)
* How messages are sent across nodes
* How messages are broadcast in the cluster
Expand Down Expand Up @@ -72,16 +71,6 @@ defmodule Firenest.Topology do
"""
@callback send(t(), node_ref(), name(), plane(), message :: term()) :: :ok | {:error, term()}

@doc """
Asks the topology to connect to the given node.
"""
@callback connect(t(), node()) :: true | false | :ignore

@doc """
Asks the topology to disconnect from the given node.
"""
@callback disconnect(t(), node()) :: true | false | :ignore

@doc """
Syncs the given `pid` across the topology using its name.
"""
Expand Down Expand Up @@ -168,31 +157,6 @@ defmodule Firenest.Topology do
adapter!(topology).send(topology, node_ref, name, :default, message)
end

@doc """
Asks the topology to connect to the given node.
It returns `true` in case of success (or if the node is already
connected), `false` in case of failure and `:ignored` if the node
is not online or if the operation is not supported.
"""
@spec connect(t(), node()) :: true | false | :ignored
def connect(topology, node) when is_atom(topology) and is_atom(node) do
adapter!(topology).connect(topology, node)
end

@doc """
Asks the topology to disconnect from the given node.
It returns `true` if the nodes are no longer connected. This
means it will also return `true` if nodes were never connected in
the first place. It returns `:ignored` if the node is not online
or if the operation is not supported.
"""
@spec disconnect(t(), node()) :: true | false | :ignored
def disconnect(topology, node) when is_atom(topology) and is_atom(node) do
adapter!(topology).disconnect(topology, node)
end

@doc """
Syncs the given `pid` across the topology using its name.
Expand Down
72 changes: 4 additions & 68 deletions lib/firenest/topology/erlang.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,42 +32,6 @@ defmodule Firenest.Topology.Erlang do

defdelegate child_spec(opts), to: Firenest.Topology.Erlang.Server

def connect(topology, node) do
fn ->
ref = subscribe(topology, self())

case :net_kernel.connect_node(node) do
true -> node_connected?(topology, node) or wait_until({:nodeup, ref, node})
false -> false
:ignored -> :ignored
end
end
|> Task.async()
|> Task.await(:infinity)
end

def disconnect(topology, node) do
fn ->
ref = subscribe(topology, self())

case node_connected?(topology, node) and :erlang.disconnect_node(node) do
true -> wait_until({:nodedown, ref, node})
false -> false
:ignored -> :ignored
end
end
|> Task.async()
|> Task.await(:infinity)
end

defp wait_until(msg) do
receive do
^msg -> true
after
@timeout -> false
end
end

def broadcast(topology, name, :default, message) do
topology |> node_names() |> Enum.each(&Process.send({name, &1}, message, [:noconnect]))
end
Expand Down Expand Up @@ -103,22 +67,10 @@ defmodule Firenest.Topology.Erlang do
:ets.lookup_element(topology, :nodes, 3)
end

defp node_connected?(topology, node) do
ms = [{{{:connected, {node, :_}}, :"$1"}, [], [:"$1"]}]
:ets.select_count(topology, ms) == 1
end

defp node_ref_connected?(topology, node_ref) do
ms = [{{{:connected, node_ref}, :"$1"}, [], [:"$1"]}]
:ets.select_count(topology, ms) == 1
end

# Subscribing to the topology events is private right now,
# we can make it public if necessary but sync_named/3 should
# be enough for all purposes.
defp subscribe(topology, pid) do
GenServer.call(topology, {:subscribe, pid})
end
end

defmodule Firenest.Topology.Erlang.Server do
Expand Down Expand Up @@ -160,7 +112,6 @@ defmodule Firenest.Topology.Erlang.Server do
monitors: %{},
nodes: %{},
local_names: %{},
subscribers: %{},
topology: topology
}

Expand All @@ -170,12 +121,6 @@ defmodule Firenest.Topology.Erlang.Server do

## Local messages

def handle_call({:subscribe, pid}, _from, state) do
ref = Process.monitor(pid)
state = put_in(state.subscribers[ref], pid)
{:reply, ref, state}
end

# Receives the sync monitor command from a local process and broadcast
# this monitor is up in all known instances of this topology.
def handle_call({:sync_named, pid, name}, _from, state) do
Expand Down Expand Up @@ -213,12 +158,8 @@ defmodule Firenest.Topology.Erlang.Server do

def handle_info({:DOWN, ref, _, _, _}, %{monitors: monitors} = state) do
case monitors do
%{^ref => _} ->
{:noreply, remove_dead_monitor(state, ref)}

%{} ->
{_, state} = pop_in(state.subscribers[ref])
{:noreply, state}
%{^ref => _} -> {:noreply, remove_dead_monitor(state, ref)}
%{} -> {:noreply, state}
end
end

Expand Down Expand Up @@ -385,16 +326,14 @@ defmodule Firenest.Topology.Erlang.Server do
%{
topology: topology,
nodes: nodes,
local_names: local_names,
subscribers: subscribers
local_names: local_names
} = state

# Add the node, notify the node, notify the services.
nodes = Map.put(nodes, node, {id, clock, Process.monitor(pid), remote_names})
node_ref = {node, id}
persist_nodes_adding(topology, nodes, node_ref)

_ = for {ref, pid} <- subscribers, do: send(pid, {:nodeup, ref, node})
_ = for {_, name} <- monitors, do: local_monitor_up(local_names, node_ref, name)
%{state | nodes: nodes}
end
Expand All @@ -403,8 +342,7 @@ defmodule Firenest.Topology.Erlang.Server do
%{
topology: topology,
nodes: nodes,
local_names: local_names,
subscribers: subscribers
local_names: local_names
} = state

# Notify the services, remove the node, notify the node.
Expand All @@ -413,8 +351,6 @@ defmodule Firenest.Topology.Erlang.Server do
_ = for {name, _} <- remote_names, do: local_monitor_down(local_names, node_ref, name)

persist_nodes_removing(topology, nodes, node_ref)

_ = for {ref, pid} <- subscribers, do: send(pid, {:nodedown, ref, node})
%{state | nodes: nodes}
end

Expand Down
37 changes: 3 additions & 34 deletions test/shared/topology_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -110,38 +110,6 @@ defmodule Firenest.TopologyTest do
end
end

describe "connection" do
@describetag :connection

@node :"[email protected]"
test "may be set and managed explicitly", %{topology: topology} do
# No node yet
refute T.disconnect(topology, @node)
refute @node in Keyword.keys(T.nodes(topology))

# Start the node but not firenest
Firenest.Test.spawn_nodes([@node])
refute @node in Keyword.keys(T.nodes(topology))

# Finally start firenest
Firenest.Test.start_firenest([@node], adapter: T.adapter!(topology))
assert T.connect(topology, @node)
assert @node in Keyword.keys(T.nodes(topology))

# Connect should still return true
assert T.connect(topology, @node)

# Now let's diconnect
assert T.disconnect(topology, @node)
refute @node in Keyword.keys(T.nodes(topology))

# And we can't connect it back because it is permanently down
refute T.connect(topology, @node)
after
T.disconnect(topology, @node)
end
end

describe "sync_named/2" do
@describetag :sync_named

Expand Down Expand Up @@ -208,8 +176,9 @@ defmodule Firenest.TopologyTest do
start_sync_named_on(topology, node_ref, evaluator, test)
assert_receive {:named_up, ^node_ref, ^test}

# And now let's disconnect from it
assert T.disconnect(topology, @node)
# And now let's shut it down
cmd = quote do: System.halt(0)
T.send(topology, node_ref, evaluator, {:eval_quoted, cmd})
assert_receive {:named_down, ^node_ref, ^test}

# And now let's kill the named process running on third
Expand Down

0 comments on commit 75ecb36

Please sign in to comment.