diff --git a/listener/cmd/listener/main.go b/listener/cmd/listener/main.go index 2536506..b1766c7 100644 --- a/listener/cmd/listener/main.go +++ b/listener/cmd/listener/main.go @@ -1,11 +1,21 @@ package main import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "github.com/herumi/bls-eth-go-binary/bls" + "github.com/robfig/cron" + "github.com/dappnode/validator-monitoring/listener/internal/api" "github.com/dappnode/validator-monitoring/listener/internal/config" + apiCron "github.com/dappnode/validator-monitoring/listener/internal/cron" // Renamed to avoid conflict with the cron/v3 package "github.com/dappnode/validator-monitoring/listener/internal/logger" "github.com/dappnode/validator-monitoring/listener/internal/mongodb" - "github.com/herumi/bls-eth-go-binary/bls" ) func main() { @@ -44,5 +54,37 @@ func main() { config.BypassValidatorsFiltering, ) - s.Start() + // Start the API server in a goroutine. Needs to be in a goroutine to allow for the cron job to run, + // otherwise it blocks the main goroutine + go func() { + s.Start() + }() + + // Set up the cron job + c := cron.New() + + // The cron job runs once a day, see https://github.com/robfig/cron/blob/master/doc.go + // to test it running once a minute, replace "@daily" for "* * * * *" + c.AddFunc("@daily", func() { + // there are 24 * 30 = 720 hours in 30 days + apiCron.RemoveOldSignatures(dbCollection, 720) + }) + c.Start() + + // Set up signal handling for graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan // Block until a signal is received + + // Stop the cron job + c.Stop() + + // Shutdown the HTTP server gracefully with a given timeout + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := s.Shutdown(ctx); err != nil { + logger.Error("Failed to shut down server gracefully: " + fmt.Sprintln(err)) + } + + logger.Info("Listener stopped gracefully") } diff --git a/listener/go.mod b/listener/go.mod index f00268b..5fa4032 100644 --- a/listener/go.mod +++ b/listener/go.mod @@ -4,12 +4,13 @@ go 1.22.0 require ( github.com/gorilla/mux v1.8.1 + github.com/herumi/bls-eth-go-binary v1.35.0 + github.com/robfig/cron v1.2.0 go.mongodb.org/mongo-driver v1.14.0 ) require ( github.com/golang/snappy v0.0.1 // indirect - github.com/herumi/bls-eth-go-binary v1.35.0 // indirect github.com/klauspost/compress v1.13.6 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect diff --git a/listener/go.sum b/listener/go.sum index dff15cd..1041429 100644 --- a/listener/go.sum +++ b/listener/go.sum @@ -12,6 +12,8 @@ github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQ github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= diff --git a/listener/internal/api/api.go b/listener/internal/api/api.go index 8e397c6..b11b3fa 100644 --- a/listener/internal/api/api.go +++ b/listener/internal/api/api.go @@ -1,6 +1,8 @@ package api import ( + "context" + "fmt" "net/http" "github.com/dappnode/validator-monitoring/listener/internal/api/routes" @@ -41,7 +43,30 @@ func (s *httpApi) Start() { Handler: routes.SetupRouter(s.dbCollection, s.beaconNodeUrls, s.bypassValidatorFiltering), } - if err := s.server.ListenAndServe(); err != nil { + // ListenAndServe returns ErrServerClosed to indicate that the server has been shut down when the server is closed gracefully. We need to + // handle this error to avoid treating it as a fatal error. See https://pkg.go.dev/net/http#Server.ListenAndServe + err := s.server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { logger.Fatal("Failed to start server: " + err.Error()) + } else if err == http.ErrServerClosed { + logger.Info("Server closed gracefully") } } + +// Shutdown gracefully shuts down the server without interrupting any active connections +func (s *httpApi) Shutdown(ctx context.Context) error { + if s.server == nil { + logger.Error("Received shutdown request but server is not running, this should never happen") + return nil // Server is not running + } + + // Attempt to gracefully shut down the server + err := s.server.Shutdown(ctx) + if err != nil { + logger.Error("Failed to shut down server gracefully: " + fmt.Sprintln(err)) + return err + } + + logger.Info("Server has been shut down gracefully") + return nil +} diff --git a/listener/internal/api/handlers/postNewSignature.go b/listener/internal/api/handlers/postNewSignature.go index 94ee60b..0f42d7c 100644 --- a/listener/internal/api/handlers/postNewSignature.go +++ b/listener/internal/api/handlers/postNewSignature.go @@ -89,6 +89,11 @@ func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mong "entries": bson.M{ "payload": req.Payload, "signature": req.Signature, + "decodedPayload": bson.M{ + "type": req.DecodedPayload.Type, + "platform": req.DecodedPayload.Platform, + "timestamp": req.DecodedPayload.Timestamp, // Needed to filter out old signatures + }, }, }, } diff --git a/listener/internal/cron/removeOldSignatures.go b/listener/internal/cron/removeOldSignatures.go new file mode 100644 index 0000000..05490d3 --- /dev/null +++ b/listener/internal/cron/removeOldSignatures.go @@ -0,0 +1,30 @@ +package cron + +import ( + "context" + "fmt" + "time" + + "github.com/dappnode/validator-monitoring/listener/internal/logger" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +// RemoveOldSignatures deletes signatures older than a specified number of hours from the MongoDB collection +func RemoveOldSignatures(collection *mongo.Collection, hours int) { + logger.Debug(fmt.Sprintf("Removing signatures older than %d hours", hours)) + targetTimeMillis := time.Now().Add(time.Duration(-hours) * time.Hour).UnixMilli() // Calculate time in the past based on hours + filter := bson.M{ + "entries.decodedPayload.timestamp": bson.M{ + "$lt": fmt.Sprintf("%d", targetTimeMillis), // Compare timestamps as strings + }, + } + // DeleteMany returns the number of documents deleted, it is useless for us since we're never + // deleting a document, but an entry on its "entries" array + _, err := collection.DeleteMany(context.Background(), filter) + if err != nil { + logger.Error("Failed to delete old signatures: " + err.Error()) + } else { + logger.Debug("Deleted old signatures") + } +}