From 86889ba8b248bef32d17dbc5314f3c1fe00caf4b Mon Sep 17 00:00:00 2001 From: Yawning Angel Date: Fri, 27 May 2022 11:50:50 +0000 Subject: [PATCH 1/2] feat: add archive web3 gw support --- archive/client.go | 179 +++++++++++++++++++++++++++++++++++++++++++++ conf/config.go | 9 +++ main.go | 11 ++- rpc/apis.go | 4 +- rpc/eth/api.go | 53 +++++++++++--- tests/rpc/utils.go | 2 +- 6 files changed, 245 insertions(+), 13 deletions(-) create mode 100644 archive/client.go diff --git a/archive/client.go b/archive/client.go new file mode 100644 index 00000000..569f4ec1 --- /dev/null +++ b/archive/client.go @@ -0,0 +1,179 @@ +// Package archive implements an archive node client. +package archive + +import ( + "context" + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/ethclient" + + "github.com/oasisprotocol/emerald-web3-gateway/rpc/utils" +) + +// Client is an archive node client backed by web, implementing a limited +// subset of rpc/eth.API, that is sufficient to support historical queries. +// +// All of the parameters that are `ethrpc.BlockNumberOrHash` just assume +// that the caller will handle converting to a block number, because they +// need to anyway, and historical estimate gas calls are not supported. +type Client struct { + inner *ethclient.Client + latestBlock uint64 +} + +func (c *Client) LatestBlock() uint64 { + return c.latestBlock +} + +func (c *Client) GetStorageAt( + ctx context.Context, + address common.Address, + position hexutil.Big, + blockNr uint64, +) (hexutil.Big, error) { + storageBytes, err := c.inner.StorageAt( + ctx, + address, + common.BigToHash((*big.Int)(&position)), + new(big.Int).SetUint64(blockNr), + ) + if err != nil { + return hexutil.Big{}, fmt.Errorf("archive: failed to query storage: %w", err) + } + + // Oh for fuck's sake. + var storageBig big.Int + storageBig.SetBytes(storageBytes) + return hexutil.Big(storageBig), nil +} + +func (c *Client) GetBalance( + ctx context.Context, + address common.Address, + blockNr uint64, +) (*hexutil.Big, error) { + balance, err := c.inner.BalanceAt( + ctx, + address, + new(big.Int).SetUint64(blockNr), + ) + if err != nil { + return nil, fmt.Errorf("archive: failed to query balance: %w", err) + } + + return (*hexutil.Big)(balance), nil +} + +func (c *Client) GetTransactionCount( + ctx context.Context, + address common.Address, + blockNr uint64, +) (*hexutil.Uint64, error) { + nonce, err := c.inner.NonceAt( + ctx, + address, + new(big.Int).SetUint64(blockNr), + ) + if err != nil { + return nil, fmt.Errorf("archive: failed to query nonce: %w", err) + } + + return (*hexutil.Uint64)(&nonce), nil +} + +func (c *Client) GetCode( + ctx context.Context, + address common.Address, + blockNr uint64, +) (hexutil.Bytes, error) { + code, err := c.inner.CodeAt( + ctx, + address, + new(big.Int).SetUint64(blockNr), + ) + if err != nil { + return nil, fmt.Errorf("archive: failed to query code: %w", err) + } + + return hexutil.Bytes(code), nil +} + +func (c *Client) Call( + ctx context.Context, + args utils.TransactionArgs, + blockNr uint64, +) (hexutil.Bytes, error) { + // You have got to be fucking shitting me, what in the actual fuck. + + if args.From == nil { + return nil, fmt.Errorf("archive: no `from` in call") + } + callMsg := ethereum.CallMsg{ + From: *args.From, + To: args.To, + GasPrice: (*big.Int)(args.GasPrice), + GasFeeCap: (*big.Int)(args.MaxFeePerGas), + GasTipCap: (*big.Int)(args.MaxPriorityFeePerGas), + Value: (*big.Int)(args.Value), + // args.Nonce? I guess it can't be that important if there's no field for it. + } + if args.Gas != nil { + callMsg.Gas = uint64(*args.Gas) + } + if args.Data != nil { + callMsg.Data = []byte(*args.Data) + } + if args.Input != nil { + // Data and Input are the same damn thing, Input is newer. + callMsg.Data = []byte(*args.Input) + } + if args.AccessList != nil { + callMsg.AccessList = *args.AccessList + } + + result, err := c.inner.CallContract( + ctx, + callMsg, + new(big.Int).SetUint64(blockNr), + ) + if err != nil { + return nil, fmt.Errorf("archive: failed to call contract: %w", err) + } + + return hexutil.Bytes(result), nil +} + +func (c *Client) Close() { + c.inner.Close() + c.inner = nil +} + +func New( + ctx context.Context, + uri string, + heightMax uint64, +) (*Client, error) { + c, err := ethclient.DialContext(ctx, uri) + if err != nil { + return nil, fmt.Errorf("archive: failed to dial archival web3 node: %w", err) + } + + var latestBlock uint64 + switch heightMax { + case 0: + if latestBlock, err = c.BlockNumber(ctx); err != nil { + return nil, fmt.Errorf("archive: failed to query block number: %w", err) + } + default: + latestBlock = heightMax + } + + return &Client{ + inner: c, + latestBlock: latestBlock, + }, nil +} diff --git a/conf/config.go b/conf/config.go index c74f2301..3b6e0708 100644 --- a/conf/config.go +++ b/conf/config.go @@ -28,6 +28,15 @@ type Config struct { Cache *CacheConfig `koanf:"cache"` Database *DatabaseConfig `koanf:"database"` Gateway *GatewayConfig `koanf:"gateway"` + + // ArchiveURI is the URI of an archival web3 gateway instance + // for servicing historical queries. + ArchiveURI string `koanf:"archive_uri"` + // ArchiveHeightMax is the maximum height (inclusive) to query the + // archvie node (ArchiveURI). If the archive node is configured + // with it's own SQL database instance, this parameter should not + // be needed. + ArchiveHeightMax uint64 `koanf:"archive_height_max"` } // Validate performs config validation. diff --git a/main.go b/main.go index b1721fbd..4a957fe0 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/oasisprotocol/emerald-web3-gateway/archive" "github.com/oasisprotocol/emerald-web3-gateway/conf" "github.com/oasisprotocol/emerald-web3-gateway/db/migrations" "github.com/oasisprotocol/emerald-web3-gateway/filters" @@ -245,7 +246,15 @@ func runRoot() error { return err } - w3.RegisterAPIs(rpc.GetRPCAPIs(ctx, rc, backend, gasPriceOracle, cfg.Gateway, es)) + var archiveClient *archive.Client + if cfg.ArchiveURI != "" { + if archiveClient, err = archive.New(ctx, cfg.ArchiveURI, cfg.ArchiveHeightMax); err != nil { + logger.Error("failed to create archive client", err) + return err + } + } + + w3.RegisterAPIs(rpc.GetRPCAPIs(ctx, rc, archiveClient, backend, gasPriceOracle, cfg.Gateway, es)) w3.RegisterHealthChecks([]server.HealthCheck{indx}) svr := server.Server{ diff --git a/rpc/apis.go b/rpc/apis.go index 0f5d88d7..05a1c6d0 100644 --- a/rpc/apis.go +++ b/rpc/apis.go @@ -7,6 +7,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/logging" "github.com/oasisprotocol/oasis-sdk/client-sdk/go/client" + "github.com/oasisprotocol/emerald-web3-gateway/archive" "github.com/oasisprotocol/emerald-web3-gateway/conf" eventFilters "github.com/oasisprotocol/emerald-web3-gateway/filters" "github.com/oasisprotocol/emerald-web3-gateway/gas" @@ -23,6 +24,7 @@ import ( func GetRPCAPIs( ctx context.Context, client client.RuntimeClient, + archiveClient *archive.Client, backend indexer.Backend, gasPriceOracle gas.Backend, config *conf.GatewayConfig, @@ -31,7 +33,7 @@ func GetRPCAPIs( var apis []ethRpc.API web3Service := web3.NewPublicAPI() - ethService := eth.NewPublicAPI(client, logging.GetLogger("eth_rpc"), config.ChainID, backend, gasPriceOracle, config.MethodLimits) + ethService := eth.NewPublicAPI(client, archiveClient, logging.GetLogger("eth_rpc"), config.ChainID, backend, gasPriceOracle, config.MethodLimits) netService := net.NewPublicAPI(config.ChainID) txpoolService := txpool.NewPublicAPI() filtersService := filters.NewPublicAPI(client, logging.GetLogger("eth_filters"), backend, eventSystem) diff --git a/rpc/eth/api.go b/rpc/eth/api.go index 0ec225dc..e4243952 100644 --- a/rpc/eth/api.go +++ b/rpc/eth/api.go @@ -25,6 +25,7 @@ import ( "github.com/oasisprotocol/oasis-sdk/client-sdk/go/modules/evm" "github.com/oasisprotocol/oasis-sdk/client-sdk/go/types" + "github.com/oasisprotocol/emerald-web3-gateway/archive" "github.com/oasisprotocol/emerald-web3-gateway/conf" "github.com/oasisprotocol/emerald-web3-gateway/gas" "github.com/oasisprotocol/emerald-web3-gateway/indexer" @@ -108,6 +109,7 @@ type API interface { type publicAPI struct { client client.RuntimeClient + archiveClient *archive.Client backend indexer.Backend gasPriceOracle gas.Backend chainID uint32 @@ -118,6 +120,7 @@ type publicAPI struct { // NewPublicAPI creates an instance of the public ETH Web3 API. func NewPublicAPI( client client.RuntimeClient, + archiveClient *archive.Client, logger *logging.Logger, chainID uint32, backend indexer.Backend, @@ -126,6 +129,7 @@ func NewPublicAPI( ) API { return &publicAPI{ client: client, + archiveClient: archiveClient, chainID: chainID, Logger: logger, backend: backend, @@ -149,6 +153,15 @@ func handleStorageError(logger *logging.Logger, err error) error { return ErrInternalError } +func (api *publicAPI) shouldQueryArchive(n uint64) bool { + // If there is no archive node configured, return false. + if api.archiveClient == nil { + return false + } + + return n <= api.archiveClient.LatestBlock() +} + // roundParamFromBlockNum converts special BlockNumber values to the corresponding special round numbers. func (api *publicAPI) roundParamFromBlockNum(ctx context.Context, logger *logging.Logger, blockNum ethrpc.BlockNumber) (uint64, error) { switch blockNum { @@ -226,6 +239,10 @@ func (api *publicAPI) GetStorageAt(ctx context.Context, address common.Address, if err != nil { return hexutil.Big{}, err } + if api.shouldQueryArchive(round) { + return api.archiveClient.GetStorageAt(ctx, address, position, round) + } + // EVM module takes index as H256, which needs leading zeros. position256 := make([]byte, 32) // Unmarshalling to hexutil.Big rejects overlong inputs. Verify in `TestRejectOverlong`. @@ -247,11 +264,15 @@ func (api *publicAPI) GetBalance(ctx context.Context, address common.Address, bl logger := api.Logger.With("method", "eth_getBalance", "address", address, "block_or_hash", blockNrOrHash) logger.Debug("request") - ethmod := evm.NewV1(api.client) round, err := api.getBlockRound(ctx, logger, blockNrOrHash) if err != nil { return nil, err } + if api.shouldQueryArchive(round) { + return api.archiveClient.GetBalance(ctx, address, round) + } + + ethmod := evm.NewV1(api.client) res, err := ethmod.Balance(ctx, round, address[:]) if err != nil { logger.Error("ethmod.Balance failed", "round", round, "err", err) @@ -291,13 +312,17 @@ func (api *publicAPI) GetTransactionCount(ctx context.Context, ethAddr common.Ad logger := api.Logger.With("method", "eth_getBlockTransactionCount", "address", ethAddr, "block_or_hash", blockNrOrHash) logger.Debug("request") - accountsMod := accounts.NewV1(api.client) - accountsAddr := types.NewAddressRaw(types.AddressV0Secp256k1EthContext, ethAddr[:]) - round, err := api.getBlockRound(ctx, logger, blockNrOrHash) if err != nil { return nil, err } + if api.shouldQueryArchive(round) { + return api.archiveClient.GetTransactionCount(ctx, ethAddr, round) + } + + accountsMod := accounts.NewV1(api.client) + accountsAddr := types.NewAddressRaw(types.AddressV0Secp256k1EthContext, ethAddr[:]) + nonce, err := accountsMod.Nonce(ctx, round, accountsAddr) if err != nil { logger.Error("accounts.Nonce failed", "err", err) @@ -311,11 +336,15 @@ func (api *publicAPI) GetCode(ctx context.Context, address common.Address, block logger := api.Logger.With("method", "eth_getCode", "address", address, "block_or_hash", blockNrOrHash) logger.Debug("request") - ethmod := evm.NewV1(api.client) round, err := api.getBlockRound(ctx, logger, blockNrOrHash) if err != nil { return nil, err } + if api.shouldQueryArchive(round) { + return api.archiveClient.GetCode(ctx, address, round) + } + + ethmod := evm.NewV1(api.client) res, err := ethmod.Code(ctx, round, address[:]) if err != nil { logger.Error("ethmod.Code failed", "err", err) @@ -361,6 +390,15 @@ func (api *publicAPI) NewRevertError(revertErr error) *RevertError { func (api *publicAPI) Call(ctx context.Context, args utils.TransactionArgs, blockNrOrHash ethrpc.BlockNumberOrHash, _ *utils.StateOverride) (hexutil.Bytes, error) { logger := api.Logger.With("method", "eth_call", "block_or_hash", blockNrOrHash) logger.Debug("request", "args", args) + + round, err := api.getBlockRound(ctx, logger, blockNrOrHash) + if err != nil { + return nil, err + } + if api.shouldQueryArchive(round) { + return api.archiveClient.Call(ctx, args, round) + } + var ( amount = []byte{0} input = []byte{} @@ -370,11 +408,6 @@ func (api *publicAPI) Call(ctx context.Context, args utils.TransactionArgs, bloc gas uint64 = 30_000_000 ) - round, err := api.getBlockRound(ctx, logger, blockNrOrHash) - if err != nil { - return nil, err - } - if args.To == nil { return []byte{}, errors.New("to address not specified") } diff --git a/tests/rpc/utils.go b/tests/rpc/utils.go index aa386873..460536a7 100644 --- a/tests/rpc/utils.go +++ b/tests/rpc/utils.go @@ -178,7 +178,7 @@ func Setup() error { return fmt.Errorf("setup: failed starting gas price oracle: %w", err) } - w3.RegisterAPIs(rpc.GetRPCAPIs(context.Background(), rc, backend, gasPriceOracle, tests.TestsConfig.Gateway, es)) + w3.RegisterAPIs(rpc.GetRPCAPIs(context.Background(), rc, nil, backend, gasPriceOracle, tests.TestsConfig.Gateway, es)) w3.RegisterHealthChecks([]server.HealthCheck{indx}) if err = w3.Start(); err != nil { From b66b4b0186414e842dc4cc3173ff19ade6a2f175 Mon Sep 17 00:00:00 2001 From: Yawning Angel Date: Tue, 31 May 2022 11:03:01 +0000 Subject: [PATCH 2/2] feat: Add a way to disable the indexer This is only for archive support at the moment, and configuring a node with the indexer disabled will also prevent it from writing to the db. --- conf/config.go | 3 ++- indexer/indexer.go | 29 +++++++++++++++++++++++++---- main.go | 13 +++++++++---- storage/psql/psql.go | 20 +++++++++++++++++++- storage/psql/psql_test.go | 2 +- tests/rpc/utils.go | 4 ++-- 6 files changed, 58 insertions(+), 13 deletions(-) diff --git a/conf/config.go b/conf/config.go index 3b6e0708..0b2afcc1 100644 --- a/conf/config.go +++ b/conf/config.go @@ -22,7 +22,8 @@ type Config struct { // blocks that the node doesn't have data for, such as by skipping them in checkpoint sync. // For sensible reasons, indexing may actually start at an even later block, such as if // this block is already indexed or the node indicates that it doesn't have this block. - IndexingStart uint64 `koanf:"indexing_start"` + IndexingStart uint64 `koanf:"indexing_start"` + IndexingDisable bool `koanf:"indexing_disable"` Log *LogConfig `koanf:"log"` Cache *CacheConfig `koanf:"cache"` diff --git a/indexer/indexer.go b/indexer/indexer.go index 1884ec49..cabc109d 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -43,10 +43,11 @@ var ErrNotHealthy = errors.New("not healthy") type Service struct { service.BaseBackgroundService - runtimeID common.Namespace - enablePruning bool - pruningStep uint64 - indexingStart uint64 + runtimeID common.Namespace + enablePruning bool + pruningStep uint64 + indexingStart uint64 + indexingDisable bool backend Backend client client.RuntimeClient @@ -303,6 +304,14 @@ func (s *Service) indexingWorker() { // Start starts service. func (s *Service) Start() { + // TODO/NotYawning: Non-archive nodes that have the indexer disabled + // likey want to use a different notion of healthy, and probably also + // want to start a worker that monitors the database for changes. + if s.indexingDisable { + s.updateHealth(true) + return + } + go s.indexingWorker() go s.healthWorker() @@ -339,8 +348,20 @@ func New( enablePruning: cfg.EnablePruning, pruningStep: cfg.PruningStep, indexingStart: cfg.IndexingStart, + indexingDisable: cfg.IndexingDisable, } s.Logger = s.Logger.With("runtime_id", s.runtimeID.String()) + // TODO/NotYawning: Non-archive nodes probably want to do something + // different here. + if s.indexingDisable { + if _, err := s.backend.QueryLastIndexedRound(ctx); err != nil { + s.Logger.Error("indexer disabled and no rounds indexed, this will never work", + "err", err, + ) + return nil, nil, err + } + } + return s, cachingBackend, nil } diff --git a/main.go b/main.go index 4a957fe0..08c2d512 100644 --- a/main.go +++ b/main.go @@ -114,7 +114,7 @@ func truncateExec(cmd *cobra.Command, args []string) error { } // Initialize db. - db, err := psql.InitDB(ctx, cfg.Database, true) + db, err := psql.InitDB(ctx, cfg.Database, true, false) if err != nil { logger.Error("failed to initialize db", "err", err) return err @@ -145,7 +145,7 @@ func migrateExec(cmd *cobra.Command, args []string) error { logger := logging.GetLogger("migrate-db") // Initialize db. - db, err := psql.InitDB(ctx, cfg.Database, true) + db, err := psql.InitDB(ctx, cfg.Database, true, false) if err != nil { logger.Error("failed to initialize db", "err", err) return err @@ -192,8 +192,13 @@ func runRoot() error { // Create the runtime client with account module query helpers. rc := client.New(conn, runtimeID) + // For now, "disable" write access to the DB in a kind of kludgy way + // if the indexer is disabled. Yes this means that no migrations + // can be done. Deal with it. + dbReadOnly := cfg.IndexingDisable + // Initialize db for migrations (higher timeouts). - db, err := psql.InitDB(ctx, cfg.Database, true) + db, err := psql.InitDB(ctx, cfg.Database, true, dbReadOnly) if err != nil { logger.Error("failed to initialize db", "err", err) return err @@ -208,7 +213,7 @@ func runRoot() error { // Initialize db again, now with configured timeouts. var storage storage.Storage - storage, err = psql.InitDB(ctx, cfg.Database, false) + storage, err = psql.InitDB(ctx, cfg.Database, false, dbReadOnly) if err != nil { logger.Error("failed to initialize db", "err", err) return err diff --git a/storage/psql/psql.go b/storage/psql/psql.go index 8160a08a..ab8ebf80 100644 --- a/storage/psql/psql.go +++ b/storage/psql/psql.go @@ -30,7 +30,12 @@ type PostDB struct { } // InitDB creates postgresql db instance. -func InitDB(ctx context.Context, cfg *conf.DatabaseConfig, longTimeouts bool) (*PostDB, error) { +func InitDB( + ctx context.Context, + cfg *conf.DatabaseConfig, + longTimeouts bool, + readOnly bool, +) (*PostDB, error) { if cfg == nil { return nil, errors.New("nil configuration") } @@ -63,6 +68,19 @@ func InitDB(ctx context.Context, cfg *conf.DatabaseConfig, longTimeouts bool) (* } } + // Set "read-only" mode by setting the default status of new + // transactions. + // + // Note: This still allows txes to alter temporary tables, and is + // advisory rather than something that is securely enforced. + if readOnly { + opts = append(opts, pgdriver.WithConnParams( + map[string]interface{}{ + "default_transaction_read_only": "on", + }, + )) + } + pgConn := pgdriver.NewConnector(opts...) sqlDB := sql.OpenDB(pgConn) maxOpenConns := cfg.MaxOpenConns diff --git a/storage/psql/psql_test.go b/storage/psql/psql_test.go index 044cfb5a..27b74be3 100644 --- a/storage/psql/psql_test.go +++ b/storage/psql/psql_test.go @@ -21,7 +21,7 @@ func TestMain(m *testing.M) { var err error ctx := context.Background() tests.MustInitConfig() - db, err = InitDB(ctx, tests.TestsConfig.Database, false) + db, err = InitDB(ctx, tests.TestsConfig.Database, false, false) if err != nil { log.Println(`It seems database failed to initialize. Do you have PostgreSQL running? If not, you can run docker run -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=postgres -p 5432:5432 -d postgres`) diff --git a/tests/rpc/utils.go b/tests/rpc/utils.go index 460536a7..d94950b2 100644 --- a/tests/rpc/utils.go +++ b/tests/rpc/utils.go @@ -130,7 +130,7 @@ func Setup() error { // Initialize db. ctx := context.Background() - db, err = psql.InitDB(ctx, tests.TestsConfig.Database, true) + db, err = psql.InitDB(ctx, tests.TestsConfig.Database, true, false) if err != nil { return fmt.Errorf("failed to initialize DB: %w", err) } @@ -143,7 +143,7 @@ func Setup() error { // Initialize db again, now with configured timeouts. var storage storage.Storage - storage, err = psql.InitDB(ctx, tests.TestsConfig.Database, false) + storage, err = psql.InitDB(ctx, tests.TestsConfig.Database, false, false) if err != nil { return err }