Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: time first seen timestamps #419

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/p2p/sensor/sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,7 @@ var SensorCmd = &cobra.Command{
if err := removePeerMessages(msgCounter, server.Peers()); err != nil {
log.Error().Err(err).Msg("Failed to clean up peer messages")
}
firstSeenTime := time.Now()
db.WritePeers(context.Background(), server.Peers(), &firstSeenTime)
db.WritePeers(context.Background(), server.Peers(), time.Now())
case peer := <-opts.Peers:
// Lock the peers map before modifying it.
peersMutex.Lock()
Expand Down
12 changes: 6 additions & 6 deletions p2p/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,27 @@ import (
type Database interface {
// WriteBlock will write the both the block and block event to the database
// if ShouldWriteBlocks and ShouldWriteBlockEvents return true, respectively.
WriteBlock(context.Context, *enode.Node, *types.Block, *big.Int, *time.Time)
WriteBlock(context.Context, *enode.Node, *types.Block, *big.Int, time.Time)

// WriteBlockHeaders will write the block headers if ShouldWriteBlocks
// returns true.
WriteBlockHeaders(context.Context, []*types.Header, *time.Time)
WriteBlockHeaders(context.Context, []*types.Header, time.Time)

// WriteBlockHashes will write the block hashes if ShouldWriteBlockEvents
// returns true.
WriteBlockHashes(context.Context, *enode.Node, []common.Hash, *time.Time)
WriteBlockHashes(context.Context, *enode.Node, []common.Hash, time.Time)

// WriteBlockBodies will write the block bodies if ShouldWriteBlocks returns
// true.
WriteBlockBody(context.Context, *eth.BlockBody, common.Hash, *time.Time)
WriteBlockBody(context.Context, *eth.BlockBody, common.Hash, time.Time)

// WriteTransactions will write the both the transaction and transaction
// event to the database if ShouldWriteTransactions and
// ShouldWriteTransactionEvents return true, respectively.
WriteTransactions(context.Context, *enode.Node, []*types.Transaction, *time.Time)
WriteTransactions(context.Context, *enode.Node, []*types.Transaction, time.Time)

// WritePeers will write the connected peers to the database.
WritePeers(context.Context, []*p2p.Peer, *time.Time)
WritePeers(context.Context, []*p2p.Peer, time.Time)

// HasBlock will return whether the block is in the database. If the database
// client has not been initialized this will always return true.
Expand Down
77 changes: 38 additions & 39 deletions p2p/database/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,23 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database {
}

// WriteBlock writes the block and the block event to datastore.
func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Block, td *big.Int, firstSeenTime *time.Time) {
func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Block, td *big.Int, tfs time.Time) {
if d.client == nil {
return
}

if d.ShouldWriteBlockEvents() {
d.jobs <- struct{}{}
go func() {
d.writeEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, firstSeenTime)
d.writeEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, tfs)
<-d.jobs
}()
}

if d.ShouldWriteBlocks() {
d.jobs <- struct{}{}
go func() {
d.writeBlock(ctx, block, td, firstSeenTime)
d.writeBlock(ctx, block, td, tfs)
<-d.jobs
}()
}
Expand All @@ -176,15 +176,15 @@ func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *typ
// write block events because headers will only be sent to the sensor when
// requested. The block events will be written when the hash is received
// instead.
func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Header, firstSeenTime *time.Time) {
func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time) {
if d.client == nil || !d.ShouldWriteBlocks() {
return
}

for _, h := range headers {
d.jobs <- struct{}{}
go func(header *types.Header) {
d.writeBlockHeader(ctx, header, firstSeenTime) // Pass firstSeenTime
d.writeBlockHeader(ctx, header, tfs)
<-d.jobs
}(h)
}
Expand All @@ -195,41 +195,41 @@ func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Head
// requested. The block events will be written when the hash is received
// instead. It will write the uncles and transactions to datastore if they
// don't already exist.
func (d *Datastore) WriteBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, firstSeenTime *time.Time) {
func (d *Datastore) WriteBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, tfs time.Time) {
if d.client == nil || !d.ShouldWriteBlocks() {
return
}

d.jobs <- struct{}{}
go func() {
d.writeBlockBody(ctx, body, hash, firstSeenTime) // Pass firstSeenTime
d.writeBlockBody(ctx, body, hash, tfs)
<-d.jobs
}()
}

// WriteBlockHashes will write the block events to datastore.
func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hashes []common.Hash, firstSeenTime *time.Time) {
func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hashes []common.Hash, tfs time.Time) {
if d.client == nil || !d.ShouldWriteBlockEvents() || len(hashes) == 0 {
return
}

d.jobs <- struct{}{}
go func() {
d.writeEvents(ctx, peer, BlockEventsKind, hashes, BlocksKind, firstSeenTime) // Pass firstSeenTime
d.writeEvents(ctx, peer, BlockEventsKind, hashes, BlocksKind, tfs)
<-d.jobs
}()
}

