Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SWATCH-3036: Use separated transactions when processing retry remittance #3877

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.20.xsd">
<changeSet id="202410211200-01" author="jcarvaja">
<createIndex tableName="billable_usage_remittance" indexName="billable_usage_retry_after_idx">
<column name="retry_after"/>
</createIndex>
<!-- rollback automatically generated -->
</changeSet>
</databaseChangeLog>
1 change: 1 addition & 0 deletions src/main/resources/liquibase/changelog.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,6 @@
<include file="/liquibase/202407231110-subscription-capacity-index.xml"/>
<include file="/liquibase/202409251400-update-subscription-capacity-view-org-id.xml"/>
<include file="/liquibase/202410021300-update_usages_to_unknown.xml"/>
<include file="/liquibase/202410211200-retry-after-billable-usage-index.xml"/>
</databaseChangeLog>
<!-- vim: set expandtab sts=4 sw=4 ai: -->
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import static java.util.Optional.ofNullable;

import com.redhat.swatch.billable.usage.configuration.ApplicationConfiguration;
import com.redhat.swatch.billable.usage.data.BillableUsageRemittanceEntity;
import com.redhat.swatch.billable.usage.data.BillableUsageRemittanceFilter;
import com.redhat.swatch.billable.usage.data.BillableUsageRemittanceRepository;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class InternalBillableUsageController {
private final BillableUsageRemittanceRepository remittanceRepository;
private final BillingProducer billingProducer;
private final RemittanceMapper remittanceMapper;
private final ApplicationConfiguration applicationConfiguration;

public List<MonthlyRemittance> getRemittances(BillableUsageRemittanceFilter filter) {
if (filter.getOrgId() == null) {
Expand Down Expand Up @@ -90,23 +92,55 @@ public void deleteDataForOrg(String orgId) {
remittanceRepository.deleteByOrgId(orgId);
}

@Transactional
public long processRetries(OffsetDateTime asOf) {
List<BillableUsageRemittanceEntity> remittances =
remittanceRepository.findByRetryAfterLessThan(asOf);
for (BillableUsageRemittanceEntity remittance : remittances) {
log.info("Processing retry of remittance {}", remittance);
int pageIndex = 0;
int total = 0;

List<BillableUsageRemittanceEntity> remittances = findByRetryAfterLessThan(asOf, pageIndex);
while (!remittances.isEmpty()) {
remittances.forEach(this::processRetryForUsage);
total += remittances.size();
pageIndex++;
remittances = findByRetryAfterLessThan(asOf, pageIndex);
}

return total;
}

@Transactional
List<BillableUsageRemittanceEntity> findByRetryAfterLessThan(OffsetDateTime asOf, int pageIndex) {
return remittanceRepository.findByRetryAfterLessThan(
asOf, pageIndex, applicationConfiguration.getRetryRemittancesBatchSize());
}

@Transactional
void updateRemittance(
BillableUsageRemittanceEntity remittance,
RemittanceStatus status,
OffsetDateTime retryAfter,
RemittanceErrorCode errorCode) {
remittance.setRetryAfter(retryAfter);
remittance.setErrorCode(errorCode);
remittance.setStatus(status);
// using merge because remittance is a detached entity
remittanceRepository.merge(remittance);
}

private void processRetryForUsage(BillableUsageRemittanceEntity remittance) {
log.info("Processing retry of remittance {}", remittance);
RemittanceStatus previousStatus = remittance.getStatus();
RemittanceErrorCode previousErrorCode = remittance.getErrorCode();
OffsetDateTime previousRetryAfter = remittance.getRetryAfter();
// reset the retry after column
updateRemittance(remittance, RemittanceStatus.PENDING, null, null);
try {
// re-trigger billable usage
billingProducer.produce(toBillableUsage(remittance));
// reset the retry after column
remittance.setRetryAfter(null);
remittance.setErrorCode(null);
remittance.setStatus(RemittanceStatus.PENDING);
} catch (Exception ex) {
log.warn("Remittance {} failed to be sent over kafka. Restoring.", remittance);
// restoring previous state of the remittance because it fails to be sent
updateRemittance(remittance, previousStatus, previousRetryAfter, previousErrorCode);
}

// to save the retry after column for all the entities
remittanceRepository.persist(remittances);
return remittances.size();
}

private BillableUsage toBillableUsage(BillableUsageRemittanceEntity remittance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@
public class ApplicationConfiguration {
@ConfigProperty(name = "rhsm-subscriptions.remittance-retention-policy.duration")
Duration remittanceRetentionPolicyDuration;

@ConfigProperty(
name = "rhsm-subscriptions.billable-usage.retry-remittances-batch-size",
defaultValue = "1024")
int retryRemittancesBatchSize;
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ public List<RemittanceSummaryProjection> getRemittanceSummaries(
return entityManager.createQuery(query).getResultList();
}

public List<BillableUsageRemittanceEntity> findByRetryAfterLessThan(OffsetDateTime asOf) {
return find("retryAfter < ?1", asOf).list();
public List<BillableUsageRemittanceEntity> findByRetryAfterLessThan(
OffsetDateTime asOf, int pageIndex, int pageSize) {
return find("retryAfter < ?1", asOf).page(pageIndex, pageSize).list();
}

public void deleteAllByOrgIdAndRemittancePendingDateBefore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,4 @@ quarkus.rest-client."com.redhat.swatch.clients.contracts.api.resources.DefaultAp
# remittance retention policy configuration:
# 70 days worth
rhsm-subscriptions.remittance-retention-policy.duration=${REMITTANCE_RETENTION_DURATION:70d}
rhsm-subscriptions.billable-usage.retry-remittances-batch-size=${RETRY_REMITTANCES_BATCH_SIZE:1024}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;

import com.redhat.swatch.billable.usage.data.BillableUsageRemittanceEntity;
Expand Down Expand Up @@ -286,10 +290,25 @@ void testProcessRetries() {
// verify remittance has been sent
verify(billingProducer).produce(argThat(b -> b.getOrgId().equals(orgId)));
// verify retry after is reset
assertTrue(
remittanceRepo.findAll().stream()
.filter(b -> b.getOrgId().equals(orgId))
.allMatch(b -> b.getRetryAfter() == null));
var found =
remittanceRepo.listAll().stream().filter(b -> b.getOrgId().equals(orgId)).findFirst();
assertTrue(found.isPresent());
assertNull(found.get().getRetryAfter());
}

@Test
void testProcessRetriesShouldRestoreStateIfSendFails() {
String orgId = "testProcessRetriesOrg123";
givenRemittanceWithOldRetryAfter(orgId);
doThrow(RuntimeException.class).when(billingProducer).produce(any(BillableUsage.class));

controller.processRetries(OffsetDateTime.now());

// verify retry after is set
var found =
remittanceRepo.listAll().stream().filter(b -> b.getOrgId().equals(orgId)).findFirst();
assertTrue(found.isPresent());
assertNotNull(found.get().getRetryAfter());
}

@Transactional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ void setUp() {
enabledOrgsSink.clear();
billableUsageSink = connector.sink(BILLABLE_USAGE_OUT);
billableUsageSink.clear();
when(configuration.getRetryRemittancesBatchSize()).thenReturn(10);
}

@Test
Expand Down
Loading