Skip to content

Commit

Permalink
feat: allow delaying transactions in ResponsePrepareProposal (#717)
Browse files Browse the repository at this point in the history
* feat: allow prepare proposal to delay removed tx

* feat: add DELAYED tx action

* test: delayed txs

* test: e2e support for tx delays and max block size

* test(e2e): limit max block on dashcore and rotate nets to 1 MB

* chore(types): marshal Misbehaviors

* test(e2e): TestApp_TxTooBig
  • Loading branch information
lklimek authored Dec 18, 2023
1 parent e32466a commit 96de171
Show file tree
Hide file tree
Showing 19 changed files with 632 additions and 284 deletions.
2 changes: 1 addition & 1 deletion abci/cmd/abci-cli/abci-cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ func cmdPrepareProposal(cmd *cobra.Command, args []string) error {
existingTx := inTxArray(txsBytesArray, tx.Tx)
if tx.Action == types.TxRecord_UNKNOWN ||
(existingTx && tx.Action == types.TxRecord_ADDED) ||
(!existingTx && (tx.Action == types.TxRecord_UNMODIFIED || tx.Action == types.TxRecord_REMOVED)) {
(!existingTx && (tx.Action == types.TxRecord_UNMODIFIED || tx.Action == types.TxRecord_REMOVED || tx.Action == types.TxRecord_DELAYED)) {
resps = append(resps, response{
Code: codeBad,
Log: "Failed. Tx: " + string(tx.GetTx()) + " action: " + tx.Action.String(),
Expand Down
98 changes: 77 additions & 21 deletions abci/example/kvstore/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,53 @@ type VerifyTxFunc func(tx types.Tx, typ abci.CheckTxType) (abci.ResponseCheckTx,
// ExecTxFunc executes the transaction against some state
type ExecTxFunc func(tx types.Tx, roundState State) (abci.ExecTxResult, error)

// Helper struct that controls size of added transactions
type TxRecords struct {
Size int64
Limit int64
Txs []*abci.TxRecord
}

// Add new transaction if it fits the limit.
//
// Returns action that was taken, as some transactions may be delayed despite provided `tx.Action`.
// Errors when newly added transaction does not fit the limit.
func (txr *TxRecords) Add(tx *abci.TxRecord) (abci.TxRecord_TxAction, error) {
txSize := int64(len(tx.Tx))
switch tx.Action {
case abci.TxRecord_ADDED:
if txr.Size+txSize > txr.Limit {
return abci.TxRecord_UNKNOWN, errors.New("new transaction cannot be added: over limit")
}
// add to txs
txr.Txs = append(txr.Txs, tx)
txr.Size += txSize
return tx.Action, nil
case abci.TxRecord_UNMODIFIED:
{
if txr.Size+txSize > txr.Limit {
// over limit, delaying
delay := abci.TxRecord{Tx: tx.Tx, Action: abci.TxRecord_DELAYED}
return txr.Add(&delay)
}

// add to txs
txr.Txs = append(txr.Txs, tx)
txr.Size += txSize
return tx.Action, nil
}

case abci.TxRecord_REMOVED, abci.TxRecord_DELAYED:
// remove from txs, not counted in size
txr.Txs = append(txr.Txs, tx)
return tx.Action, nil
default:
panic(fmt.Sprintf("unknown tx action: %v", tx.Action))
}
}

func prepareTxs(req abci.RequestPrepareProposal) ([]*abci.TxRecord, error) {
return substPrepareTx(req.Txs, req.MaxTxBytes), nil
return substPrepareTx(req.Txs, req.MaxTxBytes)
}

func txRecords2Txs(txRecords []*abci.TxRecord) types.Txs {
Expand Down Expand Up @@ -83,34 +128,45 @@ func execPrepareTx(tx []byte) (abci.ExecTxResult, error) {
// proposal for transactions with the prefix stripped.
// It marks all of the original transactions as 'REMOVED' so that
// Tendermint will remove them from its mempool.
func substPrepareTx(txs [][]byte, maxTxBytes int64) []*abci.TxRecord {
trs := make([]*abci.TxRecord, 0, len(txs))
var removed []*abci.TxRecord
var totalBytes int64
func substPrepareTx(txs [][]byte, maxTxBytes int64) ([]*abci.TxRecord, error) {
trs := TxRecords{
Size: 0,
Limit: maxTxBytes,
Txs: make([]*abci.TxRecord, 0, len(txs)+1),
}

for _, tx := range txs {
action := abci.TxRecord_UNMODIFIED

// As a special logic of this app, we replace tx with the prefix 'prepare' with one without the prefix.
// We need to preserve ordering of the transactions.
if isPrepareTx(tx) {
// replace tx and add it as REMOVED
removed = append(removed, &abci.TxRecord{
Tx: tx,
Action: abci.TxRecord_REMOVED,
})
totalBytes -= int64(len(tx))

tx = bytes.TrimPrefix(tx, []byte(PreparePrefix))
action = abci.TxRecord_ADDED
// add new tx in place of the old one
record := abci.TxRecord{
Tx: bytes.TrimPrefix(tx, []byte(PreparePrefix)),
Action: abci.TxRecord_ADDED,
}
if _, err := trs.Add(&record); err != nil {
// cannot add new tx, so we cannot remove old one - just delay it and retry next time
action = abci.TxRecord_DELAYED
} else {
// old one can be removed from the mempool
action = abci.TxRecord_REMOVED
}
}
totalBytes += int64(len(tx))
if totalBytes > maxTxBytes {
break
}
trs = append(trs, &abci.TxRecord{

// Now we add the transaction to the list of transactions
transaction := &abci.TxRecord{
Tx: tx,
Action: action,
})
}
if _, err := trs.Add(transaction); err != nil {
// this should definitely not fail, as we don't add anything new
return nil, err
}
}

return append(trs, removed...)
return trs.Txs, nil
}

const PreparePrefix = "prepare"
Expand Down
59 changes: 59 additions & 0 deletions abci/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/gogo/protobuf/jsonpb"
"github.com/rs/zerolog"

"github.com/dashpay/tenderdash/crypto"
cryptoenc "github.com/dashpay/tenderdash/crypto/encoding"
Expand Down Expand Up @@ -276,6 +277,64 @@ func (m *ResponsePrepareProposal) Validate() error {
return nil
}

type Misbehaviors []Misbehavior

func (m Misbehaviors) MarshalZerologArray(e *zerolog.Array) {
for v := range m {
e.Interface(v)
}
}

type Txs [][]byte

func (b Txs) MarshalZerologArray(a *zerolog.Array) {
for _, bs := range b {
a.Hex(crypto.Checksum(bs)[:8])
}
}

func (txr *TxRecord) MarshalZerologObject(e *zerolog.Event) {
e.Str("action", txr.Action.String())
e.Hex("tx", crypto.Checksum(txr.Tx)[:8])
}

func (r *RequestPrepareProposal) MarshalZerologObject(e *zerolog.Event) {
e.Int64("max_tx_bytes", r.MaxTxBytes)
e.Array("txs", Txs(r.Txs))
e.Interface("last_commit", r.LocalLastCommit)
e.Array("misbehavior", Misbehaviors(r.Misbehavior))
e.Time("proposed_time", r.Time)

e.Int64("height", r.Height)
e.Int32("round", r.Round)

e.Hex("next_validators_hash", r.NextValidatorsHash)
e.Uint32("core_chain_locked_height", r.CoreChainLockedHeight)
e.Hex("proposer_pro_tx_hash", r.ProposerProTxHash)
e.Uint64("proposed_app_version", r.ProposedAppVersion)
e.Str("version", r.Version.String())
e.Hex("quorum_hash", r.QuorumHash)
}

func (r *RequestProcessProposal) MarshalZerologObject(e *zerolog.Event) {
e.Array("txs", Txs(r.Txs))
e.Interface("last_commit", r.ProposedLastCommit.String())
e.Array("misbehavior", Misbehaviors(r.Misbehavior))
e.Time("proposed_time", r.Time)

e.Hex("block_hash", r.Hash)
e.Int64("height", r.Height)
e.Int32("round", r.Round)

e.Hex("next_validators_hash", r.NextValidatorsHash)
e.Uint32("core_chain_locked_height", r.CoreChainLockedHeight)
e.Interface("core_chain_lock_update", r.CoreChainLockUpdate)
e.Hex("proposer_pro_tx_hash", r.ProposerProTxHash)
e.Uint64("proposed_app_version", r.ProposedAppVersion)
e.Str("version", r.Version.String())
e.Hex("quorum_hash", r.QuorumHash)
}

func isValidApphash(apphash tmbytes.HexBytes) bool {
return len(apphash) == crypto.DefaultAppHashSize
}
Expand Down
Loading

0 comments on commit 96de171

Please sign in to comment.