Skip to content

Commit

Permalink
Subscription process hibernate after inactivity
Browse files Browse the repository at this point in the history
Add the `:subscription_hibernate_after` event store config setting used by subscription processes so they will automatically hibernate to save memory after a period of inactivity.

Hibernating a process will also force a garbage collection of the process' memory. As soon as a new message is received in the process mailbox the process will be resumed. This should have minimal impact on latency but can help to reduce memory leaks caused by large binaries.
  • Loading branch information
slashdotdash committed Aug 11, 2020
1 parent e1cad17 commit 73a03a9
Show file tree
Hide file tree
Showing 13 changed files with 204 additions and 31 deletions.
5 changes: 5 additions & 0 deletions guides/Getting Started.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ EventStore is [available in Hex](https://hex.pm/packages/eventstore) and can be
- `:schema` - define the Postgres schema to use (default: `public` schema).
- `:timeout` - set the default database query timeout in milliseconds (default: 15,000ms).

Subscription options:

- `:subscription_retry_interval` - interval between subscription connection retry attempts (default: 60,000ms).
- `:subscription_hibernate_after` - subscriptions will automatically hibernate to save memory after a period of inactivity (default: 15,000ms).

4. Add your event store module to the `event_stores` list for your app in mix config:

```elixir
Expand Down
1 change: 0 additions & 1 deletion guides/Subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ receive do
end
```


#### Mapping events

You can provide an event mapping function that maps each `RecordedEvent` before sending it to the subscriber:
Expand Down
23 changes: 15 additions & 8 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule EventStore do
## Defining an event store
We can define an event store in our own application as follows:
An event store module is defined in your own application as follows:
defmodule MyApp.EventStore do
use EventStore,
Expand Down Expand Up @@ -39,7 +39,8 @@ defmodule EventStore do
store to use Commanded's JSON serializer which provides additional support for
JSON decoding:
config :my_app, MyApp.EventStore, serializer: Commanded.Serialization.JsonSerializer
config :my_app, MyApp.EventStore,
serializer: Commanded.Serialization.JsonSerializer
The event store module defines a `start_link/1` function that needs to be
invoked before using the event store. In general, this function is not
Expand Down Expand Up @@ -189,11 +190,13 @@ defmodule EventStore do

serializer = Serializer.serializer(__MODULE__, config)
subscription_retry_interval = Subscriptions.retry_interval(__MODULE__, config)
subscription_hibernate_after = Subscriptions.hibernate_after(__MODULE__, config)

@otp_app otp_app
@config config
@serializer serializer
@subscription_retry_interval subscription_retry_interval
@subscription_hibernate_after subscription_hibernate_after

@all_stream "$all"
@default_batch_size 1_000
Expand Down Expand Up @@ -320,17 +323,17 @@ defmodule EventStore do
end

def subscribe_to_stream(stream_uuid, subscription_name, subscriber, opts \\ []) do
{conn, _opts} = opts(opts)
name = name(opts)
conn = conn(opts)

with {start_from, opts} <- Keyword.pop(opts, :start_from, :origin),
{:ok, start_from} <- Stream.start_from(conn, stream_uuid, start_from) do
name = name(opts)

opts =
Keyword.merge(opts,
conn: conn,
event_store: name,
serializer: @serializer,
hibernate_after: @subscription_hibernate_after,
retry_interval: @subscription_retry_interval,
stream_uuid: stream_uuid,
subscription_name: subscription_name,
Expand Down Expand Up @@ -392,14 +395,18 @@ defmodule EventStore do
end

defp opts(opts) do
name = name(opts)
conn = Module.concat([name, Postgrex])

conn = conn(opts)
timeout = timeout(opts)

{conn, [timeout: timeout, serializer: @serializer]}
end

defp conn(opts) do
name = name(opts)

Module.concat([name, Postgrex])
end

defp name(opts) do
case Keyword.get(opts, :name) do
nil ->
Expand Down
5 changes: 3 additions & 2 deletions lib/event_store/monitored_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule EventStore.MonitoredServer do
@moduledoc false

# Starts a `GenServer` process using a given module-fun-args tuple.
#
#
# Monitors the started process and attempts to restart it on terminate using
# an exponential backoff strategy. Allows interested processes to be informed
# when the process terminates.
Expand Down Expand Up @@ -32,7 +32,8 @@ defmodule EventStore.MonitoredServer do
end

def start_link(opts) do
{start_opts, monitor_opts} = Keyword.split(opts, [:name, :timeout, :debug, :spawn_opt])
{start_opts, monitor_opts} =
Keyword.split(opts, [:name, :timeout, :debug, :spawn_opt, :hibernate_after])

state = State.new(monitor_opts, start_opts)

Expand Down
6 changes: 4 additions & 2 deletions lib/event_store/notifications/broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ defmodule EventStore.Notifications.Broadcaster do
end

def start_link(opts) do
state = State.new(opts)
start_opts = Keyword.take(opts, [:name, :timeout, :debug, :spawn_opt])
{start_opts, broadcaster_opts} =
Keyword.split(opts, [:name, :timeout, :debug, :spawn_opt, :hibernate_after])

state = State.new(broadcaster_opts)

GenStage.start_link(__MODULE__, state, start_opts)
end
Expand Down
7 changes: 4 additions & 3 deletions lib/event_store/notifications/listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ defmodule EventStore.Notifications.Listener do
defstruct [:listen_to, :schema, :ref, demand: 0, queue: :queue.new()]

def start_link(opts) do
listen_to = Keyword.fetch!(opts, :listen_to)
schema = Keyword.fetch!(opts, :schema)
{start_opts, listener_opts} =
Keyword.split(opts, [:name, :timeout, :debug, :spawn_opt, :hibernate_after])

start_opts = Keyword.take(opts, [:name, :timeout, :debug, :spawn_opt])
listen_to = Keyword.fetch!(listener_opts, :listen_to)
schema = Keyword.fetch!(listener_opts, :schema)

state = %Listener{listen_to: listen_to, schema: schema}

Expand Down
6 changes: 4 additions & 2 deletions lib/event_store/notifications/reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ defmodule EventStore.Notifications.Reader do
end

def start_link(opts) do
state = State.new(opts)
start_opts = Keyword.take(opts, [:name, :timeout, :debug, :spawn_opt])
{start_opts, reader_opts} =
Keyword.split(opts, [:name, :timeout, :debug, :spawn_opt, :hibernate_after])

state = State.new(reader_opts)

GenStage.start_link(__MODULE__, state, start_opts)
end
Expand Down
27 changes: 21 additions & 6 deletions lib/event_store/notifications/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule EventStore.Notifications.Supervisor do
alias EventStore.Config
alias EventStore.MonitoredServer
alias EventStore.Notifications.{Listener, Reader, Broadcaster}
alias EventStore.Subscriptions

def child_spec({name, _serializer, _config} = init_arg) do
%{id: Module.concat(name, __MODULE__), start: {__MODULE__, :start_link, [init_arg]}}
Expand All @@ -23,10 +24,11 @@ defmodule EventStore.Notifications.Supervisor do
schema = Keyword.fetch!(config, :schema)

postgrex_config = Config.sync_connect_postgrex_opts(config)
hibernate_after = Subscriptions.hibernate_after(event_store, config)

listener_name = Module.concat([event_store, Listener])
reader_name = Module.concat([event_store, Reader])
broadcaster_name = Module.concat([event_store, Broadcaster])
listener_name = listener_name(event_store)
reader_name = reader_name(event_store)
broadcaster_name = broadcaster_name(event_store)
postgrex_listener_name = Module.concat([listener_name, Postgrex])
postgrex_reader_name = Module.concat([reader_name, Postgrex])

Expand All @@ -43,15 +45,28 @@ defmodule EventStore.Notifications.Supervisor do
mfa: {Postgrex, :start_link, [postgrex_config]}, name: postgrex_reader_name},
id: Module.concat([postgrex_reader_name, MonitoredServer])
),
{Listener, listen_to: postgrex_listener_name, schema: schema, name: listener_name},
{Listener,
listen_to: postgrex_listener_name,
schema: schema,
name: listener_name,
hibernate_after: hibernate_after},
{Reader,
conn: postgrex_reader_name,
serializer: serializer,
subscribe_to: listener_name,
name: reader_name},
{Broadcaster, event_store: event_store, subscribe_to: reader_name, name: broadcaster_name}
name: reader_name,
hibernate_after: hibernate_after},
{Broadcaster,
event_store: event_store,
subscribe_to: reader_name,
name: broadcaster_name,
hibernate_after: hibernate_after}
],
strategy: :one_for_all
)
end

