Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement cutting links #522

Draft
wants to merge 40 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
46d1df4
Add `handle_child_pad_removed/4` callback in bins and pipelines
FelonEkonom Jan 18, 2023
81cc84d
Update changelog
FelonEkonom Jan 19, 2023
fa27e4b
Fix credo issue
FelonEkonom Jan 19, 2023
df5bd3e
Fix dialyzer issues
FelonEkonom Jan 19, 2023
54f8a4e
Write tests for handle_child_pad_removed
FelonEkonom Jan 24, 2023
835cac6
Send :handle_unlink, even if child is terminating
FelonEkonom Jan 25, 2023
59d8892
Merge remote-tracking branch 'origin/master' into handle-child-pad-re…
FelonEkonom Jan 25, 2023
cf7a11c
Refactor handle_child_pad_removed callback related code
FelonEkonom Jan 25, 2023
818ef92
Stop raising, on unliunking static pad, when component is terminating…
FelonEkonom Jan 25, 2023
7860906
Remove leftovers from handle_child_pad_removed tests
FelonEkonom Jan 25, 2023
dedb30b
Merge remote-tracking branch 'origin/master' into handle-child-pad-re…
FelonEkonom Jan 26, 2023
f346161
Make dependent specs a map
FelonEkonom Jan 26, 2023
270fb9d
Remove link from specs, when child removes it's pad
FelonEkonom Jan 26, 2023
c1fd3bc
Update specs, when parent removes a child
FelonEkonom Jan 27, 2023
b111be3
Fix bug in Testing.Pipeline
FelonEkonom Jan 27, 2023
8b53257
Fix bug in pipeline termination & write tests for killing elements wh…
FelonEkonom Jan 30, 2023
c2112be
Refactor :handle_unlink message
FelonEkonom Jan 30, 2023
999c59f
Implement hanlde_child_pad_removed in Testing.Pipeline
FelonEkonom Jan 31, 2023
f4e2a36
Add handle_crash_group_down callback in Membrane.Bin
FelonEkonom Jan 31, 2023
36a10dd
Fix failing Testing.Pipeline tests
FelonEkonom Jan 31, 2023
12e5ea4
Merge branch 'handle-child-pad-removed-bug-fix' into crash-groups-in-…
FelonEkonom Jan 31, 2023
5624599
Fix typo
FelonEkonom Jan 31, 2023
eb22398
Merge branch 'handle-child-pad-removed-bug-fix' into crash-groups-in-…
FelonEkonom Jan 31, 2023
ca06cd0
Fix bug occuring when bin child dies between link_request and handle_…
FelonEkonom Jan 31, 2023
d0c7d15
Write test for many crash groups in a single spec
FelonEkonom Jan 31, 2023
d7b99c4
Refactor due to mix credo
FelonEkonom Feb 1, 2023
ed165bc
Add more crash groups tests
FelonEkonom Feb 1, 2023
c792393
Update docs
FelonEkonom Feb 1, 2023
94b0cad
Update lib/membrane/testing/assertions.ex
FelonEkonom Feb 2, 2023
89deeff
Implement suggestion from CR
FelonEkonom Feb 2, 2023
5299101
Merge branch 'handle-child-pad-removed-bug-fix' of github.com:membran…
FelonEkonom Feb 2, 2023
a6ee5f2
Remove done todo annotation
FelonEkonom Feb 2, 2023
df960e3
Run format
FelonEkonom Feb 2, 2023
703fa5a
Remove commented lines of code
FelonEkonom Feb 2, 2023
16a6832
Rename function remove_dynamic_pad! to remove_pad
FelonEkonom Feb 2, 2023
f1eb3dc
Rename :remove_link action to :remove_child_pad
FelonEkonom Feb 3, 2023
7751eab
Merge branch 'handle-child-pad-removed-bug-fix' into crash-groups-in-…
FelonEkonom Feb 3, 2023
8893723
Merge branch 'crash-groups-in-bins' into cut-link
FelonEkonom Feb 3, 2023
152d42a
Cut link, if implcit_unlink?: false is passed in input pad options
FelonEkonom Feb 3, 2023
5067470
wip
FelonEkonom Feb 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Changelog

