Skip to content

Commit

Permalink
Adding validation for required parameters (#66)
Browse files Browse the repository at this point in the history
Adding validation for required parameters (#66)
  • Loading branch information
kuldeepk3 authored Mar 6, 2024
1 parent 35a1487 commit af79218
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ public class FileConfig {

public final boolean enableLargeEvent;

public final String pscId;


public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExtension, String routingKey,
String streamName, String eventTemplateStr, int maxRecordsPerEvent, boolean enableDeleteCompletedFiles,
boolean exactlyOnce, double transactionTimeoutMinutes, long minTimeInMillisToUpdateFile, String fileType,
boolean enableLargeEvent) {
boolean enableLargeEvent, String pscId) {
this.stateDatabaseFileName = stateDatabaseFileName;
this.fileSpec = fileSpec;
this.fileExtension = fileExtension;
Expand All @@ -51,6 +53,7 @@ public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExte
this.minTimeInMillisToUpdateFile = minTimeInMillisToUpdateFile;
this.fileType = fileType;
this.enableLargeEvent = enableLargeEvent;
this.pscId = pscId;
}

@Override
Expand All @@ -69,6 +72,7 @@ public String toString() {
", transactionTimeoutMinutes=" + transactionTimeoutMinutes +
", minTimeInMillisToUpdateFile=" + minTimeInMillisToUpdateFile +
", enableLargeEvent=" + enableLargeEvent +
", pscId=" + pscId +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public abstract class FileIngestService extends DeviceDriver {
private static final int DEFAULT_SAMPLES_PER_EVENT_KEY = 100;

private static final int DEFAULT_INTERVAL_MS_KEY = 10000;

private static final String PSC_ID = "PSC_ID";
private final FileProcessor processor;
private final MetricPublisher metricPublisher;
private final ScheduledExecutorService executor;
Expand All @@ -75,7 +77,8 @@ public FileIngestService(DeviceDriverConfig config) {
getTransactionTimeoutMinutes(),
getMinTimeInMillisToUpdateFile(),
config.getClassName(),
getLargeEventEnable());
getLargeEventEnable(),
getPscId());
LOG.info("File Ingest Config: {}", fileSequenceConfig);
final String scopeName = getScopeName();
LOG.info("Scope: {}", scopeName);
Expand All @@ -88,6 +91,10 @@ public FileIngestService(DeviceDriverConfig config) {
executor = Executors.newScheduledThreadPool(1, namedThreadFactory);
}

private String getPscId() {
return getProperty(PSC_ID);
}

String getFileSpec() {
return getProperty(FILE_SPEC_KEY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public static void main(String[] args) {
WatchDogConfig config = new WatchDogConfig(properties);
log.debug("Properties: {}", properties);
final WatchDogService service = new WatchDogService(config);
service.checkPscServiceStatus();
service.startAsync();
service.awaitTerminated();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.pravega.common.util.Property;

import java.io.File;
import java.text.MessageFormat;
import java.util.Map;

public class WatchDogConfig {
Expand All @@ -32,7 +33,7 @@ public class WatchDogConfig {
private final String serviceName;

public WatchDogConfig(Map<String, String> properties) {
this.serviceName = properties.getOrDefault(PSC_SERVICE_NAME.toString(), PSC_SERVICE_NAME.getDefaultValue());
this.serviceName = getProperty(properties, PSC_SERVICE_NAME.toString());
this.watchDogWatchIntervalSeconds = Integer.parseInt(properties.getOrDefault(PSC_WATCHDOG_WATCH_INTERVAL_SECONDS.toString(), PSC_WATCHDOG_WATCH_INTERVAL_SECONDS.getDefaultValue()));
this.watchdogFileMonitorPath = properties.getOrDefault(PSC_WATCHDOG_FILE_MONITOR_PATH.toString(), PSC_WATCHDOG_FILE_MONITOR_PATH.getDefaultValue());
this.restartTriggerPath = properties.getOrDefault(PSC_WATCHDOG_RESTART_TRIGGER_PATH.toString(), PSC_WATCHDOG_RESTART_TRIGGER_PATH.getDefaultValue());
Expand Down Expand Up @@ -79,4 +80,20 @@ public String getRestartTriggerPath() {
return restartTriggerPath;
}


/**
* Retrieves the value of a property from the given map of properties.
*
* @param properties the map of properties to retrieve the value from
* @param key the key of the property to retrieve
* @return the value of the property
*/
public String getProperty(Map<String, String> properties, String key) {
final String value = properties.get(key);
if (value == null || value.isEmpty()) {
throw new IllegalArgumentException(MessageFormat.format("Missing required parameter {0}", key));
}
return value;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@
package io.pravega.sensor.collector.watchdog;

import com.google.common.util.concurrent.AbstractService;
import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;


/**
* Watchdog service for Pravega Sensor Collector(PSC).
Expand Down Expand Up @@ -48,4 +53,28 @@ protected void doStop() {
notifyStopped();
}

public void checkPscServiceStatus() throws IOException {
Process psc = null;
log.info("Checking PSC service status {} ", this.config.getServiceName());
if (SystemUtils.IS_OS_LINUX) {
psc = Runtime.getRuntime().exec(new String[]{"sh", "-c", "systemctl status " + this.config.getServiceName()});
} else if ( SystemUtils.IS_OS_WINDOWS) {
psc = Runtime.getRuntime().exec(new String[]{"cmd.exe", "/c", this.config.getServiceName() + ".exe", "status"});
}
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(psc.getInputStream()));

String s = null;
Boolean isAlive = true;
while ((s = stdInput.readLine()) != null) {
if(s.equalsIgnoreCase("NonExistent")){
isAlive = false;
}
}
log.debug("Process psc {}, and isAlive value is {} ", psc, isAlive);
if(!isAlive) {
log.error("PSC service is not running");
throw new RuntimeException("PSC service is not running. Please start psc service before starting watchdog service.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void createRAWFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000l,"RawFileIngestService", true);
true,20.0, 5000l,"RawFileIngestService", true, "psc1");
FileProcessor rawFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId");

Assertions.assertTrue(rawFileProcessor instanceof RawFileProcessor);
Expand All @@ -61,7 +61,7 @@ public void createCSVFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000L,"CsvFileIngestService", false);
true,20.0, 5000L,"CsvFileIngestService", false, "psc1");
FileProcessor csvFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId");

Assertions.assertTrue(csvFileProcessor instanceof CsvFileSequenceProcessor);
Expand All @@ -76,7 +76,7 @@ public void createParquetFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000L,"ParquetFileIngestService", false);
true,20.0, 5000L,"ParquetFileIngestService", false, "psc1");
FileProcessor parquetFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId");

Assertions.assertTrue(parquetFileProcessor instanceof ParquetFileProcessor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void setup() {
String stateDatabaseFileName = ":memory:";
config = new FileConfig("./psc.db","/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, true,
true,20.0, 5000,"RawFileIngestService", true);
true,20.0, 5000,"RawFileIngestService", true, "pscId");
}

@Test
Expand Down Expand Up @@ -214,7 +214,7 @@ public void testCreateRawFileProcessorWithNullTransactionCordinator() {
public void testCreateRawFileProcessorWithNullStateDatabaseFilenameInConfig() {
FileConfig newConfig = new FileConfig(null,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000,"RawFileIngestService", true);
true,20.0, 5000,"RawFileIngestService", true, "pscId");
Exception exception = Assert.assertThrows(NullPointerException.class, () -> new RawFileProcessor(newConfig, state, transactionalEventWriter, transactionCoordinator, "test"));
Assert.assertTrue("config.stateDatabaseFileName".equals(exception.getMessage()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class WatchdogServiceTests {

@Test
public void testWatchDogServiceStart() {
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().build());
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().put("PSC_SERVICE_NAME", "test").build());
WatchDogService service = new WatchDogService(config);
service.startAsync();
Assert.assertTrue(service.isRunning());
Expand All @@ -23,7 +23,7 @@ public void testWatchDogServiceStart() {

@Test
public void testWatchdogMonitorStart() {
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().build());
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().put("PSC_SERVICE_NAME", "test").build());
PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config);
monitor.startAsync();
Assert.assertTrue(monitor.isRunning());
Expand All @@ -33,7 +33,7 @@ public void testWatchdogMonitorStart() {

@Test
public void testWatchdogRestartPSC() {
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().build());
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().put("PSC_SERVICE_NAME", "test").build());
PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config);
monitor.onUpdateMissed();
monitor.onUpdateMissed();
Expand All @@ -45,7 +45,7 @@ public void testWatchdogRestartPSC() {

@Test
public void testWatchdogResetCounterOnUpdates() {
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().build());
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().put("PSC_SERVICE_NAME", "test").build());
PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config);
monitor.onUpdateMissed();
monitor.onUpdateMissed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=2.0
PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=true
PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000
PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=15
PRAVEGA_SENSOR_COLLECTOR_RAW1_PSC_ID=test
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=1.0
PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=true
PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000
PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=15
PRAVEGA_SENSOR_COLLECTOR_RAW1_PSC_ID=test

0 comments on commit af79218

Please sign in to comment.