Skip to content

Commit

Permalink
Code reorganization (#10)
Browse files Browse the repository at this point in the history
* Code reorganization

* renami dir

* update dockerfiles

* add custom air.toml config

* fix typo

* fix typo
  • Loading branch information
pablomendezroyo authored Apr 11, 2024
1 parent 07508b5 commit 9529eea
Show file tree
Hide file tree
Showing 26 changed files with 424 additions and 363 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ jobs:
- name: Build
run: |
cd ./listener
go build -a -installsuffix cgo -o bin/listener ./src
go build -a -installsuffix cgo -o bin/listener ./cmd/listener
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.env
./listener/src/tmp/*
./listener/tmp/*
./listener/bin/*
3 changes: 2 additions & 1 deletion docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ services:
context: listener
dockerfile: Dockerfile.dev
volumes:
- ./listener/src:/app/src
- ./listener/cmd:/app/cmd
- ./listener/internal:/app/internal

mongo:
extends:
Expand Down
15 changes: 15 additions & 0 deletions listener/.air.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Sample .air.toml configuration
root = "."
tmp_dir = "tmp"

[build]
cmd = "go build -o ./tmp/listener ./cmd/listener/main.go"
bin = "tmp/listener"
full_bin = "tmp/listener"
# Include other configurations as necessary

[log]
time_format = "15:04:05"

[restart]
delay = 1000 # Milliseconds delay before restarting on file changes
10 changes: 4 additions & 6 deletions listener/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@ COPY go.mod .
COPY go.sum .
RUN go mod download

# Copy the source code into the container.
COPY src/ ./src/

# Set the Current Working Directory inside the container to the src directory.
WORKDIR /app/src
## The code is in the current dir under internal and cmd Copy them and set the required workdir
COPY internal/ ./internal/
COPY cmd/ ./cmd/

# Build the application, outputting the executable to /bin directory.
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /bin/listener main.go
RUN CGO_ENABLED=0 GOOS=linux go build -v -o /bin/listener ./cmd/listener/main.go

# Use a Docker multi-stage build to create a lean production image.
FROM debian:bookworm-slim
Expand Down
10 changes: 4 additions & 6 deletions listener/Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ RUN go install github.com/cosmtrek/air@latest
# Copy the Air configuration file (if you have one) into the container.
# If you don't have a custom .air.toml, you can skip this step,
# and Air will use its default configuration.
# COPY .air.toml . (Uncomment if you have a custom Air config)
COPY .air.toml .

# Copy go module files.
COPY go.mod .
Expand All @@ -26,10 +26,8 @@ RUN go mod download

# Expect source code to be mounted at this directory rather than copied
# This is the change for development mode.
VOLUME ["/app/src"]

# Set the Current Working Directory inside the container to the src directory.
WORKDIR /app/src
VOLUME ["/app/cmd"]
VOLUME ["/app/internal"]

# Command to run the application using Air for live reloading.
CMD ["air"]
CMD ["air", "-c", ".air.toml"]
6 changes: 3 additions & 3 deletions listener/src/main.go → listener/cmd/listener/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package main

import (
"github.com/dappnode/validator-monitoring/listener/src/api"
"github.com/dappnode/validator-monitoring/listener/src/config"
"github.com/dappnode/validator-monitoring/listener/src/logger"
"github.com/dappnode/validator-monitoring/listener/internal/api"
"github.com/dappnode/validator-monitoring/listener/internal/config"
"github.com/dappnode/validator-monitoring/listener/internal/logger"
)

func main() {
Expand Down
57 changes: 57 additions & 0 deletions listener/internal/api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package api

import (
"net/http"

"github.com/dappnode/validator-monitoring/listener/internal/api/routes"
"github.com/dappnode/validator-monitoring/listener/internal/logger"
"github.com/dappnode/validator-monitoring/listener/internal/mongodb"
)

type httpApi struct {
server *http.Server
port string
dbUri string
}

// create a new api instance
func NewApi(port string, mongoDbUri string) *httpApi {
return &httpApi{
port: port,
dbUri: mongoDbUri,
}
}

// start the api
func (s *httpApi) Start() {
// if somehow s.server is not nil, it means the server is already running, this should never happen
if s.server != nil {
logger.Fatal("HTTP server already started")
}

logger.Info("Server is running on port " + s.port)
var err error

// connect to the MongoDB server
dbClient, err := mongodb.ConnectMongoDB(s.dbUri)
if err != nil {
logger.Fatal("Failed to connect to MongoDB: " + err.Error())
}

// get the collection
dbCollection := dbClient.Database("validatorMonitoring").Collection("signatures")
if dbCollection == nil {
logger.Fatal("Failed to connect to MongoDB collection")
}

// setup the http api
s.server = &http.Server{
Addr: ":" + s.port,
Handler: routes.SetupRouter(dbCollection),
}

// start the api
if err := s.server.ListenAndServe(); err != nil {
logger.Fatal("Failed to start server: " + err.Error())
}
}
7 changes: 7 additions & 0 deletions listener/internal/api/handlers/getHealthCheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package handlers

import "net/http"

func GetHealthCheck(w http.ResponseWriter, r *http.Request) {
respondOK(w, "Server is running")
}
100 changes: 100 additions & 0 deletions listener/internal/api/handlers/postNewSignature.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package handlers

import (
"encoding/base64"
"encoding/json"
"net/http"

"github.com/dappnode/validator-monitoring/listener/internal/api/types"
"github.com/dappnode/validator-monitoring/listener/internal/api/validation"
"github.com/dappnode/validator-monitoring/listener/internal/logger"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

type signatureRequest struct {
Payload string `json:"payload"`
Signature string `json:"signature"`
Network string `json:"network"`
Label string `json:"label"`
}

func PostNewSignature(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection) {
logger.Debug("Received new POST '/newSignature' request")

var sigs []signatureRequest
err := json.NewDecoder(r.Body).Decode(&sigs)
if err != nil {
logger.Error("Failed to decode request body: " + err.Error())
respondError(w, http.StatusBadRequest, "Invalid request format")
return
}

var validPubkeys []string // Needed to store valid pubkeys for bulk validation later

// For each element of the request slice, we validate the element format and decode its payload
for _, req := range sigs {
if req.Network == "" || req.Label == "" || req.Signature == "" || req.Payload == "" {
logger.Debug("Skipping invalid signature from request, missing fields")
continue // Skipping invalid elements
}
if !validation.ValidateSignature(req.Signature) {
logger.Debug("Skipping invalid signature from request, invalid signature format: " + req.Signature)
continue // Skipping invalid signatures
}
decodedBytes, err := base64.StdEncoding.DecodeString(req.Payload)
if err != nil {
logger.Error("Failed to decode BASE64 payload from request: " + err.Error())
continue // Skipping payloads that can't be decoded from BASE64
}
var decodedPayload types.DecodedPayload
if err := json.Unmarshal(decodedBytes, &decodedPayload); err != nil {
logger.Error("Failed to decode JSON payload from request: " + err.Error())
continue // Skipping payloads that can't be decoded from JSON
}
// If the payload is valid, we append the pubkey to the validPubkeys slice. Else, we skip it
if decodedPayload.Platform == "dappnode" && decodedPayload.Timestamp != "" && decodedPayload.Pubkey != "" {
validPubkeys = append(validPubkeys, decodedPayload.Pubkey) // Collecting valid pubkeys
} else {
logger.Debug("Skipping invalid signature from request, invalid payload format")
}
}

// Make a single API call to validate pubkeys in bulk
validatedPubkeys := validation.ValidatePubkeysWithConsensusClient(validPubkeys)
if len(validatedPubkeys) == 0 {
respondError(w, http.StatusInternalServerError, "Failed to validate pubkeys with consensus client")
return
}

// Now, iterate over the originally valid requests, check if the pubkey was validated, then verify signature and insert into DB
// This means going over the requests again! TODO: find a better way?
for _, req := range sigs {
decodedBytes, _ := base64.StdEncoding.DecodeString(req.Payload)
var decodedPayload types.DecodedPayload
json.Unmarshal(decodedBytes, &decodedPayload)

// Only try to validate message signatures if the pubkey is validated
if _, exists := validatedPubkeys[decodedPayload.Pubkey]; exists {
// If the pubkey is validated, we can proceed to validate the signature
if validation.ValidateSignatureAgainstPayload(req.Signature, decodedPayload) {
// Insert into MongoDB
_, err := dbCollection.InsertOne(r.Context(), bson.M{
"platform": decodedPayload.Platform,
"timestamp": decodedPayload.Timestamp,
"pubkey": decodedPayload.Pubkey,
"signature": req.Signature,
"network": req.Network,
"label": req.Label,
})
if err != nil {
continue // TODO: Log error or handle as needed
} else {
logger.Info("New Signature " + req.Signature + " inserted into MongoDB")
}
}
}
}

respondOK(w, "Finished processing signatures")
}
38 changes: 38 additions & 0 deletions listener/internal/api/handlers/postSignaturesByValidator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package handlers

import (
"encoding/json"
"net/http"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

type signaturesRequest struct {
Pubkey string `json:"pubkey"`
Network string `json:"network"`
}

func PostSignaturesByValidator(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection) {
var req signaturesRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
respondError(w, http.StatusBadRequest, "Invalid request body")
return
}

filter := bson.M{"pubkey": req.Pubkey, "network": req.Network}
cursor, err := dbCollection.Find(r.Context(), filter)
if err != nil {
respondError(w, http.StatusInternalServerError, "Error fetching signatures from MongoDB")
return
}
defer cursor.Close(r.Context())

var signatures []bson.M
if err = cursor.All(r.Context(), &signatures); err != nil {
respondError(w, http.StatusInternalServerError, "Error reading signatures from cursor")
return
}

respondOK(w, signatures)
}
88 changes: 88 additions & 0 deletions listener/internal/api/handlers/postSignaturesByValidatorAggr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package handlers

import (
"encoding/json"
"net/http"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

// Endpoint that returns an aggregation of all signatures for a given pubkey and network in this format:
// [
//
// {
// "label": "example_label",
// "network": "stader",
// "pubkey": "0xb48c495c19082d892f38227bced89f7199f4e9b642bf94c7f2f1ccf29c0e6a6f54d653002513aa7cdb56c88368797ec",
// "signatures": [
// {
// "platform": "dappnode",
// "signature": "0xa8b00e7746a523346c5165dfa80ffafe52317c6fe6cdcfabd41886f9c8209b829266c5000597142b58dddbcc9c23cfd81315180acf18bb000db50d08293bc539e06a7c751d3d9dec89fb441b3ba6aefdeeff9cfed72fb41171173f22e2993e74",
// "timestamp": "185921877"
// },
// {
// "platform": "dappnode",
// "signature": "0xa8b00e7746a523346c5165dfa80ffafe52317c6fe6cdcfabd41886f9c8209b829266c5000597142b58dddbcc9c23cfd81315180acf18bb000db50d08293bc539e06a7c751d3d9dec89fb441b3ba6aefdeeff9cfed72fb41171173f22e2993e74",
// "timestamp": "185921877"
// }
// ]
// }
//
// ]
func PostSignaturesByValidatorAggr(w http.ResponseWriter, r *http.Request, dbCollection *mongo.Collection) {
var req signaturesRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
respondError(w, http.StatusBadRequest, "Invalid request body")
return
}

// Define the aggregation pipeline
// We should probably add pubkey to each signatures array element, so a 3rd party can easily verify the signature?
pipeline := []bson.M{
{
"$match": bson.M{
"pubkey": req.Pubkey,
"network": req.Network,
},
},
{
"$group": bson.M{
"_id": bson.M{"pubkey": "$pubkey", "network": "$network", "label": "$label"},
"signatures": bson.M{
"$push": bson.M{
"signature": "$signature",
"timestamp": "$timestamp",
"platform": "$platform",
},
},
},
},
{
"$project": bson.M{
"_id": 0,
"pubkey": "$_id.pubkey",
"network": "$_id.network",
"label": "$_id.label",
"signatures": 1,
},
},
}

cursor, err := dbCollection.Aggregate(r.Context(), pipeline, options.Aggregate())
if err != nil {
respondError(w, http.StatusInternalServerError, "Error aggregating signatures from MongoDB")
return
}
defer cursor.Close(r.Context())

var results []bson.M
if err := cursor.All(r.Context(), &results); err != nil {
respondError(w, http.StatusInternalServerError, "Error reading aggregation results")
return
}

// Respond with the aggregation results
respondOK(w, results)
}
Loading

0 comments on commit 9529eea

Please sign in to comment.