Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Experiment to try to use forgetful blooms to dedup gossip #280

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
{cuttlefish, ".*", {git, "https://github.com/helium/cuttlefish", {branch, "develop"}}},
{inet_ext, ".*", {git, "https://github.com/benoitc/inet_ext", {branch, "master"}}},
{splicer, ".*", {git, "https://github.com/helium/erlang-splicer.git", {branch, "master"}}},
{relcast, ".*", {git, "https://github.com/helium/relcast.git", {branch, "master"}}}
{relcast, ".*", {git, "https://github.com/helium/relcast.git", {branch, "master"}}},
{erbloom, ".*", {git, "https://github.com/Vagabond/erbloom.git", {branch, "adt/forgetful-bloom"}}}
]}.

{erl_opts, [
Expand Down
4 changes: 4 additions & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
0},
{<<"ecc_compact">>,{pkg,<<"ecc_compact">>,<<"1.0.4">>},0},
{<<"enacl">>,{pkg,<<"enacl">>,<<"0.17.2">>},0},
{<<"erbloom">>,
{git,"https://github.com/Vagabond/erbloom.git",
{ref,"cdbde894836b72901c245a34b7e9a3428d8db77c"}},
0},
{<<"erl_base58">>,{pkg,<<"erl_base58">>,<<"0.0.1">>},0},
{<<"getopt">>,{pkg,<<"getopt">>,<<"1.0.1">>},1},
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1},
Expand Down
33 changes: 27 additions & 6 deletions src/group/libp2p_gossip_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
{ connection :: libp2p_connection:connection(),
handler_module :: atom(),
handler_state :: any(),
path :: any()
path :: any(),
global_bloom :: forgetful_bloom:bloom(),
local_bloom :: forgetful_bloom:bloom()
}).

%% API
Expand All @@ -45,17 +47,21 @@ client(Connection, Args) ->
server(Connection, _Path, _TID, Args) ->
libp2p_framed_stream:server(?MODULE, Connection, Args).

init(server, Connection, [Path, HandlerModule, HandlerState]) ->
init(server, Connection, [Path, HandlerModule, HandlerState, Bloom]) ->
lager:debug("initiating server with path ~p", [Path]),
{ok, Session} = libp2p_connection:session(Connection),
%% Catch errors from the handler module in accepting a stream. The
%% most common occurence is during shutdown of a swarm where
%% ordering of the shutdown will cause the accept below to crash
%% noisily in the logs. This catch avoids that noise
case (catch HandlerModule:accept_stream(HandlerState, Session, self(), Path)) of
ok -> {ok, #state{connection=Connection,
ok ->
{ok, LocalBloom} = forgetful_bloom:new_for_fp_rate(1000, 1.0e-7, 3, 1000),
{ok, #state{connection=Connection,
handler_module=HandlerModule,
handler_state=HandlerState,
global_bloom=Bloom,
local_bloom=LocalBloom,
path=Path}};
{error, too_many} ->
{stop, normal};
Expand All @@ -67,21 +73,36 @@ init(server, Connection, [Path, HandlerModule, HandlerState]) ->
[error_logger_lager_h:format_reason(Exit)]),
{stop, normal}
end;
init(client, Connection, [Path, HandlerModule, HandlerState]) ->
init(client, Connection, [Path, HandlerModule, HandlerState, Bloom]) ->
lager:debug("initiating client with path ~p", [Path]),
{ok, LocalBloom} = forgetful_bloom:new_for_fp_rate(1000, 1.0e-7, 3, 1000),
{ok, #state{connection=Connection,
handler_module=HandlerModule,
handler_state=HandlerState,
global_bloom=Bloom,
local_bloom=LocalBloom,
path=Path}}.

handle_data(_, Data, State=#state{handler_module=HandlerModule,
handler_state=HandlerState,
local_bloom=LocalBloom,
global_bloom=GlobalBloom,
path=Path}) ->
#libp2p_gossip_frame_pb{key=Key, data=Bin} =
libp2p_gossip_pb:decode_msg(Data, libp2p_gossip_frame_pb),
lager:debug("gossip received for handler ~p and key ~p via path ~p with payload ~p",[HandlerModule, Key, Path, Bin]),
ok = HandlerModule:handle_data(HandlerState, self(), Key,
{Path, apply_path_decode(Path, Bin)}),
Decoded = apply_path_decode(Path, Bin),
%% use or here because we don't want to short circuit
case forgetful_bloom:set(LocalBloom, {Key, Decoded}) or forgetful_bloom:set(GlobalBloom, {Key, Decoded}) of
true ->
%% TODO this might mess with ARP
%% either we've already sent this to this peer, they've sent it to us, or
%% someone else has sent it to us. In any case there's no need to handle it.
ok;
false ->
ok = HandlerModule:handle_data(HandlerState, self(), Key,
{Path, Decoded})
end,
{noreply, State}.


