From ce5691f6309ba106e7f22418bffefd8e6dad3178 Mon Sep 17 00:00:00 2001 From: Bhupender Y Date: Wed, 20 Dec 2023 22:56:10 -0800 Subject: [PATCH 1/6] Skip txn check for non txn write. Signed-off-by: Bhupender Y --- .../pravega/sensor/collector/file/FileProcessor.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java index 64f0022d..e0c17962 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java @@ -176,9 +176,14 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN /* Check if transactions can be aborted. * Will fail with {@link TxnFailedException} if the transaction has already been committed or aborted. */ - log.debug("processFile: Transaction status {} ", writer.getTransactionStatus()); - if(writer.getTransactionStatus() == Transaction.Status.OPEN){ - writer.abort(); + if (config.exactlyOnce) + { + log.debug("processFile: Transaction status {} ", writer.getTransactionStatus()); + if(writer.getTransactionStatus() == Transaction.Status.OPEN){ + writer.abort(); + } + } else { + log.debug("processFile: No need to check transaction status for non-transactional write."); } File pendingFile = new File(fileNameWithBeginOffset.fileName); From 1110aa09c66674ba561f6b24e8abea3d10e17d09 Mon Sep 17 00:00:00 2001 From: Bhupender Y Date: Thu, 21 Dec 2023 01:57:12 -0800 Subject: [PATCH 2/6] Enable large event in writer config. Signed-off-by: Bhupender Y --- .../java/io/pravega/sensor/collector/file/FileProcessor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java index e0c17962..ff58b864 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java @@ -77,6 +77,7 @@ public static FileProcessor create( EventWriterConfig.builder() .enableConnectionPooling(true) .transactionTimeoutTime((long) (config.transactionTimeoutMinutes * 60.0 * 1000.0)) + .enableLargeEvents(true) .build(), config.exactlyOnce); From 6b0215e401249d4f83d81c0267e8c17fdfac4bb0 Mon Sep 17 00:00:00 2001 From: Bhupender Y Date: Fri, 22 Dec 2023 04:38:35 -0800 Subject: [PATCH 3/6] Make large event support configurable. Signed-off-by: Bhupender Y --- .../io/pravega/sensor/collector/file/FileConfig.java | 10 +++++++++- .../sensor/collector/file/FileIngestService.java | 8 +++++++- .../sensor/collector/simple/SimpleDeviceDriver.java | 6 ++++++ .../simple/memoryless/SimpleMemorylessDriver.java | 7 +++++++ .../collector/stateful/StatefulSensorDeviceDriver.java | 6 ++++++ .../collector/file/FileProcessorFactoryTest.java | 6 +++--- .../sensor/collector/file/FileProcessorTests.java | 2 +- scripts/run-with-gradle-raw-file.sh | 2 +- windows-service/PravegaSensorCollectorApp.xml | 1 + 9 files changed, 41 insertions(+), 7 deletions(-) diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileConfig.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileConfig.java index d4b96d37..8ca5fa3f 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileConfig.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileConfig.java @@ -31,7 +31,13 @@ public class FileConfig { public final long minTimeInMillisToUpdateFile; - public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExtension, String routingKey, String streamName, String eventTemplateStr, int maxRecordsPerEvent, boolean enableDeleteCompletedFiles, boolean exactlyOnce, double transactionTimeoutMinutes, long minTimeInMillisToUpdateFile, String fileType) { + public final boolean enableLargeEvent; + + + public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExtension, String routingKey, + String streamName, String eventTemplateStr, int maxRecordsPerEvent, boolean enableDeleteCompletedFiles, + boolean exactlyOnce, double transactionTimeoutMinutes, long minTimeInMillisToUpdateFile, String fileType, + boolean enableLargeEvent) { this.stateDatabaseFileName = stateDatabaseFileName; this.fileSpec = fileSpec; this.fileExtension = fileExtension; @@ -44,6 +50,7 @@ public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExte this.transactionTimeoutMinutes = transactionTimeoutMinutes; this.minTimeInMillisToUpdateFile = minTimeInMillisToUpdateFile; this.fileType = fileType; + this.enableLargeEvent = enableLargeEvent; } @Override @@ -61,6 +68,7 @@ public String toString() { ", exactlyOnce=" + exactlyOnce + ", transactionTimeoutMinutes=" + transactionTimeoutMinutes + ", minTimeInMillisToUpdateFile=" + minTimeInMillisToUpdateFile + + ", enableLargeEvent=" + enableLargeEvent + '}'; } } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java index 6926a8c0..b55cbdb9 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java @@ -42,6 +42,7 @@ public abstract class FileIngestService extends DeviceDriver { private static final String EXACTLY_ONCE_KEY = "EXACTLY_ONCE"; private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES"; private static final String MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY = "MIN_TIME_IN_MILLIS_TO_UPDATE_FILE"; + private static final String ENABLE_LARGE_EVENT = "ENABLE_LARGE_EVENT"; private final FileProcessor processor; private final ScheduledExecutorService executor; @@ -63,7 +64,8 @@ public FileIngestService(DeviceDriverConfig config) { getExactlyOnce(), getTransactionTimeoutMinutes(), getMinTimeInMillisToUpdateFile(), - config.getClassName()); + config.getClassName(), + getLargeEventEnable()); log.info("File Ingest Config: {}", fileSequenceConfig); final String scopeName = getScopeName(); log.info("Scope: {}", scopeName); @@ -129,6 +131,10 @@ long getMinTimeInMillisToUpdateFile() { return Long.parseLong(getProperty(MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY, "5000")); } + boolean getLargeEventEnable() { + return Boolean.parseBoolean(getProperty(ENABLE_LARGE_EVENT, Boolean.toString(false))); + } + protected void watchFiles() { log.trace("watchFiles: BEGIN"); try { diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/SimpleDeviceDriver.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/SimpleDeviceDriver.java index 5a75b8ab..387f76f9 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/SimpleDeviceDriver.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/SimpleDeviceDriver.java @@ -43,6 +43,7 @@ public abstract class SimpleDeviceDriver extends DeviceDri private static final String DELAY_BETWEEN_WRITE_BATCHES_MS_KEY = "DELAY_BETWEEN_WRITE_BATCHES_MS"; private static final String EXACTLY_ONCE_KEY = "EXACTLY_ONCE"; private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES"; + private static final String ENABLE_LARGE_EVENT = "ENABLE_LARGE_EVENT"; private final String routingKey; private final DataCollectorService dataCollectorService; @@ -101,6 +102,7 @@ public SimpleDeviceDriver(DeviceDriverConfig config) { .enableConnectionPooling(true) .retryAttempts(Integer.MAX_VALUE) .transactionTimeoutTime((long) (transactionTimeoutMinutes * 60.0 * 1000.0)) + .enableLargeEvents(getLargeEventEnable()) .build(), exactlyOnce); @@ -155,6 +157,10 @@ boolean getExactlyOnce() { return Boolean.parseBoolean(getProperty(EXACTLY_ONCE_KEY, Boolean.toString(true))); } + boolean getLargeEventEnable() { + return Boolean.parseBoolean(getProperty(ENABLE_LARGE_EVENT, Boolean.toString(false))); + } + /** * This time duration must not exceed the controller property controller.transaction.maxLeaseValue (milliseconds). */ diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/SimpleMemorylessDriver.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/SimpleMemorylessDriver.java index 5b22e7d8..11886065 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/SimpleMemorylessDriver.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/SimpleMemorylessDriver.java @@ -30,6 +30,7 @@ public abstract class SimpleMemorylessDriver extends DeviceDriver { private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES"; private static final String ROUTING_KEY_KEY = "ROUTING_KEY"; private static final String SENSOR_POLL_PERIODICITY_MS = "POLL_PERIODICITY_MS"; + private static final String ENABLE_LARGE_EVENT = "ENABLE_LARGE_EVENT"; private final DataCollectorService dataCollectorService; private final EventStreamClientFactory clientFactory; @@ -58,6 +59,7 @@ public SimpleMemorylessDriver(DeviceDriverConfig config) { .enableConnectionPooling(true) .retryAttempts(Integer.MAX_VALUE) .transactionTimeoutTime((long) (transactionTimeoutMinutes * 60.0 * 1000.0)) + .enableLargeEvents(getLargeEventEnable()) .build(), exactlyOnce); dataCollectorService = new DataCollectorService<>(config.getInstanceName(), this, writer, readPeriodicityMs); @@ -134,6 +136,11 @@ private long getReadPeriodicityMs() { return Long.parseLong(getProperty(SENSOR_POLL_PERIODICITY_MS, Integer.toString(10))); } + boolean getLargeEventEnable() { + return Boolean.parseBoolean(getProperty(ENABLE_LARGE_EVENT, Boolean.toString(false))); + } + + public String getRoutingKey() { return routingKey; } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/stateful/StatefulSensorDeviceDriver.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/stateful/StatefulSensorDeviceDriver.java index 3138c9b4..8030002f 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/stateful/StatefulSensorDeviceDriver.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/stateful/StatefulSensorDeviceDriver.java @@ -37,6 +37,7 @@ abstract public class StatefulSensorDeviceDriver extends DeviceDriver { private static final String DELAY_BETWEEN_WRITE_BATCHES_MS_KEY = "DELAY_BETWEEN_WRITE_BATCHES_MS"; private static final String EXACTLY_ONCE_KEY = "EXACTLY_ONCE"; private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES"; + private static final String ENABLE_LARGE_EVENT = "ENABLE_LARGE_EVENT"; private final String routingKey; private final DataCollectorService dataCollectorService; @@ -85,6 +86,7 @@ public StatefulSensorDeviceDriver(DeviceDriverConfig config) { .enableConnectionPooling(true) .retryAttempts(Integer.MAX_VALUE) .transactionTimeoutTime((long) (transactionTimeoutMinutes * 60.0 * 1000.0)) + .enableLargeEvents(getLargeEventEnable()) .build(), exactlyOnce); @@ -141,6 +143,10 @@ String getStreamName() { return getProperty(STREAM_KEY); } + boolean getLargeEventEnable() { + return Boolean.parseBoolean(getProperty(ENABLE_LARGE_EVENT, Boolean.toString(false))); + } + @Override protected void doStart() { persistentQueueToPravegaService.startAsync(); diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorFactoryTest.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorFactoryTest.java index d1d63ee5..be1ec153 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorFactoryTest.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorFactoryTest.java @@ -37,7 +37,7 @@ public void createRAWFileProcessorTest() throws Exception { String stateDatabaseFileName = ":memory:"; config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12", "stream1","{}",10, false, - true,20.0, 5000l,"RawFileIngestService"); + true,20.0, 5000l,"RawFileIngestService", true); FileProcessor rawFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId"); Assertions.assertTrue(rawFileProcessor instanceof RawFileProcessor); @@ -52,7 +52,7 @@ public void createCSVFileProcessorTest() throws Exception { String stateDatabaseFileName = ":memory:"; config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12", "stream1","{}",10, false, - true,20.0, 5000L,"CsvFileIngestService"); + true,20.0, 5000L,"CsvFileIngestService", false); FileProcessor csvFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId"); Assertions.assertTrue(csvFileProcessor instanceof CsvFileSequenceProcessor); @@ -67,7 +67,7 @@ public void createParquetFileProcessorTest() throws Exception { String stateDatabaseFileName = ":memory:"; config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12", "stream1","{}",10, false, - true,20.0, 5000L,"ParquetFileIngestService"); + true,20.0, 5000L,"ParquetFileIngestService", false); FileProcessor parquetFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId"); Assertions.assertTrue(parquetFileProcessor instanceof ParquetFileProcessor); diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java index 83e6f247..2ba1cf09 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java @@ -58,7 +58,7 @@ public void setup(){ String stateDatabaseFileName = ":memory:"; config = new FileConfig("./psc.db","/opt/pravega-sensor-collector/Files/A","parquet","key12", "stream1","{}",10, false, - true,20.0, 5000,"RawFileIngestService"); + true,20.0, 5000,"RawFileIngestService", true); } @Test diff --git a/scripts/run-with-gradle-raw-file.sh b/scripts/run-with-gradle-raw-file.sh index feaffdfd..7e378fc7 100644 --- a/scripts/run-with-gradle-raw-file.sh +++ b/scripts/run-with-gradle-raw-file.sh @@ -30,5 +30,5 @@ export PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES=false export PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=2.0 export PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=false export PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000 - +export PRAVEGA_SENSOR_COLLECTOR_RAW1_ENABLE_LARGE_EVENT=true ./gradlew --no-daemon run diff --git a/windows-service/PravegaSensorCollectorApp.xml b/windows-service/PravegaSensorCollectorApp.xml index 05d75604..48293325 100644 --- a/windows-service/PravegaSensorCollectorApp.xml +++ b/windows-service/PravegaSensorCollectorApp.xml @@ -27,6 +27,7 @@ + From 8372430a0bf84a0b30989010f932679a321eda7b Mon Sep 17 00:00:00 2001 From: Bhupender Y Date: Tue, 26 Dec 2023 23:12:57 -0800 Subject: [PATCH 4/6] Read value from config. Signed-off-by: Bhupender Y --- .../java/io/pravega/sensor/collector/file/FileProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java index 385b2614..e24f5558 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java @@ -77,7 +77,7 @@ public static FileProcessor create( EventWriterConfig.builder() .enableConnectionPooling(true) .transactionTimeoutTime((long) (config.transactionTimeoutMinutes * 60.0 * 1000.0)) - .enableLargeEvents(true) + .enableLargeEvents(config.enableLargeEvent) .build(), config.exactlyOnce); From 0c1cfff12a9a69c6314e3df00d7577d7e8125243 Mon Sep 17 00:00:00 2001 From: Bhupender Y Date: Thu, 4 Jan 2024 21:28:09 -0800 Subject: [PATCH 5/6] Correct indentation. Signed-off-by: Bhupender Y --- .../java/io/pravega/sensor/collector/file/FileProcessor.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java index e24f5558..d1eb10e6 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java @@ -177,8 +177,7 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN /* Check if transactions can be aborted. * Will fail with {@link TxnFailedException} if the transaction has already been committed or aborted. */ - if (config.exactlyOnce) - { + if (config.exactlyOnce) { log.debug("processFile: Transaction status {} ", writer.getTransactionStatus()); if(writer.getTransactionStatus() == Transaction.Status.OPEN){ writer.abort(); From fa2e5cd32b224f2be703cb90d5ba10486ba46acc Mon Sep 17 00:00:00 2001 From: Bhupender Y Date: Mon, 8 Jan 2024 22:33:17 -0800 Subject: [PATCH 6/6] Update doc. Signed-off-by: Bhupender Y --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 6930232b..f757caf5 100644 --- a/README.md +++ b/README.md @@ -164,6 +164,8 @@ For a list of commonly-used configuration values, see the | `PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES` | `false` | If true, PSC immediately delete the file soon after processing | | `PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES` | `2.0` | Timeout for each transaction. Default value is 2 minutes | | `PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE` | `false` | If Pravega is on SDP, set this to `false`. Accept Boolean value. | +| `PRAVEGA_SENSOR_COLLECTOR_RAW1_EXACTLY_ONCE` | true | If true, it will use transactional write. For raw file ingestion it is recommended to set it as false as in transactional write, client can process maximum file size of 8mb. | +| `PRAVEGA_SENSOR_COLLECTOR_RAW1_ENABLE_LARGE_EVENT` | false | if false, will not allow to write large event. It is recommended to set it as true for non transactional write. | | `HADOOP_HOME` | `${HOME}/dev` | For windows, Hadoop requires native libraries on Windows to work properly. You can download `Winutils.exe` to fix this.
See [here](https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems). Add the location of bin/winutils.exe in the parameter HADOOP_HOME.
**This is required only for Parquet file type not for CSV and Raw file ingestion type** |