Skip to content

Commit

Permalink
[vpj][controller] Emit push job status metrics from controller (#1185)
Browse files Browse the repository at this point in the history
VPJ communicates with the controller to write PushJobDetails to the PUSH_JOB_DETAILS_STORE_NAME system store. This PR introduces new metrics emitted by the controller for push job success/failure.

New Metrics Added (Count and CountSinceLastMeasurement added in Tehuti, hence using tehuti:0.12.2):
batch_push_job_success, batch_push_job_failed_user_error, batch_push_job_failed_non_user_error
incremental_push_job_success, incremental_push_job_failed_user_error, incremental_push_job_failed_non_user_error

Current flow is VPJ checks 'push.job.status.upload.enable' config and sends PushJobDetails to '/send_push_job_details' path in Venice-controller, which writes it to the push job details system store.  Derive success/failure in the controller and emit metrics, tying metric emission and push job details upload via the same config. This config is enabled everywhere, and this approach doesn’t require deployment ordering (controllers -> VPJ) unlike other options and no schema evolution needed.

Config introduced:
Added parent controller config 'push.job.failure.checkpoints.to.define.user.error' to provide a custom list of these checkpoints based on the use case to emit the metrics accordingly. DEFAULT_PUSH_JOB_USER_ERROR_CHECKPOINTS will be used by default.
  • Loading branch information
m-nagarajan authored Sep 24, 2024
1 parent 5619993 commit 328d72a
Show file tree
Hide file tree
Showing 25 changed files with 1,247 additions and 384 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ ext.libraries = [
snappy: 'org.iq80.snappy:snappy:0.4',
spark: 'com.sparkjava:spark-core:2.9.4', // Spark-Java Rest framework
spotbugs: 'com.github.spotbugs:spotbugs:4.5.2',
tehuti: 'io.tehuti:tehuti:0.11.4',
tehuti: 'io.tehuti:tehuti:0.12.2',
testcontainers: 'org.testcontainers:testcontainers:1.18.0',
testng: 'org.testng:testng:6.14.3',
tomcatAnnotations: 'org.apache.tomcat:annotations-api:6.0.53',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import static com.linkedin.venice.vpj.VenicePushJobConstants.VENICE_STORE_NAME_PROP;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.PushJobCheckpoints;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.ZstdWithDictCompressor;
import com.linkedin.venice.controllerapi.ControllerClient;
Expand Down Expand Up @@ -259,35 +260,9 @@ public class VenicePushJob implements AutoCloseable {

private InputStorageQuotaTracker inputStorageQuotaTracker;
private final PushJobHeartbeatSenderFactory pushJobHeartbeatSenderFactory;
private PushJobHeartbeatSender pushJobHeartbeatSender = null;
private boolean pushJobStatusUploadDisabledHasBeenLogged = false;

/**
* Different successful checkpoints and known error scenarios of the VPJ flow.
* 1. The enums are not sequential
* 2. Non-negative enums are successful checkpoints
* 3. Negative enums are error scenarios (Can be user or system errors)
*/
public enum PushJobCheckpoints {
INITIALIZE_PUSH_JOB(0), NEW_VERSION_CREATED(1), START_DATA_WRITER_JOB(2), DATA_WRITER_JOB_COMPLETED(3),
START_JOB_STATUS_POLLING(4), JOB_STATUS_POLLING_COMPLETED(5), START_VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB(6),
VALIDATE_SCHEMA_AND_BUILD_DICT_MAP_JOB_COMPLETED(7), QUOTA_EXCEEDED(-1), WRITE_ACL_FAILED(-2),
DUP_KEY_WITH_DIFF_VALUE(-3), INPUT_DATA_SCHEMA_VALIDATION_FAILED(-4),
EXTENDED_INPUT_DATA_SCHEMA_VALIDATION_FAILED(-5), RECORD_TOO_LARGE_FAILED(-6), CONCURRENT_BATCH_PUSH(-7),
DATASET_CHANGED(-8), INVALID_INPUT_FILE(-9), ZSTD_DICTIONARY_CREATION_FAILED(-10),
DVC_INGESTION_ERROR_DISK_FULL(-11), DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED(-12),
DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES(-13), DVC_INGESTION_ERROR_OTHER(-14);

private final int value;

PushJobCheckpoints(int value) {
this.value = value;
}

public int getValue() {
return value;
}
}

/**
* @param jobId id of the job
* @param vanillaProps Property bag for the job
Expand Down Expand Up @@ -675,7 +650,6 @@ DataWriterComputeJob getDataWriterComputeJob() {
* @throws VeniceException
*/
public void run() {
PushJobHeartbeatSender pushJobHeartbeatSender = null;
try {
Optional<SSLFactory> sslFactory = VPJSSLUtils.createSSLFactory(
pushJobSetting.enableSSL,
Expand Down Expand Up @@ -874,8 +848,6 @@ public void run() {
pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.COMPLETED.getValue()));
}
pushJobDetails.jobDurationInMs = LatencyUtils.getElapsedTimeFromMsToMs(pushJobSetting.jobStartTimeMs);
updatePushJobDetailsWithConfigs();
updatePushJobDetailsWithLivenessHeartbeatException(pushJobHeartbeatSender);
sendPushJobDetailsToController();

// only kick off the validation and post-validation flow when everything has to be done in a single VPJ
Expand Down Expand Up @@ -903,8 +875,6 @@ public void run() {
pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.ERROR.getValue()));
pushJobDetails.failureDetails = e.toString();
pushJobDetails.jobDurationInMs = LatencyUtils.getElapsedTimeFromMsToMs(pushJobSetting.jobStartTimeMs);
updatePushJobDetailsWithConfigs();
updatePushJobDetailsWithLivenessHeartbeatException(pushJobHeartbeatSender);
sendPushJobDetailsToController();
closeVeniceWriter();
} catch (Exception ex) {
Expand All @@ -924,6 +894,7 @@ public void run() {
Utils.closeQuietlyWithErrorLogged(inputDataInfoProvider);
if (pushJobHeartbeatSender != null) {
pushJobHeartbeatSender.stop();
pushJobHeartbeatSender = null;
}
inputDataInfoProvider = null;
if (pushJobSetting.rmdSchemaDir != null) {
Expand Down Expand Up @@ -1018,7 +989,7 @@ private PushJobHeartbeatSender createPushJobHeartbeatSender(final boolean sslEna
}
}

private void updatePushJobDetailsWithLivenessHeartbeatException(PushJobHeartbeatSender pushJobHeartbeatSender) {
private void updatePushJobDetailsWithLivenessHeartbeatException() {
if (pushJobHeartbeatSender == null || this.pushJobDetails == null) {
return;
}
Expand Down Expand Up @@ -1840,6 +1811,12 @@ private void sendPushJobDetailsToController() {
LOGGER.warn("Unable to send push job details for monitoring purpose. The payload was not populated properly");
return;
}

// update push job details with more info if needed
updatePushJobDetailsWithConfigs();
updatePushJobDetailsWithLivenessHeartbeatException();

// send push job details to controller
try {
pushJobDetails.reportTimestamp = System.currentTimeMillis();
int version = pushJobSetting.version <= 0 ? UNCREATED_VERSION_NUMBER : pushJobSetting.version;
Expand Down Expand Up @@ -2779,7 +2756,6 @@ public void cancel() {
pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.KILLED.getValue()));
}
pushJobDetails.jobDurationInMs = LatencyUtils.getElapsedTimeFromMsToMs(pushJobSetting.jobStartTimeMs);
updatePushJobDetailsWithConfigs();
sendPushJobDetailsToController();
}

Expand Down
Loading

0 comments on commit 328d72a

Please sign in to comment.