Skip to content

Commit

Permalink
Add Source Dispatcher (#17)
Browse files Browse the repository at this point in the history
* add source dispatcher
* configure CI to run tests
  • Loading branch information
varsill authored May 24, 2024
1 parent 2497b89 commit 5eb91d8
Show file tree
Hide file tree
Showing 13 changed files with 385 additions and 157 deletions.
53 changes: 38 additions & 15 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,20 +1,43 @@
version: 2.1

jobs:
build_test:
docker:
- image: membraneframeworklabs/docker_membrane
steps:
- checkout
- run:
command: |
mix deps.get
mix deps.compile
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:./agora_sdk
mix compile --force --warnings-as-errors
orbs:
elixir: membraneframework/elixir@1

workflows:
version: 2
build:
jobs:
- build_test

- elixir/build_test:
# post-steps: # steps to run after steps defined in the job bar
# - elixir/use_build_cache:
# cache-version: 3
# env: test
# regenerate: true
# before-save:
# - run: mix compile --force --warnings-as-errors
filters: &filters
tags:
only: /v.*/
- elixir/test:
pre-steps: # steps to run before steps defined in the job bar
- checkout
- elixir/get_mix_deps
- run:
command: mix compile --force
filters:
<<: *filters
- elixir/lint:
filters:
<<: *filters
docs: false
- elixir/hex_publish:
requires:
- elixir/build_test
- elixir/test
- elixir/lint
context:
- Deployment
filters:
branches:
ignore: /.*/
tags:
only: /v.*/
3 changes: 3 additions & 0 deletions bundlex.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ defmodule Membrane.Agora.BundlexProject do
defp natives(%{architecture: "x86_64", os: "linux"}) do
unless System.get_env("AGORA_SDK_PRESENT") == "true", do: System.shell("./install.sh")

System.shell("ls -la agora_sdk") |> IO.inspect(label: :AGORA_PATH)
System.shell("echo $LD_LIBRARY_PATH") |> IO.inspect(label: :LD_LIBRARY_PATH)

[
sink: [
sources: ["sink.cpp", "connection_observer.cpp"],
Expand Down
2 changes: 1 addition & 1 deletion c_src/membrane_agora_plugin/sink.spec.exs
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ spec(
sends {:user_joined :: label, id :: string}
sends {:user_left :: label, id :: string}

dirty :cpu, create: 4
dirty :cpu, [:create, :write_video_data, :update_video_stream_format, :write_audio_data, :update_audio_stream_format]
2 changes: 2 additions & 0 deletions c_src/membrane_agora_plugin/source.spec.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ sends {:agora_audio_payload :: label, payload, id :: string}
sends {:agora_video_payload :: label, payload, id :: int}
sends {:user_joined :: label, id :: string}
sends {:user_left :: label, id :: string}

dirty :cpu, [:create]
1 change: 0 additions & 1 deletion install.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#! /bin/bash

if ! test -d ./agora_sdk; then
wget https://download.agora.io/sdk/release/Agora-RTC-x86_64-linux-gnu-v3.8.202.20-20220627_152601-214165.tgz
tar xvf Agora-RTC-x86_64-linux-gnu-v3.8.202.20-20220627_152601-214165.tgz
Expand Down
123 changes: 123 additions & 0 deletions lib/agora/agora_dispatcher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
defmodule Membrane.Agora.Dispatcher do
@moduledoc """
This element allows for dispatching the streams received by the `Membrane.Agora.Source`
based on the `user_id` field from the buffer's metadata.
The `Membrane.Agora.Source` sends a single stream for all the users that are present in the
Agora channel and enriches buffers with the `user_id` field which corresponds
to the ID of the stream sender from the Agora channel. The `#{inspect(__MODULE__)}` element allows
to demultiplex such a single output channel stream into multiple streams for each of the users.
Usage:
* attach the `#{inspect(__MODULE__)}` element to each output pad of the `Membrane.Agora.Source`.
* when the `{:add_pad, <user_id>}` message is received from a given `#{inspect(__MODULE__)}` dispatcher,
attach the output pad with the following name: `{:output, <user_id>}` to this particular dispatcher
"""

use Membrane.Filter

require Logger
require Membrane.Logger

def_input_pad(:input,
accepted_format:
any_of(
%Membrane.H264{alignment: :au},
%Membrane.RawAudio{sample_rate: 44_100, channels: 2, sample_format: :s16le}
)
)

def_output_pad(:output,
accepted_format:
any_of(
%Membrane.H264{alignment: :au},
%Membrane.RawAudio{sample_rate: 44_100, channels: 2, sample_format: :s16le}
),
availability: :on_request
)

def_options queue_before_pad_connected?: [
spec: boolean(),
default: true,
description: """
If true, the buffers will be queued until the corresponding pad, on which they should be sent, is connected.
Otherwise, if there is no corresponding pad available, the buffers are discarded.
Defaults to true.
"""
]

@impl true
def handle_init(_ctx, opts) do
{[],
%{
output_pads: %{},
queue_before_pad_connected?: opts.queue_before_pad_connected?,
stream_format: nil
}}
end

@impl true
def handle_stream_format(:input, format, _ctx, state) do
{[forward: format], %{state | stream_format: format}}
end

@impl true
def handle_buffer(:input, buffer, _ctx, state) do
user_id = buffer.metadata.id

cond do
user_id not in Map.keys(state.output_pads) ->
state = add_pad(state, user_id)
state = buffer_up(state, buffer)
{[notify_parent: {:add_pad, user_id}], state}

state.output_pads[user_id].is_connected ->
{[buffer: {Pad.ref(:output, user_id), buffer}], state}

true ->
state = buffer_up(state, buffer)
{[], state}
end
end

@impl true
def handle_pad_added(Pad.ref(:output, user_id), _ctx, state) do
{notify_child_actions, state} =
if user_id in Map.keys(state.output_pads) do
{[], state}
else
{[], add_pad(state, user_id)}
end

state =
update_in(state, [:output_pads, user_id, :is_connected], fn false ->
true
end)

stream_format_actions =
if state.stream_format,
do: [stream_format: {Pad.ref(:output, user_id), state.stream_format}],
else: []

buffer_actions =
Enum.reverse(state.output_pads[user_id].buffered)
|> Enum.map(&{Pad.ref(:output, user_id), &1})

{notify_child_actions ++ stream_format_actions ++ buffer_actions, state}
end

defp add_pad(state, user_id) do
put_in(state, [:output_pads, user_id], %{buffered: [], is_connected: false})
end

defp buffer_up(%{queue_before_pad_connected?: true} = state, _buffer) do
state
end

defp buffer_up(state, buffer) do
user_id = buffer.metadata.id

update_in(state, [:output_pads, user_id, :buffered], fn buffered ->
[buffer | buffered]
end)
end
end
21 changes: 12 additions & 9 deletions lib/agora/agora_sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,23 @@ defmodule Membrane.Agora.Sink do
Native.create(state.app_id, state.token, state.channel_name, state.user_id)
rescue
_e in UndefinedFunctionError ->
raise """
Couldn't setup NIF. Perhaps you have forgotten to set LD_LIBRARY_PATH:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:#{Path.expand("#{__ENV__.file}/../../../agora_sdk")}
"""
reraise(
"""
Couldn't setup NIF. Perhaps you have forgotten to set LD_LIBRARY_PATH:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:#{Path.expand("#{__ENV__.file}/../../../agora_sdk")}
""",
__STACKTRACE__
)

other_error ->
raise other_error
reraise(other_error, __STACKTRACE__)
end

{[], %{state | native_state: native_state}}
end

@impl true
def handle_stream_format(Pad.ref(:video, _), stream_format, _ctx, state) do
def handle_stream_format(Pad.ref(:video, _id), stream_format, _ctx, state) do
{:ok, native_state} =
Native.update_video_stream_format(
stream_format.height,
Expand All @@ -93,7 +96,7 @@ defmodule Membrane.Agora.Sink do
end

@impl true
def handle_stream_format(Pad.ref(:audio, _), stream_format, _ctx, state) do
def handle_stream_format(Pad.ref(:audio, _id), stream_format, _ctx, state) do
{:ok, native_state} =
Native.update_audio_stream_format(
stream_format.sample_rate,
Expand All @@ -106,7 +109,7 @@ defmodule Membrane.Agora.Sink do
end

@impl true
def handle_buffer(Pad.ref(:video, _), buffer, _ctx, state) do
def handle_buffer(Pad.ref(:video, _id), buffer, _ctx, state) do
:ok =
Native.write_video_data(
buffer.payload,
Expand All @@ -118,7 +121,7 @@ defmodule Membrane.Agora.Sink do
end

@impl true
def handle_buffer(Pad.ref(:audio, _), buffer, _ctx, state) do
def handle_buffer(Pad.ref(:audio, _id), buffer, _ctx, state) do
:ok = Native.write_audio_data(buffer.payload, state.native_state)
{[], state}
end
Expand Down
13 changes: 8 additions & 5 deletions lib/agora/agora_source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,16 @@ defmodule Membrane.Agora.Source do
Native.create(state.app_id, state.token, state.channel_name, state.user_id, self())
rescue
_e in UndefinedFunctionError ->
raise """
Couldn't setup NIF. Perhaps you have forgotten to set LD_LIBRARY_PATH:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:#{Path.expand("#{__ENV__.file}/../../../agora_sdk")}
"""
reraise(
"""
Couldn't setup NIF. Perhaps you have forgotten to set LD_LIBRARY_PATH:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:#{Path.expand("#{__ENV__.file}/../../../agora_sdk")}
""",
__STACKTRACE__
)

other_error ->
raise other_error
reraise(other_error, __STACKTRACE__)
end

{[
Expand Down
Loading

0 comments on commit 5eb91d8

Please sign in to comment.