Skip to content

Commit

Permalink
Add (dis)connect hook options, document pooling (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
kristofka authored Dec 10, 2022
1 parent 36397b1 commit 45b25c1
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 3 deletions.
163 changes: 163 additions & 0 deletions guides/pooling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# Pooling

While "checkout" or "exclusive" pools are not recommended for HTTP/2
connections, so-called "routing" pools may lead to better performances
by balancing the calls between multiple connections. A more detailed
explanation of routing pools in Elixir can be found in Andrea Leopardi's
blog post [Process pools with Elixir's Registry](https://andrealeopardi.com/posts/process-pools-with-elixirs-registry/).

Since it's difficult to provide a one-size-fits-all routing strategy,
`Spear` doesn't provide such a pooling mechanism out of the box.
However,thanks to the `:on_connect` and `:on_disconnect` configuration
options passed to `Spear.Connection.start_link`, it's fairly easy to
implement your own routing pool.

This guide provides an example of a simple round-robin routing pool
inspired by the aforementioned blog post.

### Writing the pool

First let's write a few utility functions that will allow us to keep
track of the connections in the pool :

```elixir
defmodule MyApp.Spear.Pool.PoolUtils do

def init() do
# Create a `persistent_term` that holds an atomic counter if it
# doesn't already exist
maybe_create_persistent_term()
end

# Increment and read our counter
def read_and_increment() do
counter_ref = :persistent_term.get(__MODULE__)
:atomics.add_get(counter_ref, 1, 1)
end

# Create a fun that we'll pass to `:on_connect`
def on_connect_fun(registry_name, key, value \\ nil) do
# The fun will be called in the context of the connection which is
# what we want
fn -> Registry.register(registry_name, key, value) end
end

# Create a fun that we'll pass to `:on_disconnect`
def on_disconnect_fun(registry_name, key) do
fn -> Registry.unregister(registry_name, key) end
end

defp maybe_create_persistent_term() do
case :persistent_term.get(__MODULE__, nil) do
counter when is_reference(counter) ->
# We don't really care if we start at the 2nd connection, since it's simple round robin
:atomics.put(counter, 1, 0)
:ok

_ ->
counter = :atomics.new(1, signed: false)
:atomics.put(counter, 1, 0)
:persistent_term.put(__MODULE__, counter)
end
end
end
```

Now, we can add a `Supervisor` that will manage our registry and our
connections pool :

```elixir
defmodule MyApp.Spear.Pool.Supervisor do
use Supervisor

def start_link(args) do
Supervisor.start_link(__MODULE__, args)
end

@impl Supervisor
def init(args) do
init_args = Keyword.get(args, :connection_args)
num_members = Keyword.get(args, :num_members, 1)

connection_args =
Keyword.merge(init_args,
on_connect: MyApp.Spear.Pool.PoolUtils.on_connect_fun(MyApp.SpearPoolRegistry, :connections),
on_disconnect: MyApp.Spear.Pool.PoolUtils.on_disconnect_fun(MyApp.SpearPoolRegistry, :connections)
)

connections_specs =
for index <- 1..num_members do
Supervisor.child_spec({Spear.Connection, connection_args}, id: {Spear.Connection, index})
end

connections_supervisor_spec = %{
id: :connections_supervisor,
type: :supervisor,
start: {Supervisor, :start_link, [connections_specs, [strategy: :one_for_one]]}
}

MyApp.Spear.Pool.PoolUtils.init()

children = [
{Registry, name: MyApp.SpearPoolRegistry, keys: :duplicate},
connections_supervisor_spec
]

Supervisor.init(children, strategy: :rest_for_one)
end

def get_conn() do
connections = Registry.lookup(MyApp.SpearPoolRegistry, :connections)
next_index = MyApp.Spear.Pool.PoolUtils.read_and_increment()

# We get the connection in the list at the incremented index, modulo
# the number of connections in the list (so that we wrap around).
{pid, _value = nil} = Enum.at(connections, rem(next_index, length(connections)))
{:ok, pid}
end
end
```

### Using the pool

After starting the pool (usually through your app's supervision tree) :

```elixir
{:ok, pool} =
MyApp.Spear.Pool.Supervisor.start_link(
num_members: 10,
connection_args: [connection_string: "esdb://admin:[email protected]:2113"]
)
```

You can obtain a `Spear.Connection` from the pool :

```elixir
{:ok, conn} = MyApp.Spear.Pool.Supervisor.get_conn()
```

And use it with the `Spear` module :

```elixir
e = Spear.Event.new("test-event", %{title: "some title"})
Spear.append([e], conn, "stream1")
```

### Caveats

HTTP/2 and Mint, thus Spear, allow to process efficiently multiple requests
on a single connection. A routing pool could actually degrade your app's
performances. Therefore, when using such a pool, it is important to
carefully benchmark the results for your use-case, maybe with different
routing strategies.

As a rule of thumb, a routing pool might be beneficial when you perform
many concurrent reads (eg through `Spear.stream!/3`) or many concurrent
writes of large events/list of events. In any case, benchmarking is
always recommended.






15 changes: 15 additions & 0 deletions lib/spear/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ defmodule Spear.Connection do
def connect(_, s) do
case do_connect(s.config) do
{:ok, conn} ->
run_function(s.config.on_connect)

{:ok, %__MODULE__{s | conn: conn, keep_alive_timer: KeepAliveTimer.start(s.config)}}

{:error, _reason} ->
Expand All @@ -190,6 +192,8 @@ defmodule Spear.Connection do
keep_alive_timer: KeepAliveTimer.clear(s.keep_alive_timer)
}

run_function(s.config.on_disconnect)

case info do
{:close, from} ->
Connection.reply(from, {:ok, :closed})
Expand Down Expand Up @@ -486,4 +490,15 @@ defmodule Spear.Connection do
end

defp read_only_check(_request, _s), do: :ok

defp run_function(f) when is_function(f, 0), do: f.()

defp run_function({m, f, args}) when is_list(args) do
case function_exported?(m, f, Enum.count(args)) do
true -> apply(m, f, args)
_ -> nil
end
end

defp run_function(_), do: nil
end
14 changes: 12 additions & 2 deletions lib/spear/connection/configuration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ defmodule Spear.Connection.Configuration do
be limited to read-only functionality. The list of read-only APIs can be
queried with `Spear.Connection.read_apis/0`
* `:on_connect` - (default: `nil`) a 0-arity fun or MFA to be called when the
connection is established.
* `:on_disconnect` - (default: `nil`) a 0-arity fun or MFA to be called when the
connection is lost.
See the `Spear.Connection` module docs for more information about keep-alive.
"""
@moduledoc since: "0.2.0"
Expand All @@ -72,7 +78,9 @@ defmodule Spear.Connection.Configuration do
mint_opts: Keyword.t(),
valid?: boolean(),
errors: Keyword.t(),
read_only?: boolean()
read_only?: boolean(),
on_connect: fun() | {module(), atom(), [any()]} | nil,
on_disconnect: fun() | {module(), atom(), [any()]} | nil
}

defstruct scheme: :http,
Expand All @@ -86,7 +94,9 @@ defmodule Spear.Connection.Configuration do
mint_opts: [],
valid?: true,
errors: [],
read_only?: false
read_only?: false,
on_connect: nil,
on_disconnect: nil

@doc false
def credentials(%__MODULE__{username: username, password: password}) do
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ defmodule Spear.MixProject do
"guides/writing_events.md",
"guides/streams.md",
"guides/link_resolution.md",
"guides/security.md"
"guides/security.md",
"guides/pooling.md"
],
groups_for_extras: [
Guides: Path.wildcard("guides/*.md")
Expand Down
64 changes: 64 additions & 0 deletions test/spear/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,70 @@ defmodule Spear.ConnectionTest do
assert Spear.ping(conn) == :pong
end

test "a connection started with a 0-arity :on_connect fun invokes the fun at each connection" do
my_pid = self()

f = fn -> send(my_pid, :fun_invoked) end
config = [{:on_connect, f} | @good_config]

conn = start_supervised!({Spear.Connection, config})

assert_receive(:fun_invoked)
assert Connection.call(conn, :close) == {:ok, :closed}
assert Connection.cast(conn, :connect) == :ok
assert_receive(:fun_invoked)
end

test "a connection started with a 0-arity :on_disconnect fun invokes the fun at each disconnection" do
my_pid = self()

f = fn -> send(my_pid, :fun_invoked) end
config = [{:on_disconnect, f} | @good_config]

conn = start_supervised!({Spear.Connection, config})

assert Connection.call(conn, :close) == {:ok, :closed}
assert_receive(:fun_invoked)
assert Connection.cast(conn, :connect) == :ok
assert Connection.call(conn, :close) == {:ok, :closed}
assert_receive(:fun_invoked)
end

test "a connection started with a :on_connect MFA invokes it at each connection" do
defmodule MfaTest do
def send_me(pid), do: send(pid, :mfa_invoked)
end

my_pid = self()

config = [{:on_connect, {MfaTest, :send_me, [my_pid]}} | @good_config]

conn = start_supervised!({Spear.Connection, config})

assert_receive(:mfa_invoked)
assert Connection.call(conn, :close) == {:ok, :closed}
assert Connection.cast(conn, :connect) == :ok
assert_receive(:mfa_invoked)
end

test "a connection started with a :on_disconnect MFA invokes it at each disconnection" do
defmodule MfaTest do
def send_me(pid), do: send(pid, :mfa_invoked)
end

my_pid = self()

config = [{:on_disconnect, {MfaTest, :send_me, [my_pid]}} | @good_config]

conn = start_supervised!({Spear.Connection, config})

assert Connection.call(conn, :close) == {:ok, :closed}
assert_receive(:mfa_invoked)
assert Connection.cast(conn, :connect) == :ok
assert Connection.call(conn, :close) == {:ok, :closed}
assert_receive(:mfa_invoked)
end

test "a connection can noop random info messages" do
conn = start_supervised!({Spear.Connection, @good_config})

Expand Down

0 comments on commit 45b25c1

Please sign in to comment.