Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cron + graceful api shutdown #43

Merged
merged 4 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions listener/cmd/listener/main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/herumi/bls-eth-go-binary/bls"
"github.com/robfig/cron"

"github.com/dappnode/validator-monitoring/listener/internal/api"
"github.com/dappnode/validator-monitoring/listener/internal/config"
apiCron "github.com/dappnode/validator-monitoring/listener/internal/cron" // Renamed to avoid conflict with the cron/v3 package
"github.com/dappnode/validator-monitoring/listener/internal/logger"
"github.com/dappnode/validator-monitoring/listener/internal/mongodb"
"github.com/herumi/bls-eth-go-binary/bls"
)

func main() {
Expand Down Expand Up @@ -44,5 +54,37 @@ func main() {
config.BypassValidatorsFiltering,
)

s.Start()
// Start the API server in a goroutine. Needs to be in a goroutine to allow for the cron job to run,
// otherwise it blocks the main goroutine
go func() {
s.Start()
}()

// Set up the cron job
c := cron.New()

// The cron job runs once a day, see https://github.com/robfig/cron/blob/master/doc.go
// to test it running once a minute, replace "@daily" for "* * * * *"
c.AddFunc("@daily", func() {
// there are 24 * 30 = 720 hours in 30 days
apiCron.RemoveOldSignatures(dbCollection, 720)
})
c.Start()

// Set up signal handling for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan // Block until a signal is received

// Stop the cron job
c.Stop()

// Shutdown the HTTP server gracefully with a given timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := s.Shutdown(ctx); err != nil {
logger.Error("Failed to shut down server gracefully: " + fmt.Sprintln(err))
}

logger.Info("Server shut down")
}
3 changes: 2 additions & 1 deletion listener/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ go 1.22.0

require (
github.com/gorilla/mux v1.8.1
github.com/herumi/bls-eth-go-binary v1.35.0
github.com/robfig/cron v1.2.0
go.mongodb.org/mongo-driver v1.14.0
)

require (
github.com/golang/snappy v0.0.1 // indirect
github.com/herumi/bls-eth-go-binary v1.35.0 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions listener/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQ
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
Expand Down
20 changes: 20 additions & 0 deletions listener/internal/api/api.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package api

import (
"context"
"fmt"
"net/http"

"github.com/dappnode/validator-monitoring/listener/internal/api/routes"
Expand Down Expand Up @@ -45,3 +47,21 @@ func (s *httpApi) Start() {
logger.Fatal("Failed to start server: " + err.Error())
}
}

// Shutdown gracefully shuts down the server without interrupting any active connections
func (s *httpApi) Shutdown(ctx context.Context) error {
if s.server == nil {
logger.Error("Received shutdown request but server is not running, this should never happen")
return nil // Server is not running
}

// Attempt to gracefully shut down the server
err := s.server.Shutdown(ctx)
if err != nil {
logger.Error("Failed to shut down server gracefully: " + fmt.Sprintln(err))
return err
}

logger.Info("Server has been shut down gracefully")
return nil
}
4 changes: 4 additions & 0 deletions listener/internal/api/handlers/postNewSignature.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mong
"entries": bson.M{
"payload": req.Payload,
"signature": req.Signature,
"decodedPayload": bson.M{
"timestamp": req.DecodedPayload.Timestamp, // Needed to filter out old signatures
// We can add rest of fields if storage is not a problem
},
},
},
}
Expand Down
30 changes: 30 additions & 0 deletions listener/internal/cron/removeOldSignatures.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package cron

import (
"context"
"fmt"
"time"

"github.com/dappnode/validator-monitoring/listener/internal/logger"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

// RemoveOldSignatures deletes signatures older than a specified number of hours from the MongoDB collection
func RemoveOldSignatures(collection *mongo.Collection, hours int) {
logger.Debug(fmt.Sprintf("Removing signatures older than %d hours", hours))
targetTimeMillis := time.Now().Add(time.Duration(-hours) * time.Hour).UnixMilli() // Calculate time in the past based on hours
filter := bson.M{
"entries.decodedPayload.timestamp": bson.M{
"$lt": fmt.Sprintf("%d", targetTimeMillis), // Compare timestamps as strings
},
}
// DeleteMany returns the number of documents deleted, it is useless for us since we're never
// deleting a document, but an entry on its "entries" array
_, err := collection.DeleteMany(context.Background(), filter)
if err != nil {
logger.Error("Failed to delete old signatures: " + err.Error())
} else {
logger.Debug("Deleted old signatures")
}
}
Loading