Skip to content

Commit

Permalink
refactor: Kadabra.Config struct
Browse files Browse the repository at this point in the history
  • Loading branch information
hpopp committed May 2, 2018
1 parent a094cfd commit 3de4248
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 131 deletions.
19 changes: 19 additions & 0 deletions lib/config.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Kadabra.Config do
@moduledoc false

defstruct client: nil,
supervisor: nil,
ref: nil,
uri: nil,
socket: nil,
opts: []

@type t :: %__MODULE__{
client: pid,
supervisor: pid,
ref: term,
uri: URI.t(),
socket: pid,
opts: Keyword.t()
}
end
120 changes: 52 additions & 68 deletions lib/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@ defmodule Kadabra.Connection do
@moduledoc false

defstruct buffer: "",
client: nil,
config: nil,
flow_control: nil,
opts: [],
ref: nil,
socket: nil,
supervisor: nil,
uri: nil,
queue: nil

use GenStage
require Logger

alias Kadabra.{
Config,
Connection,
ConnectionQueue,
Encodable,
Expand All @@ -26,7 +22,7 @@ defmodule Kadabra.Connection do
StreamSupervisor
}

alias Kadabra.Connection.Socket
alias Kadabra.Connection.{FlowControl, Socket}

alias Kadabra.Frame.{
Continuation,
Expand All @@ -41,12 +37,8 @@ defmodule Kadabra.Connection do

@type t :: %__MODULE__{
buffer: binary,
client: pid,
flow_control: term,
opts: Keyword.t(),
ref: reference,
socket: sock,
uri: URI.t()
config: term,
flow_control: term
}

@type sock :: {:sslsocket, any, pid | {any, any}}
Expand All @@ -62,38 +54,33 @@ defmodule Kadabra.Connection do
| WindowUpdate.t()
| Continuation.t()

def start_link(uri, pid, sup, ref, opts \\ []) do
def start_link(%Config{supervisor: sup} = config) do
name = via_tuple(sup)
start_opts = {:ok, uri, pid, sup, ref, opts}
GenStage.start_link(__MODULE__, start_opts, name: name)
GenStage.start_link(__MODULE__, config, name: name)
end

def via_tuple(ref) do
{:via, Registry, {Registry.Kadabra, {ref, __MODULE__}}}
end

def init({:ok, uri, pid, sup, ref, opts}) do
def init(%Config{supervisor: sup, uri: uri, opts: opts} = config) do
case Socket.connect(uri, opts) do
{:ok, socket} ->
send_preface_and_settings(socket, opts[:settings])
state = initial_state(socket, uri, pid, sup, ref, opts)
config = %{config | socket: socket}
state = initial_state(config)
{:consumer, state, subscribe_to: [ConnectionQueue.via_tuple(sup)]}

{:error, error} ->
{:stop, error}
end
end

defp initial_state(socket, uri, pid, sup, ref, opts) do
defp initial_state(%Config{opts: opts} = config) do
settings = Keyword.get(opts, :settings, Connection.Settings.default())

%__MODULE__{
ref: ref,
client: pid,
uri: uri,
opts: opts,
socket: socket,
supervisor: sup,
config: config,
flow_control: %Connection.FlowControl{
settings: settings
}
Expand Down Expand Up @@ -145,19 +132,17 @@ defmodule Kadabra.Connection do

def handle_call(:close, _from, %Connection{} = state) do
%Connection{
client: pid,
flow_control: flow,
socket: socket,
supervisor: sup
config: config
} = state

bin = flow.stream_id |> Goaway.new() |> Encodable.to_bin()
:ssl.send(socket, bin)
:ssl.send(config.socket, bin)

send(pid, {:closed, sup})
send(config.client, {:closed, config.supervisor})

Task.Supervisor.start_child(Kadabra.Tasks, fn ->
Kadabra.Supervisor.stop(state.supervisor)
Kadabra.Supervisor.stop(config.supervisor)
end)

{:stop, :normal, :ok, state}
Expand All @@ -166,9 +151,9 @@ defmodule Kadabra.Connection do
# sendf

@spec sendf(:goaway | :ping, t) :: {:noreply, [], t}
def sendf(:ping, %Connection{socket: socket} = state) do
def sendf(:ping, %Connection{config: config} = state) do
bin = Ping.new() |> Encodable.to_bin()
Socket.send(socket, bin)
Socket.send(config.socket, bin)
{:noreply, [], state}
end

Expand All @@ -184,8 +169,8 @@ defmodule Kadabra.Connection do
{:noreply, [], state}
end

def recv(%Frame.Ping{ack: true}, %{client: pid} = state) do
send(pid, {:pong, self()})
def recv(%Frame.Ping{ack: true}, %{config: config} = state) do
send(config.client, {:pong, self()})
{:noreply, [], state}
end

Expand All @@ -196,10 +181,10 @@ defmodule Kadabra.Connection do

# nil settings means use default
def recv(%Frame.Settings{ack: false, settings: nil}, state) do
%{flow_control: flow} = state
%{flow_control: flow, config: config} = state

bin = Frame.Settings.ack() |> Encodable.to_bin()
Socket.send(state.socket, bin)
Socket.send(config.socket, bin)

case flow.settings.max_concurrent_streams do
:infinite ->
Expand All @@ -214,17 +199,17 @@ defmodule Kadabra.Connection do
end

def recv(%Frame.Settings{ack: false, settings: settings}, state) do
%{flow_control: flow, ref: ref} = state
%{flow_control: flow, config: config} = state
old_settings = flow.settings
flow = Connection.FlowControl.update_settings(flow, settings)

notify_settings_change(ref, old_settings, flow)
notify_settings_change(config.ref, old_settings, flow)

pid = Hpack.via_tuple(ref, :encoder)
pid = Hpack.via_tuple(config.ref, :encoder)
Hpack.update_max_table_size(pid, settings.max_header_list_size)

bin = Frame.Settings.ack() |> Encodable.to_bin()
Socket.send(state.socket, bin)
Socket.send(config.socket, bin)

to_ask = settings.max_concurrent_streams - flow.active_stream_count
GenStage.ask(state.queue, to_ask)
Expand All @@ -233,7 +218,7 @@ defmodule Kadabra.Connection do
end

def recv(%Frame.Settings{ack: true}, state) do
send_huge_window_update(state.socket)
send_huge_window_update(state.config.socket)
{:noreply, [], state}
end

Expand Down Expand Up @@ -280,8 +265,8 @@ defmodule Kadabra.Connection do
defp do_send_headers(request, %{flow_control: flow} = state) do
flow =
flow
|> Connection.FlowControl.add(request)
|> Connection.FlowControl.process(state)
|> FlowControl.add(request)
|> FlowControl.process(state.config)

%{state | flow_control: flow}
end
Expand All @@ -291,23 +276,20 @@ defmodule Kadabra.Connection do
Logger.error("Got GOAWAY, #{error}, Last Stream: #{id}, Rest: #{b}")
end

def handle_info({:finished, response}, state) do
%{client: pid, flow_control: flow} = state
send(pid, {:end_stream, response})

def handle_info({:finished, stream_id}, %{flow_control: flow} = state) do
flow =
flow
|> Connection.FlowControl.decrement_active_stream_count()
|> Connection.FlowControl.remove_active(response.id)
|> Connection.FlowControl.process(state)
|> FlowControl.decrement_active_stream_count()
|> FlowControl.remove_active(stream_id)
|> FlowControl.process(state.config)

GenStage.ask(state.queue, 1)

{:noreply, [], %{state | flow_control: flow}}
end

def handle_info({:push_promise, stream}, %{client: pid} = state) do
send(pid, {:push_promise, stream})
def handle_info({:push_promise, stream}, %{config: config} = state) do
send(config.client, {:push_promise, stream})
{:noreply, [], state}
end

Expand All @@ -328,7 +310,7 @@ defmodule Kadabra.Connection do
handle_disconnect(state)
end

defp do_recv_bin(bin, %{socket: socket} = state) do
defp do_recv_bin(bin, %{config: %{socket: socket}} = state) do
bin = state.buffer <> bin

case parse_bin(socket, bin, state) do
Expand Down Expand Up @@ -360,26 +342,26 @@ defmodule Kadabra.Connection do
state
end

def process(%Data{stream_id: stream_id} = frame, state) do
send_window_update(state.socket, frame)
def process(%Data{stream_id: stream_id} = frame, %{config: config} = state) do
send_window_update(config.socket, frame)

state.ref
config.ref
|> Stream.via_tuple(stream_id)
|> Stream.cast_recv(frame)

state
end

def process(%Headers{stream_id: stream_id} = frame, state) do
state.ref
def process(%Headers{stream_id: stream_id} = frame, %{config: config} = state) do
config.ref
|> Stream.via_tuple(stream_id)
|> Stream.call_recv(frame)

state
end

def process(%RstStream{} = frame, state) do
pid = Stream.via_tuple(state.ref, frame.stream_id)
def process(%RstStream{} = frame, %{config: config} = state) do
pid = Stream.via_tuple(config.ref, frame.stream_id)
Stream.cast_recv(pid, frame)
state
end
Expand All @@ -391,10 +373,12 @@ defmodule Kadabra.Connection do
end

def process(%PushPromise{stream_id: stream_id} = frame, state) do
{:ok, pid} = StreamSupervisor.start_stream(state, stream_id)
%{config: config, flow_control: flow_control} = state
{:ok, pid} = StreamSupervisor.start_stream(config, flow_control, stream_id)

Stream.call_recv(pid, frame)

flow = Connection.FlowControl.add_active(state.flow_control, stream_id)
flow = Connection.FlowControl.add_active(flow_control, stream_id)

%{state | flow_control: flow}
end
Expand All @@ -416,13 +400,13 @@ defmodule Kadabra.Connection do
end

def process(%WindowUpdate{stream_id: stream_id} = frame, state) do
pid = Stream.via_tuple(state.ref, stream_id)
pid = Stream.via_tuple(state.config.ref, stream_id)
Stream.cast_recv(pid, frame)
state
end

def process(%Continuation{stream_id: stream_id} = frame, state) do
pid = Stream.via_tuple(state.ref, stream_id)
pid = Stream.via_tuple(state.config.ref, stream_id)
Stream.call_recv(pid, frame)
state
end
Expand Down Expand Up @@ -454,9 +438,9 @@ defmodule Kadabra.Connection do
Socket.send(socket, bin)
end

def handle_disconnect(state) do
send(state.client, {:closed, state.supervisor})
Task.start(fn -> Kadabra.Supervisor.stop(state.supervisor) end)
def handle_disconnect(%{config: config} = state) do
send(config.client, {:closed, config.supervisor})
Task.start(fn -> Kadabra.Supervisor.stop(config.supervisor) end)

{:stop, :normal, state}
end
Expand Down
33 changes: 19 additions & 14 deletions lib/connection/flow_control.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Kadabra.Connection.FlowControl do
window: 65_535,
settings: %Kadabra.Connection.Settings{}

alias Kadabra.{Connection, StreamSupervisor}
alias Kadabra.{Config, Connection, StreamSupervisor}

@type t :: %__MODULE__{
queue: :queue.queue(),
Expand Down Expand Up @@ -139,21 +139,26 @@ defmodule Kadabra.Connection.FlowControl do
%{flow_control | queue: queue}
end

@spec process(t, Connection.t()) :: t
def process(%{queue: queue} = flow, conn) do
@spec process(t, Config.t()) :: t
def process(%{queue: queue} = flow, config) do
with {{:value, request}, queue} <- :queue.out(queue),
{:can_send, true} <- {:can_send, can_send?(flow)} do
{:ok, pid} = StreamSupervisor.start_stream(conn)

size = byte_size(request.body || <<>>)
:gen_statem.call(pid, {:send_headers, request})

flow
|> Map.put(:queue, queue)
|> decrement_window(size)
|> add_active(flow.stream_id)
|> increment_active_stream_count()
|> increment_stream_id()
case StreamSupervisor.start_stream(config, flow) do
{:ok, pid} ->
size = byte_size(request.body || <<>>)
:gen_statem.call(pid, {:send_headers, request})

flow
|> Map.put(:queue, queue)
|> decrement_window(size)
|> add_active(flow.stream_id)
|> increment_active_stream_count()
|> increment_stream_id()

other ->
raise "something happened #{inspect(other)}"
flow
end
else
{:empty, _queue} -> flow
{:can_send, false} -> flow
Expand Down
6 changes: 3 additions & 3 deletions lib/kadabra.ex
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,16 @@ defmodule Kadabra do
{1, 200, "SAMPLE ECHO REQUEST"}
"""
@spec request(pid, Request.t() | [Request.t()] | request_opts) :: no_return
def request(pid, %Kadabra.Request{} = request) do
def request(pid, %Request{} = request) do
ConnectionQueue.queue_request(pid, request)
end

def request(pid, [%Kadabra.Request{} | _rest] = requests) do
def request(pid, [%Request{} | _rest] = requests) do
ConnectionQueue.queue_request(pid, requests)
end

def request(pid, opts) when is_list(opts) do
request = Kadabra.Request.new(opts)
request = Request.new(opts)
ConnectionQueue.queue_request(pid, request)
end

Expand Down
Loading

0 comments on commit 3de4248

Please sign in to comment.