Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(op-batcher): altda->ethda failover #13

Open
wants to merge 2 commits into
base: feat--multiframe-altda-channel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions op-alt-da/daclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ var ErrNotFound = errors.New("not found")
// ErrInvalidInput is returned when the input is not valid for posting to the DA storage.
var ErrInvalidInput = errors.New("invalid input")

// ErrAltDADown is returned when the alt DA returns a 503 status code.
// It is used to signify that the alt DA is down and the client should failover to the eth DA.
// See https://github.com/ethereum-optimism/specs/issues/434
var ErrAltDADown = errors.New("alt DA is down: failover to eth DA")

// DAClient is an HTTP client to communicate with a DA storage service.
// It creates commitments and retrieves input data + verifies if needed.
type DAClient struct {
Expand Down Expand Up @@ -131,6 +136,9 @@ func (c *DAClient) setInput(ctx context.Context, img []byte) (CommitmentData, er
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusServiceUnavailable {
return nil, ErrAltDADown
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to store data: %v", resp.StatusCode)
}
Expand Down
17 changes: 15 additions & 2 deletions op-alt-da/damock.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ func (d *AltDADisabled) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, block
}

// FakeDAServer is a fake DA server for e2e tests.
// It is a small wrapper around DAServer that allows for setting request latencies,
// to mimic a DA service with slow responses (eg. eigenDA with 10 min batching interval).
// It is a small wrapper around DAServer that allows for setting:
// - request latencies, to mimic a DA service with slow responses
// (eg. eigenDA with 10 min batching interval).
// - response status codes, to mimic a DA service that is down.
type FakeDAServer struct {
*DAServer
putRequestLatency time.Duration
getRequestLatency time.Duration
// next failoverCount Put requests will return 503 status code for failover testing
failoverCount uint64
}

func NewFakeDAServer(host string, port int, log log.Logger) *FakeDAServer {
Expand All @@ -130,6 +134,10 @@ func (s *FakeDAServer) HandleGet(w http.ResponseWriter, r *http.Request) {

func (s *FakeDAServer) HandlePut(w http.ResponseWriter, r *http.Request) {
time.Sleep(s.putRequestLatency)
if s.failoverCount > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the point to decrement failoverCount, then actually handle the put

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it just to simplify testing?

w.WriteHeader(http.StatusServiceUnavailable)
s.failoverCount--
}
s.DAServer.HandlePut(w, r)
}

Expand All @@ -154,6 +162,11 @@ func (s *FakeDAServer) SetGetRequestLatency(latency time.Duration) {
s.getRequestLatency = latency
}

// SetResponseStatusForNRequests sets the next n Put requests to return 503 status code.
func (s *FakeDAServer) SetPutFailoverForNRequests(n uint64) {
s.failoverCount = uint64(n)
}

type MemStore struct {
db map[string][]byte
lock sync.RWMutex
Expand Down
15 changes: 12 additions & 3 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollup
}

// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (c *channel) TxFailed(id string) {
// in the failed transaction. failoverToEthDA should be set to true when using altDA
// and altDA is down. This will switch the channel to submit frames to ethDA instead.
func (c *channel) TxFailed(id string, failoverToEthDA bool) {
if data, ok := c.pendingTransactions[id]; ok {
c.log.Trace("marked transaction as failed", "id", id)
// Note: when the batcher is changed to send multiple frames per tx,
Expand All @@ -57,7 +58,15 @@ func (c *channel) TxFailed(id string) {
} else {
c.log.Warn("unknown transaction marked as failed", "id", id)
}

if failoverToEthDA {
// We failover to calldata txs because in altda mode the channel and channelManager
// are configured to use a calldataConfigManager, as opposed to DynamicEthChannelConfig
// which can use both calldata and blobs. Failover should happen extremely rarely,
// and is only used while the altDA is down, so we can afford to be inefficient here.
// TODO: figure out how to switch to blobs/auto instead. Might need to make
// batcherService.initChannelConfig function stateless so that we can reuse it.
c.cfg.DaType = DaTypeCalldata
}
c.metr.RecordBatchTxFailed()
}

Expand Down
9 changes: 5 additions & 4 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ func (s *channelManager) Clear(l1OriginLastClosedChannel eth.BlockID) {
}

// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (s *channelManager) TxFailed(_id txID) {
// in the failed transaction. failoverToEthDA should be set to true when using altDA
// and altDA is down. This will switch the channel to submit frames to ethDA instead.
func (s *channelManager) TxFailed(_id txID, failoverToEthDA bool) {
s.mu.Lock()
defer s.mu.Unlock()
id := _id.String()
if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id)
channel.TxFailed(id)
channel.TxFailed(id, failoverToEthDA)
if s.closed && channel.NoneSubmitted() {
s.log.Info("Channel has no submitted transactions, clearing for shutdown", "chID", channel.ID())
s.removePendingChannel(channel)
Expand Down Expand Up @@ -510,7 +511,7 @@ func (s *channelManager) Requeue(newCfg ChannelConfig) {
s.metr.RecordL2BlockInPendingQueue(b)
}

// Channels which where already being submitted are put back
// Channels which were already being submitted are put back
s.channelQueue = newChannelQueue
s.currentChannel = nil
// Setting the defaultCfg will cause new channels
Expand Down
6 changes: 3 additions & 3 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) {
require.ErrorIs(err, io.EOF)

// requeue frame
m.TxFailed(txdata0.ID())
m.TxFailed(txdata0.ID(), false)

txdata1, err := m.TxData(eth.BlockID{})
require.NoError(err)
Expand Down Expand Up @@ -423,7 +423,7 @@ func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
require.NotEmpty(txdatas)

for _, txdata := range txdatas {
m.TxFailed(txdata.ID())
m.TxFailed(txdata.ID(), false)
}

// Show that this data will continue to be emitted as long as the transaction
Expand All @@ -433,7 +433,7 @@ func ChannelManagerCloseAllTxsFailed(t *testing.T, batchType uint) {
require.ElementsMatch(txdatas, txdatas1, "expected same txdatas on re-attempt")

for _, txdata := range txdatas1 {
m.TxFailed(txdata.ID())
m.TxFailed(txdata.ID(), false)
}

require.NoError(m.Close(), "Expected to close channel manager gracefully")
Expand Down
4 changes: 2 additions & 2 deletions op-batcher/batcher/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,13 @@ func TestChannelTxFailed(t *testing.T) {

// Trying to mark an unknown pending transaction as failed
// shouldn't modify state
m.TxFailed(zeroFrameTxID(0))
m.TxFailed(zeroFrameTxID(0), false)
require.Equal(t, 0, m.currentChannel.PendingFrames())
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID.String()])

// Now we still have a pending transaction
// Let's mark it as failed
m.TxFailed(expectedChannelID)
m.TxFailed(expectedChannelID, false)
require.Empty(t, m.currentChannel.pendingTransactions)
// There should be a frame in the pending channel now
require.Equal(t, 1, m.currentChannel.PendingFrames())
Expand Down
7 changes: 4 additions & 3 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,15 +776,16 @@ func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) {
}

func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) {
failover := errors.Is(err, altda.ErrAltDADown)
if err != nil {
l.Log.Warn("DA request failed", logFields(id, err)...)
l.Log.Warn("DA request failed", append([]interface{}{"failoverToEthDA", failover}, logFields(id, err))...)
}
l.state.TxFailed(id)
l.state.TxFailed(id, failover)
}

func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
l.Log.Warn("Transaction failed to send", logFields(id, err)...)
l.state.TxFailed(id)
l.state.TxFailed(id, false)
}

func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) {
Expand Down
26 changes: 26 additions & 0 deletions op-e2e/e2eutils/geth/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/transactions"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -86,6 +87,31 @@ func WaitForTransaction(hash common.Hash, client *ethclient.Client, timeout time
}
}

