Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement get active validators #12

Merged
merged 7 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ MONGO_DB_NAME=
MONGO_DB_API_PORT=
API_PORT=
LOG_LEVEL=
BEACON_NODE_URL=
BYPASS_VALIDATORS_FILTERING=
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
MONGO_DB_NAME: "${MONGO_DB_NAME}"
API_PORT: "${API_PORT}"
LOG_LEVEL: "${LOG_LEVEL}"
BYPASS_VALIDATORS_FILTERING: "${BYPASS_VALIDATORS_FILTERING}"
depends_on:
- mongo
container_name: listener
Expand Down
2 changes: 2 additions & 0 deletions listener/cmd/listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func main() {
s := api.NewApi(
config.Port,
config.MongoDBURI,
config.BeaconNodeURLs,
config.BypassValidatorsFiltering,
)

s.Start()
Expand Down
18 changes: 11 additions & 7 deletions listener/internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@ import (
)

type httpApi struct {
server *http.Server
port string
dbUri string
server *http.Server
port string
dbUri string
beaconNodeUrls map[string]string
bypassValidatorFiltering bool
}

// create a new api instance
func NewApi(port string, mongoDbUri string) *httpApi {
func NewApi(port string, mongoDbUri string, beaconNodeUrls map[string]string, bypassValidatorFiltering bool) *httpApi {
return &httpApi{
port: port,
dbUri: mongoDbUri,
port: port,
dbUri: mongoDbUri,
beaconNodeUrls: beaconNodeUrls,
bypassValidatorFiltering: bypassValidatorFiltering,
}
}

Expand Down Expand Up @@ -47,7 +51,7 @@ func (s *httpApi) Start() {
// setup the http api
s.server = &http.Server{
Addr: ":" + s.port,
Handler: routes.SetupRouter(dbCollection),
Handler: routes.SetupRouter(dbCollection, s.beaconNodeUrls, s.bypassValidatorFiltering),
}

// start the api
Expand Down
60 changes: 34 additions & 26 deletions listener/internal/api/handlers/postNewSignature.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type signatureRequest struct {
Payload string `json:"payload"`
Signature string `json:"signature"`
Network string `json:"network"`
Label string `json:"label"`
Tag string `json:"tag"`
}

// decodeAndValidateRequests decodes and validates incoming HTTP requests.
Expand All @@ -31,7 +31,7 @@ func decodeAndValidateRequests(r *http.Request) ([]types.SignatureRequestDecoded

var validRequests []types.SignatureRequestDecoded
for _, req := range requests {
if req.Network == "" || req.Label == "" || req.Signature == "" || req.Payload == "" {
if req.Network == "" || req.Tag == "" || req.Signature == "" || req.Payload == "" {
logger.Debug("Skipping invalid signature from request, missing fields")
continue
}
Expand All @@ -55,7 +55,7 @@ func decodeAndValidateRequests(r *http.Request) ([]types.SignatureRequestDecoded
Payload: req.Payload,
Signature: req.Signature,
Network: req.Network,
Label: req.Label,
Tag: req.Tag,
})
} else {
logger.Debug("Skipping invalid signature from request, invalid payload format")
Expand All @@ -65,9 +65,7 @@ func decodeAndValidateRequests(r *http.Request) ([]types.SignatureRequestDecoded
return validRequests, nil
}

func validateAndInsertSignature(req types.SignatureRequestDecoded, dbCollection *mongo.Collection, wg *sync.WaitGroup) {
defer wg.Done()

func validateAndInsertSignature(req types.SignatureRequestDecoded, dbCollection *mongo.Collection) {
isValidSignature, err := validation.IsValidSignature(req)
if err != nil {
logger.Error("Failed to validate signature: " + err.Error())
Expand All @@ -85,7 +83,7 @@ func validateAndInsertSignature(req types.SignatureRequestDecoded, dbCollection
"pubkey": req.DecodedPayload.Pubkey,
"signature": req.Signature,
"network": req.Network,
"label": req.Label,
"tag": req.Tag,
})
if err != nil {
logger.Error("Failed to insert signature into MongoDB: " + err.Error())
Expand All @@ -99,7 +97,7 @@ func validateAndInsertSignature(req types.SignatureRequestDecoded, dbCollection
// 1. Decode and validate
// 2. Get active validators
// 3. Validate signature and insert into MongoDB
func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection) {
func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection, beaconNodeUrls map[string]string, bypassValidatorFiltering bool) {
logger.Debug("Received new POST '/newSignature' request")

// Decode and validate incoming requests
Expand All @@ -111,31 +109,41 @@ func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mong
}
// Respond with an error if no valid requests were found
if len(validRequests) == 0 {
logger.Error("No valid requests")
respondError(w, http.StatusBadRequest, "No valid requests")
return
}

// Get active validators
requestsWithActiveValidators, err := validation.GetActiveValidators(validRequests)
if err != nil {
respondError(w, http.StatusInternalServerError, "Failed to validate active validators")
return
}
// Respond with an error if no active validators were found
if len(requestsWithActiveValidators) == 0 {
respondError(w, http.StatusInternalServerError, "No active validators found in request")
return
}

var wg sync.WaitGroup
// Insert into MongoDB if signature is valid
for _, req := range requestsWithActiveValidators {
// create a goroutine for each request
appendMutex := new(sync.Mutex) // Mutex for appending to the slice
dbMutex := new(sync.Mutex) // Mutex for database operations
allValidatedRequests := []types.SignatureRequestDecoded{} // This will collect all valid requests

// Iterate over all beacon nodes and get active validators
for _, url := range beaconNodeUrls {
wg.Add(1)
go validateAndInsertSignature(req, dbCollection, &wg)
go func(url string) {
defer wg.Done()
activeValidators := validation.GetActiveValidators(validRequests, url, bypassValidatorFiltering)
if len(activeValidators) == 0 {
return
}

appendMutex.Lock()
allValidatedRequests = append(allValidatedRequests, activeValidators...) // Only one goroutine can append to the slice at a time
appendMutex.Unlock()

for _, req := range activeValidators {
dbMutex.Lock()
validateAndInsertSignature(req, dbCollection) // Do we really need to lock the db insertions?
dbMutex.Unlock()
}
}(url) // Pass the URL to the goroutine
}
// Wait for all goroutines to finish

wg.Wait()
if len(allValidatedRequests) == 0 {
respondError(w, http.StatusInternalServerError, "No active validators found in any network")
return
}
respondOK(w, "Finished processing signatures")
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func PostSignaturesByValidatorAggr(w http.ResponseWriter, r *http.Request, dbCol
},
{
"$group": bson.M{
"_id": bson.M{"pubkey": "$pubkey", "network": "$network", "label": "$label"},
"_id": bson.M{"pubkey": "$pubkey", "network": "$network", "tag": "$tag"},
"signatures": bson.M{
"$push": bson.M{
"signature": "$signature",
Expand All @@ -64,7 +64,7 @@ func PostSignaturesByValidatorAggr(w http.ResponseWriter, r *http.Request, dbCol
"_id": 0,
"pubkey": "$_id.pubkey",
"network": "$_id.network",
"label": "$_id.label",
"tag": "$_id.tag",
"signatures": 1,
},
},
Expand Down
4 changes: 2 additions & 2 deletions listener/internal/api/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"go.mongodb.org/mongo-driver/mongo"
)

func SetupRouter(dbCollection *mongo.Collection) *mux.Router {
func SetupRouter(dbCollection *mongo.Collection, beaconNodeUrls map[string]string, bypassValidatorFiltering bool) *mux.Router {
r := mux.NewRouter()

// Define routes
r.HandleFunc("/", handlers.GetHealthCheck).Methods(http.MethodGet)
// closure function to inject dbCollection into the handler
r.HandleFunc("/newSignature", func(w http.ResponseWriter, r *http.Request) {
handlers.PostNewSignature(w, r, dbCollection)
handlers.PostNewSignature(w, r, dbCollection, beaconNodeUrls, bypassValidatorFiltering)
}).Methods(http.MethodPost)
r.HandleFunc("/signaturesByValidator", func(w http.ResponseWriter, r *http.Request) {
handlers.PostSignaturesByValidator(w, r, dbCollection)
Expand Down
25 changes: 24 additions & 1 deletion listener/internal/api/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,34 @@ type SignatureRequestDecoded struct {
Payload string `json:"payload"`
Signature string `json:"signature"`
Network string `json:"network"`
Label string `json:"label"`
Tag string `json:"tag"`
}

type DecodedPayload struct {
Platform string `json:"platform"`
Timestamp string `json:"timestamp"`
Pubkey string `json:"pubkey"`
}

type ActiveValidator struct {
Pubkey string `json:"pubkey"`
WithdrawalCredentials string `json:"withdrawal_credentials"`
EffectiveBalance string `json:"effective_balance"`
Slashed bool `json:"slashed"`
ActivationEligibilityEpoch string `json:"activation_eligibility_epoch"`
ActivationEpoch string `json:"activation_epoch"`
ExitEpoch string `json:"exit_epoch"`
WithdrawableEpoch string `json:"withdrawable_epoch"`
}

// https://ethereum.github.io/beacon-APIs/#/Beacon /eth/v1/beacon/states/{state_id}/validators
type ActiveValidatorsApiResponse struct {
ExecutionOptimistic bool `json:"execution_optimistic"`
Finalized bool `json:"finalized"`
Data []struct {
Index string `json:"index"`
Balance string `json:"balance"`
Status string `json:"status"`
Validator ActiveValidator `json:"validator"`
} `json:"data"`
}
92 changes: 85 additions & 7 deletions listener/internal/api/validation/GetActiveValidators.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,90 @@
package validation

import "github.com/dappnode/validator-monitoring/listener/internal/api/types"
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"

// ValidatePubkeysWithConsensusClient checks if the given pubkeys from the requests are from active validators
// or not by making SINGLE API call to the consensus client. It returns an array of the active validators pubkeys.
func GetActiveValidators(requestsDecoded []types.SignatureRequestDecoded) ([]types.SignatureRequestDecoded, error) {
requestsActiveValidators := requestsDecoded
// make api call: GET /eth/v1/beacon/states/{state_id}/validators?id=validator_pubkey1,validator_pubkey2,validator_pubkey3
"github.com/dappnode/validator-monitoring/listener/internal/api/types"
)

return requestsActiveValidators, nil
// GetActiveValidators checks the active status of validators from a specific beacon node.
// If bypass is true, it simply returns all decoded requests.
func GetActiveValidators(requestsDecoded []types.SignatureRequestDecoded, beaconNodeUrl string, bypass bool) []types.SignatureRequestDecoded {

if len(requestsDecoded) == 0 {
fmt.Println("no requests to process")
return nil
}

ids := make([]string, 0, len(requestsDecoded))
for _, req := range requestsDecoded {
ids = append(ids, req.DecodedPayload.Pubkey)
}

if len(ids) == 0 {
fmt.Println("no valid public keys for network ", beaconNodeUrl, " to query")
return nil
}

// Serialize the request body to JSON
// See https://ethereum.github.io/beacon-APIs/#/Beacon/postStateValidators
jsonData, err := json.Marshal(struct {
Ids []string `json:"ids"`
Statuses []string `json:"statuses"`
}{
Ids: ids,
Statuses: []string{"active_ongoing"}, // Only interested in currently active validators
})
if err != nil {
fmt.Printf("error marshaling request data: %v\n", err)
return nil
}

// Create HTTP client with timeout
client := &http.Client{Timeout: 50 * time.Second}
apiUrl := fmt.Sprintf("%s/eth/v1/beacon/states/head/validators", beaconNodeUrl)

// Make API call
resp, err := client.Post(apiUrl, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
fmt.Printf("error making API call to %s: %v\n", apiUrl, err)
return nil
}
defer resp.Body.Close()

// Check the HTTP response status before reading the body
if resp.StatusCode != http.StatusOK {
fmt.Printf("unexpected response status: %d\n", resp.StatusCode)
return nil
}

// Decode the API response directly into the ApiResponse struct
var apiResponse types.ActiveValidatorsApiResponse
if err := json.NewDecoder(resp.Body).Decode(&apiResponse); err != nil {
fmt.Printf("error decoding response data: %v\n", err)
return nil
}

// Use a map to quickly lookup active validators
activeValidatorMap := make(map[string]bool)
for _, validator := range apiResponse.Data {
activeValidatorMap[validator.Validator.Pubkey] = true
}

// Filter the list of decoded requests to include only those that are active
var activeValidators []types.SignatureRequestDecoded
for _, req := range requestsDecoded {
if _, isActive := activeValidatorMap[req.DecodedPayload.Pubkey]; isActive {
activeValidators = append(activeValidators, req)
}
}

if bypass {
return requestsDecoded // do not return the filtered list
} else {
return activeValidators // return the filtered list (default behaviour)
}
}
44 changes: 44 additions & 0 deletions listener/internal/api/validation/GetActiveValidators_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package validation

import (
"testing"

"github.com/dappnode/validator-monitoring/listener/internal/api/types"
)

func TestGetActiveValidators(t *testing.T) {
// Setup the input data
beaconNodeUrls := map[string]string{
"holesky": "https://holeskyvals.53650f79ab75c6ff.dyndns.dappnode.io",
}

requestsDecoded := []types.SignatureRequestDecoded{
{
Network: "holesky",
DecodedPayload: types.DecodedPayload{
Pubkey: "0xa685beb5a1f317f5a01ecd6dade42113aad945b2ab53fb1b356334ab441323e538feadd2889894b17f8fa2babe1989ca",
},
},
{
Network: "holesky",
DecodedPayload: types.DecodedPayload{
Pubkey: "0xab31efdd97f32087e96d3262f6fb84a4480411d391689be0dfc931fd8a5c16c3f51f10b127040b1cb65eb955f2b78a63",
},
},
{
Network: "holesky",
DecodedPayload: types.DecodedPayload{
Pubkey: "0xa24a030d7d8ca3c5e1f5824760d0f4157a7a89bcca6414377cca97e6e63445bef0e1b63761ee35a0fc46bb317e31b34b",
},
},
}

// Call the function. "bypass" is set to false, so the function will do expected behaviour and filter out inactive validators
result := GetActiveValidators(requestsDecoded, beaconNodeUrls["holesky"], false)

// You may need to mock the server's response or adjust the expected values here according to your actual setup
expectedNumValidators := 3 // This should match the number of mock validators that are "active"
if len(result) != expectedNumValidators {
t.Errorf("Expected %d active validators, got %d", expectedNumValidators, len(result))
}
}
Loading
Loading