Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## 1.2.3
* Deprecate calling a children group `nil` in [#964](https://github.com/membraneframework/membrane_core/pull/964)
* Adds new fields for the handle_child_terminated context: crash_initiator, exit_reason and group_name in [#964](https://github.com/membraneframework/membrane_core/pull/964)
* Makes sure that all handle_child_terminated callbacks for children in crash group are called before handle_crash_group_down of that group in [#962](https://github.com/membraneframework/membrane_core/pull/962)
* Telemetry event's metadata `component_type` now correctly refers to the root type and not the implementing module [#958](https://github.com/membraneframework/membrane_core/pull/958)

Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ defmodule Membrane.Bin do
@doc """
Callback invoked after a child terminates.

Context passed to this callback contains 3 additional fields: `:exit_reason`, `:group_name` and `:crash_initiator`.
Terminated child won't be present in the context of this callback. It is allowed to spawn a new
child with the same name.

By default, it does nothing.
"""
@callback handle_child_terminated(
Expand All @@ -228,7 +228,7 @@ defmodule Membrane.Bin do
@doc """
Callback invoked when crash of the crash group happens.

Context passed to this callback contains 2 additional fields: `:members` and `:crash_initiator`.
Context passed to this callback contains 3 additional fields: `:members`, `:crash_initiator` and `:crash_reason`.
"""
@callback handle_crash_group_down(
group_name :: Child.group(),
Expand Down
14 changes: 11 additions & 3 deletions lib/membrane/bin/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@ defmodule Membrane.Bin.CallbackContext do
Field `:start_of_stream_received?` is present only in
`c:Membrane.Bin.handle_element_end_of_stream/4`.

Fields `:members`, `:crash_initiator` and `crash_reason` and are present only in
Field `:crash_initiator` is only present in `c:Membrane.Bin.handle_child_terminated/3`
and `c:Membrane.Bin.handle_crash_group_down/3`.

Fields `:members` and `:crash_reason` are present only in
`c:Membrane.Bin.handle_crash_group_down/3`.

Fields `:exit_reason` and `:group_name` are present only in
`c:Membrane.Bin.handle_child_terminated/3`.
"""
@type t :: %{
:children => %{Membrane.Child.name() => Membrane.ChildEntry.t()},
Expand All @@ -27,8 +33,10 @@ defmodule Membrane.Bin.CallbackContext do
:utility_supervisor => Membrane.UtilitySupervisor.t(),
optional(:pad_options) => map(),
optional(:members) => [Membrane.Child.name()],
optional(:crash_initiator) => Membrane.Child.name(),
optional(:crash_initiator) => Membrane.Child.name() | nil,
optional(:crash_reason) => :normal | :shutdown | {:shutdown, term()} | term(),
optional(:start_of_stream_received?) => boolean()
optional(:start_of_stream_received?) => boolean(),
optional(:exit_reason) => :normal | :shutdown | {:shutdown, term()} | term(),
optional(:group_name) => Membrane.Child.group() | nil
}
end
13 changes: 12 additions & 1 deletion lib/membrane/child.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,20 @@ defmodule Membrane.Child do

alias Membrane.{Bin, Element}

@typedoc "Any type except for `nil`"
@type non_nil :: any()

@typedoc "Name of the child"
@type name :: Element.name() | Bin.name()
@type group() :: any()

@typedoc """
Specifies the children group name.

Can be any type except for `nil`
"""
@type group() :: non_nil()

@typedoc "Options of the child"
@type options :: Element.options() | Bin.options()

defguard is_child_name?(arg) when is_element_name?(arg) or is_bin_name?(arg)
Expand Down
5 changes: 5 additions & 0 deletions lib/membrane/core/bin/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ defmodule Membrane.Core.Bin.CallbackContext do
crash_reason: :normal | :shutdown | {:shutdown, term()} | term()
]
| [start_of_stream_received?: boolean()]
| [
group_name: Membrane.Child.group() | nil,
crash_initiator: Membrane.Child.name() | nil,
exit_reason: :normal | :shutdown | {:shutdown, term()} | term()
]

@spec from_state(Membrane.Core.Bin.State.t(), optional_fields()) ::
Membrane.Bin.CallbackContext.t()
Expand Down
26 changes: 21 additions & 5 deletions lib/membrane/core/parent/child_life_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -730,9 +730,11 @@ defmodule Membrane.Core.Parent.ChildLifeController do
with {:ok, crash_group} <- CrashGroupUtils.get_child_crash_group(child_name, state) do
state =
CrashGroupUtils.maybe_detonate_crash_group(child_name, crash_group, reason, state)
|> ChildrenModel.delete_child(child_name)

state = exec_handle_child_terminated(child_name, state)
{:ok, crash_group} = CrashGroupUtils.get_child_crash_group(child_name, state)
state = ChildrenModel.delete_child(state, child_name)

state = exec_handle_child_terminated(child_name, reason, crash_group, state)

state =
CrashGroupUtils.handle_crash_group_member_death(child_name, crash_group, reason, state)
Expand All @@ -741,7 +743,7 @@ defmodule Membrane.Core.Parent.ChildLifeController do
else
:error when reason == :normal ->
state = ChildrenModel.delete_child(state, child_name)
state = exec_handle_child_terminated(child_name, state)
state = exec_handle_child_terminated(child_name, reason, state)
{:ok, state}

:error when reason == {:shutdown, :membrane_crash_group_kill} ->
Expand Down Expand Up @@ -787,11 +789,25 @@ defmodule Membrane.Core.Parent.ChildLifeController do
related_specs |> Map.keys() |> Enum.reduce(state, &proceed_spec_startup/2)
end

defp exec_handle_child_terminated(child_name, state) do
defp exec_handle_child_terminated(child_name, reason, group \\ nil, state) do
{crash_initiator, group_name} =
if group do
{group.crash_initiator, group.name}
else
{nil, nil}
end

context_generator =
&Component.context_from_state(&1,
exit_reason: reason,
crash_initiator: crash_initiator,
group_name: group_name
)

CallbackHandler.exec_and_handle_callback(
:handle_child_terminated,
Component.action_handler(state),
%{context: &Component.context_from_state/1},
%{context: context_generator},
[child_name],
state
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do
alias Membrane.Core.{CallbackHandler, Component, Parent}
alias Membrane.Core.Parent.{ChildLifeController, ChildrenModel, CrashGroup}
alias Membrane.Core.Parent.ChildLifeController.LinkUtils
require Membrane.Logger

@spec add_crash_group(
Child.group(),
Expand All @@ -15,6 +16,13 @@ defmodule Membrane.Core.Parent.ChildLifeController.CrashGroupUtils do
) :: Parent.state()
def add_crash_group(group_name, mode, children, state)
when not is_map_key(state.crash_groups, group_name) do
if group_name == nil do
Membrane.Logger.warning("""
Calling a children group `nil` is depracated, please use some other value
for `:group` when you specify the children group in `:spec` action options.
""")
end

put_in(
state.crash_groups[group_name],
%CrashGroup{
Expand Down
5 changes: 5 additions & 0 deletions lib/membrane/core/pipeline/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ defmodule Membrane.Core.Pipeline.CallbackContext do
crash_reason: :normal | :shutdown | {:shutdown, term()} | term()
]
| [start_of_stream_received?: boolean()]
| [
group_name: Membrane.Child.group() | nil,
crash_initiator: Membrane.Child.name() | nil,
exit_reason: :normal | :shutdown | {:shutdown, term()} | term()
]

@spec from_state(Membrane.Core.Pipeline.State.t(), optional_fields()) ::
Membrane.Pipeline.CallbackContext.t()
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ defmodule Membrane.Pipeline do
@doc """
Callback invoked after a child terminates.

Context passed to this callback contains 3 additional fields: `:exit_reason`, `:group_name` and `:crash_initiator`.
Terminated child won't be present in the context of this callback. It is allowed to spawn a new child
with the same name.

By default, it does nothing.
"""
@callback handle_child_terminated(
Expand All @@ -271,7 +271,7 @@ defmodule Membrane.Pipeline do
the crash group are already dead.

You can use this callback to respawn the children from the failed crashed crash group.
Context passed to this callback contains 2 additional fields: `:members` and `:crash_initiator`.
Context passed to this callback contains 3 additional fields: `:members`, `:crash_initiator` and `:crash_reason`.
By default, it does nothing.
"""
@callback handle_crash_group_down(
Expand Down
14 changes: 11 additions & 3 deletions lib/membrane/pipeline/callback_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,14 @@ defmodule Membrane.Pipeline.CallbackContext do
Field `:start_of_stream_received?` is present only in
`c:Membrane.Pipeline.handle_element_end_of_stream/4`.

Fields `:members`, `:crash_initiator` and `:crash_reason` are present only in
Field `:crash_initiator` is only present in `c:Membrane.Pipeline.handle_child_terminated/3`
and `c:Membrane.Pipeline.handle_crash_group_down/3`.

Fields `:members` and `:crash_reason` are present only in
`c:Membrane.Pipeline.handle_crash_group_down/3`.

Fields `:exit_reason` and `:group_name` are present only in
`c:Membrane.Pipeline.handle_child_terminated/3`.
"""
@type t :: %{
:children => %{Membrane.Child.name() => Membrane.ChildEntry.t()},
Expand All @@ -23,8 +29,10 @@ defmodule Membrane.Pipeline.CallbackContext do
:utility_supervisor => Membrane.UtilitySupervisor.t(),
optional(:from) => [GenServer.from()],
optional(:members) => [Membrane.Child.name()],
optional(:crash_initiator) => Membrane.Child.name(),
optional(:crash_initiator) => Membrane.Child.name() | nil,
optional(:crash_reason) => :normal | :shutdown | {:shutdown, term()} | term(),
optional(:start_of_stream_received?) => boolean()
optional(:start_of_stream_received?) => boolean(),
optional(:exit_reason) => :normal | :shutdown | {:shutdown, term()} | term(),
optional(:group_name) => Membrane.Child.group() | nil
}
end
18 changes: 15 additions & 3 deletions test/membrane/integration/callbacks_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,32 @@ defmodule Membrane.Integration.CallbacksTest do
end

@impl true
def handle_child_terminated(child, _ctx, state) do
def handle_child_terminated(:filter2, ctx, state) do
assert ctx.crash_initiator == :filter2
assert ctx.group_name == :crash_group
assert match?({%RuntimeError{message: "Raising"}, _stacktrace}, ctx.exit_reason)
state = %{crash_group_children: MapSet.delete(state.crash_group_children, :filter2)}
{[], state}
end

@impl true
def handle_child_terminated(child, ctx, state) do
assert ctx.exit_reason == {:shutdown, :membrane_crash_group_kill}
assert ctx.crash_initiator == :filter2
assert ctx.group_name == :crash_group
state = %{crash_group_children: MapSet.delete(state.crash_group_children, child)}
{[], state}
end

@impl true
def handle_crash_group_down(_group_id, _ctx, state) do
assert MapSet.size(state.crash_group_children) == 0
{[terminate: :shutdown], state}
{[terminate: :normal], state}
end
end

test "handle_child_terminated and handle_crash_group_down in proper order" do
pipeline = Testing.Pipeline.start_supervised!(module: CallbacksOrderAssertingPipeline)
pipeline = Testing.Pipeline.start_link_supervised!(module: CallbacksOrderAssertingPipeline)
Process.monitor(pipeline)

receive do
Expand Down
Loading