Expand Down
18 changes: 12 additions & 6 deletions src/group/libp2p_group_gossip_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
handlers=#{} :: #{string() => libp2p_group_gossip:handler()},
drop_timeout :: pos_integer(),
drop_timer :: reference(),
supported_paths :: [string()]
supported_paths :: [string()],
seen_messages :: forgetful_bloom:bloom()
}).

-define(DEFAULT_PEERBOOK_CONNECTIONS, 5).
Expand Down Expand Up @@ -88,11 +89,13 @@ init([Sup, TID]) ->
lager:debug("Supported gossip paths: ~p:", [SupportedPaths]),

self() ! start_workers,
{ok, Bloom} = forgetful_bloom:new_for_fp_rate(1000, 1.0e-7, 3, 1000),
{ok, update_metadata(#state{sup=Sup, tid=TID,
seed_nodes=SeedNodes,
max_inbound_connections=InboundCount,
peerbook_connections=PeerBookCount,
seednode_connections=SeedNodeCount,
seen_messages=Bloom,
drop_timeout=DropTimeOut,
drop_timer=schedule_drop_timer(DropTimeOut),
supported_paths=SupportedPaths})}.
Expand Down Expand Up @@ -185,8 +188,10 @@ handle_cast({send, Key, Fun}, State=#state{}) when is_function(Fun, 0) ->
end, Pids),
{noreply, State};

handle_cast({send, Key, Data}, State=#state{}) ->
handle_cast({send, Key, Data}, State=#state{seen_messages=Bloom}) ->
{_, Pids} = lists:unzip(connections(all, State)),
%% track we've seen this now
forgetful_bloom:set(Bloom, {Key, Data}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point ( in the gossip server ) we do not know whether the send will be successful. For example a stream may not yet be associated with the worker

lager:debug("sending data via connection pids: ~p",[Pids]),
lists:foreach(fun(Pid) ->
libp2p_group_worker:send(Pid, Key, Data, true)
Expand Down Expand Up @@ -224,13 +229,14 @@ handle_cast(Msg, State) ->

handle_info(start_workers, State=#state{tid=TID, seednode_connections=SeedCount,
peerbook_connections=PeerCount,
seen_messages=Bloom,
supported_paths = SupportedPaths}) ->
PeerBookWorkers = [start_worker(peerbook, State) || _ <- lists:seq(1, PeerCount)],
SeedWorkers = [start_worker(seed, State) || _ <- lists:seq(1, SeedCount)],

GossipAddFun = fun(Path) ->
libp2p_swarm:add_stream_handler(TID, Path,
{libp2p_gossip_stream, server, [Path, ?MODULE, self()]})
{libp2p_gossip_stream, server, [Path, ?MODULE, self(), Bloom]})
end,
lists:foreach(GossipAddFun, SupportedPaths),
{noreply, State#state{workers=SeedWorkers ++ PeerBookWorkers}};
Expand Down Expand Up @@ -306,15 +312,15 @@ connections(Kind, #state{workers=Workers}) ->
Acc
end, [], Workers).

assign_target(WorkerPid, WorkerRef, TargetAddrs, State=#state{workers=Workers, supported_paths = SupportedPaths}) ->
assign_target(WorkerPid, WorkerRef, TargetAddrs, State=#state{workers=Workers, seen_messages=Bloom, supported_paths = SupportedPaths}) ->
case length(TargetAddrs) of
0 ->
%% the ref is stable across restarts, so use that as the lookup key
case lookup_worker(WorkerRef, #worker.ref, State) of
Worker=#worker{kind=seed, target=SelectedAddr, pid=StoredWorkerPid} when SelectedAddr /= undefined ->
%% don't give up on the seed nodes in case we're entirely offline
%% we need at least one connection to bootstrap the swarm
ClientSpec = {SupportedPaths, {libp2p_gossip_stream, [?MODULE, self()]}},
ClientSpec = {SupportedPaths, {libp2p_gossip_stream, [?MODULE, self(), Bloom]}},
libp2p_group_worker:assign_target(WorkerPid, {SelectedAddr, ClientSpec}),
%% check if this worker got restarted
case WorkerPid /= StoredWorkerPid of
Expand All @@ -330,7 +336,7 @@ assign_target(WorkerPid, WorkerRef, TargetAddrs, State=#state{workers=Workers, s
end;
_ ->
SelectedAddr = mk_multiaddr(lists:nth(rand:uniform(length(TargetAddrs)), TargetAddrs)),
ClientSpec = {SupportedPaths, {libp2p_gossip_stream, [?MODULE, self()]}},
ClientSpec = {SupportedPaths, {libp2p_gossip_stream, [?MODULE, self(), Bloom]}},
libp2p_group_worker:assign_target(WorkerPid, {SelectedAddr, ClientSpec}),
%% the ref is stable across restarts, so use that as the lookup key
case lookup_worker(WorkerRef, #worker.ref, State) of
Expand Down
3 changes: 2 additions & 1 deletion src/libp2p.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
backoff,
relcast,
libp2p_crypto,
throttle
throttle,
erbloom
]},
{env,[]},

Expand Down