// WaitForBlockWithTxFromSender waits for a block with a transaction from a specific sender address.
// It starts from the current block and checks the next nBlocks blocks.
func WaitForBlockWithTxFromSender(sender common.Address, client *ethclient.Client, nBlocks uint64) (*types.Block, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
blockNum, err := client.BlockNumber(ctx)
if err != nil {
return nil, err
}
for blockNum := blockNum; blockNum < blockNum+nBlocks; blockNum++ {
blockL1, err := WaitForBlock(big.NewInt(0).SetUint64(blockNum), client)
if err != nil {
return nil, err
}
batcherTxCount, err := transactions.TransactionsBySenderCount(blockL1, sender)
if err != nil {
return nil, err
}
if batcherTxCount > 0 {
return blockL1, nil
}
}
return nil, fmt.Errorf("no block with tx from sender %s found in the last %d blocks", sender.Hex(), nBlocks)
}

type waitForBlockOptions struct {
noChangeTimeout time.Duration
absoluteTimeout time.Duration
Expand Down
18 changes: 17 additions & 1 deletion op-e2e/e2eutils/transactions/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)

func TransactionsBySender(block *types.Block, sender common.Address) (int64, error) {
// TransactionsBySenderCount returns the number of transactions in the block that were sent by the given sender.
func TransactionsBySenderCount(block *types.Block, sender common.Address) (int64, error) {
txCount := int64(0)
for _, tx := range block.Transactions() {
signer := types.NewCancunSigner(tx.ChainId())
Expand All @@ -19,3 +20,18 @@ func TransactionsBySender(block *types.Block, sender common.Address) (int64, err
}
return txCount, nil
}

func TransactionsBySender(block *types.Block, sender common.Address) ([]*types.Transaction, error) {
txs := make([]*types.Transaction, 0)
for _, tx := range block.Transactions() {
signer := types.NewCancunSigner(tx.ChainId())
txSender, err := types.Sender(signer, tx)
if err != nil {
return nil, err
}
if txSender == sender {
txs = append(txs, tx)
}
}
return txs, nil
}
2 changes: 1 addition & 1 deletion op-e2e/system/altda/concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestBatcherConcurrentAltDARequests(t *testing.T) {
require.NoError(t, err, "Waiting for l1 blocks")
// there are possibly other services (proposer/challenger) in the background sending txs
// so we only count the batcher txs
batcherTxCount, err := transactions.TransactionsBySender(block, cfg.DeployConfig.BatchSenderAddress)
batcherTxCount, err := transactions.TransactionsBySenderCount(block, cfg.DeployConfig.BatchSenderAddress)
require.NoError(t, err)
if batcherTxCount > 1 {
return
Expand Down
74 changes: 74 additions & 0 deletions op-e2e/system/altda/failover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package altda

import (
"math/big"
"testing"

op_e2e "github.com/ethereum-optimism/optimism/op-e2e"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-batcher/flags"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/geth"
"github.com/ethereum-optimism/optimism/op-e2e/e2eutils/transactions"
"github.com/ethereum-optimism/optimism/op-e2e/system/e2esys"
"github.com/stretchr/testify/require"
)

// TestBatcher_FailoverToEthDA_FallbackToAltDA tests that the batcher will failover to ethDA
// if the da-server returns 503, and then fallback to altDA once altDA is available again
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we always try altda first for every dispersal, then retry for non-altDA after sufficient retry. Wording seems odd.

// (i.e. the da-server doesn't return 503 anymore).
func TestBatcher_FailoverToEthDA_FallbackToAltDA(t *testing.T) {
op_e2e.InitParallel(t)

nChannelsFailover := uint64(2)

cfg := e2esys.DefaultSystemConfig(t, e2esys.WithLogLevel(log.LevelCrit))
cfg.DeployConfig.UseAltDA = true
// With these settings, the batcher will post a single commitment per L1 block,
// so it's easy to trigger failover and observe the commitment changing on the next L1 block.
cfg.BatcherMaxPendingTransactions = 1 // no limit on parallel txs
cfg.BatcherMaxConcurrentDARequest = 1
cfg.BatcherBatchType = 0
// We make channels as small as possible, such that they contain a single commitment.
// This is because failover to ethDA happens on a per-channel basis (each new channel is sent to altDA first).
// Hence, we can quickly observe the failover (to ethda) and fallback (to altda) behavior.
// cfg.BatcherMaxL1TxSizeBytes = 1200
// currently altda commitments can only be sent as calldata
cfg.DataAvailabilityType = flags.CalldataType

sys, err := cfg.Start(t)
require.NoError(t, err, "Error starting up system")
defer sys.Close()
l1Client := sys.NodeClient("l1")

startBlockL1, err := geth.WaitForBlockWithTxFromSender(cfg.DeployConfig.BatchSenderAddress, l1Client, 10)
require.NoError(t, err)

// Simulate altda server returning 503
sys.FakeAltDAServer.SetPutFailoverForNRequests(nChannelsFailover)

countEthDACommitment := uint64(0)

// Most likely, sequence of blocks will be: altDA, ethDA, ethDA, altDA, altDA, altDA.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why most likely? And why two ethDA in a row, because we set failoverCount=2, but why altDA in the beginning?

for blockNumL1 := startBlockL1.NumberU64(); blockNumL1 < startBlockL1.NumberU64()+6; blockNumL1++ {
blockL1, err := geth.WaitForBlock(big.NewInt(0).SetUint64(blockNumL1), l1Client)
require.NoError(t, err)
batcherTxs, err := transactions.TransactionsBySender(blockL1, cfg.DeployConfig.BatchSenderAddress)
require.NoError(t, err)
require.Equal(t, 1, len(batcherTxs)) // sanity check: ensure BatcherMaxPendingTransactions=1 is working
batcherTx := batcherTxs[0]
if batcherTx.Data()[0] == 1 {
t.Log("blockL1", blockNumL1, "batcherTxType", "altda")
} else if batcherTx.Data()[0] == 0 {
t.Log("blockL1", blockNumL1, "batcherTxType", "ethda")
} else {
t.Fatalf("unexpected batcherTxType: %v", batcherTx.Data()[0])
}
if batcherTx.Data()[0] == byte(derive.DerivationVersion0) {
countEthDACommitment++
}
}
require.Equal(t, nChannelsFailover, countEthDACommitment, "Expected %v ethDA commitments, got %v", nChannelsFailover, countEthDACommitment)

}
2 changes: 1 addition & 1 deletion op-e2e/system/da/multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestBatcherMultiTx(t *testing.T) {
require.NoError(t, err, "Waiting for l1 blocks")
// there are possibly other services (proposer/challenger) in the background sending txs
// so we only count the batcher txs
batcherTxCount, err := transactions.TransactionsBySender(block, cfg.DeployConfig.BatchSenderAddress)
batcherTxCount, err := transactions.TransactionsBySenderCount(block, cfg.DeployConfig.BatchSenderAddress)
require.NoError(t, err)
totalBatcherTxsCount += int64(batcherTxCount)

Expand Down
Loading