Skip to content

Commit

Permalink
Sync not processed txs
Browse files Browse the repository at this point in the history
  • Loading branch information
kacpersaw committed Dec 18, 2023
1 parent 66d1c43 commit 4dd0569
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 17 deletions.
5 changes: 5 additions & 0 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"
"errors"
"github.com/spacemeshos/explorer-backend/collector/sql"
"github.com/spacemeshos/explorer-backend/model"
"github.com/spacemeshos/go-spacemesh/common/types"
sql2 "github.com/spacemeshos/go-spacemesh/sql"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/keepalive"
"time"
Expand Down Expand Up @@ -39,6 +42,8 @@ type Listener interface {
LayersInQueue() int
IsLayerInQueue(layer *pb.Layer) bool
GetEpochNumLayers() uint32
GetTransactions(parent context.Context, query *bson.D, opts ...*options.FindOptions) ([]model.Transaction, error)
UpdateTransactionState(parent context.Context, id string, state int32) error
}

type Collector struct {
Expand Down
34 changes: 34 additions & 0 deletions collector/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/spacemeshos/explorer-backend/utils"
"github.com/spacemeshos/go-spacemesh/common/types"
"go.mongodb.org/mongo-driver/bson"
"io"
"time"

Expand Down Expand Up @@ -177,3 +178,36 @@ func (c *Collector) syncLayer(lid types.LayerID) error {

return nil
}

func (c *Collector) syncNotProcessedTxs() error {
txs, err := c.listener.GetTransactions(context.TODO(), &bson.D{{Key: "state", Value: 0}})
if err != nil {
return err
}

for _, tx := range txs {
txId, err := utils.StringToBytes(tx.Id)
if err != nil {
return err
}

state, err := c.transactionsClient.TransactionsState(context.TODO(), &pb.TransactionsStateRequest{
TransactionId: []*pb.TransactionId{{Id: txId}},
IncludeTransactions: false,
})
if err != nil {
return err
}

txState := state.TransactionsState[0]

if txState != nil {
err := c.listener.UpdateTransactionState(context.TODO(), tx.Id, int32(txState.State))
if err != nil {
return err
}
}
}

return nil
}
5 changes: 5 additions & 0 deletions collector/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func (c *Collector) syncStatusPump() error {
if err != nil {
fmt.Errorf("syncLayer error: %v", err)
}

err = c.syncNotProcessedTxs()
if err != nil {
fmt.Errorf("syncNotProcessedTxs error: %v", err)
}
}
}

Expand Down
17 changes: 5 additions & 12 deletions collector/txs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ package collector_test

import (
"context"
"encoding/json"
"strings"
"testing"

"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson"

"github.com/spacemeshos/explorer-backend/model"
)

func TestTransactions(t *testing.T) {
Expand All @@ -18,20 +15,16 @@ func TestTransactions(t *testing.T) {
require.NoError(t, err)
require.Equal(t, len(generator.Transactions), len(txs))
for _, tx := range txs {
// temporary hack, until storage return data as slice of bson.B not an struct.
txEncoded, err := json.Marshal(tx.Map())
require.NoError(t, err)
var tmpTx model.Transaction
require.NoError(t, json.Unmarshal(txEncoded, &tmpTx))
generatedTx, ok := generator.Transactions[tmpTx.Id]
generatedTx, ok := generator.Transactions[tx.Id]
require.True(t, ok)
tmpTx.Receiver = strings.ToLower(tmpTx.Receiver)
tmpTx.Sender = strings.ToLower(tmpTx.Sender)
tx.Receiver = strings.ToLower(tx.Receiver)
tx.Sender = strings.ToLower(tx.Sender)
generatedTx.Receiver = strings.ToLower(generatedTx.Receiver)
generatedTx.Sender = strings.ToLower(generatedTx.Sender)
generatedTx.PublicKey = "" // we do not encode it to send tx, omit this.
generatedTx.Signature = "" // we generate sign on emulation of pb stream.
tmpTx.Signature = "" // we generate sign on emulation of pb stream.
require.Equal(t, *generatedTx, tmpTx)
tx.Signature = "" // we generate sign on emulation of pb stream.
require.Equal(t, *generatedTx, tx)
}
}
31 changes: 26 additions & 5 deletions storage/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,24 +118,24 @@ func (s *Storage) IsTransactionExists(parent context.Context, txId string) bool
return count > 0
}

func (s *Storage) GetTransactions(parent context.Context, query *bson.D, opts ...*options.FindOptions) ([]bson.D, error) {
func (s *Storage) GetTransactions(parent context.Context, query *bson.D, opts ...*options.FindOptions) ([]model.Transaction, error) {
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
defer cancel()
cursor, err := s.db.Collection("txs").Find(ctx, query, opts...)
if err != nil {
log.Info("GetTransactions: %v", err)
return nil, err
}
var docs interface{} = []bson.D{}
err = cursor.All(ctx, &docs)
var txs []model.Transaction
err = cursor.All(ctx, &txs)
if err != nil {
log.Info("GetTransactions: %v", err)
return nil, err
}
if len(docs.([]bson.D)) == 0 {
if len(txs) == 0 {
return nil, nil
}
return docs.([]bson.D), nil
return txs, nil
}

func (s *Storage) SaveTransaction(parent context.Context, in *model.Transaction) error {
Expand Down Expand Up @@ -305,3 +305,24 @@ func (s *Storage) SaveTransactionResult(parent context.Context, in *model.Transa
}
return err
}

func (s *Storage) UpdateTransactionState(parent context.Context, id string, state int32) error {
ctx, cancel := context.WithTimeout(parent, 5*time.Second)
defer cancel()

tx := bson.D{
{
Key: "$set",
Value: bson.D{
{Key: "state", Value: state},
},
},
}

_, err := s.db.Collection("txs").UpdateOne(ctx,
bson.D{{Key: "id", Value: id}}, tx)
if err != nil {
log.Info("UpdateTransactionState: %v obj: %+v", err, tx)
}
return err
}

0 comments on commit 4dd0569

Please sign in to comment.