## 1.0.0
* Introduce `:remove_link` action in pipelines and bins.
* Introduce `:remove_child_pad` action in pipelines and bins.
* Add children groups - a mechanism that allows refering to multiple children with a single identifier.
* Rename `remove_child` action into `remove_children` and allow for removing a children group with a single action.
* Add an ability to spawn anonymous children.
Expand All @@ -15,6 +15,8 @@
* The flow control of the pad is now set with a single `:flow_control` option instead of `:mode` and `:demand_mode` options.
* Remove _t suffix from types [#509](https://github.com/membraneframework/membrane_core/pull/509)
* Implement automatic demands in Membrane Sinks and Endpoints. [#512](https://github.com/membraneframework/membrane_core/pull/512)
* Add `handle_child_pad_removed/4` callback in Bins and Pipelines. [#513](https://github.com/membraneframework/membrane_core/pull/513)
* Introduce support for crash groups in Bins. [#521](https://github.com/membraneframework/membrane_core/pull/521)

## 0.11.0
* Separate element_name and pad arguments in handle_element_{start, end}_of_stream signature [#219](https://github.com/membraneframework/membrane_core/issues/219)
Expand Down
35 changes: 32 additions & 3 deletions lib/membrane/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ defmodule Membrane.Bin do
) ::
callback_return

@doc """
Callback invoked when a child removes its pad.

Removing child's pad due to return `t:Membrane.Bin.Action.remove_child_pad()`
from `Membrane.Bin` callbacks does not trigger this callback.
"""
@callback handle_child_pad_removed(
child :: Child.name(),
pad :: Pad.ref(),
context :: CallbackContext.t(),
state :: state
) :: callback_return

@doc """
Callback invoked when a notification comes in from an element.
"""
Expand Down Expand Up @@ -160,6 +173,17 @@ defmodule Membrane.Bin do
state :: state
) :: callback_return

@doc """
Callback invoked when crash of the crash group happens.

Context passed to this callback contains 2 additional fields: `:members` and `:crash_initiator`.
"""
@callback handle_crash_group_down(
group_name :: Child.group(),
context :: CallbackContext.t(),
state
) :: callback_return

@doc """
A callback invoked when the bin is being removed by its parent.

Expand All @@ -168,8 +192,7 @@ defmodule Membrane.Bin do
@callback handle_terminate_request(
context :: CallbackContext.t(),
state
) ::
callback_return()
) :: callback_return

@optional_callbacks handle_init: 2,
handle_pad_added: 3,
Expand All @@ -183,7 +206,9 @@ defmodule Membrane.Bin do
handle_child_notification: 4,
handle_parent_notification: 3,
handle_tick: 3,
handle_terminate_request: 2
handle_crash_group_down: 3,
handle_terminate_request: 2,
handle_child_pad_removed: 4

@doc PadsSpecs.def_pad_docs(:input, :bin)
defmacro def_input_pad(name, spec) do
Expand Down Expand Up @@ -323,6 +348,9 @@ defmodule Membrane.Bin do
@impl true
def handle_parent_notification(_notification, _ctx, state), do: {[], state}

@impl true
def handle_crash_group_down(_group_name, _ctx, state), do: {[], state}

@impl true
def handle_terminate_request(_ctx, state), do: {[terminate: :normal], state}

Expand All @@ -337,6 +365,7 @@ defmodule Membrane.Bin do
handle_element_end_of_stream: 4,
handle_child_notification: 4,
handle_parent_notification: 3,
handle_crash_group_down: 3,
handle_terminate_request: 2
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/bin/action.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ defmodule Membrane.Bin.Action do

Removed link has to have dynamic pads on both ends.
"""
@type remove_link :: {:remove_link, {Child.name(), Pad.ref()}}
@type remove_child_pad :: {:remove_child_pad, {Child.name(), Pad.ref()}}

@typedoc """
Starts a timer that will invoke `c:Membrane.Bin.handle_tick/3` callback
Expand Down Expand Up @@ -134,7 +134,7 @@ defmodule Membrane.Bin.Action do
| notify_parent
| spec
| remove_children
| remove_link
| remove_child_pad
| start_timer
| timer_interval
| stop_timer
Expand Down
7 changes: 6 additions & 1 deletion lib/membrane/bin/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ defmodule Membrane.Bin.CallbackContext do

Field `:pad_options` is present only in `c:Membrane.Bin.handle_pad_added/3`
and `c:Membrane.Bin.handle_pad_removed/3`.

Fields `:members` and `:crash_initiator` are present only in
`c:Membrane.Pipeline.handle_crash_group_down/3`.
"""
@type t :: %{
:clock => Membrane.Clock.t(),
Expand All @@ -18,6 +21,8 @@ defmodule Membrane.Bin.CallbackContext do
:playback => Membrane.Playback.t(),
:resource_guard => Membrane.ResourceGuard.t(),
:utility_supervisor => Membrane.UtilitySupervisor.t(),
optional(:pad_options) => map()
optional(:pad_options) => map(),
optional(:members) => [Membrane.Child.name()],
optional(:crash_initiator) => Membrane.Child.name()
}
end
15 changes: 8 additions & 7 deletions lib/membrane/children_spec.ex
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ defmodule Membrane.ChildrenSpec do
#### Limitations

At this moment crash groups are only useful for elements with dynamic pads.
Crash groups work only in pipelines and are not supported in bins.
Crash groups work in pipelines and bins as well.

### Log metadata
`:log_metadata` field can be used to set the `Membrane.Logger` metadata for all children in the given children specification.
Expand All @@ -237,9 +237,8 @@ defmodule Membrane.ChildrenSpec do
```
{[
child(:a, A) |> child(:b, B),
{child(:c, C), crash_group:
{:second, :temporary}}
], crash_group_mode: :temporary, group: :first, node: some_node}
{child(:c, C), group: :second, crash_group_mode: :temporary}
], group: :first, crash_group_mode: :temporary, node: some_node}
```

Child `:c` will be spawned in the `:second` crash group, while children `:a` and `:b` will be spawned in the `:first` crash group.
Expand Down Expand Up @@ -541,7 +540,8 @@ defmodule Membrane.ChildrenSpec do
target_queue_size: number | nil,
min_demand_factor: number | nil,
auto_demand_size: number | nil,
throttling_factor: number | nil
throttling_factor: number | nil,
implicit_unlink?: boolean()
) :: builder() | no_return
def via_in(builder, pad, props \\ [])

Expand All @@ -566,7 +566,8 @@ defmodule Membrane.ChildrenSpec do
min_demand_factor: [default: nil],
auto_demand_size: [default: nil],
toilet_capacity: [default: nil],
throttling_factor: [default: 1]
throttling_factor: [default: 1],
implicit_unlink?: [default: true]
)
|> case do
{:ok, props} ->
Expand All @@ -579,7 +580,7 @@ defmodule Membrane.ChildrenSpec do
if builder.status == :from_pad do
builder
else
via_out(builder, :output)
builder |> via_out(:output)
end
|> then(&%Builder{&1 | status: :to_pad, to_pad: pad, to_pad_props: Enum.into(props, %{})})
end
Expand Down
5 changes: 5 additions & 0 deletions lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ defmodule Membrane.Core.Bin do
{:noreply, state}
end

defp do_handle_info(Message.new(:child_pad_removed, [child, pad]), state) do
state = Parent.ChildLifeController.handle_child_pad_removed(child, pad, state)
{:noreply, state}
end

defp do_handle_info(Message.new(:child_notification, [from, notification]), state) do
state = Parent.LifecycleController.handle_child_notification(from, notification, state)
{:noreply, state}
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/core/bin/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ defmodule Membrane.Core.Bin.ActionHandler do
end

@impl CallbackHandler
def handle_action({:remove_link, {child_name, pad_ref}}, _cb, _params, state) do
Parent.ChildLifeController.handle_remove_link(child_name, pad_ref, state)
def handle_action({:remove_child_pad, {child_name, pad_ref}}, _cb, _params, state) do
Parent.ChildLifeController.handle_remove_child_pad(child_name, pad_ref, state)
end

@impl CallbackHandler
Expand Down
116 changes: 68 additions & 48 deletions lib/membrane/core/bin/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ defmodule Membrane.Core.Bin.PadController do
state
end

@spec remove_pad!(Pad.ref(), State.t()) :: State.t()
def remove_pad!(pad_ref, state) do
cond do
state.terminating? ->
state

Pad.is_dynamic_pad_ref(pad_ref) ->
Message.send(state.parent_pid, :child_pad_removed, [state.name, pad_ref])
PadModel.delete_data!(state, pad_ref)

Pad.is_static_pad_ref(pad_ref) ->
raise Membrane.PadError,
"Tried to unlink bin static pad #{inspect(pad_ref)}. Static pads cannot be unlinked unless bin is terminating"
end
end

@spec handle_linking_timeout(Pad.ref(), State.t()) :: :ok | no_return()
def handle_linking_timeout(pad_ref, state) do
case PadModel.get_data(state, pad_ref) do
Expand Down Expand Up @@ -193,58 +209,61 @@ defmodule Membrane.Core.Bin.PadController do
Core.Bin.State.t()
) :: {Core.Element.PadController.link_call_reply(), Core.Bin.State.t()}
def handle_link(direction, endpoint, other_endpoint, params, state) do
pad_data = PadModel.get_data!(state, endpoint.pad_ref)

