diff --git a/.github/workflows/go_test.yml b/.github/workflows/go_test.yml index ce49fc5..a899961 100644 --- a/.github/workflows/go_test.yml +++ b/.github/workflows/go_test.yml @@ -23,4 +23,4 @@ jobs: - name: Build run: | cd ./listener - go build -a -installsuffix cgo -o bin/listener ./src + go build -a -installsuffix cgo -o bin/listener ./cmd/listener diff --git a/.gitignore b/.gitignore index bd2c2c0..06f54b6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .env -./listener/src/tmp/* \ No newline at end of file +./listener/tmp/* +./listener/bin/* \ No newline at end of file diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 44d8551..7d2a05d 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -9,7 +9,8 @@ services: context: listener dockerfile: Dockerfile.dev volumes: - - ./listener/src:/app/src + - ./listener/cmd:/app/cmd + - ./listener/internal:/app/internal mongo: extends: diff --git a/listener/.air.toml b/listener/.air.toml new file mode 100644 index 0000000..c5cb4df --- /dev/null +++ b/listener/.air.toml @@ -0,0 +1,15 @@ +# Sample .air.toml configuration +root = "." +tmp_dir = "tmp" + +[build] + cmd = "go build -o ./tmp/listener ./cmd/listener/main.go" + bin = "tmp/listener" + full_bin = "tmp/listener" + # Include other configurations as necessary + +[log] + time_format = "15:04:05" + +[restart] + delay = 1000 # Milliseconds delay before restarting on file changes diff --git a/listener/Dockerfile b/listener/Dockerfile index fe1f270..98ef499 100644 --- a/listener/Dockerfile +++ b/listener/Dockerfile @@ -9,14 +9,12 @@ COPY go.mod . COPY go.sum . RUN go mod download -# Copy the source code into the container. -COPY src/ ./src/ - -# Set the Current Working Directory inside the container to the src directory. -WORKDIR /app/src +## The code is in the current dir under internal and cmd Copy them and set the required workdir +COPY internal/ ./internal/ +COPY cmd/ ./cmd/ # Build the application, outputting the executable to /bin directory. -RUN CGO_ENABLED=0 GOOS=linux go build -v -o /bin/listener main.go +RUN CGO_ENABLED=0 GOOS=linux go build -v -o /bin/listener ./cmd/listener/main.go # Use a Docker multi-stage build to create a lean production image. FROM debian:bookworm-slim diff --git a/listener/Dockerfile.dev b/listener/Dockerfile.dev index 2460faf..2d4444f 100644 --- a/listener/Dockerfile.dev +++ b/listener/Dockerfile.dev @@ -13,7 +13,7 @@ RUN go install github.com/cosmtrek/air@latest # Copy the Air configuration file (if you have one) into the container. # If you don't have a custom .air.toml, you can skip this step, # and Air will use its default configuration. -# COPY .air.toml . (Uncomment if you have a custom Air config) +COPY .air.toml . # Copy go module files. COPY go.mod . @@ -26,10 +26,8 @@ RUN go mod download # Expect source code to be mounted at this directory rather than copied # This is the change for development mode. -VOLUME ["/app/src"] - -# Set the Current Working Directory inside the container to the src directory. -WORKDIR /app/src +VOLUME ["/app/cmd"] +VOLUME ["/app/internal"] # Command to run the application using Air for live reloading. -CMD ["air"] \ No newline at end of file +CMD ["air", "-c", ".air.toml"] \ No newline at end of file diff --git a/listener/src/main.go b/listener/cmd/listener/main.go similarity index 61% rename from listener/src/main.go rename to listener/cmd/listener/main.go index 20debde..b9c41b1 100644 --- a/listener/src/main.go +++ b/listener/cmd/listener/main.go @@ -1,9 +1,9 @@ package main import ( - "github.com/dappnode/validator-monitoring/listener/src/api" - "github.com/dappnode/validator-monitoring/listener/src/config" - "github.com/dappnode/validator-monitoring/listener/src/logger" + "github.com/dappnode/validator-monitoring/listener/internal/api" + "github.com/dappnode/validator-monitoring/listener/internal/config" + "github.com/dappnode/validator-monitoring/listener/internal/logger" ) func main() { diff --git a/listener/internal/api/api.go b/listener/internal/api/api.go new file mode 100644 index 0000000..f786192 --- /dev/null +++ b/listener/internal/api/api.go @@ -0,0 +1,57 @@ +package api + +import ( + "net/http" + + "github.com/dappnode/validator-monitoring/listener/internal/api/routes" + "github.com/dappnode/validator-monitoring/listener/internal/logger" + "github.com/dappnode/validator-monitoring/listener/internal/mongodb" +) + +type httpApi struct { + server *http.Server + port string + dbUri string +} + +// create a new api instance +func NewApi(port string, mongoDbUri string) *httpApi { + return &httpApi{ + port: port, + dbUri: mongoDbUri, + } +} + +// start the api +func (s *httpApi) Start() { + // if somehow s.server is not nil, it means the server is already running, this should never happen + if s.server != nil { + logger.Fatal("HTTP server already started") + } + + logger.Info("Server is running on port " + s.port) + var err error + + // connect to the MongoDB server + dbClient, err := mongodb.ConnectMongoDB(s.dbUri) + if err != nil { + logger.Fatal("Failed to connect to MongoDB: " + err.Error()) + } + + // get the collection + dbCollection := dbClient.Database("validatorMonitoring").Collection("signatures") + if dbCollection == nil { + logger.Fatal("Failed to connect to MongoDB collection") + } + + // setup the http api + s.server = &http.Server{ + Addr: ":" + s.port, + Handler: routes.SetupRouter(dbCollection), + } + + // start the api + if err := s.server.ListenAndServe(); err != nil { + logger.Fatal("Failed to start server: " + err.Error()) + } +} diff --git a/listener/internal/api/handlers/getHealthCheck.go b/listener/internal/api/handlers/getHealthCheck.go new file mode 100644 index 0000000..2101a69 --- /dev/null +++ b/listener/internal/api/handlers/getHealthCheck.go @@ -0,0 +1,7 @@ +package handlers + +import "net/http" + +func GetHealthCheck(w http.ResponseWriter, r *http.Request) { + respondOK(w, "Server is running") +} diff --git a/listener/internal/api/handlers/postNewSignature.go b/listener/internal/api/handlers/postNewSignature.go new file mode 100644 index 0000000..1f68078 --- /dev/null +++ b/listener/internal/api/handlers/postNewSignature.go @@ -0,0 +1,100 @@ +package handlers + +import ( + "encoding/base64" + "encoding/json" + "net/http" + + "github.com/dappnode/validator-monitoring/listener/internal/api/types" + "github.com/dappnode/validator-monitoring/listener/internal/api/validation" + "github.com/dappnode/validator-monitoring/listener/internal/logger" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +type signatureRequest struct { + Payload string `json:"payload"` + Signature string `json:"signature"` + Network string `json:"network"` + Label string `json:"label"` +} + +func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection) { + logger.Debug("Received new POST '/newSignature' request") + + var sigs []signatureRequest + err := json.NewDecoder(r.Body).Decode(&sigs) + if err != nil { + logger.Error("Failed to decode request body: " + err.Error()) + respondError(w, http.StatusBadRequest, "Invalid request format") + return + } + + var validPubkeys []string // Needed to store valid pubkeys for bulk validation later + + // For each element of the request slice, we validate the element format and decode its payload + for _, req := range sigs { + if req.Network == "" || req.Label == "" || req.Signature == "" || req.Payload == "" { + logger.Debug("Skipping invalid signature from request, missing fields") + continue // Skipping invalid elements + } + if !validation.ValidateSignature(req.Signature) { + logger.Debug("Skipping invalid signature from request, invalid signature format: " + req.Signature) + continue // Skipping invalid signatures + } + decodedBytes, err := base64.StdEncoding.DecodeString(req.Payload) + if err != nil { + logger.Error("Failed to decode BASE64 payload from request: " + err.Error()) + continue // Skipping payloads that can't be decoded from BASE64 + } + var decodedPayload types.DecodedPayload + if err := json.Unmarshal(decodedBytes, &decodedPayload); err != nil { + logger.Error("Failed to decode JSON payload from request: " + err.Error()) + continue // Skipping payloads that can't be decoded from JSON + } + // If the payload is valid, we append the pubkey to the validPubkeys slice. Else, we skip it + if decodedPayload.Platform == "dappnode" && decodedPayload.Timestamp != "" && decodedPayload.Pubkey != "" { + validPubkeys = append(validPubkeys, decodedPayload.Pubkey) // Collecting valid pubkeys + } else { + logger.Debug("Skipping invalid signature from request, invalid payload format") + } + } + + // Make a single API call to validate pubkeys in bulk + validatedPubkeys := validation.ValidatePubkeysWithConsensusClient(validPubkeys) + if len(validatedPubkeys) == 0 { + respondError(w, http.StatusInternalServerError, "Failed to validate pubkeys with consensus client") + return + } + + // Now, iterate over the originally valid requests, check if the pubkey was validated, then verify signature and insert into DB + // This means going over the requests again! TODO: find a better way? + for _, req := range sigs { + decodedBytes, _ := base64.StdEncoding.DecodeString(req.Payload) + var decodedPayload types.DecodedPayload + json.Unmarshal(decodedBytes, &decodedPayload) + + // Only try to validate message signatures if the pubkey is validated + if _, exists := validatedPubkeys[decodedPayload.Pubkey]; exists { + // If the pubkey is validated, we can proceed to validate the signature + if validation.ValidateSignatureAgainstPayload(req.Signature, decodedPayload) { + // Insert into MongoDB + _, err := dbCollection.InsertOne(r.Context(), bson.M{ + "platform": decodedPayload.Platform, + "timestamp": decodedPayload.Timestamp, + "pubkey": decodedPayload.Pubkey, + "signature": req.Signature, + "network": req.Network, + "label": req.Label, + }) + if err != nil { + continue // TODO: Log error or handle as needed + } else { + logger.Info("New Signature " + req.Signature + " inserted into MongoDB") + } + } + } + } + + respondOK(w, "Finished processing signatures") +} diff --git a/listener/internal/api/handlers/postSignaturesByValidator.go b/listener/internal/api/handlers/postSignaturesByValidator.go new file mode 100644 index 0000000..2283ed5 --- /dev/null +++ b/listener/internal/api/handlers/postSignaturesByValidator.go @@ -0,0 +1,38 @@ +package handlers + +import ( + "encoding/json" + "net/http" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" +) + +type signaturesRequest struct { + Pubkey string `json:"pubkey"` + Network string `json:"network"` +} + +func PostSignaturesByValidator(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection) { + var req signaturesRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + respondError(w, http.StatusBadRequest, "Invalid request body") + return + } + + filter := bson.M{"pubkey": req.Pubkey, "network": req.Network} + cursor, err := dbCollection.Find(r.Context(), filter) + if err != nil { + respondError(w, http.StatusInternalServerError, "Error fetching signatures from MongoDB") + return + } + defer cursor.Close(r.Context()) + + var signatures []bson.M + if err = cursor.All(r.Context(), &signatures); err != nil { + respondError(w, http.StatusInternalServerError, "Error reading signatures from cursor") + return + } + + respondOK(w, signatures) +} diff --git a/listener/internal/api/handlers/postSignaturesByValidatorAggr.go b/listener/internal/api/handlers/postSignaturesByValidatorAggr.go new file mode 100644 index 0000000..37ac9c6 --- /dev/null +++ b/listener/internal/api/handlers/postSignaturesByValidatorAggr.go @@ -0,0 +1,88 @@ +package handlers + +import ( + "encoding/json" + "net/http" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +// Endpoint that returns an aggregation of all signatures for a given pubkey and network in this format: +// [ +// +// { +// "label": "example_label", +// "network": "stader", +// "pubkey": "0xb48c495c19082d892f38227bced89f7199f4e9b642bf94c7f2f1ccf29c0e6a6f54d653002513aa7cdb56c88368797ec", +// "signatures": [ +// { +// "platform": "dappnode", +// "signature": "0xa8b00e7746a523346c5165dfa80ffafe52317c6fe6cdcfabd41886f9c8209b829266c5000597142b58dddbcc9c23cfd81315180acf18bb000db50d08293bc539e06a7c751d3d9dec89fb441b3ba6aefdeeff9cfed72fb41171173f22e2993e74", +// "timestamp": "185921877" +// }, +// { +// "platform": "dappnode", +// "signature": "0xa8b00e7746a523346c5165dfa80ffafe52317c6fe6cdcfabd41886f9c8209b829266c5000597142b58dddbcc9c23cfd81315180acf18bb000db50d08293bc539e06a7c751d3d9dec89fb441b3ba6aefdeeff9cfed72fb41171173f22e2993e74", +// "timestamp": "185921877" +// } +// ] +// } +// +// ] +func PostSignaturesByValidatorAggr(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection) { + var req signaturesRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + respondError(w, http.StatusBadRequest, "Invalid request body") + return + } + + // Define the aggregation pipeline + // We should probably add pubkey to each signatures array element, so a 3rd party can easily verify the signature? + pipeline := []bson.M{ + { + "$match": bson.M{ + "pubkey": req.Pubkey, + "network": req.Network, + }, + }, + { + "$group": bson.M{ + "_id": bson.M{"pubkey": "$pubkey", "network": "$network", "label": "$label"}, + "signatures": bson.M{ + "$push": bson.M{ + "signature": "$signature", + "timestamp": "$timestamp", + "platform": "$platform", + }, + }, + }, + }, + { + "$project": bson.M{ + "_id": 0, + "pubkey": "$_id.pubkey", + "network": "$_id.network", + "label": "$_id.label", + "signatures": 1, + }, + }, + } + + cursor, err := dbCollection.Aggregate(r.Context(), pipeline, options.Aggregate()) + if err != nil { + respondError(w, http.StatusInternalServerError, "Error aggregating signatures from MongoDB") + return + } + defer cursor.Close(r.Context()) + + var results []bson.M + if err := cursor.All(r.Context(), &results); err != nil { + respondError(w, http.StatusInternalServerError, "Error reading aggregation results") + return + } + + // Respond with the aggregation results + respondOK(w, results) +} diff --git a/listener/internal/api/handlers/respondError.go b/listener/internal/api/handlers/respondError.go new file mode 100644 index 0000000..23fb163 --- /dev/null +++ b/listener/internal/api/handlers/respondError.go @@ -0,0 +1,18 @@ +package handlers + +import ( + "encoding/json" + "net/http" +) + +type httpErrorResp struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// TODO: error handling +func respondError(w http.ResponseWriter, code int, message string) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + json.NewEncoder(w).Encode(httpErrorResp{Code: code, Message: message}) +} diff --git a/listener/internal/api/handlers/respondOk.go b/listener/internal/api/handlers/respondOk.go new file mode 100644 index 0000000..8b35274 --- /dev/null +++ b/listener/internal/api/handlers/respondOk.go @@ -0,0 +1,13 @@ +package handlers + +import ( + "encoding/json" + "net/http" +) + +// TODO: error handling +func respondOK(w http.ResponseWriter, response any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(response) +} diff --git a/listener/internal/api/routes/routes.go b/listener/internal/api/routes/routes.go new file mode 100644 index 0000000..835a08a --- /dev/null +++ b/listener/internal/api/routes/routes.go @@ -0,0 +1,31 @@ +package routes + +import ( + "net/http" + + "github.com/dappnode/validator-monitoring/listener/internal/api/handlers" + "github.com/gorilla/mux" + "go.mongodb.org/mongo-driver/mongo" +) + +func SetupRouter(dbCollection *mongo.Collection) *mux.Router { + r := mux.NewRouter() + + // Define routes + r.HandleFunc("/", handlers.GetHealthCheck).Methods(http.MethodGet) + // closure function to inject dbCollection into the handler + r.HandleFunc("/newSignature", func(w http.ResponseWriter, r *http.Request) { + handlers.PostNewSignature(w, r, dbCollection) + }).Methods(http.MethodPost) + r.HandleFunc("/signaturesByValidator", func(w http.ResponseWriter, r *http.Request) { + handlers.PostSignaturesByValidator(w, r, dbCollection) + }).Methods(http.MethodPost) + r.HandleFunc("/signaturesByValidatorAggr", func(w http.ResponseWriter, r *http.Request) { + handlers.PostSignaturesByValidatorAggr(w, r, dbCollection) + }).Methods(http.MethodPost) + + // Middlewares + // r.Use(corsmiddleware())) + + return r +} diff --git a/listener/internal/api/types/types.go b/listener/internal/api/types/types.go new file mode 100644 index 0000000..d51c8a5 --- /dev/null +++ b/listener/internal/api/types/types.go @@ -0,0 +1,7 @@ +package types + +type DecodedPayload struct { + Platform string `json:"platform"` + Timestamp string `json:"timestamp"` + Pubkey string `json:"pubkey"` +} diff --git a/listener/internal/api/validation/validatePubkeysWithConsensusClient.go b/listener/internal/api/validation/validatePubkeysWithConsensusClient.go new file mode 100644 index 0000000..df549dc --- /dev/null +++ b/listener/internal/api/validation/validatePubkeysWithConsensusClient.go @@ -0,0 +1,13 @@ +package validation + +// validatePubkeysWithConsensusClient simulates making a bulk request to a consensus client for validating pubkeys. +// It can return a map of validated pubkeys that exist as validators. +func ValidatePubkeysWithConsensusClient(pubkeys []string) map[string]bool { + validatedPubkeys := make(map[string]bool) + // make api call: GET /eth/v1/beacon/states/{state_id}/validators?id=validator_pubkey1,validator_pubkey2,validator_pubkey3 + + for _, pubkey := range pubkeys { + validatedPubkeys[pubkey] = true // Assuming all given pubkeys are valid + } + return validatedPubkeys +} diff --git a/listener/internal/api/validation/validateSignature.go b/listener/internal/api/validation/validateSignature.go new file mode 100644 index 0000000..fbaa26c --- /dev/null +++ b/listener/internal/api/validation/validateSignature.go @@ -0,0 +1,10 @@ +package validation + +// A valid signature is a 0x prefixed hex string of 194 characters (including the prefix) +func ValidateSignature(signature string) bool { + // validate the signature + if len(signature) != 194 || signature[:2] != "0x" { + return false + } + return true +} diff --git a/listener/internal/api/validation/validateSignatureAgainstPayload.go b/listener/internal/api/validation/validateSignatureAgainstPayload.go new file mode 100644 index 0000000..46b2598 --- /dev/null +++ b/listener/internal/api/validation/validateSignatureAgainstPayload.go @@ -0,0 +1,9 @@ +package validation + +import "github.com/dappnode/validator-monitoring/listener/internal/api/types" + +// Dummy implementation of validateSignatureAgainstPayload +func ValidateSignatureAgainstPayload(signature string, payload types.DecodedPayload) bool { + // signature validation logic here + return true +} diff --git a/listener/src/config/config.go b/listener/internal/config/config.go similarity index 92% rename from listener/src/config/config.go rename to listener/internal/config/config.go index 5abd0fb..597a91e 100644 --- a/listener/src/config/config.go +++ b/listener/internal/config/config.go @@ -3,7 +3,7 @@ package config import ( "os" - "github.com/dappnode/validator-monitoring/listener/src/logger" + "github.com/dappnode/validator-monitoring/listener/internal/logger" ) // Config is the struct that holds the configuration of the application diff --git a/listener/src/logger/logger.go b/listener/internal/logger/logger.go similarity index 100% rename from listener/src/logger/logger.go rename to listener/internal/logger/logger.go diff --git a/listener/src/mongodb/connection.go b/listener/internal/mongodb/connection.go similarity index 95% rename from listener/src/mongodb/connection.go rename to listener/internal/mongodb/connection.go index 2b49514..ed1d739 100644 --- a/listener/src/mongodb/connection.go +++ b/listener/internal/mongodb/connection.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/dappnode/validator-monitoring/listener/src/logger" + "github.com/dappnode/validator-monitoring/listener/internal/logger" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) diff --git a/listener/src/api/api.go b/listener/src/api/api.go deleted file mode 100644 index 619afc2..0000000 --- a/listener/src/api/api.go +++ /dev/null @@ -1,76 +0,0 @@ -package api - -import ( - "net/http" - - "github.com/dappnode/validator-monitoring/listener/src/logger" - "github.com/dappnode/validator-monitoring/listener/src/mongodb" - "github.com/gorilla/mux" - "go.mongodb.org/mongo-driver/mongo" -) - -type httpApi struct { - server *http.Server - port string - dbUri string - dbClient *mongo.Client - dbCollection *mongo.Collection -} - -// create a new api instance -func NewApi(port string, mongoDbUri string) *httpApi { - return &httpApi{ - port: port, - dbUri: mongoDbUri, - } -} - -// start the api -func (s *httpApi) Start() { - // if somehow s.server is not nil, it means the server is already running, this should never happen - if s.server != nil { - logger.Fatal("HTTP server already started") - } - - logger.Info("Server is running on port " + s.port) - var err error - - // connect to the MongoDB server - s.dbClient, err = mongodb.ConnectMongoDB(s.dbUri) - if err != nil { - logger.Fatal("Failed to connect to MongoDB: " + err.Error()) - } - - // get the collection - s.dbCollection = s.dbClient.Database("validatorMonitoring").Collection("signatures") - if s.dbCollection == nil { - logger.Fatal("Failed to connect to MongoDB collection") - } - - // setup the http api - s.server = &http.Server{ - Addr: ":" + s.port, - Handler: s.SetupRouter(), - } - - // start the api - if err := s.server.ListenAndServe(); err != nil { - logger.Fatal("Failed to start server: " + err.Error()) - } -} - -// initialize and setup the router. This is where all the endpoints are defined -// and the middlewares are applied -func (s *httpApi) SetupRouter() *mux.Router { - r := mux.NewRouter() - - // Endpoints - r.HandleFunc("/", s.handleRoot).Methods(http.MethodGet) - r.HandleFunc("/newSignature", s.handleSignature).Methods(http.MethodPost) - r.HandleFunc("/signaturesByValidator", s.handleGetSignatures).Methods(http.MethodPost) - r.HandleFunc("/signaturesByValidatorAggr", s.handleGetSignaturesAggr).Methods(http.MethodPost) - - // Middlewares - // r.Use(corsmiddleware())) - return r -} diff --git a/listener/src/api/handlers.go b/listener/src/api/handlers.go deleted file mode 100644 index 1796e02..0000000 --- a/listener/src/api/handlers.go +++ /dev/null @@ -1,213 +0,0 @@ -package api - -import ( - "encoding/base64" - "encoding/json" - "net/http" - - "github.com/dappnode/validator-monitoring/listener/src/logger" - "github.com/dappnode/validator-monitoring/listener/src/types" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo/options" -) - -func (s *httpApi) handleRoot(w http.ResponseWriter, r *http.Request) { - s.respondOK(w, "Server is running") -} - -func (s *httpApi) handleSignature(w http.ResponseWriter, r *http.Request) { - logger.Debug("Received new POST '/signature' request") - - var sigs []types.SignatureRequest - err := json.NewDecoder(r.Body).Decode(&sigs) - if err != nil { - logger.Error("Failed to decode request body: " + err.Error()) - s.respondError(w, http.StatusBadRequest, "Invalid request format") - return - } - - var validPubkeys []string // Needed to store valid pubkeys for bulk validation later - - // For each element of the request slice, we validate the element format and decode its payload - for _, req := range sigs { - if req.Network == "" || req.Label == "" || req.Signature == "" || req.Payload == "" { - logger.Debug("Skipping invalid signature from request, missing fields") - continue // Skipping invalid elements - } - if !validateSignature(req.Signature) { - logger.Debug("Skipping invalid signature from request, invalid signature format: " + req.Signature) - continue // Skipping invalid signatures - } - decodedBytes, err := base64.StdEncoding.DecodeString(req.Payload) - if err != nil { - logger.Error("Failed to decode BASE64 payload from request: " + err.Error()) - continue // Skipping payloads that can't be decoded from BASE64 - } - var decodedPayload types.DecodedPayload - if err := json.Unmarshal(decodedBytes, &decodedPayload); err != nil { - logger.Error("Failed to decode JSON payload from request: " + err.Error()) - continue // Skipping payloads that can't be decoded from JSON - } - // If the payload is valid, we append the pubkey to the validPubkeys slice. Else, we skip it - if decodedPayload.Platform == "dappnode" && decodedPayload.Timestamp != "" && decodedPayload.Pubkey != "" { - validPubkeys = append(validPubkeys, decodedPayload.Pubkey) // Collecting valid pubkeys - } else { - logger.Debug("Skipping invalid signature from request, invalid payload format") - } - } - - // Make a single API call to validate pubkeys in bulk - validatedPubkeys := validatePubkeysWithConsensusClient(validPubkeys) - if len(validatedPubkeys) == 0 { - s.respondError(w, http.StatusInternalServerError, "Failed to validate pubkeys with consensus client") - return - } - - // Now, iterate over the originally valid requests, check if the pubkey was validated, then verify signature and insert into DB - // This means going over the requests again! TODO: find a better way? - for _, req := range sigs { - decodedBytes, _ := base64.StdEncoding.DecodeString(req.Payload) - var decodedPayload types.DecodedPayload - json.Unmarshal(decodedBytes, &decodedPayload) - - // Only try to validate message signatures if the pubkey is validated - if _, exists := validatedPubkeys[decodedPayload.Pubkey]; exists { - // If the pubkey is validated, we can proceed to validate the signature - if validateSignatureAgainstPayload(req.Signature, decodedPayload) { - // Insert into MongoDB - _, err := s.dbCollection.InsertOne(r.Context(), bson.M{ - "platform": decodedPayload.Platform, - "timestamp": decodedPayload.Timestamp, - "pubkey": decodedPayload.Pubkey, - "signature": req.Signature, - "network": req.Network, - "label": req.Label, - }) - if err != nil { - continue // TODO: Log error or handle as needed - } else { - logger.Info("New Signature " + req.Signature + " inserted into MongoDB") - } - } - } - } - - s.respondOK(w, "Finished processing signatures") -} - -func (s *httpApi) handleGetSignatures(w http.ResponseWriter, r *http.Request) { - var req types.SignaturesRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid request body") - return - } - - filter := bson.M{"pubkey": req.Pubkey, "network": req.Network} - cursor, err := s.dbCollection.Find(r.Context(), filter) - if err != nil { - s.respondError(w, http.StatusInternalServerError, "Error fetching signatures from MongoDB") - return - } - defer cursor.Close(r.Context()) - - var signatures []bson.M - if err = cursor.All(r.Context(), &signatures); err != nil { - s.respondError(w, http.StatusInternalServerError, "Error reading signatures from cursor") - return - } - - s.respondOK(w, signatures) -} - -// Endpoint that returns an aggregation of all signatures for a given pubkey and network in this format: -// [ -// -// { -// "label": "example_label", -// "network": "stader", -// "pubkey": "0xb48c495c19082d892f38227bced89f7199f4e9b642bf94c7f2f1ccf29c0e6a6f54d653002513aa7cdb56c88368797ec", -// "signatures": [ -// { -// "platform": "dappnode", -// "signature": "0xa8b00e7746a523346c5165dfa80ffafe52317c6fe6cdcfabd41886f9c8209b829266c5000597142b58dddbcc9c23cfd81315180acf18bb000db50d08293bc539e06a7c751d3d9dec89fb441b3ba6aefdeeff9cfed72fb41171173f22e2993e74", -// "timestamp": "185921877" -// }, -// { -// "platform": "dappnode", -// "signature": "0xa8b00e7746a523346c5165dfa80ffafe52317c6fe6cdcfabd41886f9c8209b829266c5000597142b58dddbcc9c23cfd81315180acf18bb000db50d08293bc539e06a7c751d3d9dec89fb441b3ba6aefdeeff9cfed72fb41171173f22e2993e74", -// "timestamp": "185921877" -// } -// ] -// } -// -// ] -func (s *httpApi) handleGetSignaturesAggr(w http.ResponseWriter, r *http.Request) { - var req types.SignaturesRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - s.respondError(w, http.StatusBadRequest, "Invalid request body") - return - } - - // Define the aggregation pipeline - // We should probably add pubkey to each signatures array element, so a 3rd party can easily verify the signature? - pipeline := []bson.M{ - { - "$match": bson.M{ - "pubkey": req.Pubkey, - "network": req.Network, - }, - }, - { - "$group": bson.M{ - "_id": bson.M{"pubkey": "$pubkey", "network": "$network", "label": "$label"}, - "signatures": bson.M{ - "$push": bson.M{ - "signature": "$signature", - "timestamp": "$timestamp", - "platform": "$platform", - }, - }, - }, - }, - { - "$project": bson.M{ - "_id": 0, - "pubkey": "$_id.pubkey", - "network": "$_id.network", - "label": "$_id.label", - "signatures": 1, - }, - }, - } - - cursor, err := s.dbCollection.Aggregate(r.Context(), pipeline, options.Aggregate()) - if err != nil { - s.respondError(w, http.StatusInternalServerError, "Error aggregating signatures from MongoDB") - return - } - defer cursor.Close(r.Context()) - - var results []bson.M - if err := cursor.All(r.Context(), &results); err != nil { - s.respondError(w, http.StatusInternalServerError, "Error reading aggregation results") - return - } - - // Respond with the aggregation results - s.respondOK(w, results) -} - -// TODO: error handling -func (s *httpApi) respondError(w http.ResponseWriter, code int, message string) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(code) - json.NewEncoder(w).Encode(types.HttpErrorResp{Code: code, Message: message}) -} - -// TODO: error handling -func (s *httpApi) respondOK(w http.ResponseWriter, response any) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(response) - -} diff --git a/listener/src/api/utils.go b/listener/src/api/utils.go deleted file mode 100644 index aa1b7d7..0000000 --- a/listener/src/api/utils.go +++ /dev/null @@ -1,30 +0,0 @@ -package api - -import "github.com/dappnode/validator-monitoring/listener/src/types" - -// A valid signature is a 0x prefixed hex string of 194 characters (including the prefix) -func validateSignature(signature string) bool { - // validate the signature - if len(signature) != 194 || signature[:2] != "0x" { - return false - } - return true -} - -// validatePubkeysWithConsensusClient simulates making a bulk request to a consensus client for validating pubkeys. -// It can return a map of validated pubkeys that exist as validators. -func validatePubkeysWithConsensusClient(pubkeys []string) map[string]bool { - validatedPubkeys := make(map[string]bool) - // make api call: GET /eth/v1/beacon/states/{state_id}/validators?id=validator_pubkey1,validator_pubkey2,validator_pubkey3 - - for _, pubkey := range pubkeys { - validatedPubkeys[pubkey] = true // Assuming all given pubkeys are valid - } - return validatedPubkeys -} - -// Dummy implementation of validateSignatureAgainstPayload -func validateSignatureAgainstPayload(signature string, payload types.DecodedPayload) bool { - // signature validation logic here - return true -} diff --git a/listener/src/types/types.go b/listener/src/types/types.go deleted file mode 100644 index 147bc7d..0000000 --- a/listener/src/types/types.go +++ /dev/null @@ -1,24 +0,0 @@ -package types - -type SignatureRequest struct { - Payload string `json:"payload"` - Signature string `json:"signature"` - Network string `json:"network"` - Label string `json:"label"` -} - -type DecodedPayload struct { - Platform string `json:"platform"` - Timestamp string `json:"timestamp"` - Pubkey string `json:"pubkey"` -} - -type HttpErrorResp struct { - Code int `json:"code"` - Message string `json:"message"` -} - -type SignaturesRequest struct { - Pubkey string `json:"pubkey"` - Network string `json:"network"` -}