Skip to content

Commit

Permalink
Add support for external engines (#84)
Browse files Browse the repository at this point in the history
* Add Engine CRD

* Use datagen and blackhole connectors for demodb

* Add fat driver for integration tests
  • Loading branch information
ryannedolan authored Jan 20, 2025
1 parent f10e6e8 commit 0e19953
Show file tree
Hide file tree
Showing 58 changed files with 2,321 additions and 66 deletions.
33 changes: 16 additions & 17 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@

install:
./gradlew compileJava installDist shadowJar
./gradlew compileJava installDist

test:
./gradlew test -x spotbugsMain -x spotbugsTest -x spotbugsTestFixtures

build:
./gradlew build
./gradlew build shadowJar
docker build . -t hoptimator
docker build hoptimator-flink-runner -f hoptimator-flink-runner/Dockerfile-flink-runner -t hoptimator-flink-runner
docker build hoptimator-flink-runner -f hoptimator-flink-runner/Dockerfile-flink-operator -t hoptimator-flink-operator

bounce: build undeploy deploy deploy-samples deploy-config deploy-demo
bounce: build undeploy deploy deploy-samples deploy-config

clean:
./gradlew clean
Expand All @@ -34,12 +34,6 @@ undeploy: undeploy-config

quickstart: build deploy

deploy-demo: deploy
kubectl apply -f ./deploy/samples/demodb.yaml

undeploy-demo: undeploy
kubectl delete -f ./deploy/samples/demodb.yaml

deploy-samples: deploy
kubectl wait --for=condition=Established=True \
crds/subscriptions.hoptimator.linkedin.com \
Expand Down Expand Up @@ -72,8 +66,6 @@ deploy-kafka: deploy deploy-flink
kubectl create namespace kafka || echo "skipping"
kubectl apply -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
kubectl apply -f ./deploy/dev
kubectl apply -f ./deploy/samples/demodb.yaml
kubectl apply -f ./deploy/samples/kafkadb.yaml

undeploy-kafka:
Expand All @@ -83,7 +75,6 @@ undeploy-kafka:
kubectl delete -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka || echo "skipping"
kubectl delete -f ./deploy/samples/kafkadb.yaml || echo "skipping"
kubectl delete -f ./deploy/samples/demodb.yaml || echo "skipping"
kubectl delete -f ./deploy/dev || echo "skipping"
kubectl delete namespace kafka || echo "skipping"

# Deploys Venice cluster in docker and creates two stores in Venice. Stores are not managed via K8s for now.
Expand All @@ -98,11 +89,19 @@ undeploy-venice:
docker compose -f ./deploy/docker/venice/docker-compose-single-dc-setup.yaml down

deploy-dev-environment: deploy deploy-flink deploy-kafka deploy-venice
kubectl wait --for=condition=Established=True \
crds/subscriptions.hoptimator.linkedin.com \
crds/kafkatopics.hoptimator.linkedin.com \
crds/sqljobs.hoptimator.linkedin.com
kubectl apply -f ./deploy/dev/
kubectl apply -f ./deploy/samples/demodb.yaml

undeploy-dev-environment: undeploy-venice undeploy-kafka undeploy-flink undeploy
kubectl delete -f ./deploy/dev || echo "skipping"
kubectl delete -f ./deploy/samples/demodb.yaml || echo "skipping"

# Integration test setup intended to be run locally
integration-tests: deploy-dev-environment deploy-samples
integration-tests: deploy-dev-environment
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka
Expand All @@ -111,7 +110,7 @@ integration-tests: deploy-dev-environment deploy-samples
kill `cat port-forward.pid`

# kind cluster used in github workflow needs to have different routing set up, avoiding the need to forward kafka ports
integration-tests-kind: deploy-dev-environment deploy-samples
integration-tests-kind: deploy-dev-environment
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka
Expand All @@ -125,15 +124,15 @@ release:
test -n "$(VERSION)" # MISSING ARG: $$VERSION
./gradlew publish

build-zeppelin:
build-zeppelin: build
docker build -t hoptimator-zeppelin -t hoptimator-zeppelin:0.11.2 -f ./deploy/docker/zeppelin/Dockerfile-zeppelin .

# attaches to terminal (not run as daemon)
run-zeppelin:
run-zeppelin: build-zeppelin
docker run --rm -p 8080:8080 \
--volume=${HOME}/.kube/config:/opt/zeppelin/.kube/config \
--add-host=docker-for-desktop:host-gateway \
--name hoptimator-zeppelin \
hoptimator-zeppelin

.PHONY: install test build bounce clean quickstart deploy-config undeploy-config deploy undeploy deploy-demo undeploy-demo deploy-samples undeploy-samples deploy-flink undeploy-flink deploy-kafka undeploy-kafka deploy-venice undeploy-venice build-zeppelin run-zeppelin integration-tests integration-tests-kind deploy-dev-environment undeploy-dev-environment generate-models release
.PHONY: install test build bounce clean quickstart deploy-config undeploy-config deploy undeploy deploy-samples undeploy-samples deploy-flink undeploy-flink deploy-kafka undeploy-kafka deploy-venice undeploy-venice build-zeppelin run-zeppelin integration-tests integration-tests-kind deploy-dev-environment undeploy-dev-environment generate-models release
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Once the Flink deployment pod has STATUS 'Running', you can forward port 8081 an
to access the Flink dashboard.

```
$ kubectl port-forward basic-session-deployment-7b94b98b6b-d6jt5 8081 &
$ kubectl port-forward svc/basic-session-deployment-rest 8081 &
```

See the [Flink SQL Gateway Documentation](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql-gateway/overview/)
Expand Down
4 changes: 3 additions & 1 deletion deploy/docker/flink/docker-compose-sql-gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ services:
image: flink:1.18.1
restart: unless-stopped
entrypoint: >
/bin/sh -c "./bin/sql-gateway.sh start-foreground -Dsql-gateway.endpoint.rest.address=localhost"
/bin/sh -c "./bin/sql-gateway.sh start-foreground -Dsql-gateway.endpoint.rest.address=localhost -Drest.address=host.docker.internal"
ports:
- 8083:8083
deploy:
resources:
limits:
memory: 1024M
extra_hosts:
- "host.docker.internal:host-gateway"
2 changes: 1 addition & 1 deletion deploy/docker/zeppelin/Dockerfile-zeppelin
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
FROM apache/zeppelin:0.11.2

# Add hoptimator fat jar to jdbc interpreter directory
ADD ./hoptimator-jdbc-driver/build/libs/hoptimator-jdbc-driver-all.jar /opt/zeppelin/interpreter/jdbc/hoptimator-jdbc-driver-all.jar
ADD ./hoptimator-jdbc-driver-int/build/libs/hoptimator-jdbc-driver-int-all.jar /opt/zeppelin/interpreter/jdbc/hoptimator-jdbc-driver-int-all.jar

# local copy of hoptimator-cli (for debugging)
ADD ./hoptimator-cli/build/install/hoptimator-cli /opt/hoptimator/hoptimator-cli/build/install/hoptimator-cli/
Expand Down
2 changes: 1 addition & 1 deletion deploy/docker/zeppelin/interpreter.json
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@
],
"dependencies": [
{
"groupArtifactVersion": "/opt/zeppelin/interpreter/jdbc/hoptimator-jdbc-driver-all.jar",
"groupArtifactVersion": "/opt/zeppelin/interpreter/jdbc/hoptimator-jdbc-driver-int-all.jar",
"local": false
}
],
Expand Down
24 changes: 20 additions & 4 deletions deploy/samples/demodb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,29 @@ spec:
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: TableTemplate
metadata:
name: demodb-template
name: demodb-read-template
spec:
databases:
- profile-database
- ads-database
methods:
- Scan
connector: |
connector = demo
database = {{database}}
table = {{table}}
connector = datagen
number-of-rows = 10
---

