From 3e0689461f109dbc4fb137687142cacdf2563eb5 Mon Sep 17 00:00:00 2001 From: minhd-vu Date: Thu, 24 Oct 2024 10:26:15 -0400 Subject: [PATCH 1/7] chore: cleanup --- cmd/p2p/sensor/sensor.go | 8 ++++---- p2p/database/datastore.go | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 8cb60d79..346f96f9 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -260,7 +260,7 @@ var SensorCmd = &cobra.Command{ signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) peers := make(map[enode.ID]string) - var peersMutex sync.Mutex + var peersMutex sync.RWMutex for _, node := range inputSensorParams.nodes { // Map node URLs to node IDs to avoid duplicates @@ -389,7 +389,7 @@ func handleAPI(server *ethp2p.Server, counter *prometheus.CounterVec) { // handleDNSDiscovery performs DNS-based peer discovery and adds new peers to // the p2p server. It syncs the DNS discovery tree and adds any newly discovered // peers not already in the peers map. -func handleDNSDiscovery(server *ethp2p.Server, peers map[enode.ID]string, peersMutex *sync.Mutex) { +func handleDNSDiscovery(server *ethp2p.Server, peers map[enode.ID]string, peersMutex *sync.RWMutex) { if len(inputSensorParams.DiscoveryDNS) == 0 { return } @@ -411,8 +411,8 @@ func handleDNSDiscovery(server *ethp2p.Server, peers map[enode.ID]string, peersM Msg("Successfully synced DNS discovery tree") // Lock the peers map and server operations - peersMutex.Lock() - defer peersMutex.Unlock() + peersMutex.RLock() + defer peersMutex.RUnlock() // Add DNS-discovered peers for _, node := range tree.Nodes() { diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 7fb86de8..03de3b85 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -107,7 +107,7 @@ type DatastoreTransaction struct { type DatastorePeer struct { Name string - Caps string + Caps []string URL string LastSeenBy string TimeLastSeen time.Time @@ -266,7 +266,7 @@ func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer) { keys = append(keys, datastore.NameKey(PeersKind, peer.ID().String(), nil)) dsPeers = append(dsPeers, &DatastorePeer{ Name: peer.Fullname(), - Caps: strings.Join(peer.Info().Caps, ","), + Caps: peer.Info().Caps, URL: peer.Node().URLv4(), LastSeenBy: d.sensorID, TimeLastSeen: now, From 6b1264158f9631b8b1c052405d3eddc7f6c00d25 Mon Sep 17 00:00:00 2001 From: minhd-vu Date: Thu, 24 Oct 2024 10:26:33 -0400 Subject: [PATCH 2/7] fix import --- p2p/database/datastore.go | 1 - 1 file changed, 1 deletion(-) diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 03de3b85..e91dd13c 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math/big" - "strings" "time" "cloud.google.com/go/datastore" From e20f4ae557d22973c501b0b690c13e83353a0a32 Mon Sep 17 00:00:00 2001 From: minhd-vu Date: Thu, 24 Oct 2024 10:58:48 -0400 Subject: [PATCH 3/7] fix logic --- cmd/p2p/sensor/sensor.go | 40 +++++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 346f96f9..59c27d13 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -251,26 +251,30 @@ var SensorCmd = &cobra.Command{ sub := server.SubscribeEvents(events) defer sub.Unsubscribe() - ticker := time.NewTicker(2 * time.Second) // Ticker for recurring tasks every 2 seconds - hourlyTicker := time.NewTicker(time.Hour) // Ticker for running DNS discovery every hour + ticker := time.NewTicker(2 * time.Second) // Ticker for recurring tasks every 2 seconds. + hourlyTicker := time.NewTicker(time.Hour) // Ticker for running DNS discovery every hour. defer ticker.Stop() defer hourlyTicker.Stop() signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) + // peers represents the peer map that is used to write to the nodes.json + // file. This is helpful when restarting the node with the --quickstart flag + // enabled. This map does not represent the peers that are currently + // connected to the sensor. To do that use `server.Peers()` instead. peers := make(map[enode.ID]string) - var peersMutex sync.RWMutex + var peersMutex sync.Mutex for _, node := range inputSensorParams.nodes { - // Map node URLs to node IDs to avoid duplicates + // Map node URLs to node IDs to avoid duplicates. peers[node.ID()] = node.URLv4() } go handleAPI(&server, msgCounter) - // Run DNS discovery immediately at startup - go handleDNSDiscovery(&server, peers, &peersMutex) + // Run DNS discovery immediately at startup. + go handleDNSDiscovery(&server) for { select { @@ -282,9 +286,9 @@ var SensorCmd = &cobra.Command{ db.WritePeers(context.Background(), server.Peers()) case peer := <-opts.Peers: - // Lock the peers map before modifying it + // Lock the peers map before modifying it. peersMutex.Lock() - // Update the peer list and the nodes file + // Update the peer list and the nodes file. if _, ok := peers[peer.ID()]; !ok { peers[peer.ID()] = peer.URLv4() @@ -294,7 +298,7 @@ var SensorCmd = &cobra.Command{ } peersMutex.Unlock() case <-hourlyTicker.C: - go handleDNSDiscovery(&server, peers, &peersMutex) + go handleDNSDiscovery(&server) case <-signals: // This gracefully stops the sensor so that the peers can be written to // the nodes file. @@ -389,7 +393,7 @@ func handleAPI(server *ethp2p.Server, counter *prometheus.CounterVec) { // handleDNSDiscovery performs DNS-based peer discovery and adds new peers to // the p2p server. It syncs the DNS discovery tree and adds any newly discovered // peers not already in the peers map. -func handleDNSDiscovery(server *ethp2p.Server, peers map[enode.ID]string, peersMutex *sync.RWMutex) { +func handleDNSDiscovery(server *ethp2p.Server) { if len(inputSensorParams.DiscoveryDNS) == 0 { return } @@ -405,26 +409,28 @@ func handleDNSDiscovery(server *ethp2p.Server, peers map[enode.ID]string, peersM return } - // Log the number of nodes in the tree + // Log the number of nodes in the tree. log.Info(). Int("unique_nodes", len(tree.Nodes())). Msg("Successfully synced DNS discovery tree") - // Lock the peers map and server operations - peersMutex.RLock() - defer peersMutex.RUnlock() + // Create a map of all the currently connected peers. + peers := make(map[enode.ID]struct{}) + for _, peer := range server.Peers() { + peers[peer.ID()] = struct{}{} + } - // Add DNS-discovered peers + // Add DNS-discovered peers. for _, node := range tree.Nodes() { if _, ok := peers[node.ID()]; ok { - continue + continue // Skip the peer if the sensor is already connected to it. } log.Debug(). Str("enode", node.URLv4()). Msg("Discovered new peer through DNS") - // Instruct server to connect to the new peer + // Instruct server to connect to the new peer. server.AddPeer(node) } From 7344c12e4d69997399f4ea87e965d7f780ea0973 Mon Sep 17 00:00:00 2001 From: minhd-vu Date: Thu, 24 Oct 2024 11:29:41 -0400 Subject: [PATCH 4/7] fix: simplify logic --- cmd/p2p/sensor/sensor.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 59c27d13..e9b2b829 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -414,23 +414,15 @@ func handleDNSDiscovery(server *ethp2p.Server) { Int("unique_nodes", len(tree.Nodes())). Msg("Successfully synced DNS discovery tree") - // Create a map of all the currently connected peers. - peers := make(map[enode.ID]struct{}) - for _, peer := range server.Peers() { - peers[peer.ID()] = struct{}{} - } - // Add DNS-discovered peers. for _, node := range tree.Nodes() { - if _, ok := peers[node.ID()]; ok { - continue // Skip the peer if the sensor is already connected to it. - } - log.Debug(). Str("enode", node.URLv4()). - Msg("Discovered new peer through DNS") + Msg("Discovered peer through DNS") - // Instruct server to connect to the new peer. + // Add the peer to the static node set. The server itself handles whether to + // connect to the peer if it's already connected. If a node is part of the + // static peer set, the server will handle reconnecting after disconnects. server.AddPeer(node) } From 7068db51bdbc3e821e1396fc862a9e6ad3846c7e Mon Sep 17 00:00:00 2001 From: dan moore Date: Mon, 28 Oct 2024 11:22:10 +0100 Subject: [PATCH 5/7] add peer count info logs --- cmd/p2p/sensor/sensor.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index e9b2b829..59d04a34 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -272,6 +272,9 @@ var SensorCmd = &cobra.Command{ } go handleAPI(&server, msgCounter) + + // Periodically check the peer count + go checkPeerCount(&server) // Run DNS discovery immediately at startup. go handleDNSDiscovery(&server) @@ -390,6 +393,21 @@ func handleAPI(server *ethp2p.Server, counter *prometheus.CounterVec) { } } +func checkPeerCount(server *ethp2p.Server) { + for { + // Get the current number of peers + peerCount := server.Peers() + + // Log the current peer count + log.Info(). + Int("peer_count", len(peerCount)). + Msg("Current number of peers") + + // Sleep for 30 seconds before checking again + time.Sleep(30 * time.Second) + } +} + // handleDNSDiscovery performs DNS-based peer discovery and adds new peers to // the p2p server. It syncs the DNS discovery tree and adds any newly discovered // peers not already in the peers map. From d8b9491f034e0819106fc95fab17c1e25dcd6b36 Mon Sep 17 00:00:00 2001 From: dan moore Date: Mon, 28 Oct 2024 11:30:21 +0100 Subject: [PATCH 6/7] bump max concurrency --- cmd/p2p/sensor/sensor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 59d04a34..52a60895 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -552,7 +552,7 @@ func init() { log.Error().Err(err).Msg("Failed to mark sensor-id as required persistent flag") } SensorCmd.Flags().IntVarP(&inputSensorParams.MaxPeers, "max-peers", "m", 2000, "Maximum number of peers to connect to") - SensorCmd.Flags().IntVarP(&inputSensorParams.MaxDatabaseConcurrency, "max-db-concurrency", "D", 10000, + SensorCmd.Flags().IntVarP(&inputSensorParams.MaxDatabaseConcurrency, "max-db-concurrency", "D", 20000, `Maximum number of concurrent database operations to perform. Increasing this will result in less chance of missing data (i.e. broken pipes) but can significantly increase memory usage.`) From 140e5fcfe36a1892c3b59fae80f9492e2a600877 Mon Sep 17 00:00:00 2001 From: dan moore Date: Mon, 28 Oct 2024 11:42:18 +0100 Subject: [PATCH 7/7] fix make gen --- doc/polycli_p2p_sensor.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/polycli_p2p_sensor.md b/doc/polycli_p2p_sensor.md index 20ccf3a9..9b75b6c1 100644 --- a/doc/polycli_p2p_sensor.md +++ b/doc/polycli_p2p_sensor.md @@ -37,7 +37,7 @@ If no nodes.json file exists, it will be created. -k, --key-file string Private key file (cannot be set with --key) -D, --max-db-concurrency int Maximum number of concurrent database operations to perform. Increasing this will result in less chance of missing data (i.e. broken pipes) but can - significantly increase memory usage. (default 10000) + significantly increase memory usage. (default 20000) -m, --max-peers int Maximum number of peers to connect to (default 2000) --nat string NAT port mapping mechanism (any|none|upnp|pmp|pmp:|extip:) (default "any") -n, --network-id uint Filter discovered nodes by this network ID