Skip to content

Commit

Permalink
Resolve conflicts.
Browse files Browse the repository at this point in the history
Signed-off-by: Bhupender Y <[email protected]>
  • Loading branch information
Bhupender-Y committed Jan 24, 2024
2 parents e04f51e + 2040b56 commit 6f6380a
Show file tree
Hide file tree
Showing 37 changed files with 134 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
export pravega_client_auth_method=Bearer
export pravega_client_auth_loadDynamic=true
export KEYCLOAK_SERVICE_ACCOUNT_FILE=/opt/pravega-sensor-collector/conf/keycloak.json
export JAVA_OPTS="-Xmx512m"
export JAVA_OPTS="-Xmx512m -Dlogback.configurationFile=/opt/pravega-sensor-collector/bin/logback.xml"
export PRAVEGA_SENSOR_COLLECTOR_NET1_CLASS=io.pravega.sensor.collector.network.NetworkDriver
export PRAVEGA_SENSOR_COLLECTOR_NET1_NETWORK_INTERFACE=ens33
export PRAVEGA_SENSOR_COLLECTOR_NET1_MEMORY_QUEUE_CAPACITY_ELEMENTS=10000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
Expand All @@ -30,7 +31,7 @@ public abstract class DeviceDriver extends AbstractService implements AutoClosea
private static final String CREATE_SCOPE_KEY = "CREATE_SCOPE";