// WriteTransactions will write the transactions and transaction events to datastore.
func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs []*types.Transaction, firstSeenTime *time.Time) {
func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs []*types.Transaction, tfs time.Time) {
if d.client == nil {
return
}

if d.ShouldWriteTransactions() {
d.jobs <- struct{}{}
go func() {
d.writeTransactions(ctx, txs, firstSeenTime)
d.writeTransactions(ctx, txs, tfs)
<-d.jobs
}()
}
Expand All @@ -242,14 +242,14 @@ func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs

d.jobs <- struct{}{}
go func() {
d.writeEvents(ctx, peer, TransactionEventsKind, hashes, TransactionsKind, firstSeenTime) // Pass firstSeenTime
d.writeEvents(ctx, peer, TransactionEventsKind, hashes, TransactionsKind, tfs)
<-d.jobs
}()
}
}

// WritePeers writes the connected peers to datastore.
func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer, firstSeenTime *time.Time) {
func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer, tls time.Time) {
if d.client == nil || !d.ShouldWritePeers() {
return
}
Expand All @@ -267,8 +267,8 @@ func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer, firstSeen
Caps: peer.Info().Caps,
URL: peer.Node().URLv4(),
LastSeenBy: d.sensorID,
TimeLastSeen: *firstSeenTime,
TTL: (*firstSeenTime).Add(d.ttl),
TimeLastSeen: tls,
TTL: tls.Add(d.ttl),
})
}

Expand Down Expand Up @@ -319,8 +319,7 @@ func (d *Datastore) HasBlock(ctx context.Context, hash common.Hash) bool {

// newDatastoreHeader creates a DatastoreHeader from a types.Header. Some
// values are converted into strings to prevent a loss of precision.
func (d *Datastore) newDatastoreHeader(header *types.Header, firstSeenTime *time.Time) *DatastoreHeader {

func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time) *DatastoreHeader {
return &DatastoreHeader{
ParentHash: datastore.NameKey(BlocksKind, header.ParentHash.Hex(), nil),
UncleHash: header.UncleHash.Hex(),
Expand All @@ -338,14 +337,14 @@ func (d *Datastore) newDatastoreHeader(header *types.Header, firstSeenTime *time
MixDigest: header.MixDigest.String(),
Nonce: fmt.Sprint(header.Nonce.Uint64()),
BaseFee: header.BaseFee.String(),
TimeFirstSeen: *firstSeenTime,
TTL: (*firstSeenTime).Add(d.ttl),
TimeFirstSeen: tfs,
TTL: tfs.Add(d.ttl),
}
}

// newDatastoreTransaction creates a DatastoreTransaction from a types.Transaction. Some
// values are converted into strings to prevent a loss of precision.
func (d *Datastore) newDatastoreTransaction(tx *types.Transaction, firstSeenTime *time.Time) *DatastoreTransaction {
func (d *Datastore) newDatastoreTransaction(tx *types.Transaction, tfs time.Time) *DatastoreTransaction {
v, r, s := tx.RawSignatureValues()
var from, to string

Expand All @@ -372,13 +371,13 @@ func (d *Datastore) newDatastoreTransaction(tx *types.Transaction, firstSeenTime
R: r.String(),
S: s.String(),
Time: tx.Time(),
TimeFirstSeen: *firstSeenTime,
TTL: (*firstSeenTime).Add(d.ttl),
TimeFirstSeen: tfs,
TTL: tfs.Add(d.ttl),
Type: int16(tx.Type()),
}
}

func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.Int, firstSeenTime *time.Time) {
func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.Int, tfs time.Time) {
key := datastore.NameKey(BlocksKind, block.Hash().Hex(), nil)

_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
Expand All @@ -391,7 +390,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.

if dsBlock.DatastoreHeader == nil {
shouldWrite = true
dsBlock.DatastoreHeader = d.newDatastoreHeader(block.Header(), firstSeenTime)
dsBlock.DatastoreHeader = d.newDatastoreHeader(block.Header(), tfs)
}

if len(dsBlock.TotalDifficulty) == 0 {
Expand All @@ -402,7 +401,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.
if dsBlock.Transactions == nil && len(block.Transactions()) > 0 {
shouldWrite = true
if d.shouldWriteTransactions {
d.writeTransactions(ctx, block.Transactions(), firstSeenTime)
d.writeTransactions(ctx, block.Transactions(), tfs)
}

dsBlock.Transactions = make([]*datastore.Key, 0, len(block.Transactions()))
Expand All @@ -415,7 +414,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.
shouldWrite = true
dsBlock.Uncles = make([]*datastore.Key, 0, len(block.Uncles()))
for _, uncle := range block.Uncles() {
d.writeBlockHeader(ctx, uncle, firstSeenTime)
d.writeBlockHeader(ctx, uncle, tfs)
dsBlock.Uncles = append(dsBlock.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil))
}
}
Expand All @@ -435,15 +434,15 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.

// writeEvent writes either a block or transaction event to datastore depending
// on the provided eventKind and hashKind.
func (d *Datastore) writeEvent(peer *enode.Node, eventKind string, hash common.Hash, hashKind string, firstSeenTime *time.Time) {
func (d *Datastore) writeEvent(peer *enode.Node, eventKind string, hash common.Hash, hashKind string, tfs time.Time) {
key := datastore.IncompleteKey(eventKind, nil)

event := DatastoreEvent{
SensorId: d.sensorID,
PeerId: peer.URLv4(),
Hash: datastore.NameKey(hashKind, hash.Hex(), nil),
Time: *firstSeenTime, // Use the firstSeenTime here
TTL: (*firstSeenTime).Add(d.ttl), // Use firstSeenTime for TTL as well
Time: tfs,
TTL: tfs.Add(d.ttl),
}
if _, err := d.client.Put(context.Background(), key, &event); err != nil {
log.Error().Err(err).Msgf("Failed to write to %v", eventKind)
Expand All @@ -453,7 +452,7 @@ func (d *Datastore) writeEvent(peer *enode.Node, eventKind string, hash common.H
// writeEvents writes either block or transaction events to datastore depending
// on the provided eventKind and hashKind. This is similar to writeEvent but
// batches the request.
func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind string, hashes []common.Hash, hashKind string, firstSeenTime *time.Time) {
func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind string, hashes []common.Hash, hashKind string, tfs time.Time) {
keys := make([]*datastore.Key, 0, len(hashes))
events := make([]*DatastoreEvent, 0, len(hashes))

Expand All @@ -464,8 +463,8 @@ func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind
SensorId: d.sensorID,
PeerId: peer.URLv4(),
Hash: datastore.NameKey(hashKind, hash.Hex(), nil),
Time: *firstSeenTime, // Use the firstSeenTime here
TTL: (*firstSeenTime).Add(d.ttl), // Use firstSeenTime for TTL as well
Time: tfs,
TTL: tfs.Add(d.ttl),
}
events = append(events, &event)
}
Expand All @@ -477,7 +476,7 @@ func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind

// writeBlockHeader will write the block header to datastore if it doesn't
// exist.
func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, firstSeenTime *time.Time) {
func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, tfs time.Time) {
key := datastore.NameKey(BlocksKind, header.Hash().Hex(), nil)

_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
Expand All @@ -486,7 +485,7 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header,
return nil
}

block.DatastoreHeader = d.newDatastoreHeader(header, firstSeenTime)
block.DatastoreHeader = d.newDatastoreHeader(header, tfs)
_, err := tx.Put(key, &block)
return err
}, datastore.MaxAttempts(MaxAttempts))
Expand All @@ -496,7 +495,7 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header,
}
}