Membrane.Logger.debug("Handle link #{inspect(endpoint, pretty: true)}")

%{spec_ref: spec_ref, endpoint: child_endpoint, name: pad_name} = pad_data

pad_props =
Map.merge(endpoint.pad_props, child_endpoint.pad_props, fn key,
external_value,
internal_value ->
if key in [
:target_queue_size,
:min_demand_factor,
:auto_demand_size,
:toilet_capacity,
:throttling_factor
] do
external_value || internal_value
else
internal_value
end
end)

child_endpoint = %{child_endpoint | pad_props: pad_props}

if params.initiator == :sibling do
:ok =
Child.PadController.validate_pad_mode!(
{endpoint.pad_ref, pad_data},
{other_endpoint.pad_ref, params.other_info}
)
end
with {:ok, pad_data} <- PadModel.get_data(state, endpoint.pad_ref) do
%{spec_ref: spec_ref, endpoint: child_endpoint, name: pad_name} = pad_data

pad_props =
Map.merge(endpoint.pad_props, child_endpoint.pad_props, fn key,
external_value,
internal_value ->
if key in [
:target_queue_size,
:min_demand_factor,
:auto_demand_size,
:toilet_capacity,
:throttling_factor
] do
external_value || internal_value
else
internal_value
end
end)

params =
Map.update!(
params,
:stream_format_validation_params,
&[{state.module, pad_name} | &1]
)
child_endpoint = %{child_endpoint | pad_props: pad_props}

