Skip to content

Commit

Permalink
feat: Epoch Notifier
Browse files Browse the repository at this point in the history
  • Loading branch information
joanestebanr committed Oct 31, 2024
1 parent 2a76deb commit cda1666
Show file tree
Hide file tree
Showing 27 changed files with 2,886 additions and 35 deletions.
30 changes: 29 additions & 1 deletion agglayer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@ import (

const errCodeAgglayerRateLimitExceeded int = -10007

var ErrAgglayerRateLimitExceeded = fmt.Errorf("agglayer rate limit exceeded")
var (
ErrAgglayerRateLimitExceeded = fmt.Errorf("agglayer rate limit exceeded")
jSONRPCCall = rpc.JSONRPCCall
)

type AggLayerClientGetClockConfiguration interface {
GetClockConfiguration() (*ClockConfiguration, error)
}

// AgglayerClientInterface is the interface that defines the methods that the AggLayerClient will implement
type AgglayerClientInterface interface {
SendTx(signedTx SignedTx) (common.Hash, error)
WaitTxToBeMined(hash common.Hash, ctx context.Context) error
SendCertificate(certificate *SignedCertificate) (common.Hash, error)
GetCertificateHeader(certificateHash common.Hash) (*CertificateHeader, error)
AggLayerClientGetClockConfiguration
}

// AggLayerClient is the client that will be used to interact with the AggLayer
Expand Down Expand Up @@ -128,3 +136,23 @@ func (c *AggLayerClient) GetCertificateHeader(certificateHash common.Hash) (*Cer

return result, nil
}

// GetClockConfiguration returns the clock configuration of AggLayer
func (c *AggLayerClient) GetClockConfiguration() (*ClockConfiguration, error) {
response, err := jSONRPCCall(c.url, "interop_getClockConfiguration")
if err != nil {
return nil, err
}

if response.Error != nil {
return nil, fmt.Errorf("GetClockConfiguration code=%d msg=%s", response.Error.Code, response.Error.Message)
}

var result *ClockConfiguration
err = json.Unmarshal(response.Result, &result)
if err != nil {
return nil, err
}

return result, nil
}
67 changes: 67 additions & 0 deletions agglayer/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package agglayer

import (
"fmt"
"testing"

"github.com/0xPolygon/cdk-rpc/rpc"
"github.com/stretchr/testify/require"
)

const (
testURL = "http://localhost:8080"
)

func TestGetClockConfigurationResponseWithError(t *testing.T) {
sut := NewAggLayerClient(testURL)
response := rpc.Response{
Error: &rpc.ErrorObject{},
}
jSONRPCCall = func(url, method string, params ...interface{}) (rpc.Response, error) {
return response, nil
}
clockConfig, err := sut.GetClockConfiguration()
require.Nil(t, clockConfig)
require.Error(t, err)
}

func TestGetClockConfigurationResponseBadJson(t *testing.T) {
sut := NewAggLayerClient(testURL)
response := rpc.Response{
Result: []byte(`{`),
}
jSONRPCCall = func(url, method string, params ...interface{}) (rpc.Response, error) {
return response, nil
}
clockConfig, err := sut.GetClockConfiguration()
require.Nil(t, clockConfig)
require.Error(t, err)
}

func TestGetClockConfigurationErrorResponse(t *testing.T) {
sut := NewAggLayerClient(testURL)

jSONRPCCall = func(url, method string, params ...interface{}) (rpc.Response, error) {
return rpc.Response{}, fmt.Errorf("unittest error")
}
clockConfig, err := sut.GetClockConfiguration()
require.Nil(t, clockConfig)
require.Error(t, err)
}

func TestGetClockConfigurationOkResponse(t *testing.T) {
sut := NewAggLayerClient(testURL)
response := rpc.Response{
Result: []byte(`{"epoch_duration": 1, "genesis_block": 1}`),
}
jSONRPCCall = func(url, method string, params ...interface{}) (rpc.Response, error) {
return response, nil
}
clockConfig, err := sut.GetClockConfiguration()
require.NotNil(t, clockConfig)
require.NoError(t, err)
require.Equal(t, ClockConfiguration{
EpochDuration: 1,
GenesisBlock: 1,
}, *clockConfig)
}
32 changes: 31 additions & 1 deletion agglayer/mock_agglayer_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions agglayer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,3 +385,14 @@ func (c CertificateHeader) String() string {
return fmt.Sprintf("Height: %d, CertificateID: %s, NewLocalExitRoot: %s",
c.Height, c.CertificateID.String(), c.NewLocalExitRoot.String())
}

// ClockConfiguration represents the configuration of the epoch clock
// returned by the interop_getClockConfiguration RPC call
type ClockConfiguration struct {
EpochDuration uint64 `json:"epoch_duration"`
GenesisBlock uint64 `json:"genesis_block"`
}

func (c ClockConfiguration) String() string {
return fmt.Sprintf("EpochDuration: %d, GenesisBlock: %d", c.EpochDuration, c.GenesisBlock)
}
23 changes: 13 additions & 10 deletions aggsender/aggsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/0xPolygon/cdk/agglayer"
"github.com/0xPolygon/cdk/aggsender/db"
aggsendertypes "github.com/0xPolygon/cdk/aggsender/types"
"github.com/0xPolygon/cdk/aggsender/types"
"github.com/0xPolygon/cdk/bridgesync"
cdkcommon "github.com/0xPolygon/cdk/common"
"github.com/0xPolygon/cdk/l1infotreesync"
Expand All @@ -31,10 +31,11 @@ var (

// AggSender is a component that will send certificates to the aggLayer
type AggSender struct {
log aggsendertypes.Logger
log types.Logger

l2Syncer aggsendertypes.L2BridgeSyncer
l1infoTreeSyncer aggsendertypes.L1InfoTreeSyncer
l2Syncer types.L2BridgeSyncer
l1infoTreeSyncer types.L1InfoTreeSyncer
epochNotifier types.EpochNotifier

storage db.AggSenderStorage
aggLayerClient agglayer.AgglayerClientInterface
Expand All @@ -51,7 +52,8 @@ func New(
cfg Config,
aggLayerClient agglayer.AgglayerClientInterface,
l1InfoTreeSyncer *l1infotreesync.L1InfoTreeSync,
l2Syncer *bridgesync.BridgeSync) (*AggSender, error) {
l2Syncer *bridgesync.BridgeSync,
epochNotifier types.EpochNotifier) (*AggSender, error) {
storage, err := db.NewAggSenderSQLStorage(logger, cfg.StoragePath)
if err != nil {
return nil, err
Expand All @@ -70,6 +72,7 @@ func New(
aggLayerClient: aggLayerClient,
l1infoTreeSyncer: l1InfoTreeSyncer,
sequencerKey: sequencerPrivateKey,
epochNotifier: epochNotifier,
}, nil
}

Expand All @@ -81,11 +84,11 @@ func (a *AggSender) Start(ctx context.Context) {

// sendCertificates sends certificates to the aggLayer
func (a *AggSender) sendCertificates(ctx context.Context) {
ticker := time.NewTicker(a.cfg.BlockGetInterval.Duration)

chEpoch := a.epochNotifier.Subscribe("aggsender")
for {
select {
case <-ticker.C:
case epoch := <-chEpoch:
a.log.Infof("Epoch %d received", epoch.Epoch)
if err := a.sendCertificate(ctx); err != nil {
log.Error(err)
}
Expand Down Expand Up @@ -171,7 +174,7 @@ func (a *AggSender) sendCertificate(ctx context.Context) error {
}
log.Infof("certificate send: Height: %d hash: %s", signedCertificate.Height, certificateHash.String())

if err := a.storage.SaveLastSentCertificate(ctx, aggsendertypes.CertificateInfo{
if err := a.storage.SaveLastSentCertificate(ctx, types.CertificateInfo{
Height: certificate.Height,
CertificateID: certificateHash,
NewLocalExitRoot: certificate.NewLocalExitRoot,
Expand Down Expand Up @@ -209,7 +212,7 @@ func (a *AggSender) saveCertificateToFile(signedCertificate *agglayer.SignedCert
func (a *AggSender) buildCertificate(ctx context.Context,
bridges []bridgesync.Bridge,
claims []bridgesync.Claim,
lastSentCertificateInfo aggsendertypes.CertificateInfo) (*agglayer.Certificate, error) {
lastSentCertificateInfo types.CertificateInfo) (*agglayer.Certificate, error) {
if len(bridges) == 0 && len(claims) == 0 {
return nil, errNoBridgesAndClaims
}
Expand Down
36 changes: 22 additions & 14 deletions aggsender/aggsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,19 @@ import (

func TestExploratoryGetCertificateHeader(t *testing.T) {
t.Skip("This test is exploratory and should be skipped")
aggLayerClient := agglayer.NewAggLayerClient("http://localhost:32795")
aggLayerClient := agglayer.NewAggLayerClient("http://localhost:32796")
certificateID := common.HexToHash("0xf153e75e24591432ac5deafaeaafba3fec0fd851261c86051b9c0d540b38c369")
certificateHeader, err := aggLayerClient.GetCertificateHeader(certificateID)
require.NoError(t, err)
fmt.Print(certificateHeader)
}
func TestExploratoryGetClockConfiguration(t *testing.T) {
t.Skip("This test is exploratory and should be skipped")
aggLayerClient := agglayer.NewAggLayerClient("http://localhost:32796")
clockConfig, err := aggLayerClient.GetClockConfiguration()
require.NoError(t, err)
fmt.Print(clockConfig)
}

func TestConvertClaimToImportedBridgeExit(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -251,7 +258,8 @@ func TestGetImportedBridgeExits(t *testing.T) {
t.Parallel()

mockProof := generateTestProof(t)
mockL1InfoTreeSyncer := mocks.NewL1InfoTreeSyncerMock(t)

mockL1InfoTreeSyncer := mocks.NewL1InfoTreeSyncer(t)
mockL1InfoTreeSyncer.On("GetInfoByGlobalExitRoot", mock.Anything).Return(&l1infotreesync.L1InfoTreeLeaf{
L1InfoTreeIndex: 1,
Timestamp: 123456789,
Expand Down Expand Up @@ -484,8 +492,8 @@ func TestGetImportedBridgeExits(t *testing.T) {
}

func TestBuildCertificate(t *testing.T) {
mockL2BridgeSyncer := mocks.NewL2BridgeSyncerMock(t)
mockL1InfoTreeSyncer := mocks.NewL1InfoTreeSyncerMock(t)
mockL2BridgeSyncer := mocks.NewL2BridgeSyncer(t)
mockL1InfoTreeSyncer := mocks.NewL1InfoTreeSyncer(t)
mockProof := generateTestProof(t)

tests := []struct {
Expand Down Expand Up @@ -796,9 +804,9 @@ func TestCheckIfCertificatesAreSettled(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

mockStorage := mocks.NewAggSenderStorageMock(t)
mockStorage := mocks.NewAggSenderStorage(t)
mockAggLayerClient := agglayer.NewAgglayerClientMock(t)
mockLogger := mocks.NewLoggerMock(t)
mockLogger := mocks.NewLogger(t)

mockStorage.On("GetCertificatesByStatus", mock.Anything, []agglayer.CertificateStatus{agglayer.Pending}).Return(tt.pendingCertificates, tt.getFromDBError)
for certID, header := range tt.certificateHeaders {
Expand Down Expand Up @@ -873,23 +881,23 @@ func TestSendCertificate(t *testing.T) {
expectedError string
}

setupTest := func(cfg testCfg) (*AggSender, *mocks.AggSenderStorageMock, *mocks.L2BridgeSyncerMock,
*agglayer.AgglayerClientMock, *mocks.L1InfoTreeSyncerMock) {
setupTest := func(cfg testCfg) (*AggSender, *mocks.AggSenderStorage, *mocks.L2BridgeSyncer,
*agglayer.AgglayerClientMock, *mocks.L1InfoTreeSyncer) {
var (
aggsender = &AggSender{
log: log.WithFields("aggsender", 1),
cfg: Config{},
sequencerKey: cfg.sequencerKey,
}
mockStorage *mocks.AggSenderStorageMock
mockL2Syncer *mocks.L2BridgeSyncerMock
mockStorage *mocks.AggSenderStorage
mockL2Syncer *mocks.L2BridgeSyncer
mockAggLayerClient *agglayer.AgglayerClientMock
mockL1InfoTreeSyncer *mocks.L1InfoTreeSyncerMock
mockL1InfoTreeSyncer *mocks.L1InfoTreeSyncer
)

if cfg.shouldSendCertificate != nil || cfg.getLastSentCertificate != nil ||
cfg.saveLastSentCertificate != nil {
mockStorage = mocks.NewAggSenderStorageMock(t)
mockStorage = mocks.NewAggSenderStorage(t)
mockStorage.On("GetCertificatesByStatus", mock.Anything, []agglayer.CertificateStatus{agglayer.Pending}).
Return(cfg.shouldSendCertificate...).Once()

Expand All @@ -906,7 +914,7 @@ func TestSendCertificate(t *testing.T) {

if cfg.lastL2BlockProcessed != nil || cfg.originNetwork != nil ||
cfg.getBridges != nil || cfg.getClaims != nil || cfg.getInfoByGlobalExitRoot != nil {
mockL2Syncer = mocks.NewL2BridgeSyncerMock(t)
mockL2Syncer = mocks.NewL2BridgeSyncer(t)

mockL2Syncer.On("GetLastProcessedBlock", mock.Anything).Return(cfg.lastL2BlockProcessed...).Once()

Expand Down Expand Up @@ -938,7 +946,7 @@ func TestSendCertificate(t *testing.T) {

if cfg.getInfoByGlobalExitRoot != nil ||
cfg.getL1InfoTreeRootByIndex != nil || cfg.getL1InfoTreeMerkleProofFromIndexToRoot != nil {
mockL1InfoTreeSyncer = mocks.NewL1InfoTreeSyncerMock(t)
mockL1InfoTreeSyncer = mocks.NewL1InfoTreeSyncer(t)
mockL1InfoTreeSyncer.On("GetInfoByGlobalExitRoot", mock.Anything).Return(cfg.getInfoByGlobalExitRoot...).Once()

if cfg.getL1InfoTreeRootByIndex != nil {
Expand Down
Loading

0 comments on commit cda1666

Please sign in to comment.