diff --git a/CHANGELOG.md b/CHANGELOG.md index 76f50716d..49e798055 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ # Changelog All notable changes to this project will be documented in this file. +## [3.66.0](https://github.com/Backbase/stream-services/compare/3.65.0...3.66.0) +### Changed +- Always check cursor and create if it does not exist. +- If `DateRangeEnd` is passed in the composition request set that as lastTxnDate instead of system date. + ## [3.65.0](https://github.com/Backbase/stream-services/compare/3.64.0...3.65.0) ### Changed - Move call to processAudiencesSegmentation after setupUsers diff --git a/stream-compositions/services/transaction-composition-service/src/main/java/com/backbase/stream/compositions/transaction/core/service/impl/TransactionIngestionServiceImpl.java b/stream-compositions/services/transaction-composition-service/src/main/java/com/backbase/stream/compositions/transaction/core/service/impl/TransactionIngestionServiceImpl.java index a6cd1ccc6..2920af6e4 100644 --- a/stream-compositions/services/transaction-composition-service/src/main/java/com/backbase/stream/compositions/transaction/core/service/impl/TransactionIngestionServiceImpl.java +++ b/stream-compositions/services/transaction-composition-service/src/main/java/com/backbase/stream/compositions/transaction/core/service/impl/TransactionIngestionServiceImpl.java @@ -57,7 +57,7 @@ public Mono ingestPull(TransactionIngestPullRequest i .map(f -> filterExisting(f, ingestPullRequest.getLastIngestedExternalIds())) .flatMap(this::sendToDbs) .doOnSuccess(list -> handleSuccess( - ingestPullRequest.getArrangementId(), true, list)) + ingestPullRequest.getArrangementId(), true, list, ingestPullRequest.getDateRangeEnd())) .onErrorResume(e -> handleError( ingestPullRequest.getArrangementId(), true, e)) .map(list -> buildResponse(list, ingestPullRequest)); @@ -88,7 +88,7 @@ private Mono buildIntegrationRequest(TransactionIn transactionIngestPullRequest.setDateRangeEnd(dateRangeEndFromRequest == null ? currentTime : dateRangeEndFromRequest); - if (dateRangeStartFromRequest == null && config.isCursorEnabled()) { + if (config.isCursorEnabled()) { log.info("Transaction Cursor is enabled and Request has no Start Date"); return getCursor(transactionIngestPullRequest) .switchIfEmpty(createCursor(transactionIngestPullRequest)); @@ -107,7 +107,7 @@ public Mono ingestPush(TransactionIngestPushRequest i return Mono.just(Flux.fromIterable(ingestPushRequest.getTransactions())) .flatMap(this::sendToDbs) .doOnSuccess(list -> handleSuccess( - ingestPushRequest.getArrangementId(), false, list)) + ingestPushRequest.getArrangementId(), false, list, null)) .onErrorResume(e -> handleError( ingestPushRequest.getArrangementId(), false, e)) .map(list -> buildResponse(list, ingestPushRequest)); @@ -222,7 +222,7 @@ private TransactionIngestResponse buildResponse(List transactions) { + List transactions, OffsetDateTime dateRangeEnd) { if (config.isCursorEnabled() && pullMode) { String lastTxnIds = null; if (config.isTransactionIdsFilterEnabled()) { @@ -231,15 +231,22 @@ private void handleSuccess(String arrangementId, boolean pullMode, .collect(Collectors.joining(DELIMITER)); } patchCursor(arrangementId, buildPatchCursorRequest( - TransactionCursor.StatusEnum.SUCCESS, - OffsetDateTime.now().format(DateTimeFormatter.ofPattern(dateFormat)), - lastTxnIds)); + TransactionCursor.StatusEnum.SUCCESS, getLastTxnDate(dateRangeEnd), + lastTxnIds)); } transactionPostIngestionService.handleSuccess(transactions); log.debug("Ingested transactions: {}", transactions); } + + private String getLastTxnDate(OffsetDateTime dateRangeEnd) { + String lastTxnDate = OffsetDateTime.now().format(DateTimeFormatter.ofPattern(dateFormat)); + if (dateRangeEnd != null) { + lastTxnDate = dateRangeEnd.format(DateTimeFormatter.ofPattern(dateFormat)); + } + return lastTxnDate; + } private Mono> handleError(String arrangementId, boolean pullMode, Throwable e) { diff --git a/stream-compositions/services/transaction-composition-service/src/test/java/com/backbase/stream/compositions/transaction/core/service/impl/TransactionIngestionServiceImplTest.java b/stream-compositions/services/transaction-composition-service/src/test/java/com/backbase/stream/compositions/transaction/core/service/impl/TransactionIngestionServiceImplTest.java index 21e77be66..35b3bfdd9 100644 --- a/stream-compositions/services/transaction-composition-service/src/test/java/com/backbase/stream/compositions/transaction/core/service/impl/TransactionIngestionServiceImplTest.java +++ b/stream-compositions/services/transaction-composition-service/src/test/java/com/backbase/stream/compositions/transaction/core/service/impl/TransactionIngestionServiceImplTest.java @@ -172,7 +172,8 @@ void ingestionInPullModePatchCursor_Success() { mockConfigForTransaction(); mockTransactionService(); - when(transactionCursorApi.patchByArrangementId(anyString(), any())).thenReturn(Mono.empty()); + TransactionCursorResponse transactionCursorResponse = mockTransactionCursorResponse(); + mockCursorApiForTransactions(transactionCursorResponse, false); TransactionIngestPullRequest transactionIngestPullRequest = mockTransactionIngestPullRequest(); transactionIngestPullRequest.setDateRangeStart(OffsetDateTime.now()); @@ -248,4 +249,28 @@ void ingestionInPushMode_Success() { StepVerifier.create(productIngestResponse) .assertNext(Assertions::assertNotNull).verifyComplete(); } + + @Test + void ingestionInPullModePatchCursor_Success_withDates() { + + mockConfigForTransaction(); + mockTransactionService(); + TransactionCursorResponse transactionCursorResponse = mockTransactionCursorResponse(); + mockCursorApiForTransactions(transactionCursorResponse, false); + + TransactionIngestPullRequest transactionIngestPullRequest = mockTransactionIngestPullRequest(); + transactionIngestPullRequest.setDateRangeStart(OffsetDateTime.now().minusDays(10)); + transactionIngestPullRequest.setDateRangeEnd(OffsetDateTime.now().minusDays(5)); + + when(transactionIntegrationService.pullTransactions(transactionIngestPullRequest)) + .thenReturn(Flux.just(new TransactionsPostRequestBody().withType("type1"). + withArrangementId("1234").withReference("ref") + .withExternalArrangementId("externalArrId"))); + + Mono productIngestResponse = transactionIngestionService + .ingestPull(transactionIngestPullRequest); + StepVerifier.create(productIngestResponse) + .assertNext(Assertions::assertNotNull) + .verifyComplete(); + } }