Skip to content

Commit

Permalink
feat: apply cronos (#208)
Browse files Browse the repository at this point in the history
* Problem: parallel tx execution is not supported (#205)

add basic support in sdk:
- add a TxExecutor baseapp option
- add TxIndex/TxCount/MsgIndex in context

Update CHANGELOG.md

Signed-off-by: yihuang <[email protected]>

fix misspell

fix lint

run gci

fix lint

gci seems not compatible with gofumpt

* Support object store (#206)

generic interface

generic btree

generic cachekv

generic transient store

support ObjStore

changelog

Update CHANGELOG.md

Signed-off-by: yihuang <[email protected]>

object store key

Apply review suggestions

fix merge conflict

fix snapshot

revert dependers

* prefix store support object store (#236)

* Problem: fee collection not compatible with parallel execution (#237)

* Problem: no efficient way to collect fee

Solution:
- support an idea of virtual account in bank module, where the incoming
  coins are accumulated in a per-tx object store first, then accumulate
  and credit to the real account at end blocker.

  it's nesserary to support parallel tx execution, where we try not to
  access shared states.

more efficient sum

support SendCoinsFromModuleToAccountVirtual

fix test

fix test

* fix lint

* fix test

* fix test

* fix test

* fix test

* fix test

* fix mock keeper

* try fix lint

* try fix lint

* reuse code

* try fix linter

* Update x/bank/keeper/send.go

Signed-off-by: yihuang <[email protected]>

* algin panic call

* fix error handling

* try fix lint

* nolintlint generate falst postiive

---------

Signed-off-by: yihuang <[email protected]>

* Problem: MultiStore interface is bloated (#240)

* Problem: MultiStore interface is bloated

Solution:
- Split out specialied methods from it, keeping the MultiStore generic

* Update store/CHANGELOG.md

Signed-off-by: yihuang <[email protected]>

---------

Signed-off-by: yihuang <[email protected]>

* Problem: no API to use the new CoW branch store (#243)

* Support RunAtomic API

* add unit test

* Problem: block gas used not set in context (#252)

Solution:
- fix the way context is updated

* Problem: invalid tx returns negative gas wanted (#253)

Solution:
- add checks

* Problem: signature verification result not cache between incarnations of same tx (cosmos#565)

* Problem: signature verification result not cache between incarnations of
same tx

Closes: cosmos#564

Solution:
- introduce incarnation cache that's shared between incarnations of the
  same tx

* Update CHANGELOG.md

Signed-off-by: yihuang <[email protected]>

* Update types/context.go

Signed-off-by: yihuang <[email protected]>

* Update types/context.go

Signed-off-by: yihuang <[email protected]>

* fix nil convert

---------

Signed-off-by: yihuang <[email protected]>

* Problem: gas consumed differs after enabled cache (cosmos#570)

* Problem: gas consumed differs after enabled cache

* fix test

* Revert "Problem: gas consumed differs after enabled cache"

This reverts commit f33944e.

* Revert "Problem: signature verification result not cache between incarnations of same tx (cosmos#565)"

This reverts commit 5a1594f.

* keep interface

* Problem: tx executor can't do dependency analysis (cosmos#744)

* Problem: tx executor can't do dependency analysis

Solution:
- change the api to allow static analysis on tx body

* fix

* changelog

* cleanup

* Update CHANGELOG.md

Signed-off-by: yihuang <[email protected]>

---------

Signed-off-by: yihuang <[email protected]>

* Problem: can't extend abci listeners in streaming manager (#269)

Solution:
- add API StreamingManager to allow extending the abci listeners, mainly
  for versiondb support

Update CHANGELOG.md

Signed-off-by: yihuang <[email protected]>

cleanup

* chore: apply custom-ed store

* feat: Append send restrictionto Delegate/Undelegate Coins (#202)

* chore: Update go.mod for custom store

* chore: fix lint

* chore: fix lint

* chore: Update go.mod for custom cosmos-sdk/store

* Revert "chore: fix lint"

This reverts commit 8e7af64.

---------

Signed-off-by: yihuang <[email protected]>
Co-authored-by: yihuang <[email protected]>
Co-authored-by: mmsqe <[email protected]>
Co-authored-by: Kyuhyeon Choi <[email protected]>
  • Loading branch information
4 people authored Nov 6, 2024
1 parent c64d101 commit 55e55e6
Show file tree
Hide file tree
Showing 38 changed files with 642 additions and 98 deletions.
107 changes: 73 additions & 34 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,11 @@ func (app *BaseApp) ApplySnapshotChunk(req *abci.RequestApplySnapshotChunk) (*ab
func (app *BaseApp) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
var mode execMode

switch {
case req.Type == abci.CheckTxType_New:
switch req.Type {
case abci.CheckTxType_New:
mode = execModeCheck

case req.Type == abci.CheckTxType_Recheck:
case abci.CheckTxType_Recheck:
mode = execModeReCheck

default:
Expand Down Expand Up @@ -775,48 +775,48 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request

// Reset the gas meter so that the AnteHandlers aren't required to
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
app.finalizeBlockState.SetContext(
app.finalizeBlockState.Context().
WithBlockGasMeter(gasMeter).
WithTxCount(len(req.Txs)),
)

// Iterate over all raw transactions in the proposal and attempt to execute
// them, gathering the execution results.
//
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
// vote extensions, so skip those.
txResults := make([]*abci.ExecTxResult, 0, len(req.Txs))
for _, rawTx := range req.Txs {
var response *abci.ExecTxResult

if _, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx)
} else {
// In the case where a transaction included in a block proposal is malformed,
// we still want to return a default response to comet. This is because comet
// expects a response for each transaction included in a block proposal.
response = sdkerrors.ResponseExecTxResultWithEvents(
sdkerrors.ErrTxDecode,
0,
0,
nil,
false,
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
txResults, err := app.executeTxs(ctx, req.Txs)
if err != nil {
// usually due to canceled
return nil, err
}

if app.finalizeBlockState.ms.TracingEnabled() {
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
}

endBlock, err := app.endBlock(app.finalizeBlockState.Context())
var (
blockGasUsed uint64
blockGasWanted uint64
)
for _, res := range txResults {
// GasUsed should not be -1 but just in case
if res.GasUsed > 0 {
blockGasUsed += uint64(res.GasUsed)
}
// GasWanted could be -1 if the tx is invalid
if res.GasWanted > 0 {
blockGasWanted += uint64(res.GasWanted)
}
}
app.finalizeBlockState.SetContext(
app.finalizeBlockState.Context().
WithBlockGasUsed(blockGasUsed).
WithBlockGasWanted(blockGasWanted),
)

endBlock, err := app.endBlock(ctx)
if err != nil {
return nil, err
}
Expand All @@ -840,6 +840,45 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
}, nil
}

func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) {
if app.txExecutor != nil {
return app.txExecutor(ctx, txs, app.finalizeBlockState.ms, func(i int, memTx sdk.Tx, ms storetypes.MultiStore, incarnationCache map[string]any) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(txs[i], memTx, i, ms, incarnationCache)
})
}

txResults := make([]*abci.ExecTxResult, 0, len(txs))
for i, rawTx := range txs {
var response *abci.ExecTxResult

if memTx, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx, memTx, i)
} else {
// In the case where a transaction included in a block proposal is malformed,
// we still want to return a default response to comet. This is because comet
// expects a response for each transaction included in a block proposal.
response = sdkerrors.ResponseExecTxResultWithEvents(
sdkerrors.ErrTxDecode,
0,
0,
nil,
false,
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
}
return txResults, nil
}

// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
// Specifically, it will execute an application's BeginBlock (if defined), followed
// by the transactions in the proposal, finally followed by the application's
Expand Down Expand Up @@ -1188,7 +1227,7 @@ func (app *BaseApp) CreateQueryContext(height int64, prove bool) (sdk.Context, e
// use custom query multi-store if provided
qms := app.qms
if qms == nil {
qms = app.cms.(storetypes.MultiStore)
qms = storetypes.RootMultiStore(app.cms)
}

lastBlockHeight := qms.LatestVersion()
Expand Down
61 changes: 53 additions & 8 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type BaseApp struct {
name string // application name from abci.BlockInfo
db dbm.DB // common DB backend
cms storetypes.CommitMultiStore // Main (uncached) state
qms storetypes.MultiStore // Optional alternative multistore for querying only.
qms storetypes.RootMultiStore // Optional alternative multistore for querying only.
storeLoader StoreLoader // function to handle store loading, may be overridden with SetStoreLoader()
grpcQueryRouter *GRPCQueryRouter // router for redirecting gRPC query calls
msgServiceRouter *MsgServiceRouter // router for redirecting Msg service messages
Expand Down Expand Up @@ -194,6 +194,9 @@ type BaseApp struct {
//
// SAFETY: it's safe to do if validators validate the total gas wanted in the `ProcessProposal`, which is the case in the default handler.
disableBlockGasMeter bool

// Optional alternative tx executor, used for block-stm parallel transaction execution.
txExecutor TxExecutor
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down Expand Up @@ -302,6 +305,9 @@ func (app *BaseApp) MountStores(keys ...storetypes.StoreKey) {
case *storetypes.MemoryStoreKey:
app.MountStore(key, storetypes.StoreTypeMemory)

case *storetypes.ObjectStoreKey:
app.MountStore(key, storetypes.StoreTypeObject)

default:
panic(fmt.Sprintf("Unrecognized store key type :%T", key))
}
Expand Down Expand Up @@ -341,6 +347,17 @@ func (app *BaseApp) MountMemoryStores(keys map[string]*storetypes.MemoryStoreKey
}
}

// MountObjectStores mounts all transient object stores with the BaseApp's internal
// commit multi-store.
func (app *BaseApp) MountObjectStores(keys map[string]*storetypes.ObjectStoreKey) {
skeys := maps.Keys(keys)
sort.Strings(skeys)
for _, key := range skeys {
memKey := keys[key]
app.MountStore(memKey, storetypes.StoreTypeObject)
}
}

// MountStore mounts a store to the provided key in the BaseApp multistore,
// using the default DB.
func (app *BaseApp) MountStore(key storetypes.StoreKey, typ storetypes.StoreType) {
Expand Down Expand Up @@ -659,13 +676,14 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter {
}

// retrieve the context for the tx w/ txBytes and other memoized values.
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte, txIndex int) sdk.Context {
modeState := app.getState(mode)
if modeState == nil {
panic(fmt.Sprintf("state is nil for mode %v", mode))
}
ctx := modeState.Context().
WithTxBytes(txBytes).
WithTxIndex(txIndex).
WithGasMeter(storetypes.NewInfiniteGasMeter())
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed

Expand Down Expand Up @@ -750,7 +768,11 @@ func (app *BaseApp) beginBlock(_ *abci.RequestFinalizeBlock) (sdk.BeginBlock, er
return resp, nil
}

func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
func (app *BaseApp) deliverTx(tx []byte, memTx sdk.Tx, txIndex int) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(tx, memTx, txIndex, nil, nil)
}

func (app *BaseApp) deliverTxWithMultiStore(tx []byte, memTx sdk.Tx, txIndex int, txMultiStore storetypes.MultiStore, incarnationCache map[string]any) *abci.ExecTxResult {
gInfo := sdk.GasInfo{}
resultStr := "successful"

Expand All @@ -763,7 +785,7 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
}()

gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx)
gInfo, result, anteEvents, err := app.runTxWithMultiStore(execModeFinalize, tx, memTx, txIndex, txMultiStore, incarnationCache)
if err != nil {
resultStr = "failed"
resp = sdkerrors.ResponseExecTxResultWithEvents(
Expand Down Expand Up @@ -821,12 +843,27 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) {
// returned if the tx does not run out of gas and if all the messages are valid
// and execute successfully. An error is returned otherwise.
func (app *BaseApp) runTx(mode execMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
return app.runTxWithMultiStore(mode, txBytes, nil, -1, nil, nil)
}

func (app *BaseApp) runTxWithMultiStore(
mode execMode,
txBytes []byte,
tx sdk.Tx,
txIndex int,
txMultiStore storetypes.MultiStore,
incarnationCache map[string]any,
) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
// determined by the GasMeter. We need access to the context to get the gas
// meter, so we initialize upfront.
var gasWanted uint64

ctx := app.getContextForTx(mode, txBytes)
ctx := app.getContextForTx(mode, txBytes, txIndex)
ctx = ctx.WithIncarnationCache(incarnationCache)
if txMultiStore != nil {
ctx = ctx.WithMultiStore(txMultiStore)
}
ms := ctx.MultiStore()

// only run the tx if there is block gas remaining
Expand Down Expand Up @@ -868,9 +905,11 @@ func (app *BaseApp) runTx(mode execMode, txBytes []byte) (gInfo sdk.GasInfo, res
defer consumeBlockGas()
}

tx, err := app.txDecoder(txBytes)
if err != nil {
return sdk.GasInfo{}, nil, nil, err
if tx == nil {
tx, err = app.txDecoder(txBytes)
if err != nil {
return sdk.GasInfo{}, nil, nil, err
}
}

msgs := tx.GetMsgs()
Expand Down Expand Up @@ -1010,6 +1049,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Me
break
}

ctx = ctx.WithMsgIndex(i)

handler := app.msgServiceRouter.Handler(msg)
if handler == nil {
return nil, errorsmod.Wrapf(sdkerrors.ErrUnknownRequest, "no message handler found for %T", msg)
Expand Down Expand Up @@ -1143,6 +1184,10 @@ func (app *BaseApp) TxEncode(tx sdk.Tx) ([]byte, error) {
return app.txEncoder(tx)
}

func (app *BaseApp) StreamingManager() storetypes.StreamingManager {
return app.streamingManager
}

// Close is called in start cmd to gracefully cleanup resources.
func (app *BaseApp) Close() error {
var errs []error
Expand Down
2 changes: 1 addition & 1 deletion baseapp/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var _ genesis.TxHandler = (*BaseApp)(nil)
// ExecuteGenesisTx implements genesis.GenesisState from
// cosmossdk.io/core/genesis to set initial state in genesis
func (ba BaseApp) ExecuteGenesisTx(tx []byte) error {
res := ba.deliverTx(tx)
res := ba.deliverTx(tx, nil, -1)

if res.Code != types.CodeTypeOK {
return errors.New(res.Log)
Expand Down
12 changes: 11 additions & 1 deletion baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ func DisableBlockGasMeter() func(*BaseApp) {
return func(app *BaseApp) { app.SetDisableBlockGasMeter(true) }
}

// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
func SetTxExecutor(executor TxExecutor) func(*BaseApp) {
return func(app *BaseApp) { app.txExecutor = executor }
}

func (app *BaseApp) SetName(name string) {
if app.sealed {
panic("SetName() on sealed BaseApp")
Expand Down Expand Up @@ -314,7 +319,7 @@ func (app *BaseApp) SetTxEncoder(txEncoder sdk.TxEncoder) {
// SetQueryMultiStore set a alternative MultiStore implementation to support grpc query service.
//
// Ref: https://github.com/cosmos/cosmos-sdk/issues/13317
func (app *BaseApp) SetQueryMultiStore(ms storetypes.MultiStore) {
func (app *BaseApp) SetQueryMultiStore(ms storetypes.RootMultiStore) {
app.qms = ms
}

Expand Down Expand Up @@ -378,6 +383,11 @@ func (app *BaseApp) SetDisableBlockGasMeter(disableBlockGasMeter bool) {
app.disableBlockGasMeter = disableBlockGasMeter
}

// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
func (app *BaseApp) SetTxExecutor(executor TxExecutor) {
app.txExecutor = executor
}

// SetMsgServiceRouter sets the MsgServiceRouter of a BaseApp.
func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) {
app.msgServiceRouter = msgServiceRouter
Expand Down
4 changes: 2 additions & 2 deletions baseapp/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func (app *BaseApp) NewUncachedContext(isCheckTx bool, header cmtproto.Header) s
}

func (app *BaseApp) GetContextForFinalizeBlock(txBytes []byte) sdk.Context {
return app.getContextForTx(execModeFinalize, txBytes)
return app.getContextForTx(execModeFinalize, txBytes, -1)
}

func (app *BaseApp) GetContextForCheckTx(txBytes []byte) sdk.Context {
return app.getContextForTx(execModeCheck, txBytes)
return app.getContextForTx(execModeCheck, txBytes, -1)
}
18 changes: 18 additions & 0 deletions baseapp/txexecutor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package baseapp

import (
"context"

abci "github.com/cometbft/cometbft/abci/types"

"cosmossdk.io/store/types"

sdk "github.com/cosmos/cosmos-sdk/types"
)

type TxExecutor func(
ctx context.Context,
block [][]byte,
cms types.MultiStore,
deliverTxWithMultiStore func(int, sdk.Tx, types.MultiStore, map[string]any) *abci.ExecTxResult,
) ([]*abci.ExecTxResult, error)
5 changes: 5 additions & 0 deletions core/coins/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func FormatCoins(coins []*basev1beta1.Coin, metadata []*bankv1beta1.Metadata) (s
if err != nil {
return "", err
}
// If a coin contains a comma, return an error given that the output
// could be misinterpreted by the user as 2 different coins.
if strings.Contains(formatted[i], ",") {
return "", fmt.Errorf("coin %s contains a comma", formatted[i])
}
}

if len(coins) == 0 {
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ require (
// <temporary replace>
// )

// replace the store module with the b-harvest fork
replace cosmossdk.io/store => github.com/b-harvest/cosmos-sdk/store v0.0.0-20241106072527-a25eb3a65854

// Below are the long-lived replace of the Cosmos SDK
replace (
// use cosmos fork of keyring
Expand Down
Loading

0 comments on commit 55e55e6

Please sign in to comment.