Skip to content

Commit

Permalink
[RTC-344] toggleable ll-hls (fishjam-dev#83)
Browse files Browse the repository at this point in the history
* [RTC-344] toggleable ll-hls
* Review fixes pt1
* Review fixes pt2
* lint
* Review fixes pt3
* Add one more pid check to test
* Monitor request handler in test
  • Loading branch information
sgfn authored Sep 15, 2023
1 parent ed317b7 commit 888daad
Show file tree
Hide file tree
Showing 17 changed files with 407 additions and 171 deletions.
6 changes: 1 addition & 5 deletions lib/jellyfish/component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ defmodule Jellyfish.Component do

alias Jellyfish.Component.{HLS, RTSP}

@callback metadata() :: map()

@enforce_keys [
:id,
:type,
Expand Down Expand Up @@ -51,9 +49,7 @@ defmodule Jellyfish.Component do

@spec new(component(), map()) :: {:ok, t()} | {:error, term()}
def new(type, options) do
with {:ok, endpoint} <- type.config(options) do
metadata = type.metadata()

with {:ok, %{endpoint: endpoint, metadata: metadata}} <- type.config(options) do
{:ok,
%__MODULE__{
id: UUID.uuid4(),
Expand Down
94 changes: 61 additions & 33 deletions lib/jellyfish/component/hls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,58 +4,86 @@ defmodule Jellyfish.Component.HLS do
"""

@behaviour Jellyfish.Endpoint.Config
@behaviour Jellyfish.Component

alias Jellyfish.Component.HLS.{LLStorage, RequestHandler}
alias Jellyfish.Component.HLS.{LLStorage, RequestHandler, Storage}
alias Jellyfish.Room

alias JellyfishWeb.ApiSpec.Component.HLS.Options

alias Membrane.RTC.Engine.Endpoint.HLS
alias Membrane.RTC.Engine.Endpoint.HLS.{CompositorConfig, HLSConfig, MixerConfig}
alias Membrane.Time

@segment_duration Time.seconds(6)
@partial_segment_duration Time.milliseconds(1_100)

@type metadata :: %{playable: boolean()}
@type metadata :: %{
playable: boolean(),
low_latency: boolean()
}

@impl true
def config(options) do
storage = fn directory -> %LLStorage{directory: directory, room_id: options.room_id} end
RequestHandler.start(options.room_id)

{:ok,
%HLS{
rtc_engine: options.engine_pid,
owner: self(),
output_directory: output_dir(options.room_id),
mixer_config: %MixerConfig{
video: %CompositorConfig{
stream_format: %Membrane.RawVideo{
width: 1920,
height: 1080,
pixel_format: :I420,
framerate: {24, 1},
aligned: true
}
with {:ok, valid_opts} <- OpenApiSpex.cast_value(options, Options.schema()) do
low_latency? = valid_opts.lowLatency
hls_config = create_hls_config(options.room_id, low_latency?: low_latency?)

{:ok,
%{
endpoint: %HLS{
rtc_engine: options.engine_pid,
owner: self(),
output_directory: output_dir(options.room_id),
mixer_config: %MixerConfig{
video: %CompositorConfig{
stream_format: %Membrane.RawVideo{
width: 1920,
height: 1080,
pixel_format: :I420,
framerate: {24, 1},
aligned: true
}
}
},
hls_config: hls_config
},
metadata: %{
playable: false,
low_latency: low_latency?
}
},
hls_config: %HLSConfig{
hls_mode: :muxed_av,
mode: :live,
target_window_duration: :infinity,
segment_duration: @segment_duration,
partial_segment_duration: @partial_segment_duration,
storage: storage
}
}}
}}
else
{:error, _reason} = error -> error
end
end

@impl true
def metadata(), do: %{playable: false}

@spec output_dir(Room.id()) :: String.t()
def output_dir(room_id) do
base_path = Application.fetch_env!(:jellyfish, :output_base_path)
Path.join([base_path, "hls_output", "#{room_id}"])
end

defp create_hls_config(room_id, low_latency?: low_latency?) do
partial_duration = if low_latency?, do: @partial_segment_duration, else: nil
hls_storage = setup_hls_storage(room_id, low_latency?: low_latency?)

%HLSConfig{
hls_mode: :muxed_av,
mode: :live,
target_window_duration: :infinity,
segment_duration: @segment_duration,
partial_segment_duration: partial_duration,
storage: hls_storage
}
end

defp setup_hls_storage(room_id, low_latency?: true) do
RequestHandler.start(room_id)

fn directory -> %LLStorage{directory: directory, room_id: room_id} end
end

defp setup_hls_storage(_room_id, low_latency?: false) do
fn directory -> %Storage{directory: directory} end
end
end
12 changes: 4 additions & 8 deletions lib/jellyfish/component/rtsp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@ defmodule Jellyfish.Component.RTSP do
"""

@behaviour Jellyfish.Endpoint.Config
@behaviour Jellyfish.Component

alias Membrane.RTC.Engine.Endpoint.RTSP

alias JellyfishWeb.ApiSpec
alias JellyfishWeb.ApiSpec.Component.RTSP.Options

@type metadata :: %{}

@impl true
def config(%{engine_pid: engine} = options) do
options = Map.drop(options, [:engine_pid, :room_id])

with {:ok, valid_opts} <- OpenApiSpex.cast_value(options, ApiSpec.Component.RTSP.schema()) do
component_spec =
with {:ok, valid_opts} <- OpenApiSpex.cast_value(options, Options.schema()) do
endpoint_spec =
Map.from_struct(valid_opts)
# OpenApiSpex will remove invalid options, so the following conversion, while ugly, is memory-safe
|> Map.new(fn {k, v} ->
Expand All @@ -27,12 +26,9 @@ defmodule Jellyfish.Component.RTSP do
|> Map.put(:max_reconnect_attempts, :infinity)
|> then(&struct(RTSP, &1))

{:ok, component_spec}
{:ok, %{endpoint: endpoint_spec, metadata: %{}}}
else
{:error, _reason} = error -> error
end
end

@impl true
def metadata(), do: %{}
end
8 changes: 7 additions & 1 deletion lib/jellyfish/endpoint/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,11 @@ defmodule Jellyfish.Endpoint.Config do
An interface for RTC Engine endpoint configuration.
"""

@callback config(map()) :: {:ok, Membrane.ChildrenSpec.child_definition()} | {:error, term()}
@callback config(map()) ::
{:ok,
%{
:endpoint => Membrane.ChildrenSpec.child_definition(),
optional(:metadata) => term()
}}
| {:error, term()}
end
2 changes: 1 addition & 1 deletion lib/jellyfish/peer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ defmodule Jellyfish.Peer do
id = UUID.uuid4()
options = Map.put(options, :peer_id, id)

with {:ok, endpoint} <- type.config(options) do
with {:ok, %{endpoint: endpoint}} <- type.config(options) do
{:ok,
%__MODULE__{
id: id,
Expand Down
41 changes: 22 additions & 19 deletions lib/jellyfish/peer/webrtc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ defmodule Jellyfish.Peer.WebRTC do
alias Membrane.WebRTC.Extension.{Mid, RepairedRid, Rid, TWCC, VAD}
alias Membrane.WebRTC.Track.Encoding

alias JellyfishWeb.ApiSpec

@impl true
def config(options) do
if not Application.get_env(:jellyfish, :webrtc_used),
Expand All @@ -18,8 +20,7 @@ defmodule Jellyfish.Peer.WebRTC do
"WebRTC peers can be used only if WEBRTC_USED environmental variable is not set to \"false\""
)

with {:ok, valid_opts} <-
OpenApiSpex.cast_value(options, JellyfishWeb.ApiSpec.Peer.WebRTC.schema()) do
with {:ok, valid_opts} <- OpenApiSpex.cast_value(options, ApiSpec.Peer.WebRTC.schema()) do
handshake_options = [
client_mode: false,
dtls_srtp: true
Expand All @@ -42,23 +43,25 @@ defmodule Jellyfish.Peer.WebRTC do
end

{:ok,
%WebRTC{
rtc_engine: options.engine_pid,
ice_name: options.peer_id,
owner: self(),
integrated_turn_options: network_options[:integrated_turn_options],
integrated_turn_domain: network_options[:integrated_turn_domain],
handshake_opts: handshake_options,
filter_codecs: filter_codecs,
log_metadata: [peer_id: options.peer_id],
trace_context: nil,
extensions: %{opus: Membrane.RTP.VAD},
webrtc_extensions: webrtc_extensions,
simulcast_config: %SimulcastConfig{
enabled: simulcast?,
initial_target_variant: fn _track -> :medium end
},
telemetry_label: [room_id: options.room_id]
%{
endpoint: %WebRTC{
rtc_engine: options.engine_pid,
ice_name: options.peer_id,
owner: self(),
integrated_turn_options: network_options[:integrated_turn_options],
integrated_turn_domain: network_options[:integrated_turn_domain],
handshake_opts: handshake_options,
filter_codecs: filter_codecs,
log_metadata: [peer_id: options.peer_id],
trace_context: nil,
extensions: %{opus: Membrane.RTP.VAD},
webrtc_extensions: webrtc_extensions,
simulcast_config: %SimulcastConfig{
enabled: simulcast?,
initial_target_variant: fn _track -> :medium end
},
telemetry_label: [room_id: options.room_id]
}
}}
else
{:error, _reason} = error -> error
Expand Down
13 changes: 7 additions & 6 deletions lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,6 @@ defmodule Jellyfish.Room do

@impl true
def handle_call({:add_component, component_type, options}, _from, state) do
options = if is_nil(options), do: %{}, else: options

options =
Map.merge(
%{engine_pid: state.engine_pid, room_id: state.id},
Expand Down Expand Up @@ -266,7 +264,7 @@ defmodule Jellyfish.Room do

Logger.info("Removed component #{inspect(component_id)}")

if component.type == Component.HLS, do: remove_hls_processes(state.id)
if component.type == Component.HLS, do: remove_hls_processes(state.id, component.metadata)

{:ok, state}
else
Expand Down Expand Up @@ -323,8 +321,8 @@ defmodule Jellyfish.Room do
else
Event.broadcast(:server_notification, {:component_crashed, state.id, endpoint_id})

%{type: type} = Map.get(state.components, endpoint_id)
if type == Component.HLS, do: remove_hls_processes(state.id)
component = Map.get(state.components, endpoint_id)
if component.type == Component.HLS, do: remove_hls_processes(state.id, component.metadata)
end

{:noreply, state}
Expand Down Expand Up @@ -404,7 +402,10 @@ defmodule Jellyfish.Room do
}
end

defp remove_hls_processes(room_id), do: Component.HLS.RequestHandler.stop(room_id)
defp remove_hls_processes(room_id, %{low_latency: true}),
do: Component.HLS.RequestHandler.stop(room_id)

defp remove_hls_processes(_room_id, _metadata), do: nil

defp registry_id(room_id), do: {:via, Registry, {Jellyfish.RoomRegistry, room_id}}

Expand Down
15 changes: 15 additions & 0 deletions lib/jellyfish/utils.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule Jellyfish.Utils do
@moduledoc false

@doc "Convert all keys in map from snake_case to camelCase"
@spec camel_case_keys(%{atom() => term()}) :: %{String.t() => term()}
def camel_case_keys(map) do
Map.new(map, fn {k, v} -> {snake_case_to_camel_case(k), v} end)
end

defp snake_case_to_camel_case(atom) do
[first | rest] = "#{atom}" |> String.split("_")
rest = rest |> Enum.map(&String.capitalize/1)
Enum.join([first | rest])
end
end
41 changes: 15 additions & 26 deletions lib/jellyfish_web/api_spec/component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ defmodule JellyfishWeb.ApiSpec.Component do
@moduledoc false

require OpenApiSpex
alias OpenApiSpex.Schema

alias JellyfishWeb.ApiSpec.Component.RTSP
alias JellyfishWeb.ApiSpec.Component.{HLS, RTSP}

defmodule Type do
@moduledoc false
Expand All @@ -29,36 +28,26 @@ defmodule JellyfishWeb.ApiSpec.Component do
description: "Component-specific options",
type: :object,
oneOf: [
RTSP
],
nullable: true
})
end

defmodule Metadata do
@moduledoc false

require OpenApiSpex

OpenApiSpex.schema(%{
title: "ComponentMetadata",
description: "Component-specific metadata",
type: :object,
properties: %{
playable: %Schema{type: :boolean}
}
HLS.Options,
RTSP.Options
]
})
end

OpenApiSpex.schema(%{
title: "Component",
description: "Describes component",
type: :object,
properties: %{
id: %Schema{type: :string, description: "Assigned component id", example: "component-1"},
type: Type,
metadata: Metadata
},
required: [:id, :type, :metadata]
oneOf: [
HLS,
RTSP
],
discriminator: %OpenApiSpex.Discriminator{
propertyName: "type",
mapping: %{
"hls" => HLS,
"rtsp" => RTSP
}
}
})
end
Loading

0 comments on commit 888daad

Please sign in to comment.