diff --git a/rebar.config b/rebar.config index fa46d420..aef0e496 100644 --- a/rebar.config +++ b/rebar.config @@ -31,7 +31,8 @@ {cuttlefish, ".*", {git, "https://github.com/helium/cuttlefish", {branch, "develop"}}}, {inet_ext, ".*", {git, "https://github.com/benoitc/inet_ext", {branch, "master"}}}, {splicer, ".*", {git, "https://github.com/helium/erlang-splicer.git", {branch, "master"}}}, - {relcast, ".*", {git, "https://github.com/helium/relcast.git", {branch, "master"}}} + {relcast, ".*", {git, "https://github.com/helium/relcast.git", {branch, "master"}}}, + {erbloom, ".*", {git, "https://github.com/Vagabond/erbloom.git", {branch, "adt/forgetful-bloom"}}} ]}. {erl_opts, [ diff --git a/rebar.lock b/rebar.lock index 94425334..19aaf819 100644 --- a/rebar.lock +++ b/rebar.lock @@ -7,6 +7,10 @@ 0}, {<<"ecc_compact">>,{pkg,<<"ecc_compact">>,<<"1.0.4">>},0}, {<<"enacl">>,{pkg,<<"enacl">>,<<"0.17.2">>},0}, + {<<"erbloom">>, + {git,"https://github.com/Vagabond/erbloom.git", + {ref,"cdbde894836b72901c245a34b7e9a3428d8db77c"}}, + 0}, {<<"erl_base58">>,{pkg,<<"erl_base58">>,<<"0.0.1">>},0}, {<<"getopt">>,{pkg,<<"getopt">>,<<"1.0.1">>},1}, {<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1}, diff --git a/src/group/libp2p_gossip_stream.erl b/src/group/libp2p_gossip_stream.erl index 22701f3f..0f7c7d7b 100644 --- a/src/group/libp2p_gossip_stream.erl +++ b/src/group/libp2p_gossip_stream.erl @@ -21,7 +21,9 @@ { connection :: libp2p_connection:connection(), handler_module :: atom(), handler_state :: any(), - path :: any() + path :: any(), + global_bloom :: forgetful_bloom:bloom(), + local_bloom :: forgetful_bloom:bloom() }). %% API @@ -45,7 +47,7 @@ client(Connection, Args) -> server(Connection, _Path, _TID, Args) -> libp2p_framed_stream:server(?MODULE, Connection, Args). -init(server, Connection, [Path, HandlerModule, HandlerState]) -> +init(server, Connection, [Path, HandlerModule, HandlerState, Bloom]) -> lager:debug("initiating server with path ~p", [Path]), {ok, Session} = libp2p_connection:session(Connection), %% Catch errors from the handler module in accepting a stream. The @@ -53,9 +55,13 @@ init(server, Connection, [Path, HandlerModule, HandlerState]) -> %% ordering of the shutdown will cause the accept below to crash %% noisily in the logs. This catch avoids that noise case (catch HandlerModule:accept_stream(HandlerState, Session, self(), Path)) of - ok -> {ok, #state{connection=Connection, + ok -> + {ok, LocalBloom} = forgetful_bloom:new_for_fp_rate(1000, 1.0e-7, 3, 1000), + {ok, #state{connection=Connection, handler_module=HandlerModule, handler_state=HandlerState, + global_bloom=Bloom, + local_bloom=LocalBloom, path=Path}}; {error, too_many} -> {stop, normal}; @@ -67,21 +73,36 @@ init(server, Connection, [Path, HandlerModule, HandlerState]) -> [error_logger_lager_h:format_reason(Exit)]), {stop, normal} end; -init(client, Connection, [Path, HandlerModule, HandlerState]) -> +init(client, Connection, [Path, HandlerModule, HandlerState, Bloom]) -> lager:debug("initiating client with path ~p", [Path]), + {ok, LocalBloom} = forgetful_bloom:new_for_fp_rate(1000, 1.0e-7, 3, 1000), {ok, #state{connection=Connection, handler_module=HandlerModule, handler_state=HandlerState, + global_bloom=Bloom, + local_bloom=LocalBloom, path=Path}}. handle_data(_, Data, State=#state{handler_module=HandlerModule, handler_state=HandlerState, + local_bloom=LocalBloom, + global_bloom=GlobalBloom, path=Path}) -> #libp2p_gossip_frame_pb{key=Key, data=Bin} = libp2p_gossip_pb:decode_msg(Data, libp2p_gossip_frame_pb), lager:debug("gossip received for handler ~p and key ~p via path ~p with payload ~p",[HandlerModule, Key, Path, Bin]), - ok = HandlerModule:handle_data(HandlerState, self(), Key, - {Path, apply_path_decode(Path, Bin)}), + Decoded = apply_path_decode(Path, Bin), + %% use or here because we don't want to short circuit + case forgetful_bloom:set(LocalBloom, {Key, Decoded}) or forgetful_bloom:set(GlobalBloom, {Key, Decoded}) of + true -> + %% TODO this might mess with ARP + %% either we've already sent this to this peer, they've sent it to us, or + %% someone else has sent it to us. In any case there's no need to handle it. + ok; + false -> + ok = HandlerModule:handle_data(HandlerState, self(), Key, + {Path, Decoded}) + end, {noreply, State}. diff --git a/src/group/libp2p_group_gossip_server.erl b/src/group/libp2p_group_gossip_server.erl index a114177f..5d13db56 100644 --- a/src/group/libp2p_group_gossip_server.erl +++ b/src/group/libp2p_group_gossip_server.erl @@ -30,7 +30,8 @@ handlers=#{} :: #{string() => libp2p_group_gossip:handler()}, drop_timeout :: pos_integer(), drop_timer :: reference(), - supported_paths :: [string()] + supported_paths :: [string()], + seen_messages :: forgetful_bloom:bloom() }). -define(DEFAULT_PEERBOOK_CONNECTIONS, 5). @@ -88,11 +89,13 @@ init([Sup, TID]) -> lager:debug("Supported gossip paths: ~p:", [SupportedPaths]), self() ! start_workers, + {ok, Bloom} = forgetful_bloom:new_for_fp_rate(1000, 1.0e-7, 3, 1000), {ok, update_metadata(#state{sup=Sup, tid=TID, seed_nodes=SeedNodes, max_inbound_connections=InboundCount, peerbook_connections=PeerBookCount, seednode_connections=SeedNodeCount, + seen_messages=Bloom, drop_timeout=DropTimeOut, drop_timer=schedule_drop_timer(DropTimeOut), supported_paths=SupportedPaths})}. @@ -185,8 +188,10 @@ handle_cast({send, Key, Fun}, State=#state{}) when is_function(Fun, 0) -> end, Pids), {noreply, State}; -handle_cast({send, Key, Data}, State=#state{}) -> +handle_cast({send, Key, Data}, State=#state{seen_messages=Bloom}) -> {_, Pids} = lists:unzip(connections(all, State)), + %% track we've seen this now + forgetful_bloom:set(Bloom, {Key, Data}), lager:debug("sending data via connection pids: ~p",[Pids]), lists:foreach(fun(Pid) -> libp2p_group_worker:send(Pid, Key, Data, true) @@ -224,13 +229,14 @@ handle_cast(Msg, State) -> handle_info(start_workers, State=#state{tid=TID, seednode_connections=SeedCount, peerbook_connections=PeerCount, + seen_messages=Bloom, supported_paths = SupportedPaths}) -> PeerBookWorkers = [start_worker(peerbook, State) || _ <- lists:seq(1, PeerCount)], SeedWorkers = [start_worker(seed, State) || _ <- lists:seq(1, SeedCount)], GossipAddFun = fun(Path) -> libp2p_swarm:add_stream_handler(TID, Path, - {libp2p_gossip_stream, server, [Path, ?MODULE, self()]}) + {libp2p_gossip_stream, server, [Path, ?MODULE, self(), Bloom]}) end, lists:foreach(GossipAddFun, SupportedPaths), {noreply, State#state{workers=SeedWorkers ++ PeerBookWorkers}}; @@ -306,7 +312,7 @@ connections(Kind, #state{workers=Workers}) -> Acc end, [], Workers). -assign_target(WorkerPid, WorkerRef, TargetAddrs, State=#state{workers=Workers, supported_paths = SupportedPaths}) -> +assign_target(WorkerPid, WorkerRef, TargetAddrs, State=#state{workers=Workers, seen_messages=Bloom, supported_paths = SupportedPaths}) -> case length(TargetAddrs) of 0 -> %% the ref is stable across restarts, so use that as the lookup key @@ -314,7 +320,7 @@ assign_target(WorkerPid, WorkerRef, TargetAddrs, State=#state{workers=Workers, s Worker=#worker{kind=seed, target=SelectedAddr, pid=StoredWorkerPid} when SelectedAddr /= undefined -> %% don't give up on the seed nodes in case we're entirely offline %% we need at least one connection to bootstrap the swarm - ClientSpec = {SupportedPaths, {libp2p_gossip_stream, [?MODULE, self()]}}, + ClientSpec = {SupportedPaths, {libp2p_gossip_stream, [?MODULE, self(), Bloom]}}, libp2p_group_worker:assign_target(WorkerPid, {SelectedAddr, ClientSpec}), %% check if this worker got restarted case WorkerPid /= StoredWorkerPid of @@ -330,7 +336,7 @@ assign_target(WorkerPid, WorkerRef, TargetAddrs, State=#state{workers=Workers, s end; _ -> SelectedAddr = mk_multiaddr(lists:nth(rand:uniform(length(TargetAddrs)), TargetAddrs)), - ClientSpec = {SupportedPaths, {libp2p_gossip_stream, [?MODULE, self()]}}, + ClientSpec = {SupportedPaths, {libp2p_gossip_stream, [?MODULE, self(), Bloom]}}, libp2p_group_worker:assign_target(WorkerPid, {SelectedAddr, ClientSpec}), %% the ref is stable across restarts, so use that as the lookup key case lookup_worker(WorkerRef, #worker.ref, State) of diff --git a/src/libp2p.app.src b/src/libp2p.app.src index bfb095d6..7e9946d1 100644 --- a/src/libp2p.app.src +++ b/src/libp2p.app.src @@ -33,7 +33,8 @@ backoff, relcast, libp2p_crypto, - throttle + throttle, + erbloom ]}, {env,[]},