Skip to content

Commit

Permalink
[test] Improved some assertions in TestPushJobWithNativeReplication (#…
Browse files Browse the repository at this point in the history
…679)

To help with debugging flakiness. Also added a waitForNonDeterministicAssertion
in testNativeReplicationForBatchPush, which might help.
  • Loading branch information
FelixGV authored Oct 4, 2023
1 parent 97df1e7 commit 60220e3
Showing 1 changed file with 23 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@
import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendStreamingRecord;
import static com.linkedin.venice.utils.TestUtils.assertCommand;
import static com.linkedin.venice.utils.TestUtils.waitForNonDeterministicAssertion;
import static com.linkedin.venice.utils.TestWriteUtils.STRING_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
Expand All @@ -61,6 +63,7 @@
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
Expand Down Expand Up @@ -255,9 +258,15 @@ public void testNativeReplicationForBatchPush(int recordCount, int partitionCoun
}

String pushJobDetailsStoreName = VeniceSystemStoreUtils.getPushJobDetailsStoreName();
Assert.assertEquals(
parentControllerClient.getAllValueSchema(pushJobDetailsStoreName).getSchemas().length,
AvroProtocolDefinition.PUSH_JOB_DETAILS.currentProtocolVersion.get().intValue());
waitForNonDeterministicAssertion(5, TimeUnit.SECONDS, () -> {
MultiSchemaResponse multiSchemaResponse =
assertCommand(parentControllerClient.getAllValueSchema(pushJobDetailsStoreName));
assertNotNull(multiSchemaResponse.getSchemas(), pushJobDetailsStoreName + " schemas are null!");
Assert.assertEquals(
multiSchemaResponse.getSchemas().length,
AvroProtocolDefinition.PUSH_JOB_DETAILS.currentProtocolVersion.get().intValue(),
"Number of schemas for " + pushJobDetailsStoreName + " is not as expected!");
});

// Verify push job details are populated
try (AvroGenericStoreClient<PushJobStatusRecordKey, Object> client =
Expand Down Expand Up @@ -1189,32 +1198,22 @@ private void makeSureSystemStoreIsPushed(String clusterName, String storeName) {
for (int i = 0; i < childDatacenters.size(); i++) {
VeniceMultiClusterWrapper childDataCenter = childDatacenters.get(i);
final int iCopy = i;
childDataCenter.getClusters().get(clusterName).useControllerClient(controllerClient -> {
String systemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName);
StoreResponse storeResponse = controllerClient.getStore(systemStoreName);
Assert.assertFalse(storeResponse.isError());
Assert.assertTrue(
storeResponse.getStore().getCurrentVersion() > 0,
systemStoreName + " is not ready for DC-" + iCopy);

systemStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(storeName);
StoreResponse storeResponse2 = controllerClient.getStore(systemStoreName);
Assert.assertFalse(storeResponse2.isError());
Assert.assertTrue(
storeResponse2.getStore().getCurrentVersion() > 0,
systemStoreName + " is not ready for DC-" + iCopy);

systemStoreName = VeniceSystemStoreType.BATCH_JOB_HEARTBEAT_STORE.getPrefix();
StoreResponse storeResponse3 = controllerClient.getStore(systemStoreName);
Assert.assertFalse(storeResponse3.isError());
Assert.assertTrue(
storeResponse3.getStore().getCurrentVersion() > 0,
systemStoreName + " is not ready for DC-" + iCopy);
childDataCenter.getClusters().get(clusterName).useControllerClient(cc -> {
assertStoreHealth(cc, VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName), iCopy);
assertStoreHealth(cc, VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(storeName), iCopy);
assertStoreHealth(cc, VeniceSystemStoreType.BATCH_JOB_HEARTBEAT_STORE.getPrefix(), iCopy);
});
}
});
}

private void assertStoreHealth(ControllerClient controllerClient, String systemStoreName, int dcNumber) {
StoreResponse storeResponse = assertCommand(controllerClient.getStore(systemStoreName));
Assert.assertTrue(
storeResponse.getStore().getCurrentVersion() > 0,
systemStoreName + " is not ready for DC-" + dcNumber);
}

private VeniceWriter<String, String, byte[]> startIncrementalPush(
ControllerClient controllerClient,
String storeName,
Expand Down

0 comments on commit 60220e3

Please sign in to comment.