From 3b719c12014e14984da6ef871ce8f6fac9cb59fd Mon Sep 17 00:00:00 2001 From: Jose Date: Mon, 21 Oct 2024 13:52:39 +0200 Subject: [PATCH] SWATCH-3036: Use separated transactions when processing retry remittance Changes: - Add an index for billable_usage_remittance.retry_after column - Query the remittances to be retried in batches and single transactions - Perform update operations in single transactions (after sending the remittance to kafka) --- ...11200-retry-after-billable-usage-index.xml | 13 ++++ src/main/resources/liquibase/changelog.xml | 1 + .../api/InternalBillableUsageController.java | 60 +++++++++++++++---- .../ApplicationConfiguration.java | 5 ++ .../BillableUsageRemittanceRepository.java | 5 +- .../src/main/resources/application.properties | 1 + .../InternalBillableUsageControllerTest.java | 27 +++++++-- .../InternalBillableUsageResourceTest.java | 1 + 8 files changed, 94 insertions(+), 19 deletions(-) create mode 100644 src/main/resources/liquibase/202410211200-retry-after-billable-usage-index.xml diff --git a/src/main/resources/liquibase/202410211200-retry-after-billable-usage-index.xml b/src/main/resources/liquibase/202410211200-retry-after-billable-usage-index.xml new file mode 100644 index 0000000000..b1f0e52f24 --- /dev/null +++ b/src/main/resources/liquibase/202410211200-retry-after-billable-usage-index.xml @@ -0,0 +1,13 @@ + + + + + + + + + diff --git a/src/main/resources/liquibase/changelog.xml b/src/main/resources/liquibase/changelog.xml index ac0e408f91..e9c3d24d99 100644 --- a/src/main/resources/liquibase/changelog.xml +++ b/src/main/resources/liquibase/changelog.xml @@ -173,5 +173,6 @@ + diff --git a/swatch-billable-usage/src/main/java/com/redhat/swatch/billable/usage/admin/api/InternalBillableUsageController.java b/swatch-billable-usage/src/main/java/com/redhat/swatch/billable/usage/admin/api/InternalBillableUsageController.java index 2701fb2646..4269d077a2 100644 --- a/swatch-billable-usage/src/main/java/com/redhat/swatch/billable/usage/admin/api/InternalBillableUsageController.java +++ b/swatch-billable-usage/src/main/java/com/redhat/swatch/billable/usage/admin/api/InternalBillableUsageController.java @@ -22,6 +22,7 @@ import static java.util.Optional.ofNullable; +import com.redhat.swatch.billable.usage.configuration.ApplicationConfiguration; import com.redhat.swatch.billable.usage.data.BillableUsageRemittanceEntity; import com.redhat.swatch.billable.usage.data.BillableUsageRemittanceFilter; import com.redhat.swatch.billable.usage.data.BillableUsageRemittanceRepository; @@ -51,6 +52,7 @@ public class InternalBillableUsageController { private final BillableUsageRemittanceRepository remittanceRepository; private final BillingProducer billingProducer; private final RemittanceMapper remittanceMapper; + private final ApplicationConfiguration applicationConfiguration; public List getRemittances(BillableUsageRemittanceFilter filter) { if (filter.getOrgId() == null) { @@ -90,23 +92,55 @@ public void deleteDataForOrg(String orgId) { remittanceRepository.deleteByOrgId(orgId); } - @Transactional public long processRetries(OffsetDateTime asOf) { - List remittances = - remittanceRepository.findByRetryAfterLessThan(asOf); - for (BillableUsageRemittanceEntity remittance : remittances) { - log.info("Processing retry of remittance {}", remittance); + int pageIndex = 0; + int total = 0; + + List remittances = findByRetryAfterLessThan(asOf, pageIndex); + while (!remittances.isEmpty()) { + remittances.forEach(this::processRetryForUsage); + total += remittances.size(); + pageIndex++; + remittances = findByRetryAfterLessThan(asOf, pageIndex); + } + + return total; + } + + @Transactional + List findByRetryAfterLessThan(OffsetDateTime asOf, int pageIndex) { + return remittanceRepository.findByRetryAfterLessThan( + asOf, pageIndex, applicationConfiguration.getRetryRemittancesBatchSize()); + } + + @Transactional + void updateRemittance( + BillableUsageRemittanceEntity remittance, + RemittanceStatus status, + OffsetDateTime retryAfter, + RemittanceErrorCode errorCode) { + remittance.setRetryAfter(retryAfter); + remittance.setErrorCode(errorCode); + remittance.setStatus(status); + // using merge because remittance is a detached entity + remittanceRepository.merge(remittance); + } + + private void processRetryForUsage(BillableUsageRemittanceEntity remittance) { + log.info("Processing retry of remittance {}", remittance); + RemittanceStatus previousStatus = remittance.getStatus(); + RemittanceErrorCode previousErrorCode = remittance.getErrorCode(); + OffsetDateTime previousRetryAfter = remittance.getRetryAfter(); + // reset the retry after column + updateRemittance(remittance, RemittanceStatus.PENDING, null, null); + try { // re-trigger billable usage billingProducer.produce(toBillableUsage(remittance)); - // reset the retry after column - remittance.setRetryAfter(null); - remittance.setErrorCode(null); - remittance.setStatus(RemittanceStatus.PENDING); + } catch (Exception ex) { + log.warn("Remittance {} failed to be sent over kafka. Restoring.", remittance); + // restoring previous state of the remittance because it fails to be sent + updateRemittance(remittance, previousStatus, previousRetryAfter, previousErrorCode); } - - // to save the retry after column for all the entities - remittanceRepository.persist(remittances); - return remittances.size(); } private BillableUsage toBillableUsage(BillableUsageRemittanceEntity remittance) { diff --git a/swatch-billable-usage/src/main/java/com/redhat/swatch/billable/usage/configuration/ApplicationConfiguration.java b/swatch-billable-usage/src/main/java/com/redhat/swatch/billable/usage/configuration/ApplicationConfiguration.java index e89929669b..5dd2161ed3 100644 --- a/swatch-billable-usage/src/main/java/com/redhat/swatch/billable/usage/configuration/ApplicationConfiguration.java +++ b/swatch-billable-usage/src/main/java/com/redhat/swatch/billable/usage/configuration/ApplicationConfiguration.java @@ -30,4 +30,9 @@ public class ApplicationConfiguration { @ConfigProperty(name = "rhsm-subscriptions.remittance-retention-policy.duration") Duration remittanceRetentionPolicyDuration; + + @ConfigProperty( + name = "rhsm-subscriptions.billable-usage.retry-remittances-batch-size", + defaultValue = "1024") + int retryRemittancesBatchSize; } diff --git a/swatch-billable-usage/src/main/java/com/redhat/swatch/billable/usage/data/BillableUsageRemittanceRepository.java b/swatch-billable-usage/src/main/java/com/redhat/swatch/billable/usage/data/BillableUsageRemittanceRepository.java index 2b38530a51..31ef59e739 100644 --- a/swatch-billable-usage/src/main/java/com/redhat/swatch/billable/usage/data/BillableUsageRemittanceRepository.java +++ b/swatch-billable-usage/src/main/java/com/redhat/swatch/billable/usage/data/BillableUsageRemittanceRepository.java @@ -86,8 +86,9 @@ public List getRemittanceSummaries( return entityManager.createQuery(query).getResultList(); } - public List findByRetryAfterLessThan(OffsetDateTime asOf) { - return find("retryAfter < ?1", asOf).list(); + public List findByRetryAfterLessThan( + OffsetDateTime asOf, int pageIndex, int pageSize) { + return find("retryAfter < ?1", asOf).page(pageIndex, pageSize).list(); } public void deleteAllByOrgIdAndRemittancePendingDateBefore( diff --git a/swatch-billable-usage/src/main/resources/application.properties b/swatch-billable-usage/src/main/resources/application.properties index ebc353e816..b704aade35 100644 --- a/swatch-billable-usage/src/main/resources/application.properties +++ b/swatch-billable-usage/src/main/resources/application.properties @@ -185,3 +185,4 @@ quarkus.rest-client."com.redhat.swatch.clients.contracts.api.resources.DefaultAp # remittance retention policy configuration: # 70 days worth rhsm-subscriptions.remittance-retention-policy.duration=${REMITTANCE_RETENTION_DURATION:70d} +rhsm-subscriptions.billable-usage.retry-remittances-batch-size=${RETRY_REMITTANCES_BATCH_SIZE:1024} diff --git a/swatch-billable-usage/src/test/java/com/redhat/swatch/billable/usage/admin/api/InternalBillableUsageControllerTest.java b/swatch-billable-usage/src/test/java/com/redhat/swatch/billable/usage/admin/api/InternalBillableUsageControllerTest.java index 2c86a68d58..630af9719b 100644 --- a/swatch-billable-usage/src/test/java/com/redhat/swatch/billable/usage/admin/api/InternalBillableUsageControllerTest.java +++ b/swatch-billable-usage/src/test/java/com/redhat/swatch/billable/usage/admin/api/InternalBillableUsageControllerTest.java @@ -22,8 +22,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.verify; import com.redhat.swatch.billable.usage.data.BillableUsageRemittanceEntity; @@ -286,10 +290,25 @@ void testProcessRetries() { // verify remittance has been sent verify(billingProducer).produce(argThat(b -> b.getOrgId().equals(orgId))); // verify retry after is reset - assertTrue( - remittanceRepo.findAll().stream() - .filter(b -> b.getOrgId().equals(orgId)) - .allMatch(b -> b.getRetryAfter() == null)); + var found = + remittanceRepo.listAll().stream().filter(b -> b.getOrgId().equals(orgId)).findFirst(); + assertTrue(found.isPresent()); + assertNull(found.get().getRetryAfter()); + } + + @Test + void testProcessRetriesShouldRestoreStateIfSendFails() { + String orgId = "testProcessRetriesOrg123"; + givenRemittanceWithOldRetryAfter(orgId); + doThrow(RuntimeException.class).when(billingProducer).produce(any(BillableUsage.class)); + + controller.processRetries(OffsetDateTime.now()); + + // verify retry after is set + var found = + remittanceRepo.listAll().stream().filter(b -> b.getOrgId().equals(orgId)).findFirst(); + assertTrue(found.isPresent()); + assertNotNull(found.get().getRetryAfter()); } @Transactional diff --git a/swatch-billable-usage/src/test/java/com/redhat/swatch/billable/usage/admin/api/InternalBillableUsageResourceTest.java b/swatch-billable-usage/src/test/java/com/redhat/swatch/billable/usage/admin/api/InternalBillableUsageResourceTest.java index 7933f036f5..ebbcc35007 100644 --- a/swatch-billable-usage/src/test/java/com/redhat/swatch/billable/usage/admin/api/InternalBillableUsageResourceTest.java +++ b/swatch-billable-usage/src/test/java/com/redhat/swatch/billable/usage/admin/api/InternalBillableUsageResourceTest.java @@ -77,6 +77,7 @@ void setUp() { enabledOrgsSink.clear(); billableUsageSink = connector.sink(BILLABLE_USAGE_OUT); billableUsageSink.clear(); + when(configuration.getRetryRemittancesBatchSize()).thenReturn(10); } @Test