Skip to content

Commit

Permalink
fix: randomly failing broadcaster test
Browse files Browse the repository at this point in the history
  • Loading branch information
Valentin Atanasov authored and yaboiishere committed Apr 23, 2024
1 parent 0828e4b commit d50d48b
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 134 deletions.
2 changes: 1 addition & 1 deletion lib/ae_mdw/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ defmodule AeMdw.Application do

children = [
AeMdwWeb.Supervisor,
AeMdwWeb.Websocket.Supervisor,
AeMdw.Sync.Supervisor,
AeMdwWeb.Websocket.Supervisor,
AeMdw.APM.Telemetry
]

Expand Down
62 changes: 39 additions & 23 deletions lib/ae_mdw_web/websocket/broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,24 @@ defmodule AeMdwWeb.Websocket.Broadcaster do
}

@spec start_link(any()) :: GenServer.on_start()
def start_link(_arg), do: GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
def start_link(%{name: name, state: state}),
do: GenServer.start_link(__MODULE__, %{state: state}, name: name)

def start_link(_arg),
do: GenServer.start_link(__MODULE__, %{state: State.mem_state()}, name: __MODULE__)

@impl GenServer
def init(:ok), do: {:ok, :no_state}
def init(%{state: state}), do: {:ok, %{internal_state: state}}

@spec broadcast_key_block(Db.key_block(), version(), source(), count(), count()) :: :ok
def broadcast_key_block(block, version, source, mb_count, txs_count) do
def broadcast_key_block(name \\ __MODULE__, block, version, source, mb_count, txs_count) do
header = :aec_blocks.to_header(block)
{:ok, hash} = :aec_headers.hash_header(header)

if not already_processed?({:key, version, hash, source}) do
if Subscriptions.has_subscribers?(source, version, "KeyBlocks") do
GenServer.cast(
__MODULE__,
name,
{:broadcast_key_block, header, source, version, mb_count, txs_count}
)
end
Expand All @@ -51,7 +55,7 @@ defmodule AeMdwWeb.Websocket.Broadcaster do
end

@spec broadcast_micro_block(Db.micro_block(), source()) :: :ok
def broadcast_micro_block(block, source) do
def broadcast_micro_block(name \\ __MODULE__, block, source) do
header = :aec_blocks.to_header(block)
{:ok, hash} = :aec_headers.hash_header(header)

Expand All @@ -60,7 +64,7 @@ defmodule AeMdwWeb.Websocket.Broadcaster do

if Enum.any?(versions) do
txs_count = length(:aec_blocks.txs(block))
GenServer.cast(__MODULE__, {:broadcast_micro_block, header, source, versions, txs_count})
GenServer.cast(name, {:broadcast_micro_block, header, source, versions, txs_count})
end

set_processed({:micro, hash, source})
Expand All @@ -70,14 +74,14 @@ defmodule AeMdwWeb.Websocket.Broadcaster do
end

@spec broadcast_txs(Db.micro_block(), source()) :: :ok
def broadcast_txs(block, source) do
def broadcast_txs(name \\ __MODULE__, block, source) do
{:ok, hash} = block |> :aec_blocks.to_header() |> :aec_headers.hash_header()

if not already_processed?({:txs, hash, source}) do
versions = Enum.filter([:v1, :v2], &broadcast_transaction?(source, &1))

if Enum.any?(versions) do
GenServer.cast(__MODULE__, {:broadcast_txs, block, source, versions})
GenServer.cast(name, {:broadcast_txs, block, source, versions})
end

set_processed({:txs, hash, source})
Expand All @@ -87,9 +91,12 @@ defmodule AeMdwWeb.Websocket.Broadcaster do
end

@impl GenServer
def handle_cast({:broadcast_key_block, header, source, version, mbs_count, txs_count}, state) do
def handle_cast(
{:broadcast_key_block, header, source, version, mbs_count, txs_count},
%{internal_state: internal_state} = state
) do
_result =
do_broadcast_block(header, source, version, %{
do_broadcast_block(internal_state, header, source, version, %{
micro_blocks_count: mbs_count,
transactions_count: txs_count
})
Expand All @@ -98,15 +105,24 @@ defmodule AeMdwWeb.Websocket.Broadcaster do
end

@impl GenServer
def handle_cast({:broadcast_micro_block, header, source, versions, txs_count}, state) do
Enum.each(versions, &do_broadcast_block(header, source, &1, %{transactions_count: txs_count}))
def handle_cast(
{:broadcast_micro_block, header, source, versions, txs_count},
%{internal_state: internal_state} = state
) do
Enum.each(
versions,
&do_broadcast_block(internal_state, header, source, &1, %{transactions_count: txs_count})
)

{:noreply, state}
end

@impl GenServer
def handle_cast({:broadcast_txs, micro_block, source, versions}, state) do
Enum.each(versions, &do_broadcast_txs(micro_block, source, &1))
def handle_cast(
{:broadcast_txs, micro_block, source, versions},
%{internal_state: internal_state} = state
) do
Enum.each(versions, &do_broadcast_txs(internal_state, micro_block, source, &1))

{:noreply, state}
end
Expand All @@ -119,39 +135,39 @@ defmodule AeMdwWeb.Websocket.Broadcaster do
#
# Private functions
#
defp do_broadcast_block(header, source, version, counters) do
defp do_broadcast_block(internal_state, header, source, version, counters) do
type = :aec_headers.type(header)
channel = Map.fetch!(@block_subs, type)

with {:ok, block} <- serialize_block(header, type, source, version) do
with {:ok, block} <- serialize_block(internal_state, header, type, source, version) do
block
|> Map.merge(counters)
|> encode_message(channel, source)
|> broadcast(channel, source, version)
end
end

defp serialize_block(header, :key, :mdw, :v2) do
defp serialize_block(internal_state, header, :key, :mdw, :v2) do
height = header |> :aec_headers.height() |> Integer.to_string()
Blocks.fetch_key_block(State.mem_state(), height)
Blocks.fetch_key_block(internal_state, height)
end

defp serialize_block(header, :micro, :mdw, :v2) do
defp serialize_block(internal_state, header, :micro, :mdw, :v2) do
{:ok, hash} = :aec_headers.hash_header(header)
Blocks.fetch_micro_block(State.mem_state(), encode(:micro_block_hash, hash))
Blocks.fetch_micro_block(internal_state, encode(:micro_block_hash, hash))
end

defp serialize_block(header, _type, _source, _version) do
defp serialize_block(_internal_state, header, _type, _source, _version) do
prev_block_type = Db.prev_block_type(header)
{:ok, :aec_headers.serialize_for_client(header, prev_block_type)}
end

defp do_broadcast_txs(block, source, version) do
defp do_broadcast_txs(internal_state, block, source, version) do
tx_pids = Subscriptions.subscribers(source, version, "Transactions")

context =
if {source, version} == {:mdw, :v2} do
{:state, State.mem_state()}
{:state, internal_state}
else
{:block, block}
end
Expand Down
Loading

0 comments on commit d50d48b

Please sign in to comment.