public DeviceDriver(DeviceDriverConfig config) {
this.config = config;
this.config = Preconditions.checkNotNull(config, "config");
LOGGER.info("Create Scope: {}", isCreateScope());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*/
package io.pravega.sensor.collector;

import com.google.common.base.Preconditions;

import java.util.Map;

public class DeviceDriverConfig {
Expand All @@ -18,10 +20,10 @@ public class DeviceDriverConfig {
private final DeviceDriverManager deviceDriverManager;

public DeviceDriverConfig(String instanceName, String className, Map<String, String> properties, DeviceDriverManager deviceDriverManager) {
this.instanceName = instanceName;
this.className = className;
this.properties = properties;
this.deviceDriverManager = deviceDriverManager;
this.instanceName = Preconditions.checkNotNull(instanceName, "instanceName");
this.className = Preconditions.checkNotNull(className, "className");
this.properties = Preconditions.checkNotNull(properties, "deviceDriverProperties");
this.deviceDriverManager = Preconditions.checkNotNull(deviceDriverManager, "deviceDriverManager");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -20,6 +21,7 @@ public class DeviceDriverFactory {
*/
DeviceDriver create(DeviceDriverConfig config) {
try {
Preconditions.checkNotNull(config, "deviceDriverConfig");
LOGGER.info("Creating device driver instance {} with class {}", config.getInstanceName(), config.getClassName());
final Class<?> deviceDriverClass = Class.forName(config.getClassName());
final DeviceDriver driver = (DeviceDriver) deviceDriverClass.getConstructor(DeviceDriverConfig.class).newInstance(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,7 +32,7 @@ public class DeviceDriverManager extends AbstractService {
private List<DeviceDriver> drivers;

public DeviceDriverManager(Map<String, String> properties) {
configs = configFromProperties(PREFIX, SEPARATOR, properties);
configs = configFromProperties(PREFIX, SEPARATOR, Preconditions.checkNotNull(properties, "properties"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector;

import com.google.common.base.Preconditions;
import io.pravega.client.ClientConfig;

import java.net.URI;
Expand All @@ -22,8 +23,8 @@ public class PravegaClientConfig {
private static final String PRAVEGA_CONTROLLER_URI_KEY = "PRAVEGA_CONTROLLER_URI";

public PravegaClientConfig(URI controllerURI, String scopeName) {
this.controllerURI = controllerURI;
this.scopeName = scopeName;
this.controllerURI = Preconditions.checkNotNull(controllerURI, "controllerURI");
this.scopeName = Preconditions.checkNotNull(scopeName, "scopeName");
}

public PravegaClientConfig(Map<String, String> properties, String scopeName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector;

import com.google.common.base.Preconditions;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import org.slf4j.Logger;
Expand All @@ -24,7 +25,7 @@ public class PravegaClientPool implements AutoCloseable {
private final Map<PravegaClientConfig, EventStreamClientFactory> eventStreamClientFactories = new HashMap<>();

public synchronized ClientConfig getClientConfig(PravegaClientConfig config) {
final ClientConfig clientConfig = clientConfigs.get(config);
final ClientConfig clientConfig = clientConfigs.get(Preconditions.checkNotNull(config, "pravegaClientConfig"));
if (clientConfig != null) {
log.info("Reusing client config for {}", config);
return clientConfig;
Expand All @@ -36,7 +37,7 @@ public synchronized ClientConfig getClientConfig(PravegaClientConfig config) {
}

public synchronized EventStreamClientFactory getEventStreamClientFactory(PravegaClientConfig config) {
final EventStreamClientFactory factory = eventStreamClientFactories.get(config);
final EventStreamClientFactory factory = eventStreamClientFactories.get(Preconditions.checkNotNull(config, "pravegaClientConfig"));
if (factory != null) {
log.info("Reusing client factory for {}", config);
return factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector.accelerometer;

import com.google.common.base.Preconditions;
import org.apache.commons.codec.binary.Hex;

/**
Expand All @@ -18,7 +19,7 @@ public class AccelerometerRawData {
public final byte[] bytes;

public AccelerometerRawData(byte[] bytes) {
this.bytes = bytes;
this.bytes = Preconditions.checkNotNull(bytes, "bytes");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public abstract class FileIngestService extends DeviceDriver {
private static final String EXACTLY_ONCE_KEY = "EXACTLY_ONCE";
private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES";
private static final String MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY = "MIN_TIME_IN_MILLIS_TO_UPDATE_FILE";
private static final String DELETE_COMPLETED_FILES_INTERVAL_IN_MINUTES_KEY = "DELETE_COMPLETED_FILES_INTERVAL_IN_MINUTES";
private static final String ENABLE_LARGE_EVENT = "ENABLE_LARGE_EVENT";

private static final int DEFAULT_SAMPLES_PER_EVENT_KEY = 100;
Expand All @@ -51,8 +52,9 @@ public abstract class FileIngestService extends DeviceDriver {
private final FileProcessor processor;
private final ScheduledExecutorService executor;

private ScheduledFuture<?> watchFiletask;
private ScheduledFuture<?> watchFileTask;
private ScheduledFuture<?> processFileTask;
private ScheduledFuture<?> deleteFileTask;

public FileIngestService(DeviceDriverConfig config) {
super(config);
Expand Down Expand Up @@ -135,6 +137,10 @@ long getMinTimeInMillisToUpdateFile() {
return Long.parseLong(getProperty(MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY, "5000"));
}

long getDeleteCompletedFilesIntervalInMinutes() {
return Long.parseLong(getProperty(DELETE_COMPLETED_FILES_INTERVAL_IN_MINUTES_KEY, "720"));
}

boolean getLargeEventEnable() {
return Boolean.parseBoolean(getProperty(ENABLE_LARGE_EVENT, Boolean.toString(false)));
}
Expand All @@ -160,9 +166,20 @@ protected void processFiles() {
LOG.trace("processFiles: END");
}

protected void deleteCompletedFiles() {
LOG.trace("deleteCompletedFiles: BEGIN");
try {
processor.deleteCompletedFiles();
} catch (Exception e) {
LOG.error("deleteCompletedFiles: Delete file error", e);
// Continue on any errors. We will retry on the next iteration.
}
LOG.trace("deleteCompletedFiles: END");
}

@Override
protected void doStart() {
watchFiletask = executor.scheduleAtFixedRate(
watchFileTask = executor.scheduleAtFixedRate(
this::watchFiles,
0,
getIntervalMs(),
Expand All @@ -177,13 +194,22 @@ protected void doStart() {
0,
1,
TimeUnit.MILLISECONDS);


deleteFileTask = executor.scheduleAtFixedRate(
this::deleteCompletedFiles,
1,
getDeleteCompletedFilesIntervalInMinutes(),
TimeUnit.MINUTES);

notifyStarted();
}

@Override
protected void doStop() {
LOG.info("doStop: Cancelling ingestion task and process file task");
watchFiletask.cancel(false);
watchFileTask.cancel(false);
processFileTask.cancel(false);
deleteFileTask.cancel(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TxnFailedException;
import io.pravega.client.stream.impl.ByteArraySerializer;
import io.pravega.keycloak.com.google.common.base.Preconditions;
import io.pravega.sensor.collector.util.EventWriter;
import io.pravega.sensor.collector.util.FileNameWithOffset;
import io.pravega.sensor.collector.util.FileUtils;
Expand Down Expand Up @@ -59,11 +60,12 @@ public abstract class FileProcessor {
private final Path movedFilesDirectory;

public FileProcessor(FileConfig config, TransactionStateDB state, EventWriter<byte[]> writer, TransactionCoordinator transactionCoordinator) {
this.config = config;
this.state = state;
this.writer = writer;
this.transactionCoordinator = transactionCoordinator;
this.eventGenerator = getEventGenerator(config);
this.config = Preconditions.checkNotNull(config, "config");
Preconditions.checkNotNull(config.stateDatabaseFileName, "config.stateDatabaseFileName");
this.state = Preconditions.checkNotNull(state, "state");
this.writer = Preconditions.checkNotNull(writer, "writer");
this.transactionCoordinator = Preconditions.checkNotNull(transactionCoordinator, "transactionCoordinator");
this.eventGenerator = Preconditions.checkNotNull(getEventGenerator(config), "eventGenerator");
this.movedFilesDirectory = Paths.get(config.stateDatabaseFileName).getParent();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.io.CountingInputStream;
import io.pravega.sensor.collector.file.EventGenerator;
import io.pravega.sensor.collector.util.PravegaWriterEvent;
Expand Down Expand Up @@ -41,10 +42,10 @@ public class CsvFileEventGenerator implements EventGenerator {
private final ObjectMapper mapper;

public CsvFileEventGenerator(String routingKey, int maxRecordsPerEvent, ObjectNode eventTemplate, ObjectMapper mapper) {
this.routingKey = routingKey;
this.routingKey = Preconditions.checkNotNull(routingKey, "routingKey");
this.maxRecordsPerEvent = maxRecordsPerEvent;
this.eventTemplate = eventTemplate;
this.mapper = mapper;
this.mapper = Preconditions.checkNotNull(mapper, "objectMapper");
}

public static CsvFileEventGenerator create(String routingKey, int maxRecordsPerEvent, String eventTemplateStr, String writerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.io.CountingInputStream;
import io.pravega.keycloak.com.google.common.base.Preconditions;
import io.pravega.sensor.collector.file.EventGenerator;
import io.pravega.sensor.collector.util.PravegaWriterEvent;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -57,10 +58,10 @@ public class ParquetEventGenerator implements EventGenerator {
private final ObjectMapper mapper;

public ParquetEventGenerator(String routingKey, int maxRecordsPerEvent, ObjectNode eventTemplate, ObjectMapper mapper) {
this.routingKey = routingKey;
this.routingKey = Preconditions.checkNotNull(routingKey, "routingKey");
this.maxRecordsPerEvent = maxRecordsPerEvent;
this.eventTemplate = eventTemplate;
this.mapper = mapper;
this.mapper = Preconditions.checkNotNull(mapper, "objectMapper");
}

public static ParquetEventGenerator create(String routingKey, int maxRecordsPerEvent, String eventTemplateStr, String writerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.io.CountingInputStream;
import io.pravega.sensor.collector.file.EventGenerator;
import io.pravega.sensor.collector.util.PravegaWriterEvent;
Expand All @@ -37,9 +38,9 @@ public class RawEventGenerator implements EventGenerator {
private final ObjectMapper mapper;

public RawEventGenerator(String routingKey, ObjectNode eventTemplate, ObjectMapper mapper) {
this.routingKey = routingKey;
this.routingKey = Preconditions.checkNotNull(routingKey, "routingKey");
this.eventTemplate = eventTemplate;
this.mapper = mapper;
this.mapper = Preconditions.checkNotNull(mapper, "objectMapper");
}

public static RawEventGenerator create(String routingKey, String eventTemplateStr, String writerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*/
package io.pravega.sensor.collector.network;

import com.google.common.base.Preconditions;

import java.util.List;

public class NetworkRawData {
Expand All @@ -17,7 +19,7 @@ public class NetworkRawData {

public NetworkRawData(long timestampNanos, List<Long> statisticValues) {
this.timestampNanos = timestampNanos;
this.statisticValues = statisticValues;
this.statisticValues = Preconditions.checkNotNull(statisticValues, "statisticValues");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package io.pravega.sensor.collector.network;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.pravega.sensor.collector.simple.Samples;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -45,8 +46,8 @@ public class NetworkSamples implements Samples {
public String lastTimestampFormatted;

public NetworkSamples(String remoteAddr, String interfaceName) {
this.remoteAddr = remoteAddr;
this.interfaceName = interfaceName;
this.remoteAddr = Preconditions.checkNotNull(remoteAddr, "remoteAddr");
this.interfaceName = Preconditions.checkNotNull(interfaceName, "interfaceName");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*/
package io.pravega.sensor.collector.network;

import com.google.common.base.Preconditions;

import java.io.RandomAccessFile;

public class NetworkStatisticFile {
Expand All @@ -19,8 +21,8 @@ public class NetworkStatisticFile {
public final RandomAccessFile randomAccessFile;

public NetworkStatisticFile(String interfaceName, String statisticName, RandomAccessFile randomAccessFile) {
this.interfaceName = interfaceName;
this.statisticName = statisticName;
this.randomAccessFile = randomAccessFile;
this.interfaceName = Preconditions.checkNotNull(interfaceName, "interfaceName");
this.statisticName = Preconditions.checkNotNull(statisticName, "statisticName");
this.randomAccessFile = Preconditions.checkNotNull(randomAccessFile, "randomAccessFile");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package io.pravega.sensor.collector.simple;

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import io.pravega.keycloak.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,9 +29,9 @@ public class DataCollectorService<R, S extends Samples> extends AbstractExecutio
private final SimpleDeviceDriver<R, S> driver;

public DataCollectorService(String instanceName, BlockingQueue<R> memoryQueue, SimpleDeviceDriver<R, S> driver) {
this.instanceName = instanceName;
this.memoryQueue = memoryQueue;
this.driver = driver;
this.instanceName = Preconditions.checkNotNull(instanceName, "instanceName");
this.memoryQueue = Preconditions.checkNotNull(memoryQueue, "memoryQueue");
this.driver = Preconditions.checkNotNull(driver, "driver");
}

@Override
Expand Down
Loading

0 comments on commit 6f6380a

Please sign in to comment.