Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create cron to remove signatures with status unknown #49

Merged
merged 7 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions listener/cmd/listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func main() {
c.AddFunc("@daily", func() {
// there are 24 * 30 = 720 hours in 30 days
apiCron.RemoveOldSignatures(dbCollection, 720)

})
c.AddFunc("* * * * *", func() {
apiCron.UpdateSignaturesStatus(dbCollection, config.BeaconNodeURLs)
})
c.Start()

Expand Down
5 changes: 3 additions & 2 deletions listener/internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"

"github.com/dappnode/validator-monitoring/listener/internal/api/routes"
"github.com/dappnode/validator-monitoring/listener/internal/api/types"
"github.com/dappnode/validator-monitoring/listener/internal/logger"
"go.mongodb.org/mongo-driver/mongo"
)
Expand All @@ -15,11 +16,11 @@ type httpApi struct {
port string
dbClient *mongo.Client
dbCollection *mongo.Collection
beaconNodeUrls map[string]string
beaconNodeUrls map[types.Network]string
}

// create a new api instance
func NewApi(port string, dbClient *mongo.Client, dbCollection *mongo.Collection, beaconNodeUrls map[string]string) *httpApi {
func NewApi(port string, dbClient *mongo.Client, dbCollection *mongo.Collection, beaconNodeUrls map[types.Network]string) *httpApi {
return &httpApi{
port: port,
dbClient: dbClient,
Expand Down
113 changes: 64 additions & 49 deletions listener/internal/api/handlers/postNewSignature.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,102 +13,117 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)

// Posting a new singature consists in the following steps:
// 1. Decode and validate
// 2. Get active validators
// 3. Validate signature and insert into MongoDB
func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection, beaconNodeUrls map[string]string) {
func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection, beaconNodeUrls map[types.Network]string) {
logger.Debug("Received new POST '/newSignature' request")

// Parse request body
var requests []types.SignatureRequest
err := json.NewDecoder(r.Body).Decode(&requests)
if err != nil {
logger.Error("Failed to decode request body: " + err.Error())
respondError(w, http.StatusBadRequest, "Invalid request format")
return
}
// Decode and validate incoming requests
validRequests, err := validation.ValidateAndDecodeRequests(requests)
if err != nil {

// Parse and validate request body
if err := json.NewDecoder(r.Body).Decode(&requests); err != nil {
logger.Error("Failed to decode request body: " + err.Error())
respondError(w, http.StatusBadRequest, "Invalid request format")
return
}
// Respond with an error if no valid requests were found
if len(validRequests) == 0 {

requestsValidatedAndDecoded, err := validation.ValidateAndDecodeRequests(requests)
if err != nil || len(requestsValidatedAndDecoded) == 0 {
logger.Error("Failed to validate and decode requests: " + err.Error())
respondError(w, http.StatusBadRequest, "No valid requests")
return
}

// Get active validators from the network, get the network from the first item in the array
beaconNodeUrl, ok := beaconNodeUrls[validRequests[0].Network]
// Check network validity
network := requestsValidatedAndDecoded[0].Network
beaconNodeUrl, ok := beaconNodeUrls[network]
if !ok {
respondError(w, http.StatusBadRequest, "Invalid network")
return
}

activeValidators, err := validation.GetActiveValidators(validRequests, beaconNodeUrl)
// Get active validators
pubkeys := getPubkeys(requestsValidatedAndDecoded)
validatorsStatusMap, err := validation.GetValidatorsStatus(pubkeys, beaconNodeUrl)
if err != nil {
logger.Error("Failed to get active validators: " + err.Error())
respondError(w, http.StatusInternalServerError, "Failed to get active validators")
respondError(w, http.StatusInternalServerError, "Failed to get active validators: "+err.Error())
return
}
if len(activeValidators) == 0 {
respondError(w, http.StatusInternalServerError, "No active validators found in network")

// Filter and verify signatures
validSignatures := filterAndVerifySignatures(requestsValidatedAndDecoded, validatorsStatusMap)
if len(validSignatures) == 0 {
respondError(w, http.StatusBadRequest, "No valid signatures")
return
}

// Insert valid signatures into MongoDB
if err := insertSignaturesIntoDB(validSignatures, dbCollection); err != nil {
logger.Error("Failed to insert signatures into MongoDB: " + err.Error())
respondError(w, http.StatusInternalServerError, "Failed to insert signatures into MongoDB")
return
}

validSignatures := []types.SignatureRequestDecodedWithActive{}
for _, req := range activeValidators {
isValidSignature, err := validation.VerifySignature(req)
if err != nil {
logger.Error("Failed to validate signature: " + err.Error())
respondOK(w, "Finished processing signatures")
}

func getPubkeys(requests []types.SignatureRequestDecoded) []string {
pubkeys := make([]string, len(requests))
for i, req := range requests {
pubkeys[i] = req.Pubkey
}
return pubkeys
}

func filterAndVerifySignatures(requests []types.SignatureRequestDecoded, validatorsStatusMap map[string]types.Status) []types.SignatureRequestDecodedWithStatus {
validSignatures := []types.SignatureRequestDecodedWithStatus{}
for _, req := range requests {
status, ok := validatorsStatusMap[req.Pubkey]
if !ok {
logger.Warn("Validator not found: " + req.Pubkey)
continue
}
if !isValidSignature {
logger.Warn("Invalid signature: " + req.Signature)
if status == types.Inactive {
logger.Warn("Inactive validator: " + req.Pubkey)
continue
}
validSignatures = append(validSignatures, req)
}
// Respond with an error if no valid signatures were found
if len(validSignatures) == 0 {
respondError(w, http.StatusBadRequest, "No valid signatures")
return
reqWithStatus := types.SignatureRequestDecodedWithStatus{
SignatureRequestDecoded: req,
Status: status,
}
if isValid, err := validation.VerifySignature(reqWithStatus); err == nil && isValid {
validSignatures = append(validSignatures, reqWithStatus)
} else {
logger.Warn("Invalid signature: " + req.Signature)
}
}
return validSignatures
}

// Iterate over all valid signatures and insert the signature
for _, req := range validSignatures {
func insertSignaturesIntoDB(signatures []types.SignatureRequestDecodedWithStatus, dbCollection *mongo.Collection) error {
for _, req := range signatures {
filter := bson.M{
"pubkey": req.Pubkey,
"tag": req.Tag,
"network": req.Network,
}
update := bson.M{
"$set": bson.M{
"status": req.Status, // Only save the last status
},
"$setOnInsert": bson.M{"status": req.Status}, // do not update status if already exists
"$push": bson.M{
"entries": bson.M{
"payload": req.Payload,
"signature": req.Signature,
"decodedPayload": bson.M{
"type": req.DecodedPayload.Type,
"platform": req.DecodedPayload.Platform,
"timestamp": req.DecodedPayload.Timestamp, // Needed to filter out old signatures
"timestamp": req.DecodedPayload.Timestamp,
},
},
},
}
options := options.Update().SetUpsert(true)
_, err := dbCollection.UpdateOne(context.TODO(), filter, update, options)
if err != nil {
logger.Error("Failed to insert signature into MongoDB: " + err.Error())
continue
if _, err := dbCollection.UpdateOne(context.Background(), filter, update, options); err != nil {
return err
}
logger.Debug("New Signature " + req.Signature + " inserted into MongoDB")
}

respondOK(w, "Finished processing signatures")
return nil
}
3 changes: 2 additions & 1 deletion listener/internal/api/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"net/http"

"github.com/dappnode/validator-monitoring/listener/internal/api/handlers"
"github.com/dappnode/validator-monitoring/listener/internal/api/types"
"github.com/gorilla/mux"
"go.mongodb.org/mongo-driver/mongo"
)

func SetupRouter(dbCollection *mongo.Collection, beaconNodeUrls map[string]string) *mux.Router {
func SetupRouter(dbCollection *mongo.Collection, beaconNodeUrls map[types.Network]string) *mux.Router {
r := mux.NewRouter()

// Define routes
Expand Down
51 changes: 38 additions & 13 deletions listener/internal/api/types/types.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,42 @@
package types

// In sync with brain
type Network string // "mainnet" | "holesky" | "gnosis" | "lukso"

const (
Mainnet Network = "mainnet"
Holesky Network = "holesky"
Gnosis Network = "gnosis"
Lukso Network = "lukso"
)

// In sync with brain
// @see https://github.com/dappnode/StakingBrain/blob/0aaeefa8aec1b21ba2f2882cb444747419a3ff5d/packages/common/src/types/db/types.ts#L27
type Tag string // "obol" | "diva" | "ssv" | "rocketpool" | "stakewise" | "stakehouse" | "solo" | "stader"

const (
Obol Tag = "obol"
Diva Tag = "diva"
Ssv Tag = "ssv"
Rocketpool Tag = "rocketpool"
Stakewise Tag = "stakewise"
Stakehouse Tag = "stakehouse"
Solo Tag = "solo"
Stader Tag = "stader"
)

type SignatureRequest struct {
Payload string `json:"payload"`
Pubkey string `json:"pubkey"`
Signature string `json:"signature"`
Network string `json:"network"`
Tag string `json:"tag"`
Payload string `json:"payload"`
Pubkey string `json:"pubkey"`
Signature string `json:"signature"`
Network Network `json:"network"`
Tag string `json:"tag"`
}

type DecodedPayload struct {
Type string `json:"type"`
Platform string `json:"platform"`
Timestamp string `json:"timestamp"`
}

type SignatureRequestDecoded struct {
Expand All @@ -19,16 +50,10 @@ type Status string
const (
Unknown Status = "unknown"
Active Status = "active"
Inactive Status = "inactive"
Inactive Status = "inactive" // means any response from beacon node that is not active
)

type SignatureRequestDecodedWithActive struct {
type SignatureRequestDecodedWithStatus struct {
SignatureRequestDecoded
Status Status `json:"status"` // "unknown" | "active" | "inactive"
}

type DecodedPayload struct {
Type string `json:"type"`
Platform string `json:"platform"`
Timestamp string `json:"timestamp"`
}
Loading
Loading