func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, firstSeenTime *time.Time) {
func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, tfs time.Time) {
key := datastore.NameKey(BlocksKind, hash.Hex(), nil)

_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
Expand All @@ -510,7 +509,7 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has
if block.Transactions == nil && len(body.Transactions) > 0 {
shouldWrite = true
if d.shouldWriteTransactions {
d.writeTransactions(ctx, body.Transactions, firstSeenTime)
d.writeTransactions(ctx, body.Transactions, tfs)
}

block.Transactions = make([]*datastore.Key, 0, len(body.Transactions))
Expand All @@ -523,7 +522,7 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has
shouldWrite = true
block.Uncles = make([]*datastore.Key, 0, len(body.Uncles))
for _, uncle := range body.Uncles {
d.writeBlockHeader(ctx, uncle, firstSeenTime)
d.writeBlockHeader(ctx, uncle, tfs)
block.Uncles = append(block.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil))
}
}
Expand All @@ -543,13 +542,13 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has

// writeTransactions will write the transactions to datastore and return the
// transaction hashes.
func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transaction, firstSeenTime *time.Time) {
func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transaction, tfs time.Time) {
keys := make([]*datastore.Key, 0, len(txs))
transactions := make([]*DatastoreTransaction, 0, len(txs))

for _, tx := range txs {
keys = append(keys, datastore.NameKey(TransactionsKind, tx.Hash().Hex(), nil))
transactions = append(transactions, d.newDatastoreTransaction(tx, firstSeenTime))
transactions = append(transactions, d.newDatastoreTransaction(tx, tfs))
}

if _, err := d.client.PutMulti(ctx, keys, transactions); err != nil {
Expand Down
Loading
Loading