reply =
Message.call!(child_endpoint.pid, :handle_link, [
direction,
child_endpoint,
other_endpoint,
params
])

state = PadModel.set_data!(state, endpoint.pad_ref, :linked?, true)
state = PadModel.set_data!(state, endpoint.pad_ref, :endpoint, child_endpoint)
state = ChildLifeController.proceed_spec_startup(spec_ref, state)
{reply, state}
if params.initiator == :sibling do
:ok =
Child.PadController.validate_pad_mode!(
{endpoint.pad_ref, pad_data},
{other_endpoint.pad_ref, params.other_info}
)
end

params =
Map.update!(
params,
:stream_format_validation_params,
&[{state.module, pad_name} | &1]
)

reply =
Message.call!(child_endpoint.pid, :handle_link, [
direction,
child_endpoint,
other_endpoint,
params
])

state = PadModel.set_data!(state, endpoint.pad_ref, :linked?, true)
state = PadModel.set_data!(state, endpoint.pad_ref, :endpoint, child_endpoint)
state = ChildLifeController.proceed_spec_startup(spec_ref, state)
{reply, state}
else
{:error, :unknown_pad} ->
{{:error, {:unknown_pad, state.name, state.module, endpoint.pad_ref}}, state}
end
end

@doc """
Expand All @@ -258,6 +277,7 @@ defmodule Membrane.Core.Bin.PadController do
{pad_data, state} = PadModel.pop_data!(state, pad_ref)

if endpoint do
IO.inspect(endpoint, label: "ENDPOINT")
Message.send(endpoint.pid, :handle_unlink, endpoint.pad_ref)
ChildLifeController.proceed_spec_startup(pad_data.spec_ref, state)
else
Expand Down
4 changes: 3 additions & 1 deletion lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ defmodule Membrane.Core.Element.PadController do
{Endpoint.t(), PadModel.pad_info(), %{toilet: Toilet.t() | nil}}

@type link_call_reply ::
:ok | {:ok, link_call_reply_props} | {:error, {:neighbor_dead, reason :: any}}
:ok
| {:ok, link_call_reply_props}
| {:error, {:neighbor_dead, reason :: any} | :unknown_pad}

@default_auto_demand_size_factor 4000

Expand Down
Loading