Skip to content

Commit

Permalink
Merge pull request #84 from cardano-foundation/store-cbor-as-file
Browse files Browse the repository at this point in the history
feat: FRED: store tx batch (Cardano L1 transaction) metadata as json and cbor files plus a few tweaks to the format and schema
  • Loading branch information
matiwinnetou authored Nov 11, 2024
2 parents 2dfaab8 + 77afce4 commit 4a36b5f
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 259 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"definitions": {
"countryCodePattern": {
"type": "string",
"pattern": "^[A-Z]{2}$"
},
"currencyIdPattern": {
"type": "string",
"pattern": "^ISO_4217:[A-Z]{3}$|^ISO_24165:[A-Z0-9]+(:[A-Z0-9]+)?$"
},
"bigDecimalPattern": {
"type": "string",
"pattern": "^[+-]?[0-9]+(\\.[0-9]+)?([eE][+-]?[0-9]+)?$"
},
"accountingPeriodPattern": {
"type": "string",
"pattern": "^[0-9]{4}-[0-9]{2}$"
}
},
"properties": {
"metadata": {
"type": "object",
"properties": {
"creation_slot": {
"type": "string",
"pattern": "^[0-9]+$"
"type": "integer",
"minimum": 0
},
"version": {
"type": "string"
Expand All @@ -19,8 +37,7 @@
"type": "object",
"properties": {
"country_code": {
"type": "string",
"pattern": "^[A-Z]{2}$"
"$ref": "#/definitions/countryCodePattern"
},
"name": {
"type": "string"
Expand All @@ -32,8 +49,7 @@
"type": "string"
},
"currency_id": {
"type": "string",
"pattern": "^ISO_4217:[A-Z]{3}$|^ISO_24165:[A-Z0-9]+(:[A-Z0-9]+)?$"
"$ref": "#/definitions/currencyIdPattern"
}
},
"required": ["country_code", "name", "tax_id_number", "id", "currency_id"]
Expand Down Expand Up @@ -77,8 +93,7 @@
"type": "object",
"properties": {
"amount": {
"type": "string",
"pattern": "^[0-9]+(\\.[0-9]{1,2})?$"
"$ref": "#/definitions/bigDecimalPattern"
},
"event": {
"type": "object",
Expand Down Expand Up @@ -129,8 +144,7 @@
"type": "string"
},
"id": {
"type": "string",
"pattern": "^ISO_4217:[A-Z]{3}$|^ISO_24165:[A-Z0-9]+(:[A-Z0-9]+)?$"
"$ref": "#/definitions/currencyIdPattern"
}
},
"required": ["cust_code", "id"]
Expand All @@ -142,8 +156,7 @@
"type": "string"
},
"rate": {
"type": "string",
"pattern": "^0(\\.\\d{1,3})?$|^1(\\.0{1,3})?$"
"$ref": "#/definitions/bigDecimalPattern"
}
}
},
Expand All @@ -166,21 +179,19 @@
"type": "string"
},
"fx_rate": {
"type": "string",
"pattern": "^[0-9]+(\\.[0-9]+)?$"
"$ref": "#/definitions/bigDecimalPattern"
}
},
"required": ["amount", "event", "document", "id", "fx_rate"]
}
},
"accounting_period": {
"type": "string",
"pattern": "^[0-9]{4}-[0-9]{2}$"
"$ref": "#/definitions/accountingPeriodPattern"
}
},
"required": ["date", "number", "batch_id", "id", "type", "items", "accounting_period"]
}
}
},
"required": ["metadata", "org", "txs"]
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"metadata": {
"creation_slot": "10278",
"creation_slot": 10278,
"version": "1.0"
},
"org": {
Expand Down Expand Up @@ -48,7 +48,7 @@
}
},
"id": "9c6e241e1136e20e3b5033861156a1e453dce463575ebbebaa813e2af73fa95c",
"fx_rate": "0.91277"
"fx_rate": "1E-2"
}
],
"accounting_period": "2023-10"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.bloxbean.cardano.client.api.model.Amount;
import com.bloxbean.cardano.client.backend.api.BackendService;
import com.bloxbean.cardano.client.common.cbor.CborSerializationUtil;
import com.bloxbean.cardano.client.exception.CborSerializationException;
import com.bloxbean.cardano.client.function.helper.SignerProviders;
import com.bloxbean.cardano.client.metadata.Metadata;
import com.bloxbean.cardano.client.metadata.MetadataBuilder;
Expand All @@ -15,7 +16,6 @@
import io.vavr.control.Either;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.cardanofoundation.lob.app.blockchain_common.service_assistance.MetadataChecker;
Expand All @@ -24,6 +24,7 @@
import org.cardanofoundation.lob.app.blockchain_reader.BlockchainReaderPublicApiIF;
import org.zalando.problem.Problem;

import java.io.IOException;
import java.nio.file.Files;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
Expand All @@ -35,6 +36,7 @@
import java.util.stream.Stream;

import static org.apache.commons.collections4.iterators.PeekingIterator.peekingIterator;
import static org.zalando.problem.Status.INTERNAL_SERVER_ERROR;

