Skip to content

Commit

Permalink
Delete handle_buffers_batch callback (#601)
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom authored Sep 26, 2023
1 parent 185b5e5 commit 6afd4b3
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 81 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* Fix process leak in starting clocks. [#594](https://github.com/membraneframework/membrane_core/pull/594)
* Add child exit reason to the supervisor exit reason. [#595](https://github.com/membraneframework/membrane_core/pull/595)
* Remove default implementation of `start_/2`, `start_link/2` and `terminate/2` in modules using `Membrane.Pipeline`. [#598](https://github.com/membraneframework/membrane_core/pull/598)
* Remove callback _Membrane.Element.WithInputPads.handle_buffers_batch/4_. [#601](https://github.com/membraneframework/membrane_core/pull/601)

## 0.11.0
* Separate element_name and pad arguments in handle_element_{start, end}_of_stream signature [#219](https://github.com/membraneframework/membrane_core/issues/219)
Expand Down
2 changes: 1 addition & 1 deletion guides/upgrading/v1.0.0-rc0.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Names of the callbacks that are used to process buffers have been unified. This
* _Membrane.Endpoint.handle_write/4_ and _Membrane.Endpoint.handle_write_list/4_
* _Membrane.Sink.handle_write/4_ and _Membrane.Sink.handle_write_list/4_

and they became `c:Membrane.Element.WithInputPads.handle_buffer/4` and `c:Membrane.Element.WithInputPads.handle_buffers_batch/4`, respectively:
and they became `c:Membrane.Element.WithInputPads.handle_buffer/4` and _Membrane.Element.WithInputPads.handle_buffers_batch/4_, respectively:

```diff
@impl true
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/core/element/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ defmodule Membrane.Core.Element.ActionHandler do
when cb in [
:handle_stream_format,
:handle_event,
:handle_buffers_batch,
:handle_buffer,
:handle_end_of_stream
] do
dir =
Expand All @@ -182,7 +182,7 @@ defmodule Membrane.Core.Element.ActionHandler do
action =
case cb do
:handle_event -> {:event, {pad, data}}
:handle_buffers_batch -> {:buffer, {pad, data}}
:handle_buffer -> {:buffer, {pad, data}}
:handle_stream_format -> {:stream_format, {pad, data}}
:handle_end_of_stream -> {:end_of_stream, pad}
end
Expand Down
18 changes: 10 additions & 8 deletions lib/membrane/core/element/buffer_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ defmodule Membrane.Core.Element.BufferController do
end

@doc """
Executes `handle_buffers_batch` callback.
Executes `handle_buffer` callback.
"""
@spec exec_buffer_callback(
Pad.ref(),
Expand All @@ -113,12 +113,14 @@ defmodule Membrane.Core.Element.BufferController do
end

defp do_exec_buffer_callback(pad_ref, buffers, state) do
CallbackHandler.exec_and_handle_callback(
:handle_buffers_batch,
ActionHandler,
%{context: &CallbackContext.from_state/1},
[pad_ref, buffers],
state
)
Enum.reduce(buffers, state, fn buffer, state ->
CallbackHandler.exec_and_handle_callback(
:handle_buffer,
ActionHandler,
%{context: &CallbackContext.from_state/1},
[pad_ref, buffer],
state
)
end)
end
end
11 changes: 4 additions & 7 deletions lib/membrane/element/action.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ defmodule Membrane.Element.Action do
sub-callback executions are finished).
Useful when a long action is to be undertaken, and partial results need to
be returned before entire process finishes (e.g. default implementation of
`c:Membrane.WithInputPads.handle_buffers_batch/4` uses split action to invoke
`c:Membrane.WithInputPads.handle_buffer/4` with each buffer)
be returned before entire process finishes.
"""
@type split :: {:split, {callback_name :: atom, args_list :: [[any]]}}

Expand Down Expand Up @@ -80,7 +78,7 @@ defmodule Membrane.Element.Action do
of data from pad's internal queue, which _sends_ demands automatically when it
runs out of data.
If there is any data available at the pad, the data is passed to
`c:Membrane.WithInputPads.handle_buffers_batch/4` callback. Invoked callback is
`c:Membrane.WithInputPads.handle_buffer/4` callback. Invoked callback is
guaranteed not to receive more data than demanded.
Demand size can be either a non-negative integer, that overrides existing demand,
Expand Down Expand Up @@ -161,13 +159,12 @@ defmodule Membrane.Element.Action do
Allowed only when _all_ below conditions are met:
- element is filter,
- callback is `c:Membrane.Element.WithInputPads.handle_buffers_batch/4`,
`c:Membrane.Element.WithInputPads.handle_buffer/4`,
- callback is `c:Membrane.Element.WithInputPads.handle_buffer/4`,
`c:Membrane.Element.WithInputPads.handle_stream_format/4`,
`c:Membrane.Element.Base.handle_event/4` or `c:Membrane.Element.WithInputPads.handle_end_of_stream/3`
- playback is `playing`
Keep in mind that `c:Membrane.WithInputPads.handle_buffers_batch/4` can only
Keep in mind that `c:Membrane.WithInputPads.handle_buffer/4` can only
forward buffers, `c:Membrane.Element.WithInputPads.handle_stream_format/4` - stream formats.
`c:Membrane.Element.Base.handle_event/4` - events and
`c:Membrane.Element.WithInputPads.handle_end_of_stream/3` - ends of streams.
Expand Down
28 changes: 3 additions & 25 deletions lib/membrane/element/with_input_pads.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,10 @@ defmodule Membrane.Element.WithInputPads do
@doc """
Callback that is called when buffer should be processed by the Element.
By default calls `c:handle_buffer/4` for each buffer.
For pads in pull mode it is called when buffers have been demanded (by returning
For pads in pull mode it is called when buffer have been demanded (by returning
`:demand` action from any callback).
For pads in push mode it is invoked when buffers arrive.
"""
@callback handle_buffers_batch(
pad :: Pad.ref(),
buffers :: list(Buffer.t()),
context :: CallbackContext.t(),
state :: Element.state()
) :: Membrane.Element.Base.callback_return()

@doc """
Callback that is called when buffer should be processed by the Element. In contrast
to `c:handle_buffers_batch/4`, it is passed only a single buffer.
Called by default implementation of `c:handle_buffers_batch/4`.
For pads in push mode it is invoked when buffer arrive.
"""
@callback handle_buffer(
pad :: Pad.ref(),
Expand Down Expand Up @@ -99,16 +84,9 @@ defmodule Membrane.Element.WithInputPads do
@impl true
def handle_end_of_stream(pad, _context, state), do: {[], state}

@impl true
def handle_buffers_batch(pad, buffers, _context, state) do
args_list = buffers |> Enum.map(&[pad, &1])
{[split: {:handle_buffer, args_list}], state}
end

defoverridable handle_stream_format: 4,
handle_start_of_stream: 3,
handle_end_of_stream: 3,
handle_buffers_batch: 4
handle_end_of_stream: 3
end
end
end
2 changes: 1 addition & 1 deletion lib/membrane/element/with_output_pads.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule Membrane.Element.WithOutputPads do
In filters, this callback should usually return `:demand` action with
size sufficient for supplying incoming demand. This will result in calling
`c:Membrane.WithInputPads.handle_buffers_batch/4`, which is to supply
`c:Membrane.WithInputPads.handle_buffer/4`, which is to supply
the demand.
If a source or an endpoint is unable to produce enough buffers, or a filter
Expand Down
10 changes: 3 additions & 7 deletions lib/membrane/filter_aggregator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ defmodule Membrane.FilterAggregator do
end

@impl true
def handle_buffers_batch(:input, buffers, _ctx, %{states: states}) do
{actions, states} = pipe_downstream([buffer: {:output, buffers}], states)
def handle_buffer(:input, buffer, _ctx, %{states: states}) do
{actions, states} = pipe_downstream([buffer: {:output, buffer}], states)
actions = reject_internal_actions(actions)

{actions, %{states: states}}
Expand Down Expand Up @@ -191,12 +191,8 @@ defmodule Membrane.FilterAggregator do
end
end

defp perform_action({:buffer, {:output, []}}, _module, _context, state) do
{[], state}
end

defp perform_action({:buffer, {:output, buffer}}, module, context, state) do
module.handle_buffers_batch(:input, List.wrap(buffer), context, state)
module.handle_buffer(:input, buffer, context, state)
end

defp perform_action({:stream_format, {:output, stream_format}}, module, context, state) do
Expand Down
13 changes: 4 additions & 9 deletions test/membrane/filter_aggregator/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,10 @@ defmodule Membrane.FilterAggregator.IntegrationTest do
def_output_pad :output, flow_control: :auto, accepted_format: RemoteStream

@impl true
def handle_buffers_batch(:input, buffers, _ctx, state) do
buffers =
buffers
|> Enum.map(fn %Buffer{payload: <<idx, payload::binary>>} ->
payload = for <<i <- payload>>, into: <<>>, do: <<i - 1>>
%Buffer{payload: <<idx, payload::binary>>}
end)

{[buffer: {:output, buffers}], state}
def handle_buffer(:input, %Buffer{payload: <<idx, payload::binary>>}, _ctx, state) do
payload = for <<i <- payload>>, into: <<>>, do: <<i - 1>>
buffer = %Buffer{payload: <<idx, payload::binary>>}
{[buffer: {:output, buffer}], state}
end
end

Expand Down
34 changes: 13 additions & 21 deletions test/membrane/filter_aggregator/unit_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,11 @@ defmodule Membrane.FilterAggregator.UnitTest do
assert ctx_b.playback == :playing
end

test "handle_buffers_batch splitting and mapping buffers", ctx do
test_range = 1..10
buffers = test_range |> Enum.map(&%Buffer{payload: <<&1>>})
buffers_count = Enum.count(test_range)
test "handle_buffer mapping buffers", ctx do
buffer = %Buffer{payload: <<1>>}

FilterA
|> expect(:handle_buffers_batch, fn :input, buffers, %{}, %{module: FilterA} = state ->
args_list = buffers |> Enum.map(&[:input, &1])
{[split: {:handle_buffer, args_list}], state}
end)
|> expect(:handle_buffer, buffers_count, fn :input, buffer, %{}, state ->
|> expect(:handle_buffer, fn :input, buffer, %{}, state ->
assert state.module == FilterA
assert %Buffer{payload: <<payload>>} = buffer
out_payload = payload + 1
Expand All @@ -290,7 +284,7 @@ defmodule Membrane.FilterAggregator.UnitTest do
end)

FilterB
|> expect(:handle_buffers_batch, buffers_count, fn :input, [buffer], %{}, state ->
|> expect(:handle_buffer, fn :input, buffer, %{}, state ->
assert state.module == FilterB
assert %Buffer{payload: <<payload>>} = buffer
out_payload = payload * 2
Expand All @@ -299,11 +293,9 @@ defmodule Membrane.FilterAggregator.UnitTest do
end)

assert {actions, %{states: states}} =
FilterAggregator.handle_buffers_batch(:input, buffers, %{}, %{states: ctx.states})
FilterAggregator.handle_buffer(:input, buffer, %{}, %{states: ctx.states})

expected_actions =
test_range
|> Enum.map(&{:buffer, {:output, %Buffer{payload: <<(&1 + 1) * 2>>}}})
expected_actions = [buffer: {:output, %Buffer{payload: <<(1 + 1) * 2>>}}]

assert actions == expected_actions

Expand All @@ -312,8 +304,8 @@ defmodule Membrane.FilterAggregator.UnitTest do
{:b, FilterB, ctx_b, %{module: FilterB, state: state_b}}
] = states

assert state_a == test_range |> Enum.map(&(&1 + 1)) |> Enum.sum()
assert state_b == test_range |> Enum.map(&((&1 + 1) * 2)) |> Enum.sum()
assert state_a == 1 + 1
assert state_b == (1 + 1) * 2

assert ctx_a == ctx.states |> Enum.at(0) |> elem(2)
assert ctx_b == ctx.states |> Enum.at(1) |> elem(2)
Expand All @@ -327,9 +319,9 @@ defmodule Membrane.FilterAggregator.UnitTest do
assert ctx.pads.input.start_of_stream? == true
{[], state}
end)
|> expect(:handle_buffers_batch, fn :input, [^buffer], %{} = ctx, state ->
|> expect(:handle_buffer, fn :input, ^buffer, %{} = ctx, state ->
assert ctx.pads.input.start_of_stream? == true
{[forward: [buffer]], state}
{[forward: buffer], state}
end)
|> expect(:handle_end_of_stream, fn :input, %{} = ctx, state ->
assert ctx.pads.input.end_of_stream? == true
Expand All @@ -341,9 +333,9 @@ defmodule Membrane.FilterAggregator.UnitTest do
assert ctx.pads.input.start_of_stream? == true
{[], state}
end)
|> expect(:handle_buffers_batch, fn :input, [^buffer], %{} = ctx, state ->
|> expect(:handle_buffer, fn :input, ^buffer, %{} = ctx, state ->
assert ctx.pads.input.start_of_stream? == true
{[buffer: {:output, [buffer]}], state}
{[buffer: {:output, buffer}], state}
end)
|> expect(:handle_end_of_stream, fn :input, %{} = ctx, state ->
assert ctx.pads.input.end_of_stream? == true
Expand All @@ -356,7 +348,7 @@ defmodule Membrane.FilterAggregator.UnitTest do
})

assert {[buffer: {:output, buffers}], %{states: states}} =
FilterAggregator.handle_buffers_batch(:input, [buffer], %{}, %{states: states})
FilterAggregator.handle_buffer(:input, buffer, %{}, %{states: states})

assert List.wrap(buffers) == [buffer]

Expand Down

0 comments on commit 6afd4b3

Please sign in to comment.