diff --git a/.env.example b/.env.example index b15c9bb..1def49e 100644 --- a/.env.example +++ b/.env.example @@ -4,3 +4,5 @@ MONGO_DB_NAME= MONGO_DB_API_PORT= API_PORT= LOG_LEVEL= +BEACON_NODE_URL= +BYPASS_VALIDATORS_FILTERING= \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index d19bdae..b767bca 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,6 +12,7 @@ services: MONGO_DB_NAME: "${MONGO_DB_NAME}" API_PORT: "${API_PORT}" LOG_LEVEL: "${LOG_LEVEL}" + BYPASS_VALIDATORS_FILTERING: "${BYPASS_VALIDATORS_FILTERING}" depends_on: - mongo container_name: listener diff --git a/listener/cmd/listener/main.go b/listener/cmd/listener/main.go index b9c41b1..0fd4b7d 100644 --- a/listener/cmd/listener/main.go +++ b/listener/cmd/listener/main.go @@ -18,6 +18,8 @@ func main() { s := api.NewApi( config.Port, config.MongoDBURI, + config.BeaconNodeURLs, + config.BypassValidatorsFiltering, ) s.Start() diff --git a/listener/internal/api/api.go b/listener/internal/api/api.go index f786192..de0ab52 100644 --- a/listener/internal/api/api.go +++ b/listener/internal/api/api.go @@ -9,16 +9,20 @@ import ( ) type httpApi struct { - server *http.Server - port string - dbUri string + server *http.Server + port string + dbUri string + beaconNodeUrls map[string]string + bypassValidatorFiltering bool } // create a new api instance -func NewApi(port string, mongoDbUri string) *httpApi { +func NewApi(port string, mongoDbUri string, beaconNodeUrls map[string]string, bypassValidatorFiltering bool) *httpApi { return &httpApi{ - port: port, - dbUri: mongoDbUri, + port: port, + dbUri: mongoDbUri, + beaconNodeUrls: beaconNodeUrls, + bypassValidatorFiltering: bypassValidatorFiltering, } } @@ -47,7 +51,7 @@ func (s *httpApi) Start() { // setup the http api s.server = &http.Server{ Addr: ":" + s.port, - Handler: routes.SetupRouter(dbCollection), + Handler: routes.SetupRouter(dbCollection, s.beaconNodeUrls, s.bypassValidatorFiltering), } // start the api diff --git a/listener/internal/api/handlers/postNewSignature.go b/listener/internal/api/handlers/postNewSignature.go index 34fe958..af170e7 100644 --- a/listener/internal/api/handlers/postNewSignature.go +++ b/listener/internal/api/handlers/postNewSignature.go @@ -18,7 +18,7 @@ type signatureRequest struct { Payload string `json:"payload"` Signature string `json:"signature"` Network string `json:"network"` - Label string `json:"label"` + Tag string `json:"tag"` } // decodeAndValidateRequests decodes and validates incoming HTTP requests. @@ -31,7 +31,7 @@ func decodeAndValidateRequests(r *http.Request) ([]types.SignatureRequestDecoded var validRequests []types.SignatureRequestDecoded for _, req := range requests { - if req.Network == "" || req.Label == "" || req.Signature == "" || req.Payload == "" { + if req.Network == "" || req.Tag == "" || req.Signature == "" || req.Payload == "" { logger.Debug("Skipping invalid signature from request, missing fields") continue } @@ -55,7 +55,7 @@ func decodeAndValidateRequests(r *http.Request) ([]types.SignatureRequestDecoded Payload: req.Payload, Signature: req.Signature, Network: req.Network, - Label: req.Label, + Tag: req.Tag, }) } else { logger.Debug("Skipping invalid signature from request, invalid payload format") @@ -65,9 +65,7 @@ func decodeAndValidateRequests(r *http.Request) ([]types.SignatureRequestDecoded return validRequests, nil } -func validateAndInsertSignature(req types.SignatureRequestDecoded, dbCollection *mongo.Collection, wg *sync.WaitGroup) { - defer wg.Done() - +func validateAndInsertSignature(req types.SignatureRequestDecoded, dbCollection *mongo.Collection) { isValidSignature, err := validation.IsValidSignature(req) if err != nil { logger.Error("Failed to validate signature: " + err.Error()) @@ -85,7 +83,7 @@ func validateAndInsertSignature(req types.SignatureRequestDecoded, dbCollection "pubkey": req.DecodedPayload.Pubkey, "signature": req.Signature, "network": req.Network, - "label": req.Label, + "tag": req.Tag, }) if err != nil { logger.Error("Failed to insert signature into MongoDB: " + err.Error()) @@ -99,7 +97,7 @@ func validateAndInsertSignature(req types.SignatureRequestDecoded, dbCollection // 1. Decode and validate // 2. Get active validators // 3. Validate signature and insert into MongoDB -func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection) { +func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection, beaconNodeUrls map[string]string, bypassValidatorFiltering bool) { logger.Debug("Received new POST '/newSignature' request") // Decode and validate incoming requests @@ -111,31 +109,41 @@ func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mong } // Respond with an error if no valid requests were found if len(validRequests) == 0 { - logger.Error("No valid requests") respondError(w, http.StatusBadRequest, "No valid requests") return } - // Get active validators - requestsWithActiveValidators, err := validation.GetActiveValidators(validRequests) - if err != nil { - respondError(w, http.StatusInternalServerError, "Failed to validate active validators") - return - } - // Respond with an error if no active validators were found - if len(requestsWithActiveValidators) == 0 { - respondError(w, http.StatusInternalServerError, "No active validators found in request") - return - } - var wg sync.WaitGroup - // Insert into MongoDB if signature is valid - for _, req := range requestsWithActiveValidators { - // create a goroutine for each request + appendMutex := new(sync.Mutex) // Mutex for appending to the slice + dbMutex := new(sync.Mutex) // Mutex for database operations + allValidatedRequests := []types.SignatureRequestDecoded{} // This will collect all valid requests + + // Iterate over all beacon nodes and get active validators + for _, url := range beaconNodeUrls { wg.Add(1) - go validateAndInsertSignature(req, dbCollection, &wg) + go func(url string) { + defer wg.Done() + activeValidators := validation.GetActiveValidators(validRequests, url, bypassValidatorFiltering) + if len(activeValidators) == 0 { + return + } + + appendMutex.Lock() + allValidatedRequests = append(allValidatedRequests, activeValidators...) // Only one goroutine can append to the slice at a time + appendMutex.Unlock() + + for _, req := range activeValidators { + dbMutex.Lock() + validateAndInsertSignature(req, dbCollection) // Do we really need to lock the db insertions? + dbMutex.Unlock() + } + }(url) // Pass the URL to the goroutine } - // Wait for all goroutines to finish + wg.Wait() + if len(allValidatedRequests) == 0 { + respondError(w, http.StatusInternalServerError, "No active validators found in any network") + return + } respondOK(w, "Finished processing signatures") } diff --git a/listener/internal/api/handlers/postSignaturesByValidatorAggr.go b/listener/internal/api/handlers/postSignaturesByValidatorAggr.go index 37ac9c6..01045f1 100644 --- a/listener/internal/api/handlers/postSignaturesByValidatorAggr.go +++ b/listener/internal/api/handlers/postSignaturesByValidatorAggr.go @@ -49,7 +49,7 @@ func PostSignaturesByValidatorAggr(w http.ResponseWriter, r *http.Request, dbCol }, { "$group": bson.M{ - "_id": bson.M{"pubkey": "$pubkey", "network": "$network", "label": "$label"}, + "_id": bson.M{"pubkey": "$pubkey", "network": "$network", "tag": "$tag"}, "signatures": bson.M{ "$push": bson.M{ "signature": "$signature", @@ -64,7 +64,7 @@ func PostSignaturesByValidatorAggr(w http.ResponseWriter, r *http.Request, dbCol "_id": 0, "pubkey": "$_id.pubkey", "network": "$_id.network", - "label": "$_id.label", + "tag": "$_id.tag", "signatures": 1, }, }, diff --git a/listener/internal/api/routes/routes.go b/listener/internal/api/routes/routes.go index 835a08a..5735c3e 100644 --- a/listener/internal/api/routes/routes.go +++ b/listener/internal/api/routes/routes.go @@ -8,14 +8,14 @@ import ( "go.mongodb.org/mongo-driver/mongo" ) -func SetupRouter(dbCollection *mongo.Collection) *mux.Router { +func SetupRouter(dbCollection *mongo.Collection, beaconNodeUrls map[string]string, bypassValidatorFiltering bool) *mux.Router { r := mux.NewRouter() // Define routes r.HandleFunc("/", handlers.GetHealthCheck).Methods(http.MethodGet) // closure function to inject dbCollection into the handler r.HandleFunc("/newSignature", func(w http.ResponseWriter, r *http.Request) { - handlers.PostNewSignature(w, r, dbCollection) + handlers.PostNewSignature(w, r, dbCollection, beaconNodeUrls, bypassValidatorFiltering) }).Methods(http.MethodPost) r.HandleFunc("/signaturesByValidator", func(w http.ResponseWriter, r *http.Request) { handlers.PostSignaturesByValidator(w, r, dbCollection) diff --git a/listener/internal/api/types/types.go b/listener/internal/api/types/types.go index e20d5d7..b924e0e 100644 --- a/listener/internal/api/types/types.go +++ b/listener/internal/api/types/types.go @@ -5,7 +5,7 @@ type SignatureRequestDecoded struct { Payload string `json:"payload"` Signature string `json:"signature"` Network string `json:"network"` - Label string `json:"label"` + Tag string `json:"tag"` } type DecodedPayload struct { @@ -13,3 +13,26 @@ type DecodedPayload struct { Timestamp string `json:"timestamp"` Pubkey string `json:"pubkey"` } + +type ActiveValidator struct { + Pubkey string `json:"pubkey"` + WithdrawalCredentials string `json:"withdrawal_credentials"` + EffectiveBalance string `json:"effective_balance"` + Slashed bool `json:"slashed"` + ActivationEligibilityEpoch string `json:"activation_eligibility_epoch"` + ActivationEpoch string `json:"activation_epoch"` + ExitEpoch string `json:"exit_epoch"` + WithdrawableEpoch string `json:"withdrawable_epoch"` +} + +// https://ethereum.github.io/beacon-APIs/#/Beacon /eth/v1/beacon/states/{state_id}/validators +type ActiveValidatorsApiResponse struct { + ExecutionOptimistic bool `json:"execution_optimistic"` + Finalized bool `json:"finalized"` + Data []struct { + Index string `json:"index"` + Balance string `json:"balance"` + Status string `json:"status"` + Validator ActiveValidator `json:"validator"` + } `json:"data"` +} diff --git a/listener/internal/api/validation/GetActiveValidators.go b/listener/internal/api/validation/GetActiveValidators.go index e4854bf..c374424 100644 --- a/listener/internal/api/validation/GetActiveValidators.go +++ b/listener/internal/api/validation/GetActiveValidators.go @@ -1,12 +1,90 @@ package validation -import "github.com/dappnode/validator-monitoring/listener/internal/api/types" +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "time" -// ValidatePubkeysWithConsensusClient checks if the given pubkeys from the requests are from active validators -// or not by making SINGLE API call to the consensus client. It returns an array of the active validators pubkeys. -func GetActiveValidators(requestsDecoded []types.SignatureRequestDecoded) ([]types.SignatureRequestDecoded, error) { - requestsActiveValidators := requestsDecoded - // make api call: GET /eth/v1/beacon/states/{state_id}/validators?id=validator_pubkey1,validator_pubkey2,validator_pubkey3 + "github.com/dappnode/validator-monitoring/listener/internal/api/types" +) - return requestsActiveValidators, nil +// GetActiveValidators checks the active status of validators from a specific beacon node. +// If bypass is true, it simply returns all decoded requests. +func GetActiveValidators(requestsDecoded []types.SignatureRequestDecoded, beaconNodeUrl string, bypass bool) []types.SignatureRequestDecoded { + + if len(requestsDecoded) == 0 { + fmt.Println("no requests to process") + return nil + } + + ids := make([]string, 0, len(requestsDecoded)) + for _, req := range requestsDecoded { + ids = append(ids, req.DecodedPayload.Pubkey) + } + + if len(ids) == 0 { + fmt.Println("no valid public keys for network ", beaconNodeUrl, " to query") + return nil + } + + // Serialize the request body to JSON + // See https://ethereum.github.io/beacon-APIs/#/Beacon/postStateValidators + jsonData, err := json.Marshal(struct { + Ids []string `json:"ids"` + Statuses []string `json:"statuses"` + }{ + Ids: ids, + Statuses: []string{"active_ongoing"}, // Only interested in currently active validators + }) + if err != nil { + fmt.Printf("error marshaling request data: %v\n", err) + return nil + } + + // Create HTTP client with timeout + client := &http.Client{Timeout: 50 * time.Second} + apiUrl := fmt.Sprintf("%s/eth/v1/beacon/states/head/validators", beaconNodeUrl) + + // Make API call + resp, err := client.Post(apiUrl, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + fmt.Printf("error making API call to %s: %v\n", apiUrl, err) + return nil + } + defer resp.Body.Close() + + // Check the HTTP response status before reading the body + if resp.StatusCode != http.StatusOK { + fmt.Printf("unexpected response status: %d\n", resp.StatusCode) + return nil + } + + // Decode the API response directly into the ApiResponse struct + var apiResponse types.ActiveValidatorsApiResponse + if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil { + fmt.Printf("error decoding response data: %v\n", err) + return nil + } + + // Use a map to quickly lookup active validators + activeValidatorMap := make(map[string]bool) + for _, validator := range apiResponse.Data { + activeValidatorMap[validator.Validator.Pubkey] = true + } + + // Filter the list of decoded requests to include only those that are active + var activeValidators []types.SignatureRequestDecoded + for _, req := range requestsDecoded { + if _, isActive := activeValidatorMap[req.DecodedPayload.Pubkey]; isActive { + activeValidators = append(activeValidators, req) + } + } + + if bypass { + return requestsDecoded // do not return the filtered list + } else { + return activeValidators // return the filtered list (default behaviour) + } } diff --git a/listener/internal/api/validation/GetActiveValidators_test.go b/listener/internal/api/validation/GetActiveValidators_test.go new file mode 100644 index 0000000..9520a96 --- /dev/null +++ b/listener/internal/api/validation/GetActiveValidators_test.go @@ -0,0 +1,44 @@ +package validation + +import ( + "testing" + + "github.com/dappnode/validator-monitoring/listener/internal/api/types" +) + +func TestGetActiveValidators(t *testing.T) { + // Setup the input data + beaconNodeUrls := map[string]string{ + "holesky": "https://holeskyvals.53650f79ab75c6ff.dyndns.dappnode.io", + } + + requestsDecoded := []types.SignatureRequestDecoded{ + { + Network: "holesky", + DecodedPayload: types.DecodedPayload{ + Pubkey: "0xa685beb5a1f317f5a01ecd6dade42113aad945b2ab53fb1b356334ab441323e538feadd2889894b17f8fa2babe1989ca", + }, + }, + { + Network: "holesky", + DecodedPayload: types.DecodedPayload{ + Pubkey: "0xab31efdd97f32087e96d3262f6fb84a4480411d391689be0dfc931fd8a5c16c3f51f10b127040b1cb65eb955f2b78a63", + }, + }, + { + Network: "holesky", + DecodedPayload: types.DecodedPayload{ + Pubkey: "0xa24a030d7d8ca3c5e1f5824760d0f4157a7a89bcca6414377cca97e6e63445bef0e1b63761ee35a0fc46bb317e31b34b", + }, + }, + } + + // Call the function. "bypass" is set to false, so the function will do expected behaviour and filter out inactive validators + result := GetActiveValidators(requestsDecoded, beaconNodeUrls["holesky"], false) + + // You may need to mock the server's response or adjust the expected values here according to your actual setup + expectedNumValidators := 3 // This should match the number of mock validators that are "active" + if len(result) != expectedNumValidators { + t.Errorf("Expected %d active validators, got %d", expectedNumValidators, len(result)) + } +} diff --git a/listener/internal/api/validation/IsValidSignature_test.go b/listener/internal/api/validation/IsValidSignature_test.go index 0772b02..b616670 100644 --- a/listener/internal/api/validation/IsValidSignature_test.go +++ b/listener/internal/api/validation/IsValidSignature_test.go @@ -42,7 +42,7 @@ func TestIsValidSignature(t *testing.T) { Payload: base64.StdEncoding.EncodeToString(messageBytes), Signature: signature.SerializeToHexStr(), Network: "mainnet", - Label: "solo", + Tag: "solo", } // Validate the signature @@ -84,7 +84,7 @@ func TestIsValidSignatureError(t *testing.T) { Payload: base64.StdEncoding.EncodeToString(payloadBytes), Signature: "clearlyInvalidSignature", // Intentionally invalid Network: "mainnet", - Label: "solo", + Tag: "solo", } // Validate the signature diff --git a/listener/internal/config/config.go b/listener/internal/config/config.go index 597a91e..574b47f 100644 --- a/listener/internal/config/config.go +++ b/listener/internal/config/config.go @@ -2,6 +2,7 @@ package config import ( "os" + "strings" "github.com/dappnode/validator-monitoring/listener/internal/logger" ) @@ -14,8 +15,13 @@ type Config struct { MongoDBURI string // LogLevel is the level of logging LogLevel string + // BypassValidatorsFiltering is a boolean that indicates if the validators filtering should be bypassed + BypassValidatorsFiltering bool + // BeaconNodeURLs is the URLs of the beacon nodes for different networks + BeaconNodeURLs map[string]string } +// TODO: read bypass boolean env func LoadConfig() (*Config, error) { mongoDBURI := os.Getenv("MONGO_DB_URI") @@ -34,9 +40,52 @@ func LoadConfig() (*Config, error) { logger.Fatal("API_PORT is not set") } + // Load bypassValidatorsFiltering boolean from env. Defaults to false unless explicitly set to "true". + bypassValidatorsFilteringStr := os.Getenv("BYPASS_VALIDATORS_FILTERING") + if bypassValidatorsFilteringStr == "" { + logger.Info("BYPASS_VALIDATORS_FILTERING is not set, using default false") + bypassValidatorsFilteringStr = "false" + } + bypassValidatorsFiltering := strings.ToLower(bypassValidatorsFilteringStr) == "true" + + beaconNodeURL := os.Getenv("BEACON_NODE_URL") + if beaconNodeURL == "" { + logger.Fatal("BEACON_NODE_URL is not set") + } + + // beacon node urls per network + + beaconMainnet := os.Getenv("BEACON_NODE_URL_MAINNET") + if beaconMainnet == "" { + logger.Fatal("BEACON_NODE_URL_MAINNET is not set") + } + beaconHolesky := os.Getenv("BEACON_NODE_URL_HOLESKY") + if beaconHolesky == "" { + logger.Fatal("BEACON_NODE_URL_HOLESKY is not set") + } + + beaconGnosis := os.Getenv("BEACON_NODE_URL_GNOSIS") + if beaconGnosis == "" { + logger.Fatal("BEACON_NODE_URL_GNOSIS is not set") + } + + beaconLukso := os.Getenv("BEACON_NODE_URL_LUKSO") + if beaconLukso == "" { + logger.Fatal("BEACON_NODE_URL_LUKSO is not set") + } + + beaconNodeURLs := map[string]string{ + "mainnet": beaconMainnet, + "holesky": beaconHolesky, + "gnosis": beaconGnosis, + "lukso": beaconLukso, + } + return &Config{ - Port: apiPort, - MongoDBURI: mongoDBURI, - LogLevel: logLevel, + Port: apiPort, + MongoDBURI: mongoDBURI, + LogLevel: logLevel, + BypassValidatorsFiltering: bypassValidatorsFiltering, + BeaconNodeURLs: beaconNodeURLs, }, nil }