def broadcaster_name(event_store), do: Module.concat([event_store, Broadcaster])
def listener_name(event_store), do: Module.concat([event_store, Listener])
def reader_name(event_store), do: Module.concat([event_store, Reader])
end
33 changes: 33 additions & 0 deletions lib/event_store/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,37 @@ defmodule EventStore.Subscriptions do
" config. Expected an integer in milliseconds but got: " <> inspect(invalid)
end
end

@doc """
Get the inactivity period, in milliseconds, after which a subscription process
will be automatically hibernated.
From Erlang/OTP 20, subscription processes will automatically hibernate to
save memory after `15_000` milliseconds of inactivity. This can be changed by
configuring the `:subscription_hibernate_after` option for the event store
module.
You can also set it to `:infinity` to fully disable it.
"""
def hibernate_after(event_store, config) do
case Keyword.get(config, :subscription_hibernate_after) do
interval when is_integer(interval) and interval >= 0 ->
interval

:infinity ->
:infinity

nil ->
# Default to 15 seconds when not configured
15_000

invalid ->
raise ArgumentError,
message:
"Invalid `:subscription_hibernate_after` setting in " <>
inspect(event_store) <>
" config. Expected an integer (in milliseconds) or `:infinity` but got: " <>
inspect(invalid)
end
end
end
8 changes: 5 additions & 3 deletions lib/event_store/subscriptions/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@ defmodule EventStore.Subscriptions.Subscription do
:retry_interval
]

