Skip to content

Commit

Permalink
Fix default watch namespacing (#97)
Browse files Browse the repository at this point in the history
* Fix default namespacing

* remove comment

* checkstyle fixes
  • Loading branch information
jogrogan authored Jan 29, 2025
1 parent 9ee160d commit 00e7864
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.kubernetes.client.apimachinery.GroupVersion;
import io.kubernetes.client.common.KubernetesListObject;
Expand All @@ -25,15 +27,18 @@

public class K8sContext {

private final static Logger LOG = LoggerFactory.getLogger(K8sContext.class);
public static final String DEFAULT_NAMESPACE = "default";
private static final String ENV_OVERRIDE_BASEPATH = "KUBECONFIG_BASEPATH";
private static K8sContext currentContext = null;

private final String name;
private final String namespace;
private final ApiClient apiClient;
private ApiClient apiClient;
private final SharedInformerFactory informerFactory;

public K8sContext(String name, String namespace, ApiClient apiClient) {
LOG.info("K8sContext created for namespace: {}", namespace);
this.name = name;
this.namespace = namespace;
this.apiClient = apiClient;
Expand All @@ -44,6 +49,12 @@ public ApiClient apiClient() {
return apiClient;
}

// Assigning a new api client should only happen once right after context creation.
// Re-assigning a new api client can have unexpected consequences.
public void apiClient(ApiClient apiClient) {
this.apiClient = apiClient;
}

public String name() {
return name;
}
Expand All @@ -57,8 +68,8 @@ public SharedInformerFactory informerFactory() {
}

public <T extends KubernetesObject, U extends KubernetesListObject> void registerInformer(
K8sApiEndpoint<T, U> endpoint, Duration resyncPeriod) {
informerFactory.sharedIndexInformerFor(generic(endpoint), endpoint.elementType(), resyncPeriod.toMillis());
K8sApiEndpoint<T, U> endpoint, Duration resyncPeriod, String watchNamespace) {
informerFactory.sharedIndexInformerFor(generic(endpoint), endpoint.elementType(), resyncPeriod.toMillis(), watchNamespace);
}

public DynamicKubernetesApi dynamic(String apiVersion, String plural) {
Expand Down Expand Up @@ -113,19 +124,13 @@ static K8sContext defaultContext() throws IOException {
KubeConfig kubeConfig = KubeConfig.loadKubeConfig(r);
kubeConfig.setFile(file);
ApiClient apiClient = addEnvOverrides(kubeConfig).build();
String namespace = Optional.ofNullable(kubeConfig.getNamespace()).orElse("default");
String namespace = Optional.ofNullable(kubeConfig.getNamespace()).orElse(DEFAULT_NAMESPACE);
return new K8sContext(kubeConfig.getCurrentContext(), namespace, apiClient);
}
} else {
ApiClient apiClient = Config.defaultClient();
String filePath = System.getenv("POD_NAMESPACE_FILEPATH");
String namespace;
if (filePath == null) {
namespace = "default";
} else {
namespace = new String(Files.readAllBytes(Paths.get(filePath)));
}
return new K8sContext("default", namespace, apiClient);
String namespace = getNamespace();
return new K8sContext(namespace, namespace, apiClient);
}
}

Expand All @@ -139,4 +144,16 @@ private static ClientBuilder addEnvOverrides(KubeConfig kubeConfig) throws IOExc

return builder;
}

private static String getNamespace() throws IOException {
String filePath = System.getenv("POD_NAMESPACE_FILEPATH");
if (filePath != null) {
return new String(Files.readAllBytes(Paths.get(filePath)));
}
String namespace = System.getProperty("SELF_POD_NAMESPACE");
if (namespace != null) {
return namespace;
}
return DEFAULT_NAMESPACE;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.hoptimator.operator;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
Expand All @@ -23,6 +24,7 @@
import io.kubernetes.client.util.Config;

import com.linkedin.hoptimator.catalog.Resource;
import com.linkedin.hoptimator.k8s.K8sApiEndpoints;
import com.linkedin.hoptimator.k8s.K8sContext;
import com.linkedin.hoptimator.models.V1alpha1Subscription;
import com.linkedin.hoptimator.models.V1alpha1SubscriptionList;
Expand All @@ -35,17 +37,17 @@ public class HoptimatorOperatorApp {
private static final Logger log = LoggerFactory.getLogger(HoptimatorOperatorApp.class);

final String url;
final String namespace;
final String watchNamespace;
final ApiClient apiClient;
final Predicate<V1alpha1Subscription> subscriptionFilter;
final Properties properties;
final Resource.Environment environment;

/** This constructor is likely to evolve and break. */
public HoptimatorOperatorApp(String url, String namespace, ApiClient apiClient,
public HoptimatorOperatorApp(String url, String watchNamespace, ApiClient apiClient,
Predicate<V1alpha1Subscription> subscriptionFilter, Properties properties) {
this.url = url;
this.namespace = namespace;
this.watchNamespace = watchNamespace;
this.apiClient = apiClient;
this.subscriptionFilter = subscriptionFilter;
this.properties = properties;
Expand All @@ -59,9 +61,10 @@ public static void main(String[] args) throws Exception {

Options options = new Options();

Option namespace = new Option("n", "namespace", true, "specified namespace");
namespace.setRequired(false);
options.addOption(namespace);
Option watchNamespace = new Option("w", "watch", true,
"namespace to watch for resource operations, empty string indicates all namespaces");
watchNamespace.setRequired(false);
options.addOption(watchNamespace);

CommandLineParser parser = new DefaultParser();
HelpFormatter formatter = new HelpFormatter();
Expand All @@ -78,9 +81,10 @@ public static void main(String[] args) throws Exception {
}

String urlInput = cmd.getArgs()[0];
String namespaceInput = cmd.getOptionValue("namespace", "default");
String watchNamespaceInput = cmd.getOptionValue("watch", "");

new HoptimatorOperatorApp(urlInput, namespaceInput, Config.defaultClient(), null, new Properties()).run();
new HoptimatorOperatorApp(urlInput, watchNamespaceInput,
Config.defaultClient(), null, new Properties()).run();
}

public void run() throws Exception {
Expand All @@ -91,15 +95,18 @@ public void run() throws Exception {

apiClient.setHttpClient(apiClient.getHttpClient().newBuilder().readTimeout(0, TimeUnit.SECONDS).build());
SharedInformerFactory informerFactory = new SharedInformerFactory(apiClient);
Operator operator = new Operator(namespace, apiClient, informerFactory, properties);
Operator operator = new Operator(watchNamespace, apiClient, informerFactory, properties);
K8sContext context = K8sContext.currentContext();
context.apiClient(apiClient);

operator.registerApi("Subscription", "subscription", "subscriptions", "hoptimator.linkedin.com", "v1alpha1",
V1alpha1Subscription.class, V1alpha1SubscriptionList.class);

List<Controller> controllers = new ArrayList<>();
controllers.addAll(ControllerService.controllers(operator));
controllers.add(SubscriptionReconciler.controller(operator, plannerFactory, environment, subscriptionFilter));

context.registerInformer(K8sApiEndpoints.PIPELINES, Duration.ofMinutes(5), watchNamespace);
controllers.add(PipelineReconciler.controller(context));

ControllerManager controllerManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -18,15 +25,44 @@
public class PipelineOperatorApp {
private static final Logger log = LoggerFactory.getLogger(PipelineOperatorApp.class);

final String watchNamespace;

public PipelineOperatorApp(String watchNamespace) {
this.watchNamespace = watchNamespace;
}

public static void main(String[] args) throws Exception {
new PipelineOperatorApp().run();
Options options = new Options();

Option watchNamespace = new Option("w", "watch", true,
"namespace to watch for resource operations, empty string indicates all namespaces");
watchNamespace.setRequired(false);
options.addOption(watchNamespace);

CommandLineParser parser = new DefaultParser();
HelpFormatter formatter = new HelpFormatter();
CommandLine cmd;

try {
cmd = parser.parse(options, args);
} catch (ParseException e) {
System.out.println(e.getMessage());
formatter.printHelp("pipeline-operator", options);

System.exit(1);
return;
}

String watchNamespaceInput = cmd.getOptionValue("watch", "");

new PipelineOperatorApp(watchNamespaceInput).run();
}

public void run() throws Exception {
K8sContext context = K8sContext.currentContext();

// register informers
context.registerInformer(K8sApiEndpoints.PIPELINES, Duration.ofMinutes(5));
context.registerInformer(K8sApiEndpoints.PIPELINES, Duration.ofMinutes(5), watchNamespace);

List<Controller> controllers = new ArrayList<>();
// TODO: add additional controllers from ControllerProvider SPI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ protected Duration pendingRetryDuration() {
}

public static Controller controller(K8sContext context) {
// Duplicate call, only needed while still using HoptimatorOperatorApp,
// when removed in favor of PipelineOperatorApp this call is redundant
context.registerInformer(K8sApiEndpoints.PIPELINES, Duration.ofMinutes(5));

Reconciler reconciler = new PipelineReconciler(context);
return ControllerBuilder.defaultBuilder(context.informerFactory())
.withReconciler(reconciler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,14 @@
import org.apache.calcite.adapter.jdbc.JdbcTable;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlDialectFactory;
import org.apache.calcite.sql.SqlDialectFactoryImpl;

import com.linkedin.hoptimator.Database;
import com.linkedin.hoptimator.Engine;
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcConvention;


public class HoptimatorJdbcSchema extends JdbcSchema implements Database {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,8 @@
import java.util.Collection;
import java.util.List;

import com.linkedin.hoptimator.util.DataTypeUtils;

import org.apache.calcite.adapter.java.AbstractQueryableTable;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.adapter.jdbc.JdbcTable;
import org.apache.calcite.adapter.jdbc.JdbcTableScan;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.linq4j.Queryable;
import org.apache.calcite.plan.RelOptCluster;
Expand All @@ -20,10 +16,10 @@
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.ModifiableTable;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTableQueryable;

import com.linkedin.hoptimator.util.DataTypeUtils;


public class HoptimatorJdbcTable extends AbstractQueryableTable implements TranslatableTable,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
package com.linkedin.hoptimator.util.planner;

import org.apache.calcite.adapter.jdbc.JdbcTable;
import java.util.List;

import org.apache.calcite.adapter.jdbc.JdbcTableScan;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.hint.RelHint;

import com.google.common.collect.ImmutableList;

import java.util.List;

import static org.apache.calcite.linq4j.Nullness.castNonNull;


public class HoptimatorJdbcTableScan extends JdbcTableScan {
public final HoptimatorJdbcTable jdbcTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
RelDataType key = rel(keySchema, typeFactory);
RelDataType value = rel(valueSchema, typeFactory);
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
builder.addAll(value.getFieldList());
if (key.isStruct()) {
for (RelDataTypeField field: key.getFieldList()) {
builder.add(KEY_PREFIX + field.getName(), field.getType());
}
} else {
builder.add("KEY", key);
}
builder.addAll(value.getFieldList());
RelDataType combinedSchema = builder.build();
return DataTypeUtils.flatten(combinedSchema, typeFactory);
}
Expand Down
6 changes: 3 additions & 3 deletions hoptimator-venice/src/test/resources/venice-ddl-insert-all.id
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ spec:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store`
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE-CLUSTER0`.`test-store`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ spec:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY')
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS SIGNED) AS `intField`, `KEY_id` FROM `VENICE-CLUSTER0`.`test-store`
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`KEY_id`, `intField`) SELECT `KEY_id`, CAST(`stringField` AS SIGNED) AS `intField` FROM `VENICE-CLUSTER0`.`test-store`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
Expand Down
6 changes: 3 additions & 3 deletions hoptimator-venice/src/test/resources/venice-ddl-select.id
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ spec:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH ()
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY')
- CREATE DATABASE IF NOT EXISTS `PIPELINE` WITH ()
- CREATE TABLE IF NOT EXISTS `PIPELINE`.`SINK` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ()
- INSERT INTO `PIPELINE`.`SINK` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store-1`
- CREATE TABLE IF NOT EXISTS `PIPELINE`.`SINK` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ()
- INSERT INTO `PIPELINE`.`SINK` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE-CLUSTER0`.`test-store-1`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
Expand Down

0 comments on commit 00e7864

Please sign in to comment.