Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always check cursor and create if it does not exist #383

Merged
merged 1 commit into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Changelog
All notable changes to this project will be documented in this file.

## [3.66.0](https://github.com/Backbase/stream-services/compare/3.65.0...3.66.0)
### Changed
- Always check cursor and create if it does not exist.
- If `DateRangeEnd` is passed in the composition request set that as lastTxnDate instead of system date.

## [3.65.0](https://github.com/Backbase/stream-services/compare/3.64.0...3.65.0)
### Changed
- Move call to processAudiencesSegmentation after setupUsers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Mono<TransactionIngestResponse> ingestPull(TransactionIngestPullRequest i
.map(f -> filterExisting(f, ingestPullRequest.getLastIngestedExternalIds()))
.flatMap(this::sendToDbs)
.doOnSuccess(list -> handleSuccess(
ingestPullRequest.getArrangementId(), true, list))
ingestPullRequest.getArrangementId(), true, list, ingestPullRequest.getDateRangeEnd()))
.onErrorResume(e -> handleError(
ingestPullRequest.getArrangementId(), true, e))
.map(list -> buildResponse(list, ingestPullRequest));
Expand Down Expand Up @@ -88,7 +88,7 @@ private Mono<TransactionIngestPullRequest> buildIntegrationRequest(TransactionIn
transactionIngestPullRequest.setDateRangeEnd(dateRangeEndFromRequest == null
? currentTime : dateRangeEndFromRequest);

if (dateRangeStartFromRequest == null && config.isCursorEnabled()) {
if (config.isCursorEnabled()) {
log.info("Transaction Cursor is enabled and Request has no Start Date");
return getCursor(transactionIngestPullRequest)
.switchIfEmpty(createCursor(transactionIngestPullRequest));
Expand All @@ -107,7 +107,7 @@ public Mono<TransactionIngestResponse> ingestPush(TransactionIngestPushRequest i
return Mono.just(Flux.fromIterable(ingestPushRequest.getTransactions()))
.flatMap(this::sendToDbs)
.doOnSuccess(list -> handleSuccess(
ingestPushRequest.getArrangementId(), false, list))
ingestPushRequest.getArrangementId(), false, list, null))
.onErrorResume(e -> handleError(
ingestPushRequest.getArrangementId(), false, e))
.map(list -> buildResponse(list, ingestPushRequest));
Expand Down Expand Up @@ -222,7 +222,7 @@ private TransactionIngestResponse buildResponse(List<TransactionsPostResponseBod
}

private void handleSuccess(String arrangementId, boolean pullMode,
List<TransactionsPostResponseBody> transactions) {
List<TransactionsPostResponseBody> transactions, OffsetDateTime dateRangeEnd) {
if (config.isCursorEnabled() && pullMode) {
String lastTxnIds = null;
if (config.isTransactionIdsFilterEnabled()) {
Expand All @@ -231,15 +231,22 @@ private void handleSuccess(String arrangementId, boolean pullMode,
.collect(Collectors.joining(DELIMITER));
}
patchCursor(arrangementId, buildPatchCursorRequest(
TransactionCursor.StatusEnum.SUCCESS,
OffsetDateTime.now().format(DateTimeFormatter.ofPattern(dateFormat)),
lastTxnIds));
TransactionCursor.StatusEnum.SUCCESS, getLastTxnDate(dateRangeEnd),
lastTxnIds));
}

transactionPostIngestionService.handleSuccess(transactions);

log.debug("Ingested transactions: {}", transactions);
}

private String getLastTxnDate(OffsetDateTime dateRangeEnd) {
String lastTxnDate = OffsetDateTime.now().format(DateTimeFormatter.ofPattern(dateFormat));
if (dateRangeEnd != null) {
lastTxnDate = dateRangeEnd.format(DateTimeFormatter.ofPattern(dateFormat));
}
return lastTxnDate;
}

private Mono<List<TransactionsPostResponseBody>> handleError(String arrangementId,
boolean pullMode, Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ void ingestionInPullModePatchCursor_Success() {

mockConfigForTransaction();
mockTransactionService();
when(transactionCursorApi.patchByArrangementId(anyString(), any())).thenReturn(Mono.empty());
TransactionCursorResponse transactionCursorResponse = mockTransactionCursorResponse();
mockCursorApiForTransactions(transactionCursorResponse, false);

TransactionIngestPullRequest transactionIngestPullRequest = mockTransactionIngestPullRequest();
transactionIngestPullRequest.setDateRangeStart(OffsetDateTime.now());
Expand Down Expand Up @@ -248,4 +249,28 @@ void ingestionInPushMode_Success() {
StepVerifier.create(productIngestResponse)
.assertNext(Assertions::assertNotNull).verifyComplete();
}

@Test
void ingestionInPullModePatchCursor_Success_withDates() {

mockConfigForTransaction();
mockTransactionService();
TransactionCursorResponse transactionCursorResponse = mockTransactionCursorResponse();
mockCursorApiForTransactions(transactionCursorResponse, false);

TransactionIngestPullRequest transactionIngestPullRequest = mockTransactionIngestPullRequest();
transactionIngestPullRequest.setDateRangeStart(OffsetDateTime.now().minusDays(10));
transactionIngestPullRequest.setDateRangeEnd(OffsetDateTime.now().minusDays(5));

when(transactionIntegrationService.pullTransactions(transactionIngestPullRequest))
.thenReturn(Flux.just(new TransactionsPostRequestBody().withType("type1").
withArrangementId("1234").withReference("ref")
.withExternalArrangementId("externalArrId")));

Mono<TransactionIngestResponse> productIngestResponse = transactionIngestionService
.ingestPull(transactionIngestPullRequest);
StepVerifier.create(productIngestResponse)
.assertNext(Assertions::assertNotNull)
.verifyComplete();
}
}