diff --git a/collector/collector.go b/collector/collector.go index 30ff9dd..5d3ff8d 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -4,8 +4,11 @@ import ( "context" "errors" "github.com/spacemeshos/explorer-backend/collector/sql" + "github.com/spacemeshos/explorer-backend/model" "github.com/spacemeshos/go-spacemesh/common/types" sql2 "github.com/spacemeshos/go-spacemesh/sql" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo/options" "golang.org/x/sync/errgroup" "google.golang.org/grpc/keepalive" "time" @@ -39,6 +42,8 @@ type Listener interface { LayersInQueue() int IsLayerInQueue(layer *pb.Layer) bool GetEpochNumLayers() uint32 + GetTransactions(parent context.Context, query *bson.D, opts ...*options.FindOptions) ([]model.Transaction, error) + UpdateTransactionState(parent context.Context, id string, state int32) error } type Collector struct { diff --git a/collector/mesh.go b/collector/mesh.go index 9f14a57..765a0b2 100644 --- a/collector/mesh.go +++ b/collector/mesh.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/spacemeshos/explorer-backend/utils" "github.com/spacemeshos/go-spacemesh/common/types" + "go.mongodb.org/mongo-driver/bson" "io" "time" @@ -177,3 +178,36 @@ func (c *Collector) syncLayer(lid types.LayerID) error { return nil } + +func (c *Collector) syncNotProcessedTxs() error { + txs, err := c.listener.GetTransactions(context.TODO(), &bson.D{{Key: "state", Value: 0}}) + if err != nil { + return err + } + + for _, tx := range txs { + txId, err := utils.StringToBytes(tx.Id) + if err != nil { + return err + } + + state, err := c.transactionsClient.TransactionsState(context.TODO(), &pb.TransactionsStateRequest{ + TransactionId: []*pb.TransactionId{{Id: txId}}, + IncludeTransactions: false, + }) + if err != nil { + return err + } + + txState := state.TransactionsState[0] + + if txState != nil { + err := c.listener.UpdateTransactionState(context.TODO(), tx.Id, int32(txState.State)) + if err != nil { + return err + } + } + } + + return nil +} diff --git a/collector/node.go b/collector/node.go index 4dbffae..a93a55e 100644 --- a/collector/node.go +++ b/collector/node.go @@ -48,6 +48,11 @@ func (c *Collector) syncStatusPump() error { if err != nil { fmt.Errorf("syncLayer error: %v", err) } + + err = c.syncNotProcessedTxs() + if err != nil { + fmt.Errorf("syncNotProcessedTxs error: %v", err) + } } } diff --git a/collector/txs_test.go b/collector/txs_test.go index dfa6a31..9510070 100644 --- a/collector/txs_test.go +++ b/collector/txs_test.go @@ -2,14 +2,11 @@ package collector_test import ( "context" - "encoding/json" "strings" "testing" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/bson" - - "github.com/spacemeshos/explorer-backend/model" ) func TestTransactions(t *testing.T) { @@ -18,20 +15,16 @@ func TestTransactions(t *testing.T) { require.NoError(t, err) require.Equal(t, len(generator.Transactions), len(txs)) for _, tx := range txs { - // temporary hack, until storage return data as slice of bson.B not an struct. - txEncoded, err := json.Marshal(tx.Map()) require.NoError(t, err) - var tmpTx model.Transaction - require.NoError(t, json.Unmarshal(txEncoded, &tmpTx)) - generatedTx, ok := generator.Transactions[tmpTx.Id] + generatedTx, ok := generator.Transactions[tx.Id] require.True(t, ok) - tmpTx.Receiver = strings.ToLower(tmpTx.Receiver) - tmpTx.Sender = strings.ToLower(tmpTx.Sender) + tx.Receiver = strings.ToLower(tx.Receiver) + tx.Sender = strings.ToLower(tx.Sender) generatedTx.Receiver = strings.ToLower(generatedTx.Receiver) generatedTx.Sender = strings.ToLower(generatedTx.Sender) generatedTx.PublicKey = "" // we do not encode it to send tx, omit this. generatedTx.Signature = "" // we generate sign on emulation of pb stream. - tmpTx.Signature = "" // we generate sign on emulation of pb stream. - require.Equal(t, *generatedTx, tmpTx) + tx.Signature = "" // we generate sign on emulation of pb stream. + require.Equal(t, *generatedTx, tx) } } diff --git a/storage/tx.go b/storage/tx.go index ef479fb..e015954 100644 --- a/storage/tx.go +++ b/storage/tx.go @@ -118,7 +118,7 @@ func (s *Storage) IsTransactionExists(parent context.Context, txId string) bool return count > 0 } -func (s *Storage) GetTransactions(parent context.Context, query *bson.D, opts ...*options.FindOptions) ([]bson.D, error) { +func (s *Storage) GetTransactions(parent context.Context, query *bson.D, opts ...*options.FindOptions) ([]model.Transaction, error) { ctx, cancel := context.WithTimeout(parent, 5*time.Second) defer cancel() cursor, err := s.db.Collection("txs").Find(ctx, query, opts...) @@ -126,16 +126,16 @@ func (s *Storage) GetTransactions(parent context.Context, query *bson.D, opts .. log.Info("GetTransactions: %v", err) return nil, err } - var docs interface{} = []bson.D{} - err = cursor.All(ctx, &docs) + var txs []model.Transaction + err = cursor.All(ctx, &txs) if err != nil { log.Info("GetTransactions: %v", err) return nil, err } - if len(docs.([]bson.D)) == 0 { + if len(txs) == 0 { return nil, nil } - return docs.([]bson.D), nil + return txs, nil } func (s *Storage) SaveTransaction(parent context.Context, in *model.Transaction) error { @@ -305,3 +305,24 @@ func (s *Storage) SaveTransactionResult(parent context.Context, in *model.Transa } return err } + +func (s *Storage) UpdateTransactionState(parent context.Context, id string, state int32) error { + ctx, cancel := context.WithTimeout(parent, 5*time.Second) + defer cancel() + + tx := bson.D{ + { + Key: "$set", + Value: bson.D{ + {Key: "state", Value: state}, + }, + }, + } + + _, err := s.db.Collection("txs").UpdateOne(ctx, + bson.D{{Key: "id", Value: id}}, tx) + if err != nil { + log.Info("UpdateTransactionState: %v obj: %+v", err, tx) + } + return err +}