From af792187a76221921f63279b23a8309e4acc1d4f Mon Sep 17 00:00:00 2001 From: Kuldeep Kumar <100260049+kuldeepk3@users.noreply.github.com> Date: Wed, 6 Mar 2024 15:10:33 +0530 Subject: [PATCH] Adding validation for required parameters (#66) Adding validation for required parameters (#66) --- .../sensor/collector/file/FileConfig.java | 6 +++- .../collector/file/FileIngestService.java | 9 +++++- .../collector/watchdog/PscWatchdogApp.java | 1 + .../collector/watchdog/WatchDogConfig.java | 19 +++++++++++- .../collector/watchdog/WatchDogService.java | 29 +++++++++++++++++++ .../file/FileProcessorFactoryTest.java | 6 ++-- .../collector/file/FileProcessorTests.java | 4 +-- .../watchdog/WatchdogServiceTests.java | 8 ++--- .../RawFileIngest-integration-test.properties | 1 + .../resources/RawFileIngestService.properties | 1 + 10 files changed, 72 insertions(+), 12 deletions(-) diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileConfig.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileConfig.java index 8ca5fa3f..0bf92e0b 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileConfig.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileConfig.java @@ -33,11 +33,13 @@ public class FileConfig { public final boolean enableLargeEvent; + public final String pscId; + public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExtension, String routingKey, String streamName, String eventTemplateStr, int maxRecordsPerEvent, boolean enableDeleteCompletedFiles, boolean exactlyOnce, double transactionTimeoutMinutes, long minTimeInMillisToUpdateFile, String fileType, - boolean enableLargeEvent) { + boolean enableLargeEvent, String pscId) { this.stateDatabaseFileName = stateDatabaseFileName; this.fileSpec = fileSpec; this.fileExtension = fileExtension; @@ -51,6 +53,7 @@ public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExte this.minTimeInMillisToUpdateFile = minTimeInMillisToUpdateFile; this.fileType = fileType; this.enableLargeEvent = enableLargeEvent; + this.pscId = pscId; } @Override @@ -69,6 +72,7 @@ public String toString() { ", transactionTimeoutMinutes=" + transactionTimeoutMinutes + ", minTimeInMillisToUpdateFile=" + minTimeInMillisToUpdateFile + ", enableLargeEvent=" + enableLargeEvent + + ", pscId=" + pscId + '}'; } } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java index 2577090c..fe85f38e 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java @@ -52,6 +52,8 @@ public abstract class FileIngestService extends DeviceDriver { private static final int DEFAULT_SAMPLES_PER_EVENT_KEY = 100; private static final int DEFAULT_INTERVAL_MS_KEY = 10000; + + private static final String PSC_ID = "PSC_ID"; private final FileProcessor processor; private final MetricPublisher metricPublisher; private final ScheduledExecutorService executor; @@ -75,7 +77,8 @@ public FileIngestService(DeviceDriverConfig config) { getTransactionTimeoutMinutes(), getMinTimeInMillisToUpdateFile(), config.getClassName(), - getLargeEventEnable()); + getLargeEventEnable(), + getPscId()); LOG.info("File Ingest Config: {}", fileSequenceConfig); final String scopeName = getScopeName(); LOG.info("Scope: {}", scopeName); @@ -88,6 +91,10 @@ public FileIngestService(DeviceDriverConfig config) { executor = Executors.newScheduledThreadPool(1, namedThreadFactory); } + private String getPscId() { + return getProperty(PSC_ID); + } + String getFileSpec() { return getProperty(FILE_SPEC_KEY); } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java index ce229cad..dbe98db2 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java @@ -28,6 +28,7 @@ public static void main(String[] args) { WatchDogConfig config = new WatchDogConfig(properties); log.debug("Properties: {}", properties); final WatchDogService service = new WatchDogService(config); + service.checkPscServiceStatus(); service.startAsync(); service.awaitTerminated(); } catch (Exception e) { diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogConfig.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogConfig.java index 213b480b..aa42b136 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogConfig.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogConfig.java @@ -12,6 +12,7 @@ import io.pravega.common.util.Property; import java.io.File; +import java.text.MessageFormat; import java.util.Map; public class WatchDogConfig { @@ -32,7 +33,7 @@ public class WatchDogConfig { private final String serviceName; public WatchDogConfig(Map properties) { - this.serviceName = properties.getOrDefault(PSC_SERVICE_NAME.toString(), PSC_SERVICE_NAME.getDefaultValue()); + this.serviceName = getProperty(properties, PSC_SERVICE_NAME.toString()); this.watchDogWatchIntervalSeconds = Integer.parseInt(properties.getOrDefault(PSC_WATCHDOG_WATCH_INTERVAL_SECONDS.toString(), PSC_WATCHDOG_WATCH_INTERVAL_SECONDS.getDefaultValue())); this.watchdogFileMonitorPath = properties.getOrDefault(PSC_WATCHDOG_FILE_MONITOR_PATH.toString(), PSC_WATCHDOG_FILE_MONITOR_PATH.getDefaultValue()); this.restartTriggerPath = properties.getOrDefault(PSC_WATCHDOG_RESTART_TRIGGER_PATH.toString(), PSC_WATCHDOG_RESTART_TRIGGER_PATH.getDefaultValue()); @@ -79,4 +80,20 @@ public String getRestartTriggerPath() { return restartTriggerPath; } + + /** + * Retrieves the value of a property from the given map of properties. + * + * @param properties the map of properties to retrieve the value from + * @param key the key of the property to retrieve + * @return the value of the property + */ + public String getProperty(Map properties, String key) { + final String value = properties.get(key); + if (value == null || value.isEmpty()) { + throw new IllegalArgumentException(MessageFormat.format("Missing required parameter {0}", key)); + } + return value; + } + } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java index 066b61fe..eb716a3e 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java @@ -10,9 +10,14 @@ package io.pravega.sensor.collector.watchdog; import com.google.common.util.concurrent.AbstractService; +import org.apache.commons.lang3.SystemUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + /** * Watchdog service for Pravega Sensor Collector(PSC). @@ -48,4 +53,28 @@ protected void doStop() { notifyStopped(); } + public void checkPscServiceStatus() throws IOException { + Process psc = null; + log.info("Checking PSC service status {} ", this.config.getServiceName()); + if (SystemUtils.IS_OS_LINUX) { + psc = Runtime.getRuntime().exec(new String[]{"sh", "-c", "systemctl status " + this.config.getServiceName()}); + } else if ( SystemUtils.IS_OS_WINDOWS) { + psc = Runtime.getRuntime().exec(new String[]{"cmd.exe", "/c", this.config.getServiceName() + ".exe", "status"}); + } + BufferedReader stdInput = new BufferedReader(new + InputStreamReader(psc.getInputStream())); + + String s = null; + Boolean isAlive = true; + while ((s = stdInput.readLine()) != null) { + if(s.equalsIgnoreCase("NonExistent")){ + isAlive = false; + } + } + log.debug("Process psc {}, and isAlive value is {} ", psc, isAlive); + if(!isAlive) { + log.error("PSC service is not running"); + throw new RuntimeException("PSC service is not running. Please start psc service before starting watchdog service."); + } + } } diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorFactoryTest.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorFactoryTest.java index 089871aa..51c34a51 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorFactoryTest.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorFactoryTest.java @@ -46,7 +46,7 @@ public void createRAWFileProcessorTest() throws Exception { String stateDatabaseFileName = ":memory:"; config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12", "stream1","{}",10, false, - true,20.0, 5000l,"RawFileIngestService", true); + true,20.0, 5000l,"RawFileIngestService", true, "psc1"); FileProcessor rawFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId"); Assertions.assertTrue(rawFileProcessor instanceof RawFileProcessor); @@ -61,7 +61,7 @@ public void createCSVFileProcessorTest() throws Exception { String stateDatabaseFileName = ":memory:"; config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12", "stream1","{}",10, false, - true,20.0, 5000L,"CsvFileIngestService", false); + true,20.0, 5000L,"CsvFileIngestService", false, "psc1"); FileProcessor csvFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId"); Assertions.assertTrue(csvFileProcessor instanceof CsvFileSequenceProcessor); @@ -76,7 +76,7 @@ public void createParquetFileProcessorTest() throws Exception { String stateDatabaseFileName = ":memory:"; config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12", "stream1","{}",10, false, - true,20.0, 5000L,"ParquetFileIngestService", false); + true,20.0, 5000L,"ParquetFileIngestService", false, "psc1"); FileProcessor parquetFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId"); Assertions.assertTrue(parquetFileProcessor instanceof ParquetFileProcessor); diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java index b7d1afd7..efedc3b4 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java @@ -71,7 +71,7 @@ protected void setup() { String stateDatabaseFileName = ":memory:"; config = new FileConfig("./psc.db","/opt/pravega-sensor-collector/Files/A","parquet","key12", "stream1","{}",10, true, - true,20.0, 5000,"RawFileIngestService", true); + true,20.0, 5000,"RawFileIngestService", true, "pscId"); } @Test @@ -214,7 +214,7 @@ public void testCreateRawFileProcessorWithNullTransactionCordinator() { public void testCreateRawFileProcessorWithNullStateDatabaseFilenameInConfig() { FileConfig newConfig = new FileConfig(null,"/opt/pravega-sensor-collector/Files/A","parquet","key12", "stream1","{}",10, false, - true,20.0, 5000,"RawFileIngestService", true); + true,20.0, 5000,"RawFileIngestService", true, "pscId"); Exception exception = Assert.assertThrows(NullPointerException.class, () -> new RawFileProcessor(newConfig, state, transactionalEventWriter, transactionCoordinator, "test")); Assert.assertTrue("config.stateDatabaseFileName".equals(exception.getMessage())); } diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/watchdog/WatchdogServiceTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/watchdog/WatchdogServiceTests.java index 39e032be..959386cb 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/watchdog/WatchdogServiceTests.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/watchdog/WatchdogServiceTests.java @@ -13,7 +13,7 @@ public class WatchdogServiceTests { @Test public void testWatchDogServiceStart() { - WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().build()); + WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().put("PSC_SERVICE_NAME", "test").build()); WatchDogService service = new WatchDogService(config); service.startAsync(); Assert.assertTrue(service.isRunning()); @@ -23,7 +23,7 @@ public void testWatchDogServiceStart() { @Test public void testWatchdogMonitorStart() { - WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().build()); + WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().put("PSC_SERVICE_NAME", "test").build()); PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config); monitor.startAsync(); Assert.assertTrue(monitor.isRunning()); @@ -33,7 +33,7 @@ public void testWatchdogMonitorStart() { @Test public void testWatchdogRestartPSC() { - WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().build()); + WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().put("PSC_SERVICE_NAME", "test").build()); PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config); monitor.onUpdateMissed(); monitor.onUpdateMissed(); @@ -45,7 +45,7 @@ public void testWatchdogRestartPSC() { @Test public void testWatchdogResetCounterOnUpdates() { - WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().build()); + WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().put("PSC_SERVICE_NAME", "test").build()); PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config); monitor.onUpdateMissed(); monitor.onUpdateMissed(); diff --git a/pravega-sensor-collector/src/test/resources/RawFileIngest-integration-test.properties b/pravega-sensor-collector/src/test/resources/RawFileIngest-integration-test.properties index 7782b040..6578a9a0 100644 --- a/pravega-sensor-collector/src/test/resources/RawFileIngest-integration-test.properties +++ b/pravega-sensor-collector/src/test/resources/RawFileIngest-integration-test.properties @@ -22,3 +22,4 @@ PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=2.0 PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=true PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000 PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=15 +PRAVEGA_SENSOR_COLLECTOR_RAW1_PSC_ID=test diff --git a/pravega-sensor-collector/src/test/resources/RawFileIngestService.properties b/pravega-sensor-collector/src/test/resources/RawFileIngestService.properties index 0237ea93..c77417d8 100644 --- a/pravega-sensor-collector/src/test/resources/RawFileIngestService.properties +++ b/pravega-sensor-collector/src/test/resources/RawFileIngestService.properties @@ -24,3 +24,4 @@ PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=1.0 PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=true PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000 PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=15 +PRAVEGA_SENSOR_COLLECTOR_RAW1_PSC_ID=test