apiVersion: hoptimator.linkedin.com/v1alpha1
kind: TableTemplate
metadata:
name: demodb-write-template
spec:
databases:
- profile-database
- ads-database
methods:
- Modify
connector: |
connector = blackhole
2 changes: 1 addition & 1 deletion deploy/samples/flinkDeployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ spec:
imagePullPolicy: Never
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "3"
taskmanager.numberOfTaskSlots: "6"
serviceAccount: flink
jobManager:
resource:
Expand Down
8 changes: 8 additions & 0 deletions deploy/samples/flinkengine.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: Engine
metadata:
name: flink-engine
spec:
url: jdbc:flink://localhost:8083
dialect: Flink

1 change: 1 addition & 0 deletions generate-models.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ docker run \
-u "$(pwd)/hoptimator-k8s/src/main/resources/tabletemplates.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/views.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/subscriptions.crd.yaml" \
-u "$(pwd)/hoptimator-k8s/src/main/resources/engines.crd.yaml" \
&& echo "done."
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ flink-clients = "org.apache.flink:flink-clients:1.18.1"
flink-connector-base = "org.apache.flink:flink-connector-base:1.18.1"
flink-core = "org.apache.flink:flink-core:1.18.1"
flink-csv = "org.apache.flink:flink-csv:1.18.1"
flink-jdbc = "org.apache.flink:flink-sql-jdbc-driver-bundle:1.18.1"
flink-streaming-java = "org.apache.flink:flink-streaming-java:1.18.1"
flink-table-api-java = "org.apache.flink:flink-table-api-java:1.18.1"
flink-table-api-java-bridge = "org.apache.flink:flink-table-api-java-bridge:1.18.1"
Expand Down
5 changes: 4 additions & 1 deletion hoptimator
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

BASEDIR="$( cd "$( dirname "$0" )" && pwd )"

$BASEDIR/hoptimator-cli/build/install/hoptimator-cli/bin/hoptimator-cli sqlline.SqlLine \
$BASEDIR/hoptimator-cli/build/install/hoptimator-cli/bin/hoptimator-cli \
-Dorg.slf4j.simpleLogger.showThreadName=false \
-Dorg.slf4j.simpleLogger.showLogName=false \
sqlline.SqlLine \
-ac sqlline.HoptimatorAppConfig \
-u jdbc:hoptimator:// -n "" -p "" -nn "Hoptimator" $@