@Slf4j
@RequiredArgsConstructor
Expand All @@ -43,17 +45,12 @@ public class L1TransactionCreator {
private static final int CARDANO_MAX_TRANSACTION_SIZE_BYTES = 16384;

private final BackendService backendService;

private final MetadataSerialiser metadataSerialiser;

private final BlockchainReaderPublicApiIF blockchainReaderPublicApi;

private final MetadataChecker jsonSchemaMetadataChecker;

private final Account organiserAccount;

private final int metadataLabel;

private final boolean debugStoreOutputTx;

private String runId;
Expand All @@ -71,14 +68,30 @@ public void init() {

public Either<Problem, Optional<BlockchainTransactions>> pullBlockchainTransaction(String organisationId,
Set<TransactionEntity> txs) {
val chainTipE = blockchainReaderPublicApi.getChainTip();
return blockchainReaderPublicApi.getChainTip()
.flatMap(chainTip -> handleTransactionCreation(organisationId, txs, chainTip.getAbsoluteSlot()));
}

return chainTipE.map(chainTip -> createTransaction(organisationId, txs, chainTip.getAbsoluteSlot()));
private Either<Problem, Optional<BlockchainTransactions>> handleTransactionCreation(String organisationId,
Set<TransactionEntity> transactions,
long creationSlot) {
try {
return createTransaction(organisationId, transactions, creationSlot);
} catch (IOException e) {
log.error("Error creating blockchain transaction: ", e);

return Either.left(Problem.builder()
.withTitle("ERROR_CREATING_TRANSACTION")
.withDetail(STR."Exception encountered: \{e.getMessage()}")
.withStatus(INTERNAL_SERVER_ERROR)
.build());
}
}

private Optional<BlockchainTransactions> createTransaction(String organisationId,
// error or transactions to process or no more transactions to process in case of blockchain transaction creation
private Either<Problem, Optional<BlockchainTransactions>> createTransaction(String organisationId,
Set<TransactionEntity> transactions,
long creationSlot) {
long creationSlot) throws IOException {
log.info("Splitting {} passedTransactions into blockchain passedTransactions", transactions.size());

val transactionsBatch = new LinkedHashSet<TransactionEntity>();
Expand All @@ -88,67 +101,95 @@ private Optional<BlockchainTransactions> createTransaction(String organisationId

transactionsBatch.add(txEntity);

val txBytesE = serialiseTransactionChunk(organisationId, transactionsBatch, creationSlot);
if (txBytesE.isLeft()) {
log.error("Error serialising transaction, abort processing, issue: {}", txBytesE.getLeft().getDetail());
return Optional.empty();
val serializedTransactionsE = serialiseTransactionChunk(organisationId, transactionsBatch, creationSlot);
if (serializedTransactionsE.isLeft()) {
log.error("Error serialising transaction, abort processing, issue: {}", serializedTransactionsE.getLeft().getDetail());

return Either.left(serializedTransactionsE.getLeft());
}

val txBytes = txBytesE.get();
val serializedTransaction = serializedTransactionsE.get();
val txBytes = serializedTransaction.txBytes;

val transactionLinePeek = it.peek();
if (transactionLinePeek == null) { // next one is last element
continue;
}
val newChunkTxBytesE = serialiseTransactionChunk(organisationId, Stream.concat(transactionsBatch.stream(), Stream.of(transactionLinePeek)).collect(Collectors.toSet()), creationSlot);
val newChunkTxBytesE = serialiseTransactionChunk(organisationId, Stream.concat(transactionsBatch.stream(), Stream.of(transactionLinePeek))
.collect(Collectors.toSet()), creationSlot);

if (newChunkTxBytesE.isLeft()) {
log.error("Error serialising transaction, abort processing, issue: {}", newChunkTxBytesE.getLeft().getDetail());
return Optional.empty();

return Either.left(newChunkTxBytesE.getLeft());
}
val newChunkTxBytes = newChunkTxBytesE.get();
val newSerializedTransaction = newChunkTxBytesE.get();
val newChunkTxBytes = newSerializedTransaction.txBytes;

if (newChunkTxBytes.length >= CARDANO_MAX_TRANSACTION_SIZE_BYTES) {
log.info("Blockchain transaction created, id:{}", TransactionUtil.getTxHash(txBytes));

final var remaining = calculateRemainingTransactionLines(transactions, transactionsBatch);
potentiallyStoreTxs(creationSlot, serializedTransaction);

return Optional.of(new BlockchainTransactions(organisationId, transactionsBatch, remaining, creationSlot, txBytes));
val remainingTxs = calculateRemainingTransactions(transactions, transactionsBatch);

return Either.right(Optional.of(new BlockchainTransactions(organisationId, transactionsBatch, remainingTxs, creationSlot, txBytes)));
}
}

// if there are any left overs
// if there are any left overs, meaning that the batch is not full, e.g. just a couple of transactions to serialise
if (!transactionsBatch.isEmpty()) {
log.info("Last batch size: {}", transactionsBatch.size());
log.info("Leftovers batch size: {}", transactionsBatch.size());

val serializedTxE = serialiseTransactionChunk(organisationId, transactionsBatch, creationSlot);

val txBytesE = serialiseTransactionChunk(organisationId, transactionsBatch, creationSlot);
if (serializedTxE.isEmpty()) {
log.error("Error serialising transaction, abort processing, issue: {}", serializedTxE.getLeft().getDetail());

if (txBytesE.isEmpty()) {
log.error("Error serialising transaction, abort processing, issue: {}", txBytesE.getLeft().getDetail());
return Optional.empty();
return Either.left(serializedTxE.getLeft());
}

val txBytes = txBytesE.get();
val serTx = serializedTxE.get();
potentiallyStoreTxs(creationSlot, serTx);
val txBytes = serTx.txBytes;

log.info("Transaction size: {}", txBytes.length);

final var remaining = calculateRemainingTransactionLines(transactions, transactionsBatch);
val remaining = calculateRemainingTransactions(transactions, transactionsBatch);

return Optional.of(new BlockchainTransactions(organisationId, transactionsBatch, remaining, creationSlot, txBytes));
return Either.right(Optional.of(new BlockchainTransactions(organisationId, transactionsBatch, remaining, creationSlot, txBytes)));
}

return Optional.empty();
// no transactions to process
return Either.right(Optional.empty());
}

// for debug and inspection only
private void potentiallyStoreTxs(long creationSlot, SerializedCardanoL1Transaction tx) throws IOException {
if (debugStoreOutputTx) {
val timestamp = DateTimeFormatter.ISO_INSTANT.format(Instant.now());
val name = STR."lob-txs-metadata-\{runId}-\{timestamp}-\{creationSlot}";
val tmpJsonTxFile = Files.createTempFile(name, ".json");
val tmpCborFile = Files.createTempFile(name, ".cbor");

log.info("DebugStoreTx enabled, storing JSON tx metadata to file: {}", tmpJsonTxFile);
Files.writeString(tmpJsonTxFile, tx.metadataJson);

log.info("DebugStoreTx enabled, storing CBOR tx metadata to file: {}", tmpCborFile);
Files.write(tmpCborFile, tx.metadataCbor);
}
}

private static Set<TransactionEntity> calculateRemainingTransactionLines(
private static Set<TransactionEntity> calculateRemainingTransactions(
Set<TransactionEntity> transactions,
Set<TransactionEntity> transactionsBatch) {

return Sets.difference(transactions, transactionsBatch);
}

private Either<Problem, byte[]> serialiseTransactionChunk(String organisationId,
Set<TransactionEntity> transactionsBatch,
long creationSlot) {
private Either<Problem, SerializedCardanoL1Transaction> serialiseTransactionChunk(String organisationId,
Set<TransactionEntity> transactionsBatch,
long creationSlot) {
try {
val metadataMap =
metadataSerialiser.serialiseToMetadataMap(organisationId, transactionsBatch, creationSlot);
Expand All @@ -162,41 +203,34 @@ private Either<Problem, byte[]> serialiseTransactionChunk(String organisationId,
val metadata = MetadataBuilder.createMetadata();
metadata.put(metadataLabel, metadataMap);

if (debugStoreOutputTx) {
val timestamp = DateTimeFormatter.ISO_INSTANT.format(Instant.now());
val name = STR."lob-tx-metadata-\{runId}-\{timestamp}-\{creationSlot}";
val tmpTxFile = Files.createTempFile(name, ".json");

log.info("DebugStoreTx enabled, storing tx metadata to file: {}", tmpTxFile);

Files.writeString(tmpTxFile, json);
}

val isValid = jsonSchemaMetadataChecker.checkTransactionMetadata(json);

if (!isValid) {
return Either.left(Problem.builder()
.withTitle("INVALID_TRANSACTION_METADATA")
.withDetail("Metadata is not valid according to the transaction schema, we will not create a transaction!")
.withStatus(INTERNAL_SERVER_ERROR)
.build()
);
}

log.info("Metadata for tx validated, gonna serialise tx now...");

return Either.right(serialiseTransaction(metadata));
val serialisedTx = serialiseTransaction(metadata);

return Either.right(new SerializedCardanoL1Transaction(serialisedTx, bytes, json));
} catch (Exception e) {
log.error("Error serialising metadata to cbor", e);
return Either.left(Problem.builder()
.withTitle("ERROR_SERIALISING_METADATA")
.withDetail("Error serialising metadata to cbor")
.withStatus(INTERNAL_SERVER_ERROR)
.build()
);
}
}

@SneakyThrows
protected byte[] serialiseTransaction(Metadata metadata) {
protected byte[] serialiseTransaction(Metadata metadata) throws CborSerializationException {
val quickTxBuilder = new QuickTxBuilder(backendService);

val tx = new Tx()
Expand All @@ -210,4 +244,8 @@ protected byte[] serialiseTransaction(Metadata metadata) {
.serialize();
}

public record SerializedCardanoL1Transaction(byte[] txBytes,
byte[] metadataCbor,
String metadataJson) { }

}
Loading

0 comments on commit 4a36b5f

Please sign in to comment.