From 96c29dd3798fb75608580f79d847ef5acaf29182 Mon Sep 17 00:00:00 2001 From: Charlie Chen Date: Fri, 22 Dec 2023 16:24:43 -0600 Subject: [PATCH] re-apply v10 fixes and wait for tx receipt before reporting outTx tracker --- zetaclient/evm_client.go | 58 +++++++++++++++++++++-------------- zetaclient/evm_signer.go | 66 +++++++++++++++++++++++++++++++++------- zetaclient/query.go | 2 +- 3 files changed, 92 insertions(+), 34 deletions(-) diff --git a/zetaclient/evm_client.go b/zetaclient/evm_client.go index a7c93b8558..6830a99c6e 100644 --- a/zetaclient/evm_client.go +++ b/zetaclient/evm_client.go @@ -28,6 +28,7 @@ import ( "github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/erc20custody.sol" "github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/zetaconnector.non-eth.sol" "github.com/zeta-chain/zetacore/common" + crosschainkeeper "github.com/zeta-chain/zetacore/x/crosschain/keeper" "github.com/zeta-chain/zetacore/x/crosschain/types" observertypes "github.com/zeta-chain/zetacore/x/observer/types" "github.com/zeta-chain/zetacore/zetaclient/config" @@ -559,6 +560,13 @@ var lowestOutTxNonceToObserve = map[int64]uint64{ // FIXME: there's a chance that a txhash in OutTxChan may not deliver when Stop() is called // observeOutTx periodically checks all the txhash in potential outbound txs func (ob *EVMChainClient) observeOutTx() { + // The garbage trackers associated with finalized cctxs + finalizedCctxTrackers := map[int64]map[uint64]bool{ + 5: make(map[uint64]bool), // Goerli + 97: make(map[uint64]bool), // BSC testnet + 80001: make(map[uint64]bool), // Mumbai + } + // read env variables if set timeoutNonce, err := strconv.Atoi(os.Getenv("OS_TIMEOUT_NONCE")) if err != nil || timeoutNonce <= 0 { @@ -593,6 +601,21 @@ func (ob *EVMChainClient) observeOutTx() { if nonceInt < lowestOutTxNonceToObserve[ob.chain.ChainId] { continue } + if finalizedCctxTrackers[ob.chain.ChainId][nonceInt] { + continue + } + // Skip those problematic trackers whose associated cctxs are no longer pending (Reverted/Outboundminted/Aborted) + cctx, err := ob.zetaClient.GetCctxByNonce(ob.chain.ChainId, nonceInt) + if err != nil || cctx == nil { + ob.logger.ObserveOutTx.Error().Err(err).Msgf("garbage tracker, error GetCctxByNonce for chain %d nonce %d", ob.chain.ChainId, nonceInt) + continue + } + if !crosschainkeeper.IsPending(*cctx) { + finalizedCctxTrackers[ob.chain.ChainId][nonceInt] = true + ob.logger.ObserveOutTx.Info().Msgf("garbage tracker chain %s nonce %d is not pending", ob.chain.String(), nonceInt) + continue + } + ob.Mu.Lock() _, found := ob.outTXConfirmedReceipts[ob.GetTxID(nonceInt)] ob.Mu.Unlock() @@ -645,11 +668,11 @@ func (ob *EVMChainClient) queryTxByHash(txHash string, nonce uint64) (*ethtypes. receipt, err := ob.evmClient.TransactionReceipt(ctxt, ethcommon.HexToHash(txHash)) if err != nil { if err != ethereum.NotFound { - logger.Warn().Err(err).Msgf("TransactionReceipt/TransactionByHash error, txHash %s", txHash) + logger.Warn().Err(err).Msgf("queryTxByHash: TransactionReceipt/TransactionByHash error, txHash %s", txHash) } return nil, nil, err } - transaction, _, err := ob.evmClient.TransactionByHash(ctxt, ethcommon.HexToHash(txHash)) + transaction, isPending, err := ob.evmClient.TransactionByHash(ctxt, ethcommon.HexToHash(txHash)) if err != nil { return nil, nil, err } @@ -657,15 +680,20 @@ func (ob *EVMChainClient) queryTxByHash(txHash string, nonce uint64) (*ethtypes. return nil, nil, fmt.Errorf("queryTxByHash: txHash %s nonce mismatch: wanted %d, got tx nonce %d", txHash, nonce, transaction.Nonce()) } confHeight := receipt.BlockNumber.Uint64() + ob.GetCoreParams().ConfirmationCount - if confHeight < 0 || confHeight >= math.MaxInt64 { - return nil, nil, fmt.Errorf("confHeight is out of range") + if confHeight >= math.MaxInt64 { + return nil, nil, fmt.Errorf("queryTxByHash: confHeight is out of range") } // #nosec G701 checked in range if int64(confHeight) > ob.GetLastBlockHeight() { - log.Warn().Msgf("included but not confirmed: receipt block %d, current block %d", receipt.BlockNumber, ob.GetLastBlockHeight()) + log.Warn().Msgf("queryTxByHash: included but not confirmed: receipt block %d, current block %d", receipt.BlockNumber, ob.GetLastBlockHeight()) return nil, nil, fmt.Errorf("included but not confirmed") } + // transaction must NOT be pending + if isPending { + log.Error().Msgf("queryTxByHash: confirmed but still pending: txHash %s nonce %d receipt block %d", txHash, nonce, receipt.BlockNumber) + return nil, nil, fmt.Errorf("confirmed but still pending") + } return receipt, transaction, nil } @@ -921,7 +949,9 @@ func (ob *EVMChainClient) observeInTX() error { // query incoming gas asset for bn := startBlock; bn <= toBlock; bn++ { - if common.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains + if crosschainFlags.BlockHeaderVerificationFlags != nil && + crosschainFlags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled && + common.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains err = ob.postBlockHeader(toBlock) if err != nil { ob.logger.ExternalChainWatcher.Error().Err(err).Msg("error posting block header") @@ -932,22 +962,6 @@ func (ob *EVMChainClient) observeInTX() error { ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("error getting block: %d", bn) continue } - headerRLP, err := rlp.EncodeToBytes(block.Header()) - if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("error encoding block header: %d", bn) - continue - } - - _, err = ob.zetaClient.PostAddBlockHeader( - ob.chain.ChainId, - block.Hash().Bytes(), - block.Number().Int64(), - common.NewEthereumHeader(headerRLP), - ) - if err != nil { - ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("error posting block header: %d", bn) - continue - } for _, tx := range block.Transactions() { if tx.To() == nil { diff --git a/zetaclient/evm_signer.go b/zetaclient/evm_signer.go index 7ea8b841b6..0e1a0054e9 100644 --- a/zetaclient/evm_signer.go +++ b/zetaclient/evm_signer.go @@ -9,6 +9,7 @@ import ( "math/rand" "strconv" "strings" + "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi" @@ -36,6 +37,10 @@ type EVMSigner struct { erc20CustodyContractAddress ethcommon.Address logger zerolog.Logger ts *TelemetryServer + + // for outTx tracker reporting + mu *sync.Mutex + outTxHashBeingReported map[string]bool } var _ ChainSigner = &EVMSigner{} @@ -83,7 +88,9 @@ func NewEVMSigner( logger: logger.With(). Str("chain", chain.ChainName.String()). Str("module", "EVMSigner").Logger(), - ts: ts, + ts: ts, + mu: &sync.Mutex{}, + outTxHashBeingReported: make(map[string]bool), }, nil } @@ -569,11 +576,7 @@ func (signer *EVMSigner) TryProcessOutTx( log.Warn().Err(err).Msgf("OutTx Broadcast error") retry, report := HandleBroadcastError(err, strconv.FormatUint(send.GetCurrentOutTxParam().OutboundTxTssNonce, 10), toChain.String(), outTxHash) if report { - zetaHash, err := zetaBridge.AddTxHashToOutTxTracker(toChain.ChainId, tx.Nonce(), outTxHash, nil, "", -1) - if err != nil { - logger.Err(err).Msgf("Unable to add to tracker on ZetaCore: nonce %d chain %s outTxHash %s", send.GetCurrentOutTxParam().OutboundTxTssNonce, toChain, outTxHash) - } - logger.Info().Msgf("Broadcast to core successful %s", zetaHash) + signer.ReportToOutTxTracker(zetaBridge, toChain.ChainId, tx.Nonce(), outTxHash, logger) } if !retry { break @@ -582,15 +585,56 @@ func (signer *EVMSigner) TryProcessOutTx( continue } logger.Info().Msgf("Broadcast success: nonce %d to chain %s outTxHash %s", send.GetCurrentOutTxParam().OutboundTxTssNonce, toChain, outTxHash) - zetaHash, err := zetaBridge.AddTxHashToOutTxTracker(toChain.ChainId, tx.Nonce(), outTxHash, nil, "", -1) - if err != nil { - logger.Err(err).Msgf("Unable to add to tracker on ZetaCore: nonce %d chain %s outTxHash %s", send.GetCurrentOutTxParam().OutboundTxTssNonce, toChain, outTxHash) - } - logger.Info().Msgf("Broadcast to core successful %s", zetaHash) + signer.ReportToOutTxTracker(zetaBridge, toChain.ChainId, tx.Nonce(), outTxHash, logger) break // successful broadcast; no need to retry } + } +} +func (signer *EVMSigner) ReportToOutTxTracker(zetaBridge ZetaCoreBridger, chainID int64, nonce uint64, outTxHash string, logger zerolog.Logger) { + // skip if already being reported + signer.mu.Lock() + defer signer.mu.Unlock() + if _, found := signer.outTxHashBeingReported[outTxHash]; found { + logger.Info().Msgf("ReportToOutTxTracker: outTxHash %s for chain %d nonce %d is being reported", outTxHash, chainID, nonce) + return } + signer.outTxHashBeingReported[outTxHash] = true // mark as being reported + + // report to outTxTracker with goroutine + go func() { + defer func() { + signer.mu.Lock() + delete(signer.outTxHashBeingReported, outTxHash) + signer.mu.Unlock() + }() + + // try fetching tx receipt for 10 minutes + tStart := time.Now() + for { + if time.Since(tStart) > 10*time.Minute { // give up after 10 minutes + logger.Info().Msgf("ReportToOutTxTracker: outTxHash report timeout for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash) + return + } + receipt, err := signer.client.TransactionReceipt(context.TODO(), ethcommon.HexToHash(outTxHash)) + if err != nil { + logger.Info().Err(err).Msgf("ReportToOutTxTracker: receipt not available for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash) + time.Sleep(10 * time.Second) + continue + } + if receipt != nil { + logger.Info().Msgf("ReportToOutTxTracker: receipt available for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash) + break + } + } + + // report to outTxTracker + zetaHash, err := zetaBridge.AddTxHashToOutTxTracker(chainID, nonce, outTxHash, nil, "", -1) + if err != nil { + logger.Err(err).Msgf("ReportToOutTxTracker: unable to add to tracker on ZetaCore for chain %d nonce %d outTxHash %s", chainID, nonce, outTxHash) + } + logger.Info().Msgf("ReportToOutTxTracker: reported outTxHash to core successful %s, chain %d nonce %d outTxHash %s", zetaHash, chainID, nonce, outTxHash) + }() } // SignERC20WithdrawTx diff --git a/zetaclient/query.go b/zetaclient/query.go index 76d4573572..6ac9a1c5a1 100644 --- a/zetaclient/query.go +++ b/zetaclient/query.go @@ -332,7 +332,7 @@ func (b *ZetaCoreBridge) GetAllOutTxTrackerByChain(chain common.Chain, order Ord Pagination: &query.PageRequest{ Key: nil, Offset: 0, - Limit: 2000, + Limit: 3000, CountTotal: false, Reverse: false, },