Skip to content

Commit

Permalink
Merge pull request #1902 from lavanet/PRT-remove-fallback-of-solana-slot
Browse files Browse the repository at this point in the history
fix: PRT -  Fix a fallback bug in Solana chain tracker
  • Loading branch information
ranlavanet authored Jan 12, 2025
2 parents bac5c8e + 039f9f4 commit d3723bb
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 20 deletions.
4 changes: 2 additions & 2 deletions protocol/chainlib/chain_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (cf *ChainFetcher) ChainFetcherMetadata() []pairingtypes.Metadata {
}

func (cf *ChainFetcher) CustomMessage(ctx context.Context, path string, data []byte, connectionType string, apiName string) ([]byte, error) {
utils.LavaFormatDebug("Sending CustomMessage", utils.Attribute{Key: "path", Value: path}, utils.Attribute{Key: "data", Value: data}, utils.Attribute{Key: "connectionType", Value: connectionType}, utils.Attribute{Key: "apiName", Value: apiName})
utils.LavaFormatTrace("Sending CustomMessage", utils.Attribute{Key: "path", Value: path}, utils.Attribute{Key: "data", Value: data}, utils.Attribute{Key: "connectionType", Value: connectionType}, utils.Attribute{Key: "apiName", Value: apiName})
craftData := &CraftData{Path: path, Data: data, ConnectionType: connectionType}
parsing := &spectypes.ParseDirective{
ApiName: apiName,
Expand All @@ -323,7 +323,7 @@ func (cf *ChainFetcher) CustomMessage(ctx context.Context, path string, data []b
return nil, err
}
reply, _, _, _, _, err := cf.chainRouter.SendNodeMsg(ctx, nil, chainMessage, nil)
utils.LavaFormatDebug("CustomMessage", utils.Attribute{Key: "reply", Value: reply})
utils.LavaFormatTrace("CustomMessage", utils.Attribute{Key: "reply", Value: reply})
if err != nil {
return nil, err
}
Expand Down
9 changes: 7 additions & 2 deletions protocol/chaintracker/chain_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,12 +654,17 @@ func newCustomChainTracker(chainFetcher ChainFetcher, config ChainTrackerConfig)
// By applying a name SVM for example
case "SOLANA", "SOLANAT", "KOII", "KOIIT":
utils.LavaFormatInfo("using SVMChainTracker", utils.Attribute{Key: "chainID", Value: config.ChainId})
cache, err := ristretto.NewCache(&ristretto.Config[int64, int64]{NumCounters: CacheNumCounters, MaxCost: CacheMaxCost, BufferItems: 64, IgnoreInternalCost: true})
slotCache, err := ristretto.NewCache(&ristretto.Config[int64, int64]{NumCounters: CacheNumCounters, MaxCost: CacheMaxCost, BufferItems: 64, IgnoreInternalCost: true})
if err != nil {
utils.LavaFormatFatal("could not create cache", err)
}
hashCache, err := ristretto.NewCache(&ristretto.Config[int64, string]{NumCounters: CacheNumCounters, MaxCost: CacheMaxCost, BufferItems: 64, IgnoreInternalCost: true})
if err != nil {
utils.LavaFormatFatal("could not create cache", err)
}
chainTracker.iChainFetcherWrapper = &SVMChainTracker{
cache: cache,
slotCache: slotCache,
hashCache: hashCache,
dataFetcher: chainTracker,
chainFetcher: chainFetcher,
}
Expand Down
75 changes: 59 additions & 16 deletions protocol/chaintracker/svm_chain_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@ import (
"context"
"encoding/json"
fmt "fmt"
"sync/atomic"
"time"

"github.com/dgraph-io/ristretto/v2"
"github.com/lavanet/lava/v4/utils"
)

const (
CacheMaxCost = 100000 // each item cost would be 1
CacheNumCounters = 100000 // expect 100000 items
latestBlockRequest = "{\"jsonrpc\":\"2.0\",\"method\":\"getLatestBlockhash\",\"params\":[{\"commitment\":\"finalized\"}],\"id\":1}"
CacheMaxCost = 100000 // each item cost would be 1
CacheNumCounters = 100000 // expect 100000 items
latestBlockRequest = "{\"jsonrpc\":\"2.0\",\"method\":\"getLatestBlockhash\",\"params\":[{\"commitment\":\"finalized\"}],\"id\":1}"
slotCacheTTL = time.Hour * 4
hashCacheTTL = time.Hour * 1
getSlotFromCacheMaxRetries = 5
getSlotFromCacheSleepDuration = time.Millisecond * 50
)

type IChainFetcherWrapper interface {
Expand All @@ -29,7 +34,9 @@ type IChainTrackerDataFetcher interface {
type SVMChainTracker struct {
dataFetcher IChainTrackerDataFetcher
chainFetcher ChainFetcher
cache *ristretto.Cache[int64, int64] // cache for block to slot. (a few slots can point the same block, but we don't really care about that so overwrite is ok)
slotCache *ristretto.Cache[int64, int64] // cache for block to slot. (a few slots can point the same block, but we don't really care about that so overwrite is ok)
hashCache *ristretto.Cache[int64, string] // cache for block to hash.
seenBlock int64
}

type SVMLatestBlockResponse struct {
Expand All @@ -38,7 +45,8 @@ type SVMLatestBlockResponse struct {
Slot int64 `json:"slot"`
} `json:"context"`
Value struct {
LastValidBlockHeight int64 `json:"lastValidBlockHeight"`
LastValidBlockHeight int64 `json:"lastValidBlockHeight"`
BlockHash string `json:"blockhash"`
} `json:"value"`
} `json:"result"`
}
Expand All @@ -48,42 +56,77 @@ func (cs *SVMChainTracker) fetchLatestBlockNumInner(ctx context.Context) (int64,
if err != nil {
return 0, err
}

var response SVMLatestBlockResponse
if err := json.Unmarshal(latestBlockResponse, &response); err != nil {
return 0, fmt.Errorf("failed to unmarshal response: %v", err)
}

blockNum := response.Result.Value.LastValidBlockHeight
slot := response.Result.Context.Slot
cs.cache.SetWithTTL(blockNum, slot, 1, time.Hour*24)
utils.LavaFormatDebug("[SVMChainTracker] fetching latest block num", utils.LogAttr("slot", slot), utils.LogAttr("block_num", blockNum))
blockHash := response.Result.Value.BlockHash

atomic.StoreInt64(&cs.seenBlock, blockNum)
cs.slotCache.SetWithTTL(blockNum, slot, 1, slotCacheTTL)
cs.hashCache.SetWithTTL(blockNum, blockHash, 1, hashCacheTTL)

utils.LavaFormatTrace("[SVMChainTracker] fetching latest block num",
utils.LogAttr("slot", slot),
utils.LogAttr("block_num", blockNum),
utils.LogAttr("block_hash", blockHash),
)

return blockNum, nil
}

func (cs *SVMChainTracker) FetchLatestBlockNum(ctx context.Context) (int64, error) {
latestBlockNum, err := cs.fetchLatestBlockNumInner(ctx)
if err != nil {
utils.LavaFormatWarning("[SVMChainTracker] failed to get latest block num, getting from chain fetcher", err)
return cs.chainFetcher.FetchLatestBlockNum(ctx)
return 0, utils.LavaFormatWarning("[SVMChainTracker] failed to get latest block num, getting from chain fetcher", err,
utils.LogAttr("block_num", latestBlockNum),
utils.LogAttr("latest_block", cs.dataFetcher.GetAtomicLatestBlockNum()),
utils.LogAttr("server_memory", cs.dataFetcher.GetServerBlockMemory()))
}
utils.LavaFormatDebug("[SVMChainTracker] fetched latest block num", utils.LogAttr("block_num", latestBlockNum))
utils.LavaFormatTrace("[SVMChainTracker] fetched latest block num", utils.LogAttr("block_num", latestBlockNum))
return latestBlockNum, nil
}

func (cs *SVMChainTracker) FetchBlockHashByNum(ctx context.Context, blockNum int64) (string, error) {
if blockNum < cs.dataFetcher.GetAtomicLatestBlockNum()-int64(cs.dataFetcher.GetServerBlockMemory()) {
return "", ErrorFailedToFetchTooEarlyBlock.Wrapf("requested Block: %d, latest block: %d, server memory %d", blockNum, cs.dataFetcher.GetAtomicLatestBlockNum(), cs.dataFetcher.GetServerBlockMemory())
}
blockHash, ok := cs.hashCache.Get(blockNum)
if ok {
utils.LavaFormatTrace("[SVMChainTracker] FetchBlockHashByNum found block hash in cache", utils.LogAttr("block_num", blockNum), utils.LogAttr("hash", blockHash))
return blockHash, nil
}

// In SVM, the block hash is fetched by slot instead of block.
// We need to get the slot which is related to this block number.
slot, ok := cs.cache.Get(blockNum)
if !ok {
utils.LavaFormatError("slot not found in cache, falling back to direct block fetch - This error can happen on bootstrap and should resolve by itself, if persists please let the dev team know", ErrorFailedToFetchTooEarlyBlock, utils.LogAttr("block", blockNum), utils.LogAttr("latest_block", cs.dataFetcher.GetAtomicLatestBlockNum()), utils.LogAttr("server_memory", cs.dataFetcher.GetServerBlockMemory()))
return cs.chainFetcher.FetchBlockHashByNum(ctx, blockNum)
slot, err := cs.tryGetSlotFromCache(blockNum)
if err != nil {
return "", err
}
utils.LavaFormatDebug("[SVMChainTracker] FetchBlockHashByNum found slot in cache", utils.LogAttr("block_num", blockNum), utils.LogAttr("slot", slot))

utils.LavaFormatTrace("[SVMChainTracker] FetchBlockHashByNum found slot in cache", utils.LogAttr("block_num", blockNum), utils.LogAttr("slot", slot))
hash, err := cs.chainFetcher.FetchBlockHashByNum(ctx, slot)
if err == nil {
utils.LavaFormatDebug("[SVMChainTracker] FetchBlockHashByNum succeeded", utils.LogAttr("block_num", blockNum), utils.LogAttr("hash", hash), utils.LogAttr("slot", slot))
utils.LavaFormatTrace("[SVMChainTracker] FetchBlockHashByNum succeeded", utils.LogAttr("block_num", blockNum), utils.LogAttr("hash", hash), utils.LogAttr("slot", slot))
}
return hash, err
}

func (cs *SVMChainTracker) tryGetSlotFromCache(blockNum int64) (int64, error) {
if blockNum <= atomic.LoadInt64(&cs.seenBlock) {
for i := 0; i < getSlotFromCacheMaxRetries; i++ {
slot, ok := cs.slotCache.Get(blockNum)
if ok {
return slot, nil
}
time.Sleep(getSlotFromCacheSleepDuration)
}
}

return 0, fmt.Errorf("slot not found in cache. This error can happen on bootstrap and should resolve by itself, if persists please let the dev team know. "+
"block: %d, latest_block: %d, server_memory: %d", blockNum, cs.dataFetcher.GetAtomicLatestBlockNum(), cs.dataFetcher.GetServerBlockMemory())
}

0 comments on commit d3723bb

Please sign in to comment.