15 changes: 15 additions & 0 deletions hoptimator-api/src/main/java/com/linkedin/hoptimator/Engine.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.linkedin.hoptimator;

import javax.sql.DataSource;

/** An execution engine. */
public interface Engine {

String engineName();

DataSource dataSource();

SqlDialect dialect();

String url();
}
1 change: 1 addition & 0 deletions hoptimator-cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies {
implementation libs.calcite.core
implementation libs.sqlline
implementation libs.slf4j.simple
implementation libs.flink.jdbc
}

publishing {
Expand Down
24 changes: 24 additions & 0 deletions hoptimator-jdbc-driver-int/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/** fat jar for integration tests */

plugins {
id 'com.gradleup.shadow' version '8.3.5'
id 'java'
}

dependencies {
// include various plugins
implementation project(':hoptimator-avro')
implementation project(':hoptimator-demodb')
implementation project(":hoptimator-kafka")
implementation project(":hoptimator-venice")
implementation libs.flink.jdbc

implementation project(':hoptimator-jdbc')
implementation project(':hoptimator-util')
implementation project(':hoptimator-k8s')
}

shadowJar {
zip64 true
mergeServiceFiles()
}
4 changes: 3 additions & 1 deletion hoptimator-k8s/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ dependencies {
implementation libs.calcite.server
implementation libs.kubernetes.client

// These are included in case the demo databases are deployed.
// These are included in case the respective Databases are installed
testRuntimeOnly project(':hoptimator-demodb')
testRuntimeOnly project(':hoptimator-kafka')
testRuntimeOnly project(':hoptimator-venice')

testImplementation(testFixtures(project(':hoptimator-jdbc')))
testImplementation(platform('org.junit:junit-bom:5.11.3'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import com.linkedin.hoptimator.k8s.models.V1alpha1Database;
import com.linkedin.hoptimator.k8s.models.V1alpha1DatabaseList;
import com.linkedin.hoptimator.k8s.models.V1alpha1Engine;
import com.linkedin.hoptimator.k8s.models.V1alpha1EngineList;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList;
import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline;
Expand All @@ -34,6 +36,9 @@ public final class K8sApiEndpoints {
public static final K8sApiEndpoint<V1alpha1Database, V1alpha1DatabaseList> DATABASES =
new K8sApiEndpoint<>("Database", "hoptimator.linkedin.com", "v1alpha1", "databases", false,
V1alpha1Database.class, V1alpha1DatabaseList.class);
public static final K8sApiEndpoint<V1alpha1Engine, V1alpha1EngineList> ENGINES =
new K8sApiEndpoint<>("Engine", "hoptimator.linkedin.com", "v1alpha1", "engines", false,
V1alpha1Engine.class, V1alpha1EngineList.class);
public static final K8sApiEndpoint<V1alpha1Pipeline, V1alpha1PipelineList> PIPELINES =
new K8sApiEndpoint<>("Pipeline", "hoptimator.linkedin.com", "v1alpha1", "pipelines", false,
V1alpha1Pipeline.class, V1alpha1PipelineList.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,17 @@ public Row(String name, String url, String schema, String dialect, String driver
}
// CHECKSTYLE:ON

public K8sDatabaseTable(K8sContext context) {
private final K8sEngineTable engines;

public K8sDatabaseTable(K8sContext context, K8sEngineTable engines) {
super(context, K8sApiEndpoints.DATABASES, Row.class);
this.engines = engines;
}

public void addDatabases(SchemaPlus parentSchema) {
for (Row row : rows()) {
parentSchema.add(schemaName(row),
HoptimatorJdbcSchema.create(row.NAME, row.SCHEMA, dataSource(row), parentSchema, dialect(row)));
HoptimatorJdbcSchema.create(row.NAME, row.SCHEMA, dataSource(row), parentSchema, dialect(row), engines.forDatabase(row.NAME)));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.linkedin.hoptimator.k8s;

import javax.sql.DataSource;

import com.linkedin.hoptimator.Engine;
import com.linkedin.hoptimator.SqlDialect;

import java.util.Objects;

import org.apache.calcite.adapter.jdbc.JdbcSchema;


public class K8sEngine implements Engine {

private final String name;
private final String url;
private final SqlDialect dialect;
private final String driver;

public K8sEngine(String name, String url, SqlDialect dialect, String driver) {
this.name = name;
this.url = Objects.requireNonNull(url, "url");
this.dialect = dialect;
this.driver = driver;
}

@Override
public String engineName() {
return name;
}

@Override
public DataSource dataSource() {
// TODO support username, password via Secrets
return JdbcSchema.dataSource(url, driver, null, null);
}

@Override
public String url() {
return url;
}

@Override
public SqlDialect dialect() {
return dialect;
}
}
Loading

0 comments on commit 0e19953

Please sign in to comment.