From ac144ecbc59db8c52a01db2577af4e83e8a05b77 Mon Sep 17 00:00:00 2001 From: minhd-vu Date: Wed, 30 Oct 2024 11:00:04 -0400 Subject: [PATCH] fix: timestamp --- cmd/p2p/sensor/sensor.go | 3 +- p2p/database/database.go | 12 +++--- p2p/database/datastore.go | 77 +++++++++++++++++++-------------------- p2p/protocol.go | 26 ++++++------- 4 files changed, 57 insertions(+), 61 deletions(-) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 26926179..e30a893d 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -283,8 +283,7 @@ var SensorCmd = &cobra.Command{ if err := removePeerMessages(msgCounter, server.Peers()); err != nil { log.Error().Err(err).Msg("Failed to clean up peer messages") } - firstSeenTime := time.Now() - db.WritePeers(context.Background(), server.Peers(), &firstSeenTime) + db.WritePeers(context.Background(), server.Peers(), time.Now()) case peer := <-opts.Peers: // Lock the peers map before modifying it. peersMutex.Lock() diff --git a/p2p/database/database.go b/p2p/database/database.go index 8e800a12..1a44db76 100644 --- a/p2p/database/database.go +++ b/p2p/database/database.go @@ -18,27 +18,27 @@ import ( type Database interface { // WriteBlock will write the both the block and block event to the database // if ShouldWriteBlocks and ShouldWriteBlockEvents return true, respectively. - WriteBlock(context.Context, *enode.Node, *types.Block, *big.Int, *time.Time) + WriteBlock(context.Context, *enode.Node, *types.Block, *big.Int, time.Time) // WriteBlockHeaders will write the block headers if ShouldWriteBlocks // returns true. - WriteBlockHeaders(context.Context, []*types.Header, *time.Time) + WriteBlockHeaders(context.Context, []*types.Header, time.Time) // WriteBlockHashes will write the block hashes if ShouldWriteBlockEvents // returns true. - WriteBlockHashes(context.Context, *enode.Node, []common.Hash, *time.Time) + WriteBlockHashes(context.Context, *enode.Node, []common.Hash, time.Time) // WriteBlockBodies will write the block bodies if ShouldWriteBlocks returns // true. - WriteBlockBody(context.Context, *eth.BlockBody, common.Hash, *time.Time) + WriteBlockBody(context.Context, *eth.BlockBody, common.Hash, time.Time) // WriteTransactions will write the both the transaction and transaction // event to the database if ShouldWriteTransactions and // ShouldWriteTransactionEvents return true, respectively. - WriteTransactions(context.Context, *enode.Node, []*types.Transaction, *time.Time) + WriteTransactions(context.Context, *enode.Node, []*types.Transaction, time.Time) // WritePeers will write the connected peers to the database. - WritePeers(context.Context, []*p2p.Peer, *time.Time) + WritePeers(context.Context, []*p2p.Peer, time.Time) // HasBlock will return whether the block is in the database. If the database // client has not been initialized this will always return true. diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 692e72bd..143e069e 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -150,7 +150,7 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database { } // WriteBlock writes the block and the block event to datastore. -func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Block, td *big.Int, firstSeenTime *time.Time) { +func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Block, td *big.Int, tfs time.Time) { if d.client == nil { return } @@ -158,7 +158,7 @@ func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *typ if d.ShouldWriteBlockEvents() { d.jobs <- struct{}{} go func() { - d.writeEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, firstSeenTime) + d.writeEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, tfs) <-d.jobs }() } @@ -166,7 +166,7 @@ func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *typ if d.ShouldWriteBlocks() { d.jobs <- struct{}{} go func() { - d.writeBlock(ctx, block, td, firstSeenTime) + d.writeBlock(ctx, block, td, tfs) <-d.jobs }() } @@ -176,7 +176,7 @@ func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *typ // write block events because headers will only be sent to the sensor when // requested. The block events will be written when the hash is received // instead. -func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Header, firstSeenTime *time.Time) { +func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time) { if d.client == nil || !d.ShouldWriteBlocks() { return } @@ -184,7 +184,7 @@ func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Head for _, h := range headers { d.jobs <- struct{}{} go func(header *types.Header) { - d.writeBlockHeader(ctx, header, firstSeenTime) // Pass firstSeenTime + d.writeBlockHeader(ctx, header, tfs) <-d.jobs }(h) } @@ -195,33 +195,33 @@ func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Head // requested. The block events will be written when the hash is received // instead. It will write the uncles and transactions to datastore if they // don't already exist. -func (d *Datastore) WriteBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, firstSeenTime *time.Time) { +func (d *Datastore) WriteBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, tfs time.Time) { if d.client == nil || !d.ShouldWriteBlocks() { return } d.jobs <- struct{}{} go func() { - d.writeBlockBody(ctx, body, hash, firstSeenTime) // Pass firstSeenTime + d.writeBlockBody(ctx, body, hash, tfs) <-d.jobs }() } // WriteBlockHashes will write the block events to datastore. -func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hashes []common.Hash, firstSeenTime *time.Time) { +func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hashes []common.Hash, tfs time.Time) { if d.client == nil || !d.ShouldWriteBlockEvents() || len(hashes) == 0 { return } d.jobs <- struct{}{} go func() { - d.writeEvents(ctx, peer, BlockEventsKind, hashes, BlocksKind, firstSeenTime) // Pass firstSeenTime + d.writeEvents(ctx, peer, BlockEventsKind, hashes, BlocksKind, tfs) <-d.jobs }() } // WriteTransactions will write the transactions and transaction events to datastore. -func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs []*types.Transaction, firstSeenTime *time.Time) { +func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs []*types.Transaction, tfs time.Time) { if d.client == nil { return } @@ -229,7 +229,7 @@ func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs if d.ShouldWriteTransactions() { d.jobs <- struct{}{} go func() { - d.writeTransactions(ctx, txs, firstSeenTime) + d.writeTransactions(ctx, txs, tfs) <-d.jobs }() } @@ -242,14 +242,14 @@ func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs d.jobs <- struct{}{} go func() { - d.writeEvents(ctx, peer, TransactionEventsKind, hashes, TransactionsKind, firstSeenTime) // Pass firstSeenTime + d.writeEvents(ctx, peer, TransactionEventsKind, hashes, TransactionsKind, tfs) <-d.jobs }() } } // WritePeers writes the connected peers to datastore. -func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer, firstSeenTime *time.Time) { +func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer, tls time.Time) { if d.client == nil || !d.ShouldWritePeers() { return } @@ -267,8 +267,8 @@ func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer, firstSeen Caps: peer.Info().Caps, URL: peer.Node().URLv4(), LastSeenBy: d.sensorID, - TimeLastSeen: *firstSeenTime, - TTL: (*firstSeenTime).Add(d.ttl), + TimeLastSeen: tls, + TTL: tls.Add(d.ttl), }) } @@ -319,8 +319,7 @@ func (d *Datastore) HasBlock(ctx context.Context, hash common.Hash) bool { // newDatastoreHeader creates a DatastoreHeader from a types.Header. Some // values are converted into strings to prevent a loss of precision. -func (d *Datastore) newDatastoreHeader(header *types.Header, firstSeenTime *time.Time) *DatastoreHeader { - +func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time) *DatastoreHeader { return &DatastoreHeader{ ParentHash: datastore.NameKey(BlocksKind, header.ParentHash.Hex(), nil), UncleHash: header.UncleHash.Hex(), @@ -338,14 +337,14 @@ func (d *Datastore) newDatastoreHeader(header *types.Header, firstSeenTime *time MixDigest: header.MixDigest.String(), Nonce: fmt.Sprint(header.Nonce.Uint64()), BaseFee: header.BaseFee.String(), - TimeFirstSeen: *firstSeenTime, - TTL: (*firstSeenTime).Add(d.ttl), + TimeFirstSeen: tfs, + TTL: tfs.Add(d.ttl), } } // newDatastoreTransaction creates a DatastoreTransaction from a types.Transaction. Some // values are converted into strings to prevent a loss of precision. -func (d *Datastore) newDatastoreTransaction(tx *types.Transaction, firstSeenTime *time.Time) *DatastoreTransaction { +func (d *Datastore) newDatastoreTransaction(tx *types.Transaction, tfs time.Time) *DatastoreTransaction { v, r, s := tx.RawSignatureValues() var from, to string @@ -372,13 +371,13 @@ func (d *Datastore) newDatastoreTransaction(tx *types.Transaction, firstSeenTime R: r.String(), S: s.String(), Time: tx.Time(), - TimeFirstSeen: *firstSeenTime, - TTL: (*firstSeenTime).Add(d.ttl), + TimeFirstSeen: tfs, + TTL: tfs.Add(d.ttl), Type: int16(tx.Type()), } } -func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.Int, firstSeenTime *time.Time) { +func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.Int, tfs time.Time) { key := datastore.NameKey(BlocksKind, block.Hash().Hex(), nil) _, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error { @@ -391,7 +390,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. if dsBlock.DatastoreHeader == nil { shouldWrite = true - dsBlock.DatastoreHeader = d.newDatastoreHeader(block.Header(), firstSeenTime) + dsBlock.DatastoreHeader = d.newDatastoreHeader(block.Header(), tfs) } if len(dsBlock.TotalDifficulty) == 0 { @@ -402,7 +401,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. if dsBlock.Transactions == nil && len(block.Transactions()) > 0 { shouldWrite = true if d.shouldWriteTransactions { - d.writeTransactions(ctx, block.Transactions(), firstSeenTime) + d.writeTransactions(ctx, block.Transactions(), tfs) } dsBlock.Transactions = make([]*datastore.Key, 0, len(block.Transactions())) @@ -415,7 +414,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. shouldWrite = true dsBlock.Uncles = make([]*datastore.Key, 0, len(block.Uncles())) for _, uncle := range block.Uncles() { - d.writeBlockHeader(ctx, uncle, firstSeenTime) + d.writeBlockHeader(ctx, uncle, tfs) dsBlock.Uncles = append(dsBlock.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil)) } } @@ -435,15 +434,15 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. // writeEvent writes either a block or transaction event to datastore depending // on the provided eventKind and hashKind. -func (d *Datastore) writeEvent(peer *enode.Node, eventKind string, hash common.Hash, hashKind string, firstSeenTime *time.Time) { +func (d *Datastore) writeEvent(peer *enode.Node, eventKind string, hash common.Hash, hashKind string, tfs time.Time) { key := datastore.IncompleteKey(eventKind, nil) event := DatastoreEvent{ SensorId: d.sensorID, PeerId: peer.URLv4(), Hash: datastore.NameKey(hashKind, hash.Hex(), nil), - Time: *firstSeenTime, // Use the firstSeenTime here - TTL: (*firstSeenTime).Add(d.ttl), // Use firstSeenTime for TTL as well + Time: tfs, + TTL: tfs.Add(d.ttl), } if _, err := d.client.Put(context.Background(), key, &event); err != nil { log.Error().Err(err).Msgf("Failed to write to %v", eventKind) @@ -453,7 +452,7 @@ func (d *Datastore) writeEvent(peer *enode.Node, eventKind string, hash common.H // writeEvents writes either block or transaction events to datastore depending // on the provided eventKind and hashKind. This is similar to writeEvent but // batches the request. -func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind string, hashes []common.Hash, hashKind string, firstSeenTime *time.Time) { +func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind string, hashes []common.Hash, hashKind string, tfs time.Time) { keys := make([]*datastore.Key, 0, len(hashes)) events := make([]*DatastoreEvent, 0, len(hashes)) @@ -464,8 +463,8 @@ func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind SensorId: d.sensorID, PeerId: peer.URLv4(), Hash: datastore.NameKey(hashKind, hash.Hex(), nil), - Time: *firstSeenTime, // Use the firstSeenTime here - TTL: (*firstSeenTime).Add(d.ttl), // Use firstSeenTime for TTL as well + Time: tfs, + TTL: tfs.Add(d.ttl), } events = append(events, &event) } @@ -477,7 +476,7 @@ func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind // writeBlockHeader will write the block header to datastore if it doesn't // exist. -func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, firstSeenTime *time.Time) { +func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, tfs time.Time) { key := datastore.NameKey(BlocksKind, header.Hash().Hex(), nil) _, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error { @@ -486,7 +485,7 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, return nil } - block.DatastoreHeader = d.newDatastoreHeader(header, firstSeenTime) + block.DatastoreHeader = d.newDatastoreHeader(header, tfs) _, err := tx.Put(key, &block) return err }, datastore.MaxAttempts(MaxAttempts)) @@ -496,7 +495,7 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, } } -func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, firstSeenTime *time.Time) { +func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, tfs time.Time) { key := datastore.NameKey(BlocksKind, hash.Hex(), nil) _, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error { @@ -510,7 +509,7 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has if block.Transactions == nil && len(body.Transactions) > 0 { shouldWrite = true if d.shouldWriteTransactions { - d.writeTransactions(ctx, body.Transactions, firstSeenTime) + d.writeTransactions(ctx, body.Transactions, tfs) } block.Transactions = make([]*datastore.Key, 0, len(body.Transactions)) @@ -523,7 +522,7 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has shouldWrite = true block.Uncles = make([]*datastore.Key, 0, len(body.Uncles)) for _, uncle := range body.Uncles { - d.writeBlockHeader(ctx, uncle, firstSeenTime) + d.writeBlockHeader(ctx, uncle, tfs) block.Uncles = append(block.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil)) } } @@ -543,13 +542,13 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has // writeTransactions will write the transactions to datastore and return the // transaction hashes. -func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transaction, firstSeenTime *time.Time) { +func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transaction, tfs time.Time) { keys := make([]*datastore.Key, 0, len(txs)) transactions := make([]*DatastoreTransaction, 0, len(txs)) for _, tx := range txs { keys = append(keys, datastore.NameKey(TransactionsKind, tx.Hash().Hex(), nil)) - transactions = append(transactions, d.newDatastoreTransaction(tx, firstSeenTime)) + transactions = append(transactions, d.newDatastoreTransaction(tx, tfs)) } if _, err := d.client.PutMulti(ctx, keys, transactions); err != nil { diff --git a/p2p/protocol.go b/p2p/protocol.go index 8f0f714b..551b2559 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -300,7 +300,7 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { return err } - firstSeenTime := time.Now() // Capture firstSeenTime + tfs := time.Now() c.counter.WithLabelValues(packet.Name(), c.node.URLv4(), c.name).Add(float64(len(packet))) @@ -312,7 +312,7 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { } } - c.db.WriteBlockHashes(ctx, c.node, hashes, &firstSeenTime) // Pass firstSeenTime + c.db.WriteBlockHashes(ctx, c.node, hashes, tfs) return nil } @@ -323,11 +323,11 @@ func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error { return err } - firstSeenTime := time.Now() // Capture firstSeenTime + tfs := time.Now() c.counter.WithLabelValues(txs.Name(), c.node.URLv4(), c.name).Add(float64(len(txs))) - c.db.WriteTransactions(ctx, c.node, txs, &firstSeenTime) // Pass firstSeenTime + c.db.WriteTransactions(ctx, c.node, txs, tfs) return nil } @@ -353,7 +353,7 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error { return err } - firstSeenTime := time.Now() // Capture the timestamp + tfs := time.Now() headers := packet.BlockHeadersRequest c.counter.WithLabelValues(packet.Name(), c.node.URLv4(), c.name).Add(float64(len(headers))) @@ -364,7 +364,7 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error { } } - c.db.WriteBlockHeaders(ctx, headers, &firstSeenTime) // Pass firstSeenTime + c.db.WriteBlockHeaders(ctx, headers, tfs) return nil } @@ -389,7 +389,7 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error { return err } - firstSeenTime := time.Now() // Capture firstSeenTime + tfs := time.Now() if len(packet.BlockBodiesResponse) == 0 { return nil @@ -413,7 +413,7 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error { return nil } - c.db.WriteBlockBody(ctx, packet.BlockBodiesResponse[0], *hash, &firstSeenTime) // Pass firstSeenTime + c.db.WriteBlockBody(ctx, packet.BlockBodiesResponse[0], *hash, tfs) return nil } @@ -424,8 +424,7 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { return err } - // Capture the exact time the block was first seen. - firstSeenTime := time.Now() + tfs := time.Now() c.counter.WithLabelValues(block.Name(), c.node.URLv4(), c.name).Inc() @@ -446,8 +445,7 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { return err } - // Pass the firstSeenTime to WriteBlock. - c.db.WriteBlock(ctx, c.node, block.Block, block.TD, &firstSeenTime) + c.db.WriteBlock(ctx, c.node, block.Block, block.TD, tfs) return nil } @@ -501,11 +499,11 @@ func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) err return err } - firstSeenTime := time.Now() // Capture firstSeenTime + tfs := time.Now() c.counter.WithLabelValues(packet.Name(), c.node.URLv4(), c.name).Add(float64(len(packet.PooledTransactionsResponse))) - c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse, &firstSeenTime) // Pass firstSeenTime + c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse, tfs) return nil }