def start_link(opts \\ []) do
{start_opts, subscription_opts} = Keyword.split(opts, [:name, :timeout, :debug, :spawn_opt])
def start_link(opts) do
{start_opts, subscription_opts} =
Keyword.split(opts, [:name, :timeout, :debug, :spawn_opt, :hibernate_after])

stream_uuid = Keyword.fetch!(subscription_opts, :stream_uuid)
subscription_name = Keyword.fetch!(subscription_opts, :subscription_name)
retry_interval = Keyword.fetch!(subscription_opts, :retry_interval)

state = %Subscription{
stream_uuid: stream_uuid,
subscription_name: subscription_name,
subscription: SubscriptionFsm.new(stream_uuid, subscription_name, subscription_opts),
retry_interval: Keyword.fetch!(subscription_opts, :retry_interval)
retry_interval: retry_interval
}

GenServer.start_link(__MODULE__, state, start_opts)
Expand Down
7 changes: 3 additions & 4 deletions lib/event_store/subscriptions/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ defmodule EventStore.Subscriptions.Supervisor do

supervisor = Module.concat(event_store, __MODULE__)

name = {:via, Registry, registry_name(event_store, stream_uuid, subscription_name)}
opts = Keyword.put(opts, :name, name)
spec = {Subscription, opts}
via_name = {:via, Registry, registry_name(event_store, stream_uuid, subscription_name)}
opts = Keyword.put(opts, :name, via_name)

DynamicSupervisor.start_child(supervisor, spec)
DynamicSupervisor.start_child(supervisor, {Subscription, opts})
end

def unsubscribe_from_stream(event_store, stream_uuid, subscription_name) do
Expand Down
57 changes: 57 additions & 0 deletions test/notifications/notifications_supervisor_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
defmodule EventStore.Notifications.NotificationsSupervisorTest do
use EventStore.StorageCase

alias EventStore.{EventFactory, Notifications, PubSub, Serializer, Wait}
alias TestEventStore, as: EventStore

describe "notifications supervisor" do
setup do
config = TestEventStore.config() |> Keyword.put(:subscription_hibernate_after, 0)
serializer = Serializer.serializer(TestEventStore, config)

start_supervised!({Notifications.Supervisor, {ES, serializer, config}})
for child_spec <- PubSub.child_spec(ES), do: start_supervised!(child_spec)

:ok
end

test "hibernate processes after inactivity" do
listener_pid = Notifications.Supervisor.listener_name(ES) |> Process.whereis()
reader_name_pid = Notifications.Supervisor.reader_name(ES) |> Process.whereis()
broadcaster_pid = Notifications.Supervisor.broadcaster_name(ES) |> Process.whereis()

# Listener processes should be hibernated after inactivity
Wait.until(fn ->
assert_hibernated(listener_pid)
assert_hibernated(reader_name_pid)
assert_hibernated(broadcaster_pid)
end)

stream_uuid = "example-stream"

:ok = PubSub.subscribe(ES, stream_uuid)

# Appending events to the event store should resume listener processes
:ok = append_events(stream_uuid, 3)

assert_receive {:events, events}

# Listener processes should be hibernated again after inactivity
Wait.until(fn ->
assert_hibernated(listener_pid)
assert_hibernated(reader_name_pid)
assert_hibernated(broadcaster_pid)
end)
end
end

defp assert_hibernated(pid) do
assert Process.info(pid, :current_function) == {:current_function, {:erlang, :hibernate, 3}}
end

defp append_events(stream_uuid, count, expected_version \\ 0) do
events = EventFactory.create_events(count, expected_version)

EventStore.append_to_stream(stream_uuid, expected_version, events)
end
end
Loading

0 comments on commit 73a03a9

Please sign in to comment.