Skip to content

Commit

Permalink
Add cron + graceful api shutdown (#43)
Browse files Browse the repository at this point in the history
* add cron + graceful api shutdown

* pass hours to delete as parameter

* remove duplicate start

* fix server shut down handling+other decoded  types
  • Loading branch information
Marketen authored May 15, 2024
1 parent 24da2da commit bb853e2
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 4 deletions.
46 changes: 44 additions & 2 deletions listener/cmd/listener/main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/herumi/bls-eth-go-binary/bls"
"github.com/robfig/cron"

"github.com/dappnode/validator-monitoring/listener/internal/api"
"github.com/dappnode/validator-monitoring/listener/internal/config"
apiCron "github.com/dappnode/validator-monitoring/listener/internal/cron" // Renamed to avoid conflict with the cron/v3 package
"github.com/dappnode/validator-monitoring/listener/internal/logger"
"github.com/dappnode/validator-monitoring/listener/internal/mongodb"
"github.com/herumi/bls-eth-go-binary/bls"
)

func main() {
Expand Down Expand Up @@ -44,5 +54,37 @@ func main() {
config.BypassValidatorsFiltering,
)

s.Start()
// Start the API server in a goroutine. Needs to be in a goroutine to allow for the cron job to run,
// otherwise it blocks the main goroutine
go func() {
s.Start()
}()

// Set up the cron job
c := cron.New()

// The cron job runs once a day, see https://github.com/robfig/cron/blob/master/doc.go
// to test it running once a minute, replace "@daily" for "* * * * *"
c.AddFunc("@daily", func() {
// there are 24 * 30 = 720 hours in 30 days
apiCron.RemoveOldSignatures(dbCollection, 720)
})
c.Start()

// Set up signal handling for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan // Block until a signal is received

// Stop the cron job
c.Stop()

// Shutdown the HTTP server gracefully with a given timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := s.Shutdown(ctx); err != nil {
logger.Error("Failed to shut down server gracefully: " + fmt.Sprintln(err))
}

logger.Info("Listener stopped gracefully")
}
3 changes: 2 additions & 1 deletion listener/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ go 1.22.0

require (
github.com/gorilla/mux v1.8.1
github.com/herumi/bls-eth-go-binary v1.35.0
github.com/robfig/cron v1.2.0
go.mongodb.org/mongo-driver v1.14.0
)

require (
github.com/golang/snappy v0.0.1 // indirect
github.com/herumi/bls-eth-go-binary v1.35.0 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions listener/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQ
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
Expand Down
27 changes: 26 additions & 1 deletion listener/internal/api/api.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package api

import (
"context"
"fmt"
"net/http"

"github.com/dappnode/validator-monitoring/listener/internal/api/routes"
Expand Down Expand Up @@ -41,7 +43,30 @@ func (s *httpApi) Start() {
Handler: routes.SetupRouter(s.dbCollection, s.beaconNodeUrls, s.bypassValidatorFiltering),
}

if err := s.server.ListenAndServe(); err != nil {
// ListenAndServe returns ErrServerClosed to indicate that the server has been shut down when the server is closed gracefully. We need to
// handle this error to avoid treating it as a fatal error. See https://pkg.go.dev/net/http#Server.ListenAndServe
err := s.server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
logger.Fatal("Failed to start server: " + err.Error())
} else if err == http.ErrServerClosed {
logger.Info("Server closed gracefully")
}
}

// Shutdown gracefully shuts down the server without interrupting any active connections
func (s *httpApi) Shutdown(ctx context.Context) error {
if s.server == nil {
logger.Error("Received shutdown request but server is not running, this should never happen")
return nil // Server is not running
}

// Attempt to gracefully shut down the server
err := s.server.Shutdown(ctx)
if err != nil {
logger.Error("Failed to shut down server gracefully: " + fmt.Sprintln(err))
return err
}

logger.Info("Server has been shut down gracefully")
return nil
}
5 changes: 5 additions & 0 deletions listener/internal/api/handlers/postNewSignature.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mong
"entries": bson.M{
"payload": req.Payload,
"signature": req.Signature,
"decodedPayload": bson.M{
"type": req.DecodedPayload.Type,
"platform": req.DecodedPayload.Platform,
"timestamp": req.DecodedPayload.Timestamp, // Needed to filter out old signatures
},
},
},
}
Expand Down
30 changes: 30 additions & 0 deletions listener/internal/cron/removeOldSignatures.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package cron

import (
"context"
"fmt"
"time"

"github.com/dappnode/validator-monitoring/listener/internal/logger"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

// RemoveOldSignatures deletes signatures older than a specified number of hours from the MongoDB collection
func RemoveOldSignatures(collection *mongo.Collection, hours int) {
logger.Debug(fmt.Sprintf("Removing signatures older than %d hours", hours))
targetTimeMillis := time.Now().Add(time.Duration(-hours) * time.Hour).UnixMilli() // Calculate time in the past based on hours
filter := bson.M{
"entries.decodedPayload.timestamp": bson.M{
"$lt": fmt.Sprintf("%d", targetTimeMillis), // Compare timestamps as strings
},
}
// DeleteMany returns the number of documents deleted, it is useless for us since we're never
// deleting a document, but an entry on its "entries" array
_, err := collection.DeleteMany(context.Background(), filter)
if err != nil {
logger.Error("Failed to delete old signatures: " + err.Error())
} else {
logger.Debug("Deleted old signatures")
}
}

0 comments on commit bb853e2

Please sign in to comment.