From 2f756d9db89ec5c38160bac57aeafec1e271c85d Mon Sep 17 00:00:00 2001 From: Javed Khan Date: Tue, 2 Jul 2024 16:27:27 -0700 Subject: [PATCH] da: try using blobdata for eth fallback --- op-batcher/batcher/driver.go | 58 +++++++++++++++------- op-batcher/batcher/service.go | 2 +- op-batcher/batcher/test_batch_submitter.go | 2 +- op-celestia/cli.go | 49 ++++++++++++++---- op-celestia/da_client.go | 22 ++++---- op-node/rollup/driver/da.go | 13 ++++- 6 files changed, 107 insertions(+), 39 deletions(-) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index db561994f9ab0..5918a16b61274 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -543,7 +543,7 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh var candidate *txmgr.TxCandidate var err error if isBlockedBlob { - candidate, _ = l.calldataTxCandidate([]byte{}) + candidate = l.calldataTxCandidate([]byte{}) } else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil { panic(err) // this error should not happen } @@ -551,6 +551,20 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh l.queueTx(txData{}, true, candidate, queue, receiptsCh) } +// fallbackTxCandidate creates a fallback tx candidate for the given txdata. +func (l *BatchSubmitter) fallbackTxCandidate(_ context.Context, txdata txData) (*txmgr.TxCandidate, error) { + switch l.DAClient.FallbackMode { + case celestia.FallbackModeBlobData: + return l.blobTxCandidate(txdata) + case celestia.FallbackModeCallData: + return l.calldataTxCandidate(txdata.CallData()), nil + case celestia.FallbackModeDisabled: + return nil, fmt.Errorf("celestia: fallback disabled") + default: + return nil, fmt.Errorf("celestia: unknown fallback mode: %s", l.DAClient.FallbackMode) + } +} + // sendTransaction creates & queues for sending a transaction to the batch inbox address with the given `txData`. // The method will block if the queue's MaxPendingTransactions is exceeded. func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error { @@ -585,10 +599,15 @@ func (l *BatchSubmitter) sendTransaction(ctx context.Context, txdata txData, que // signal plasma commitment tx with TxDataVersion1 data = comm.TxData() } - candidate, err = l.calldataTxCandidate(data) + candidate, err = l.celestiaTxCandidate(data) if err != nil { - l.recordFailedTx(txdata.ID(), err) - return nil + l.Log.Error("celestia: blob submission failed", "err", err) + candidate, err = l.fallbackTxCandidate(ctx, txdata) + if err != nil { + l.Log.Error("celestia: fallback failed", "err", err) + l.recordFailedTx(txdata.ID(), err) + return nil + } } } @@ -624,25 +643,28 @@ func (l *BatchSubmitter) blobTxCandidate(data txData) (*txmgr.TxCandidate, error }, nil } -func (l *BatchSubmitter) calldataTxCandidate(data []byte) (*txmgr.TxCandidate, error) { +func (l *BatchSubmitter) calldataTxCandidate(data []byte) *txmgr.TxCandidate { l.Log.Info("Building Calldata transaction candidate", "size", len(data)) + return &txmgr.TxCandidate{ + To: &l.RollupConfig.BatchInboxAddress, + TxData: data, + } +} + +func (l *BatchSubmitter) celestiaTxCandidate(data []byte) (*txmgr.TxCandidate, error) { + l.Log.Info("Building Celestia transaction candidate", "size", len(data)) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Duration(l.RollupConfig.BlockTime)*time.Second) ids, err := l.DAClient.Client.Submit(ctx, [][]byte{data}, -1, l.DAClient.Namespace) cancel() - if err == nil && len(ids) == 1 { - l.Log.Info("celestia: blob successfully submitted", "id", hex.EncodeToString(ids[0])) - data = append([]byte{celestia.DerivationVersionCelestia}, ids[0]...) - } else { - if l.DAClient.EthFallbackDisabled { - return nil, fmt.Errorf("celestia: blob submission failed; eth fallback disabled: %w", err) - } - - l.Log.Info("celestia: blob submission failed; falling back to eth", "err", err) + if err != nil { + return nil, err } - return &txmgr.TxCandidate{ - To: &l.RollupConfig.BatchInboxAddress, - TxData: data, - }, nil + if len(ids) != 1 { + return nil, fmt.Errorf("celestia: expected 1 id, got %d", len(ids)) + } + l.Log.Info("celestia: blob successfully submitted", "id", hex.EncodeToString(ids[0])) + data = append([]byte{celestia.DerivationVersionCelestia}, ids[0]...) + return l.calldataTxCandidate(data), nil } func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txRef]) { diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index f14b00ed97e8a..650031610e85a 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -373,7 +373,7 @@ func (bs *BatcherService) initPlasmaDA(cfg *CLIConfig) error { } func (bs *BatcherService) initDA(cfg *CLIConfig) error { - client, err := celestia.NewDAClient(cfg.DaConfig.Rpc, cfg.DaConfig.AuthToken, cfg.DaConfig.Namespace, cfg.DaConfig.EthFallbackDisabled) + client, err := celestia.NewDAClient(cfg.DaConfig.Rpc, cfg.DaConfig.AuthToken, cfg.DaConfig.Namespace, cfg.DaConfig.FallbackMode) if err != nil { return err } diff --git a/op-batcher/batcher/test_batch_submitter.go b/op-batcher/batcher/test_batch_submitter.go index b0d03f2793837..8814400f06ca7 100644 --- a/op-batcher/batcher/test_batch_submitter.go +++ b/op-batcher/batcher/test_batch_submitter.go @@ -29,7 +29,7 @@ func (l *TestBatchSubmitter) JamTxPool(ctx context.Context) error { var err error cc := l.state.cfgProvider.ChannelConfig() if cc.UseBlobs { - candidate, _ = l.calldataTxCandidate([]byte{}) + candidate = l.calldataTxCandidate([]byte{}) } else if candidate, err = l.blobTxCandidate(emptyTxData); err != nil { return err } diff --git a/op-celestia/cli.go b/op-celestia/cli.go index bf00f83bb8773..c0ab92606c44d 100644 --- a/op-celestia/cli.go +++ b/op-celestia/cli.go @@ -1,11 +1,22 @@ package celestia import ( + "fmt" + "github.com/urfave/cli/v2" opservice "github.com/ethereum-optimism/optimism/op-service" ) +const ( + // FallbackModeDisabled is the fallback mode disabled + FallbackModeDisabled = "disabled" + // FallbackModeBlobData is the fallback mode blob data + FallbackModeBlobData = "blobdata" + // FallbackModeCallData is the fallback mode call data + FallbackModeCallData = "calldata" +) + const ( // RPCFlagName defines the flag for the rpc url RPCFlagName = "da.rpc" @@ -15,6 +26,8 @@ const ( NamespaceFlagName = "da.namespace" // EthFallbackDisabledFlagName defines the flag for disabling eth fallback EthFallbackDisabledFlagName = "da.eth_fallback_disabled" + // FallbackModeFlagName defines the flag for fallback mode + FallbackModeFlagName = "da.fallback_mode" // NamespaceSize is the size of the hex encoded namespace string NamespaceSize = 58 @@ -43,17 +56,35 @@ func CLIFlags(envPrefix string) []cli.Flag { }, &cli.BoolFlag{ Name: EthFallbackDisabledFlagName, - Usage: "disable eth fallback", + Usage: "disable eth fallback (deprecated, use FallbackModeFlag instead)", EnvVars: opservice.PrefixEnvVar(envPrefix, "DA_ETH_FALLBACK_DISABLED"), + Action: func(c *cli.Context, e bool) error { + if e { + return c.Set(FallbackModeFlagName, FallbackModeDisabled) + } + return nil + }, + }, + &cli.StringFlag{ + Name: FallbackModeFlagName, + Usage: fmt.Sprintf("fallback mode; must be one of: %s, %s or %s", FallbackModeDisabled, FallbackModeBlobData, FallbackModeCallData), + EnvVars: opservice.PrefixEnvVar(envPrefix, "DA_FALLBACK_MODE"), + Value: FallbackModeCallData, + Action: func(c *cli.Context, s string) error { + if s != FallbackModeDisabled && s != FallbackModeBlobData && s != FallbackModeCallData { + return fmt.Errorf("invalid fallback mode: %s; must be one of: %s, %s or %s", s, FallbackModeDisabled, FallbackModeBlobData, FallbackModeCallData) + } + return nil + }, }, } } type CLIConfig struct { - Rpc string - AuthToken string - Namespace string - EthFallbackDisabled bool + Rpc string + AuthToken string + Namespace string + FallbackMode string } func (c CLIConfig) Check() error { @@ -68,9 +99,9 @@ func NewCLIConfig() CLIConfig { func ReadCLIConfig(ctx *cli.Context) CLIConfig { return CLIConfig{ - Rpc: ctx.String(RPCFlagName), - AuthToken: ctx.String(AuthTokenFlagName), - Namespace: ctx.String(NamespaceFlagName), - EthFallbackDisabled: ctx.Bool(EthFallbackDisabledFlagName), + Rpc: ctx.String(RPCFlagName), + AuthToken: ctx.String(AuthTokenFlagName), + Namespace: ctx.String(NamespaceFlagName), + FallbackMode: ctx.String(FallbackModeFlagName), } } diff --git a/op-celestia/da_client.go b/op-celestia/da_client.go index d3d407a10b94a..f5c14fe50377f 100644 --- a/op-celestia/da_client.go +++ b/op-celestia/da_client.go @@ -2,6 +2,7 @@ package celestia import ( "encoding/hex" + "fmt" "time" "github.com/rollkit/go-da" @@ -9,13 +10,13 @@ import ( ) type DAClient struct { - Client da.DA - GetTimeout time.Duration - Namespace da.Namespace - EthFallbackDisabled bool + Client da.DA + GetTimeout time.Duration + Namespace da.Namespace + FallbackMode string } -func NewDAClient(rpc, token, namespace string, ethFallbackDisabled bool) (*DAClient, error) { +func NewDAClient(rpc, token, namespace, fallbackMode string) (*DAClient, error) { client, err := proxy.NewClient(rpc, token) if err != nil { return nil, err @@ -24,10 +25,13 @@ func NewDAClient(rpc, token, namespace string, ethFallbackDisabled bool) (*DACli if err != nil { return nil, err } + if fallbackMode != "disabled" && fallbackMode != "blobdata" && fallbackMode != "calldata" { + return nil, fmt.Errorf("celestia: unknown fallback mode: %s", fallbackMode) + } return &DAClient{ - Client: client, - GetTimeout: time.Minute, - Namespace: ns, - EthFallbackDisabled: ethFallbackDisabled, + Client: client, + GetTimeout: time.Minute, + Namespace: ns, + FallbackMode: fallbackMode, }, nil } diff --git a/op-node/rollup/driver/da.go b/op-node/rollup/driver/da.go index cacef1afd0c07..a8e29000e714c 100644 --- a/op-node/rollup/driver/da.go +++ b/op-node/rollup/driver/da.go @@ -6,7 +6,18 @@ import ( ) func SetDAClient(cfg celestia.CLIConfig) error { - client, err := celestia.NewDAClient(cfg.Rpc, cfg.AuthToken, cfg.Namespace, false) + // NOTE: we always read using blob_data_source.go + // If the transaction has calldata, based on the prefix byte. + // - If the prefix byte is 0xce + // - We interpret the calldata as a celestia reference and fetch + // the data from celestia. + // - Otherwise, we use the calldata fallback mode. + // If the transaction has blobs, we use blobdata fallback mode. + // See dataAndHashesFromTxs and DataFromEVMTransactions + // The read path always operates in the most permissive mode and is + // independent of the fallback mode. + // Therefore the configuration value for FallbackMode passed here does not matter. + client, err := celestia.NewDAClient(cfg.Rpc, cfg.AuthToken, cfg.Namespace, cfg.FallbackMode) if err != nil { return err }