From 5d30bb5ef9fc3e2b2f3b4314777e47abcb638370 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Fri, 15 Nov 2024 13:12:57 -0500 Subject: [PATCH] Improved metrics output consistency and removed references to Thread and Global variables. --- README.md | 8 +- RELEASE.md | 4 + .../com/datastax/cdm/job/CopyJobSession.java | 37 ++--- .../com/datastax/cdm/job/CounterUnit.java | 29 ++-- .../com/datastax/cdm/job/DiffJobSession.java | 39 +++-- .../cdm/job/GuardrailCheckJobSession.java | 8 +- .../java/com/datastax/cdm/job/JobCounter.java | 99 ++++++------ src/resources/cdm-detailed.properties | 18 ++- .../com/datastax/cdm/job/JobCounterTest.java | 150 +++++++++++------- 9 files changed, 220 insertions(+), 172 deletions(-) diff --git a/README.md b/README.md index 0aa8459..9e079ca 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Migrate and Validate Tables between Origin and Target Cassandra Clusters. - **Java11** (minimum) as Spark binaries are compiled with it. - **Spark `3.5.x` with Scala `2.13` and Hadoop `3.3`** - Typically installed using [this binary](https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3-scala2.13.tgz) on a single VM (no cluster necessary) where you want to run this job. This simple setup is recommended for most one-time migrations. - - However we recommend a Spark Cluster or a Spark Serverless platform like `Databricks` or `Google Dataproc` (that supports the above mentioned versions) for large (e.g. several terabytes) complex migrations OR when CDM is used as a long-term data-transfer utility and not a one-time job. + - However we recommend using a Spark Cluster or a Spark Serverless platform like `Databricks` or `Google Dataproc` (that supports the above mentioned versions) for large (e.g. several terabytes) complex migrations OR when CDM is used as a long-term data-transfer utility and not a one-time job. Spark can be installed by running the following: - @@ -150,7 +150,7 @@ spark-submit --properties-file cdm.properties \ - Supports migration/validation from and to [Azure Cosmos Cassandra](https://learn.microsoft.com/en-us/azure/cosmos-db/cassandra) - Validate migration accuracy and performance using a smaller randomized data-set - Supports adding custom fixed `writetime` and/or `ttl` -- Track run information (start-time, end-time, status, etc.) in tables (`cdm_run_info` and `cdm_run_details`) on the target keyspace +- Track run information (start-time, end-time, run-metrics, status, etc.) in tables (`cdm_run_info` and `cdm_run_details`) on the target keyspace # Things to know - Each run (Migration or Validation) can be tracked (when enabled). You can find summary and details of the same in tables `cdm_run_info` and `cdm_run_details` in the target keyspace. @@ -160,7 +160,7 @@ spark-submit --properties-file cdm.properties \ - If a table has only collection and/or UDT non-key columns, the `writetime` used on target will be time the job was run. If you want to avoid this, we recommend setting `spark.cdm.schema.ttlwritetime.calc.useCollections` param to `true` in such scenarios. - When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in `list` type columns. Note this is [due to a Cassandra/DSE bug](https://issues.apache.org/jira/browse/CASSANDRA-11368) and not a CDM issue. This issue can be addressed by enabling and setting a positive value for `spark.cdm.transform.custom.writetime.incrementBy` param. This param was specifically added to address this issue. - When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table `cdm_run_info` will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well. -- When running on a Spark Cluster (and not a single VM), the rate-limit values (`spark.cdm.perfops.ratelimit.origin` & `spark.cdm.perfops.ratelimit.target`) applies to individual Spark worker nodes. Hence this value should be set to `effective-rate-limit-you-need`/`number-of-spark-worker-nodes` . E.g. If you need an effective rate-limit of 10000, and the number of Spark worker nodes are 4, then you should set the above rate-limit params to a value of 2500. +- When running on a Spark Cluster (and not a single VM), the rate-limit values (`spark.cdm.perfops.ratelimit.origin` & `spark.cdm.perfops.ratelimit.target`) applies to individual Spark worker nodes. Hence this value should be set to the effective-rate-limit-you-need/number-of-spark-worker-nodes . E.g. If you need an effective rate-limit of 10000, and the number of Spark worker nodes are 4, then you should set the above rate-limit params to a value of 2500. # Performance recommendations Below recommendations may only be useful when migrating large tables where the default performance is not good enough @@ -175,7 +175,7 @@ Below recommendations may only be useful when migrating large tables where the d - `ratelimit`: Default is `20000`, but this property should usually be updated (after updating other properties) to the highest possible value that your `origin` and `target` clusters can efficiently handle. - Using schema manipulation features (like `constantColumns`, `explodeMap`, `extractJson`), transformation functions and/or where-filter-conditions (except partition min/max) may negatively impact performance - We typically recommend [this infrastructure](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines) for CDM VMs and [this starter conf](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm.properties). You can then optimize the job further based on CDM params info provided above and the observed load and throughput on `Origin` and `Target` clusters -- Use a Spark Cluster or a Spark Serverless platform like `Databricks` or `Google Dataproc` for large (e.g. several terabytes) complex migrations OR when CDM is used as a long-term data-transfer utility and not a one-time job. +- We recommend using a Spark Cluster or a Spark Serverless platform like `Databricks` or `Google Dataproc` for large (e.g. several terabytes) complex migrations OR when CDM is used as a long-term data-transfer utility and not a one-time job. > [!NOTE] > For additional performance tuning, refer to details mentioned in the [`cdm-detailed.properties` file here](./src/resources/cdm-detailed.properties) diff --git a/RELEASE.md b/RELEASE.md index e61b164..d91de75 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,8 @@ # Release Notes +## [5.1.0] - 2024-11-15 +- Improves metrics output by producing stats labels in an intuitive and consistent order +- Refactored JobCounter by removing any references to `thread` or `global` as CDM operations are now isolated within partition-ranges (`parts`). Each such `part` is then parallelly processed and aggregated by Spark. + ## [5.0.0] - 2024-11-08 - CDM refactored to be fully Spark Native and more performant when deployed on a multi-node Spark Cluster - `trackRun` feature has been expanded to record `run-info` for each part in the `CDM_RUN_DETAILS` table. Along with granular metrics, this information can be used to troubleshoot any unbalanced problematic partitions. diff --git a/src/main/java/com/datastax/cdm/job/CopyJobSession.java b/src/main/java/com/datastax/cdm/job/CopyJobSession.java index ca147a2..aa24f16 100644 --- a/src/main/java/com/datastax/cdm/job/CopyJobSession.java +++ b/src/main/java/com/datastax/cdm/job/CopyJobSession.java @@ -82,52 +82,53 @@ protected void processPartitionRange(PartitionRange range) { for (Row originRow : resultSet) { rateLimiterOrigin.acquire(1); - jobCounter.threadIncrement(JobCounter.CounterType.READ); + jobCounter.increment(JobCounter.CounterType.READ); Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null); if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) { - jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); + jobCounter.increment(JobCounter.CounterType.SKIPPED); continue; } for (Record r : pkFactory.toValidRecordList(record)) { BoundStatement boundUpsert = bind(r); if (null == boundUpsert) { - jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); + jobCounter.increment(JobCounter.CounterType.SKIPPED); continue; } rateLimiterTarget.acquire(1); batch = writeAsync(batch, writeResults, boundUpsert); - jobCounter.threadIncrement(JobCounter.CounterType.UNFLUSHED); + jobCounter.increment(JobCounter.CounterType.UNFLUSHED); if (jobCounter.getCount(JobCounter.CounterType.UNFLUSHED) > fetchSize) { flushAndClearWrites(batch, writeResults); - jobCounter.threadIncrement(JobCounter.CounterType.WRITE, - jobCounter.getCount(JobCounter.CounterType.UNFLUSHED)); - jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED); + jobCounter.increment(JobCounter.CounterType.WRITE, + jobCounter.getCount(JobCounter.CounterType.UNFLUSHED, true)); + jobCounter.reset(JobCounter.CounterType.UNFLUSHED); } } } flushAndClearWrites(batch, writeResults); - jobCounter.threadIncrement(JobCounter.CounterType.WRITE, - jobCounter.getCount(JobCounter.CounterType.UNFLUSHED)); - jobCounter.threadReset(JobCounter.CounterType.UNFLUSHED); - jobCounter.globalIncrement(); + jobCounter.increment(JobCounter.CounterType.WRITE, + jobCounter.getCount(JobCounter.CounterType.UNFLUSHED, true)); + jobCounter.reset(JobCounter.CounterType.UNFLUSHED); + jobCounter.flush(); if (null != trackRunFeature) { - trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS, jobCounter.getThreadCounters(true)); + trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS, jobCounter.getMetrics()); } } catch (Exception e) { - jobCounter.threadIncrement(JobCounter.CounterType.ERROR, - jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.WRITE) - - jobCounter.getCount(JobCounter.CounterType.SKIPPED)); + jobCounter.increment(JobCounter.CounterType.ERROR, + jobCounter.getCount(JobCounter.CounterType.READ, true) + - jobCounter.getCount(JobCounter.CounterType.WRITE, true) + - jobCounter.getCount(JobCounter.CounterType.SKIPPED, true)); logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max, e); - logger.error("Error stats " + jobCounter.getThreadCounters(false)); - jobCounter.globalIncrement(); + logger.error("Error stats " + jobCounter.getMetrics(true)); + jobCounter.flush(); if (null != trackRunFeature) { - trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL, jobCounter.getThreadCounters(true)); + trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL, jobCounter.getMetrics()); } } } diff --git a/src/main/java/com/datastax/cdm/job/CounterUnit.java b/src/main/java/com/datastax/cdm/job/CounterUnit.java index aaad7c2..c6672c4 100644 --- a/src/main/java/com/datastax/cdm/job/CounterUnit.java +++ b/src/main/java/com/datastax/cdm/job/CounterUnit.java @@ -20,30 +20,31 @@ public class CounterUnit implements Serializable { private static final long serialVersionUID = 2194336948011681878L; - private long globalCounter = 0; - private long threadLocalCounter = 0; + private long count = 0; + private long interimCount = 0; - public void incrementThreadCounter(long incrementBy) { - threadLocalCounter += incrementBy; + public void increment(long incrementBy) { + interimCount += incrementBy; } - public long getThreadCounter() { - return threadLocalCounter; + public long getInterimCount() { + return interimCount; } - public void resetThreadCounter() { - threadLocalCounter = 0; + public void reset() { + interimCount = 0; } - public void setGlobalCounter(long value) { - globalCounter = value; + public void setCount(long value) { + count = value; } - public void addThreadToGlobalCounter() { - globalCounter += threadLocalCounter; + public void addToCount() { + count += interimCount; + reset(); } - public long getGlobalCounter() { - return globalCounter; + public long getCount() { + return count; } } diff --git a/src/main/java/com/datastax/cdm/job/DiffJobSession.java b/src/main/java/com/datastax/cdm/job/DiffJobSession.java index 3d421f0..197ee6f 100644 --- a/src/main/java/com/datastax/cdm/job/DiffJobSession.java +++ b/src/main/java/com/datastax/cdm/job/DiffJobSession.java @@ -135,10 +135,10 @@ protected void processPartitionRange(PartitionRange range) { StreamSupport.stream(resultSet.spliterator(), false).forEach(originRow -> { rateLimiterOrigin.acquire(1); Record record = new Record(pkFactory.getTargetPK(originRow), originRow, null); - jobCounter.threadIncrement(JobCounter.CounterType.READ); + jobCounter.increment(JobCounter.CounterType.READ); if (originSelectByPartitionRangeStatement.shouldFilterRecord(record)) { - jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); + jobCounter.increment(JobCounter.CounterType.SKIPPED); } else { for (Record r : pkFactory.toValidRecordList(record)) { rateLimiterTarget.acquire(1); @@ -146,7 +146,7 @@ protected void processPartitionRange(PartitionRange range) { .getAsyncResult(r.getPk()); if (null == targetResult) { - jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED); + jobCounter.increment(JobCounter.CounterType.SKIPPED); } else { r.setAsyncTargetRow(targetResult); recordsToDiff.add(r); @@ -168,32 +168,31 @@ protected void processPartitionRange(PartitionRange range) { .getCount(JobCounter.CounterType.CORRECTED_MISSING) && jobCounter.getCount(JobCounter.CounterType.MISMATCH) == jobCounter .getCount(JobCounter.CounterType.CORRECTED_MISMATCH)) { - jobCounter.globalIncrement(); + jobCounter.flush(); trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.DIFF_CORRECTED, - jobCounter.getThreadCounters(true)); + jobCounter.getMetrics()); } else { - jobCounter.globalIncrement(); - trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.DIFF, - jobCounter.getThreadCounters(true)); + jobCounter.flush(); + trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.DIFF, jobCounter.getMetrics()); } } else if (null != trackRunFeature) { - jobCounter.globalIncrement(); - trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS, jobCounter.getThreadCounters(true)); + jobCounter.flush(); + trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.PASS, jobCounter.getMetrics()); } else { - jobCounter.globalIncrement(); + jobCounter.flush(); } } catch (Exception e) { - jobCounter.threadIncrement(JobCounter.CounterType.ERROR, + jobCounter.increment(JobCounter.CounterType.ERROR, jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.VALID) - jobCounter.getCount(JobCounter.CounterType.MISSING) - jobCounter.getCount(JobCounter.CounterType.MISMATCH) - jobCounter.getCount(JobCounter.CounterType.SKIPPED)); logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max, e); - logger.error("Error stats " + jobCounter.getThreadCounters(false)); - jobCounter.globalIncrement(); + logger.error("Error stats " + jobCounter.getMetrics(true)); + jobCounter.flush(); if (null != trackRunFeature) - trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL, jobCounter.getThreadCounters(true)); + trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.FAIL, jobCounter.getMetrics()); } } @@ -205,7 +204,7 @@ private boolean diffAndClear(List recordsToDiff, JobCounter jobCounter) private boolean diff(Record record, JobCounter jobCounter) { if (record.getTargetRow() == null) { - jobCounter.threadIncrement(JobCounter.CounterType.MISSING); + jobCounter.increment(JobCounter.CounterType.MISSING); logger.error("Missing target row found for key: {}", record.getPk()); if (autoCorrectMissing && isCounterTable && !forceCounterWhenMissing) { logger.error("{} is true, but not Inserting as {} is not enabled; key : {}", @@ -218,7 +217,7 @@ private boolean diff(Record record, JobCounter jobCounter) { if (autoCorrectMissing) { rateLimiterTarget.acquire(1); targetSession.getTargetUpsertStatement().putRecord(record); - jobCounter.threadIncrement(JobCounter.CounterType.CORRECTED_MISSING); + jobCounter.increment(JobCounter.CounterType.CORRECTED_MISSING); logger.error("Inserted missing row in target: {}", record.getPk()); } return true; @@ -226,19 +225,19 @@ private boolean diff(Record record, JobCounter jobCounter) { String diffData = isDifferent(record); if (!diffData.isEmpty()) { - jobCounter.threadIncrement(JobCounter.CounterType.MISMATCH); + jobCounter.increment(JobCounter.CounterType.MISMATCH); logger.error("Mismatch row found for key: {} Mismatch: {}", record.getPk(), diffData); if (autoCorrectMismatch) { rateLimiterTarget.acquire(1); targetSession.getTargetUpsertStatement().putRecord(record); - jobCounter.threadIncrement(JobCounter.CounterType.CORRECTED_MISMATCH); + jobCounter.increment(JobCounter.CounterType.CORRECTED_MISMATCH); logger.error("Corrected mismatch row in target: {}", record.getPk()); } return true; } else { - jobCounter.threadIncrement(JobCounter.CounterType.VALID); + jobCounter.increment(JobCounter.CounterType.VALID); return false; } } diff --git a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java index 394cf9b..2c540c6 100644 --- a/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java +++ b/src/main/java/com/datastax/cdm/job/GuardrailCheckJobSession.java @@ -54,14 +54,14 @@ protected void processPartitionRange(PartitionRange range) { String checkString; for (Row originRow : resultSet) { rateLimiterOrigin.acquire(1); - jobCounter.threadIncrement(JobCounter.CounterType.READ); + jobCounter.increment(JobCounter.CounterType.READ); checkString = guardrailFeature.guardrailChecks(originRow); if (checkString != null && !checkString.isEmpty()) { - jobCounter.threadIncrement(JobCounter.CounterType.LARGE); + jobCounter.increment(JobCounter.CounterType.LARGE); logger.error("Guardrails failed for row {}", checkString); } else { - jobCounter.threadIncrement(JobCounter.CounterType.VALID); + jobCounter.increment(JobCounter.CounterType.VALID); } } } catch (Exception e) { @@ -69,7 +69,7 @@ protected void processPartitionRange(PartitionRange range) { logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max); } finally { - jobCounter.globalIncrement(); + jobCounter.flush(); } ThreadContext.remove(THREAD_CONTEXT_LABEL); diff --git a/src/main/java/com/datastax/cdm/job/JobCounter.java b/src/main/java/com/datastax/cdm/job/JobCounter.java index 6613c9f..16aeb4b 100644 --- a/src/main/java/com/datastax/cdm/job/JobCounter.java +++ b/src/main/java/com/datastax/cdm/job/JobCounter.java @@ -18,6 +18,7 @@ import java.io.Serializable; import java.util.HashMap; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +30,7 @@ public class JobCounter implements Serializable { private static final long serialVersionUID = 7016816604237020549L; public enum CounterType { - READ, WRITE, VALID, ERROR, MISMATCH, MISSING, CORRECTED_MISSING, CORRECTED_MISMATCH, SKIPPED, UNFLUSHED, LARGE + READ, WRITE, MISMATCH, CORRECTED_MISMATCH, MISSING, CORRECTED_MISSING, VALID, SKIPPED, LARGE, ERROR, UNFLUSHED } private final Logger logger = LoggerFactory.getLogger(this.getClass().getName()); @@ -49,8 +50,6 @@ public JobCounter(JobType jobType) { case GUARDRAIL: setRegisteredTypes(CounterType.READ, CounterType.VALID, CounterType.SKIPPED, CounterType.LARGE); break; - default: - throw new IllegalArgumentException("JobType " + jobType + " is not registered"); } } @@ -68,43 +67,45 @@ private CounterUnit getCounterUnit(CounterType counterType) { return (counterMap.get(counterType)); } - public long getCount(CounterType counterType, boolean global) { - return global ? getCounterUnit(counterType).getGlobalCounter() : getCounterUnit(counterType).getThreadCounter(); + public long getCount(CounterType type, boolean interim) { + return interim ? getCounterUnit(type).getInterimCount() : getCounterUnit(type).getCount(); } public long getCount(CounterType counterType) { return getCount(counterType, false); } - // Method to reset thread-specific counters for given type - public void threadReset(CounterType counterType) { - getCounterUnit(counterType).resetThreadCounter(); + public void reset(CounterType type) { + getCounterUnit(type).reset(); } - // Method to increment thread-specific counters by a given value - public void threadIncrement(CounterType counterType, long incrementBy) { - getCounterUnit(counterType).incrementThreadCounter(incrementBy); + public void increment(CounterType counterType, long incrementBy) { + getCounterUnit(counterType).increment(incrementBy); } - // Method to increment thread-specific counters by 1 - public void threadIncrement(CounterType counterType) { - threadIncrement(counterType, 1); + public void increment(CounterType counterType) { + increment(counterType, 1); } - // Method to increment global counters based on thread-specific counters - public void globalIncrement() { + public void flush() { for (CounterType type : counterMap.keySet()) { - getCounterUnit(type).addThreadToGlobalCounter(); + getCounterUnit(type).addToCount(); } } - // Method to get current counts (both thread-specific and global) as a formatted - // string - public String getThreadCounters(boolean global) { + public String getMetrics() { + return getMetrics(false); + } + + public String getMetrics(boolean interim) { StringBuilder sb = new StringBuilder(); - for (CounterType type : counterMap.keySet()) { - long value = global ? getCounterUnit(type).getGlobalCounter() : getCounterUnit(type).getThreadCounter(); - sb.append(type.name()).append(": ").append(value).append("; "); + for (CounterType type : CounterType.values()) { + if (counterMap.containsKey(type)) { + if (!interim && type == CounterType.UNFLUSHED) { + continue; + } + sb.append(printFriendlyCase(type.name())).append(": ").append(getCount(type, interim)).append("; "); + } } // Remove the trailing comma and space if (sb.length() > 2) { @@ -113,21 +114,31 @@ public String getThreadCounters(boolean global) { return sb.toString(); } + // Capitalizes the first letter of each word in a sentence + private String printFriendlyCase(String sentence) { + String[] words = sentence.toLowerCase().split("_"); + StringBuilder sb = new StringBuilder(); + for (String word : words) { + sb.append(StringUtils.capitalize(word)).append(" "); + } + return sb.toString().trim(); + } + public void add(JobCounter v) { for (CounterType type : counterMap.keySet()) { - getCounterUnit(type).setGlobalCounter(getCounterUnit(type).getGlobalCounter() + v.getCount(type, true)); + getCounterUnit(type).setCount(getCounterUnit(type).getCount() + v.getCount(type)); } } public void reset() { for (CounterType type : counterMap.keySet()) { - getCounterUnit(type).setGlobalCounter(0); + reset(type); } } public boolean isZero() { for (CounterType type : counterMap.keySet()) { - if (getCounterUnit(type).getGlobalCounter() > 0) { + if (getCounterUnit(type).getCount() > 0 || getCounterUnit(type).getInterimCount() > 0) { return false; } } @@ -135,32 +146,20 @@ public boolean isZero() { } public void printMetrics(long runId, TrackRun trackRunFeature) { + logger.info("################################################################################################"); if (null != trackRunFeature) { - trackRunFeature.endCdmRun(runId, getThreadCounters(true)); + trackRunFeature.endCdmRun(runId, getMetrics()); + logger.info("RunId: {}", runId); + } + for (CounterType type : CounterType.values()) { + if (counterMap.containsKey(type)) { + if (type == CounterType.UNFLUSHED) { + continue; + } + logger.info("Final " + printFriendlyCase(type.name()) + " Record Count: {}", + counterMap.get(type).getCount()); + } } - logger.info("################################################################################################"); - if (counterMap.containsKey(CounterType.READ)) - logger.info("Final Read Record Count: {}", counterMap.get(CounterType.READ).getGlobalCounter()); - if (counterMap.containsKey(CounterType.MISMATCH)) - logger.info("Final Mismatch Record Count: {}", counterMap.get(CounterType.MISMATCH).getGlobalCounter()); - if (counterMap.containsKey(CounterType.CORRECTED_MISMATCH)) - logger.info("Final Corrected Mismatch Record Count: {}", - counterMap.get(CounterType.CORRECTED_MISMATCH).getGlobalCounter()); - if (counterMap.containsKey(CounterType.MISSING)) - logger.info("Final Missing Record Count: {}", counterMap.get(CounterType.MISSING).getGlobalCounter()); - if (counterMap.containsKey(CounterType.CORRECTED_MISSING)) - logger.info("Final Corrected Missing Record Count: {}", - counterMap.get(CounterType.CORRECTED_MISSING).getGlobalCounter()); - if (counterMap.containsKey(CounterType.VALID)) - logger.info("Final Valid Record Count: {}", counterMap.get(CounterType.VALID).getGlobalCounter()); - if (counterMap.containsKey(CounterType.SKIPPED)) - logger.info("Final Skipped Record Count: {}", counterMap.get(CounterType.SKIPPED).getGlobalCounter()); - if (counterMap.containsKey(CounterType.WRITE)) - logger.info("Final Write Record Count: {}", counterMap.get(CounterType.WRITE).getGlobalCounter()); - if (counterMap.containsKey(CounterType.ERROR)) - logger.info("Final Error Record Count: {}", counterMap.get(CounterType.ERROR).getGlobalCounter()); - if (counterMap.containsKey(CounterType.LARGE)) - logger.info("Final Large Record Count: {}", counterMap.get(CounterType.LARGE).getGlobalCounter()); logger.info("################################################################################################"); } diff --git a/src/resources/cdm-detailed.properties b/src/resources/cdm-detailed.properties index a02fbd8..8586770 100644 --- a/src/resources/cdm-detailed.properties +++ b/src/resources/cdm-detailed.properties @@ -207,12 +207,18 @@ spark.cdm.trackRun.runId localJobCounter.threadIncrement(JobCounter.CounterType.CORRECTED_MISMATCH, 5)); + () -> localJobCounter.increment(JobCounter.CounterType.CORRECTED_MISMATCH, 5)); } + @Captor + private ArgumentCaptor trackRunInfoCaptorLong; + + @Captor + private ArgumentCaptor trackRunInfoCaptor; + @Test - public void testPrintProgressForGlobalAndThread() { - jobCounter.threadIncrement(JobCounter.CounterType.READ, 11); - jobCounter.globalIncrement(); + public void printMetricsMigrate() { + jobCounter = new JobCounter(JobType.MIGRATE); + + String expected = "Read: 10; Write: 7; Skipped: 1; Error: 2"; + jobCounter.increment(JobCounter.CounterType.READ, 10); + jobCounter.increment(JobCounter.CounterType.WRITE, 7); + jobCounter.increment(JobCounter.CounterType.ERROR, 2); + jobCounter.increment(JobCounter.CounterType.SKIPPED, 1); + jobCounter.increment(JobCounter.CounterType.UNFLUSHED, 3); + jobCounter.flush(); // You may use mocking to capture logger outputs - jobCounter.printMetrics(0, null); + jobCounter.printMetrics(0, trackRun); + Mockito.verify(trackRun).endCdmRun(trackRunInfoCaptorLong.capture(), trackRunInfoCaptor.capture()); + assertEquals(expected, trackRunInfoCaptor.getValue()); } @Test - public void testPrintFinal() { - jobCounter.threadIncrement(JobCounter.CounterType.READ, 5); - jobCounter.globalIncrement(); + public void printMetricsValidate() { + jobCounter = new JobCounter(JobType.VALIDATE); + + String expected = "Read: 5; Mismatch: 0; Corrected Mismatch: 0; Missing: 7; Corrected Missing: 7; Valid: 0; Skipped: 0; Error: 72"; + jobCounter.increment(JobCounter.CounterType.READ, 5); + jobCounter.increment(JobCounter.CounterType.CORRECTED_MISSING, 7); + jobCounter.increment(JobCounter.CounterType.ERROR, 72); + jobCounter.increment(JobCounter.CounterType.MISSING, 7); + jobCounter.flush(); // You may use mocking to capture logger outputs - jobCounter.printMetrics(0, null); + jobCounter.printMetrics(0, trackRun); + Mockito.verify(trackRun).endCdmRun(trackRunInfoCaptorLong.capture(), trackRunInfoCaptor.capture()); + assertEquals(expected, trackRunInfoCaptor.getValue()); } - @Captor - private ArgumentCaptor trackRunInfoCaptorLong; - - @Captor - private ArgumentCaptor trackRunInfoCaptor; + @Test + public void add() { + JobCounter localJobCounter = new JobCounter(JobType.MIGRATE); + localJobCounter.increment(JobCounter.CounterType.READ, 4); + localJobCounter.increment(JobCounter.CounterType.WRITE, 5); + localJobCounter.increment(JobCounter.CounterType.SKIPPED, 6); + localJobCounter.increment(JobCounter.CounterType.ERROR, 7); + localJobCounter.flush(); + jobCounter.add(localJobCounter); + + assertAll(() -> { + assertEquals(4, jobCounter.getCount(JobCounter.CounterType.READ)); + assertEquals(5, jobCounter.getCount(JobCounter.CounterType.WRITE)); + assertEquals(6, jobCounter.getCount(JobCounter.CounterType.SKIPPED)); + assertEquals(7, jobCounter.getCount(JobCounter.CounterType.ERROR)); + assertEquals(0, jobCounter.getCount(JobCounter.CounterType.UNFLUSHED)); + }); + } - // @Test - // public void testPrintFinalWithRunTracking() { - // jobCounter = new JobCounter(JobType.VALIDATE); - // - // String expected = "Read: 5; Mismatch: 0; Corrected Mismatch: 0; Missing: 7; Corrected Missing: 7; Valid: 0; - // Skipped: 0; Error: 72"; - // jobCounter.threadIncrement(JobCounter.CounterType.READ, 5); - // jobCounter.threadIncrement(JobCounter.CounterType.CORRECTED_MISSING, 7); - // jobCounter.threadIncrement(JobCounter.CounterType.ERROR, 72); - // jobCounter.threadIncrement(JobCounter.CounterType.MISSING, 7); - // jobCounter.globalIncrement(); - // // You may use mocking to capture logger outputs - // jobCounter.printMetrics(0, trackRun); - // Mockito.verify(trackRun).endCdmRun(trackRunInfoCaptorLong.capture(), trackRunInfoCaptor.capture()); - // assertEquals(expected, trackRunInfoCaptor.getValue()); - // } + @Test + public void reset() { + jobCounter.reset(); + + assertAll(() -> { + assertEquals(0, jobCounter.getCount(JobCounter.CounterType.READ)); + assertEquals(0, jobCounter.getCount(JobCounter.CounterType.WRITE)); + assertEquals(0, jobCounter.getCount(JobCounter.CounterType.SKIPPED)); + assertEquals(0, jobCounter.getCount(JobCounter.CounterType.ERROR)); + assertEquals(0, jobCounter.getCount(JobCounter.CounterType.UNFLUSHED)); + }); + } @Test - public void testGetCountGlobal() { - jobCounter.threadIncrement(JobCounter.CounterType.READ, 5); - jobCounter.globalIncrement(); - assertEquals(5, jobCounter.getCount(JobCounter.CounterType.READ, true)); + public void isZero() { + assertTrue(jobCounter.isZero()); + + jobCounter.increment(JobCounter.CounterType.READ, 5); + assertFalse(jobCounter.isZero()); } @Test - public void threadIncrementByOne() { - jobCounter.threadIncrement(JobCounter.CounterType.READ, 5); - jobCounter.threadIncrement(JobCounter.CounterType.READ); - assertEquals(6, jobCounter.getCount(JobCounter.CounterType.READ)); + public void initGuardrail() { + JobCounter localJobCounter = new JobCounter(JobType.GUARDRAIL); + assertThrows(IllegalArgumentException.class, () -> localJobCounter.increment(JobCounter.CounterType.WRITE, 5)); } }