Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Integration Tests #27

Merged
merged 64 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
688565d
initial test
venkatajagannath Jul 23, 2024
44147d1
path changed
venkatajagannath Jul 23, 2024
7a2a9dc
bug fix
venkatajagannath Jul 23, 2024
29df2a1
bug fix
venkatajagannath Jul 23, 2024
0331a19
bug fix
venkatajagannath Jul 23, 2024
16d13d3
bug fix
venkatajagannath Jul 23, 2024
564fed6
fix
venkatajagannath Jul 23, 2024
eb02d95
update
venkatajagannath Jul 23, 2024
085d299
bug fix
venkatajagannath Jul 23, 2024
e4d7a52
bug fix
venkatajagannath Jul 23, 2024
bc61da0
update
venkatajagannath Jul 23, 2024
5470475
bug fix
venkatajagannath Jul 23, 2024
51b629a
bug fix
venkatajagannath Jul 23, 2024
1ce4e6a
bug fix
venkatajagannath Jul 26, 2024
cdf7ade
bug fix
venkatajagannath Jul 26, 2024
4c54a6c
update
venkatajagannath Jul 26, 2024
44077ee
update
venkatajagannath Jul 26, 2024
7d10395
bug fix
venkatajagannath Jul 26, 2024
26cf958
Merge branch 'main' into integration-test
venkatajagannath Jul 28, 2024
da4618b
bug fix
venkatajagannath Jul 29, 2024
e3304c5
Bug fix
venkatajagannath Jul 29, 2024
9a1f155
bug fix
venkatajagannath Jul 29, 2024
e94b6b9
added more cpus
venkatajagannath Jul 29, 2024
620d6f1
using minikube
venkatajagannath Aug 1, 2024
a33a8f1
updated
venkatajagannath Aug 1, 2024
852fd40
14 cpus
venkatajagannath Aug 1, 2024
68a458c
GKE cluster
venkatajagannath Aug 1, 2024
4092c8d
updated
venkatajagannath Aug 3, 2024
dcaade1
update
venkatajagannath Aug 3, 2024
3030108
update
venkatajagannath Aug 3, 2024
9f7ea3a
update
venkatajagannath Aug 3, 2024
395b11e
update
venkatajagannath Aug 4, 2024
45acf00
update
venkatajagannath Aug 4, 2024
dd166a6
update
venkatajagannath Aug 4, 2024
de1c2ae
update
venkatajagannath Aug 4, 2024
916d8cf
update
venkatajagannath Aug 4, 2024
5f1ce1b
Merge branch 'main' into integration-test
venkatajagannath Aug 5, 2024
871fd65
print roles
venkatajagannath Aug 5, 2024
e0b91c8
update
venkatajagannath Aug 5, 2024
6b93c86
update
venkatajagannath Aug 5, 2024
fd38a0a
test
venkatajagannath Aug 5, 2024
31928e1
test
venkatajagannath Aug 5, 2024
074df99
update
venkatajagannath Aug 5, 2024
2e3bd49
minor update
venkatajagannath Aug 5, 2024
beb08ae
bug fix
venkatajagannath Aug 5, 2024
ab53383
more tests
venkatajagannath Aug 5, 2024
96ea474
update
venkatajagannath Aug 5, 2024
4c1442e
bug fix
venkatajagannath Aug 6, 2024
0dd01b8
update
venkatajagannath Aug 6, 2024
ccb1ec0
update
venkatajagannath Aug 6, 2024
453c5ff
try again
venkatajagannath Aug 6, 2024
5152d9e
update
venkatajagannath Aug 6, 2024
e40eb7f
updates
venkatajagannath Aug 6, 2024
16e3d91
update
venkatajagannath Aug 6, 2024
a8340c6
try again
venkatajagannath Aug 6, 2024
93cbda5
node svc account set
venkatajagannath Aug 6, 2024
a622d53
update
venkatajagannath Aug 6, 2024
a90651a
update
venkatajagannath Aug 6, 2024
96dbbb9
update
venkatajagannath Aug 6, 2024
3aa283f
update
venkatajagannath Aug 6, 2024
e82b3e4
yaml files updated
venkatajagannath Aug 6, 2024
5caff5e
making ip or dns generic choice
venkatajagannath Aug 6, 2024
76e6ff7
unit tests added
venkatajagannath Aug 6, 2024
642cbb1
minor updates
venkatajagannath Aug 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 74 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
name: test

on:
push:
branches: [ "main" ]
Expand Down Expand Up @@ -45,30 +44,99 @@ jobs:
- uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha || github.ref }}

- uses: actions/cache@v4
with:
path: |
~/.cache/pip
.nox
key: unit-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('ray_provider/__init__.py') }}

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install packages and dependencies
run: |
python -m pip install hatch
hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze

- name: Test Ray against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }}
run: |
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-cov

- name: Upload coverage to Github
uses: actions/upload-artifact@v4
with:
name: coverage-unit-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}
path: .coverage

Run-Integration-Tests:
needs: Authorize
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11"]
airflow-version: ["2.9"]
permissions:
contents: 'read'
id-token: 'write'
steps:
- uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha || github.ref }}
- uses: actions/cache@v4
with:
path: |
~/.cache/pip
.nox
key: integration-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('ray_provider/__init__.py') }}
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install packages and dependencies
run: |
python -m pip install hatch
hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze
- id: 'auth'
name: 'Authenticate to Google Cloud'
uses: 'google-github-actions/auth@v1'
with:
credentials_json: '${{ secrets.GCP_SA_KEY }}'
service_account: ${{ secrets.SERVICE_ACCOUNT_EMAIL }}'
create_credentials_file: true

- name: 'Set up Cloud SDK'
uses: 'google-github-actions/setup-gcloud@v1'

- name: 'Set GCP Project ID'
run: gcloud config set project ${{ secrets.PROJECT_ID }}

- name: Create GKE cluster
run: |
gcloud container clusters create integration-test-cluster \
--zone us-central1-a \
--num-nodes 2 \
--machine-type e2-standard-8 \
--no-enable-autoupgrade \
--no-enable-autorepair \
--no-enable-ip-alias \
--service-account=${{ secrets.SERVICE_ACCOUNT_EMAIL }}

- name: Setup gcloud and get kubeconfig
run: |
gcloud components install gke-gcloud-auth-plugin
gcloud container clusters get-credentials integration-test-cluster --zone us-central1-a
kubectl config view --raw > kubeconfig.yaml
echo "KUBECONFIG=${{ github.workspace }}/kubeconfig.yaml" >> $GITHUB_ENV
echo "USE_GKE_GCLOUD_AUTH_PLUGIN=True" >> $GITHUB_ENV

- name: Run integration tests
run: |
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration
- name: Upload coverage to Github
uses: actions/upload-artifact@v4
with:
name: coverage-integration-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}
path: .coverage
- name: Delete GKE cluster
if: always()
run: |
gcloud container clusters delete integration-test-cluster --zone us-central1-a --quiet
15 changes: 0 additions & 15 deletions example_dags/scripts/k8-gpu.yaml

This file was deleted.

15 changes: 0 additions & 15 deletions example_dags/scripts/k8.yaml

This file was deleted.

4 changes: 2 additions & 2 deletions example_dags/scripts/ray-gpu.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
apiVersion: ray.io/v1alpha1
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: raycluster-complete
spec:
rayVersion: "2.10.0"
enableInTreeAutoscaling: true
headGroupSpec:
serviceType: ClusterIP
serviceType: LoadBalancer
rayStartParams:
dashboard-host: "0.0.0.0"
block: "true"
Expand Down
27 changes: 0 additions & 27 deletions example_dags/scripts/ray-service.yaml

This file was deleted.

11 changes: 2 additions & 9 deletions example_dags/scripts/ray.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
apiVersion: ray.io/v1alpha1
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: raycluster-complete
spec:
rayVersion: "2.10.0"
enableInTreeAutoscaling: true
headGroupSpec:
serviceType: ClusterIP
serviceType: LoadBalancer
rayStartParams:
dashboard-host: "0.0.0.0"
block: "true"
Expand All @@ -18,13 +18,6 @@ spec:
containers:
- name: ray-head
image: rayproject/ray-ml:latest
env: # Environment variables section starts here
- name: RAY_GRAFANA_IFRAME_HOST
value: "http://127.0.0.1:3000"
- name: RAY_GRAFANA_HOST
value: "http://prometheus-grafana.prometheus-system.svc:80"
- name: RAY_PROMETHEUS_HOST
value: "http://prometheus-kube-prometheus-prometheus.prometheus-system.svc:9090"
resources:
limits:
cpu: 4
Expand Down
62 changes: 62 additions & 0 deletions example_dags/setup-teardown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from datetime import datetime, timedelta
from pathlib import Path

from airflow import DAG

from ray_provider.operators.ray import DeleteRayCluster, SetupRayCluster, SubmitRayJob

default_args = {
"owner": "airflow",
"start_date": datetime(2024, 3, 26),
"retries": 1,
"retry_delay": timedelta(minutes=0),
}


RAY_SPEC = Path(__file__).parent / "scripts/ray.yaml"
FOLDER_PATH = Path(__file__).parent / "ray_scripts"

dag = DAG(
"Setup_Teardown",
default_args=default_args,
description="Setup Ray cluster and submit a job",
schedule=None,
)

setup_cluster = SetupRayCluster(
task_id="SetupRayCluster",
conn_id="ray_conn",
ray_cluster_yaml=str(RAY_SPEC),
use_gpu=False,
update_if_exists=False,
dag=dag,
)

submit_ray_job = SubmitRayJob(
task_id="SubmitRayJob",
conn_id="ray_conn",
entrypoint="python script.py",
runtime_env={"working_dir": str(FOLDER_PATH)},
num_cpus=1,
num_gpus=0,
memory=0,
resources={},
fetch_logs=True,
wait_for_completion=True,
job_timeout_seconds=600,
xcom_task_key="SetupRayCluster.dashboard",
poll_interval=5,
dag=dag,
)

delete_cluster = DeleteRayCluster(
task_id="DeleteRayCluster",
conn_id="ray_conn",
ray_cluster_yaml=str(RAY_SPEC),
use_gpu=False,
dag=dag,
)

# Create ray cluster and submit ray job
setup_cluster.as_setup() >> submit_ray_job >> delete_cluster.as_teardown()
setup_cluster >> delete_cluster
56 changes: 35 additions & 21 deletions ray_provider/hooks/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def __init__(
self.verify = self._get_field("verify") or False
self.ray_client_instance = None

self.namespace = self.get_namespace()
self.namespace = self.get_namespace() or self.DEFAULT_NAMESPACE
self.kubeconfig: str | None = None
self.in_cluster: bool | None = None
self.client_configuration = None
Expand Down Expand Up @@ -235,7 +235,6 @@ def _is_port_open(self, host: str, port: int) -> bool:
def _get_service(self, name: str, namespace: str) -> client.V1Service:
"""
Get the Kubernetes service.

:param name: The name of the service.
:param namespace: The namespace of the service.
:return: The Kubernetes service object.
Expand All @@ -250,16 +249,34 @@ def _get_service(self, name: str, namespace: str) -> client.V1Service:
def _get_load_balancer_details(self, service: client.V1Service) -> dict[str, Any] | None:
"""
Extract LoadBalancer details from the service.

:param service: The Kubernetes service object.
:return: A dictionary containing LoadBalancer details if available, None otherwise.
"""
if service.status.load_balancer.ingress:
ingress: client.V1LoadBalancerIngress = service.status.load_balancer.ingress[0]
ip_or_hostname: str | None = ingress.ip or ingress.hostname
if ip_or_hostname:
ip: str | None = ingress.ip
hostname: str | None = ingress.hostname
if ip or hostname:
ports: list[dict[str, Any]] = [{"name": port.name, "port": port.port} for port in service.spec.ports]
return {"ip_or_hostname": ip_or_hostname, "ports": ports}
return {"ip": ip, "hostname": hostname, "ports": ports}
return None

def _check_load_balancer_readiness(self, lb_details: dict[str, Any]) -> str | None:
"""
Check if the LoadBalancer is ready by testing port connectivity.
:param lb_details: Dictionary containing LoadBalancer details.
:return: The working address (IP or hostname) if ready, None otherwise.
"""
ip: str | None = lb_details["ip"]
hostname: str | None = lb_details["hostname"]

for port_info in lb_details["ports"]:
port = port_info["port"]
if ip and self._is_port_open(ip, port):
return ip
if hostname and self._is_port_open(hostname, port):
return hostname

return None

def wait_for_load_balancer(
Expand All @@ -271,7 +288,6 @@ def wait_for_load_balancer(
) -> dict[str, Any]:
"""
Wait for the LoadBalancer to be ready and return its details.

:param service_name: The name of the LoadBalancer service.
:param namespace: The namespace of the service.
:param max_retries: Maximum number of retries.
Expand All @@ -281,25 +297,23 @@ def wait_for_load_balancer(
"""
for attempt in range(1, max_retries + 1):
self.log.info(f"Attempt {attempt}: Checking LoadBalancer status...")

try:
service: client.V1Service = self._get_service(service_name, namespace)
lb_details: dict[str, Any] | None = self._get_load_balancer_details(service)

if lb_details:
hostname = lb_details["ip_or_hostname"]
all_ports_open = True
for port in lb_details["ports"]:
if not self._is_port_open(hostname, port["port"]):
all_ports_open = False
break

if all_ports_open:
self.log.info("All ports are open. LoadBalancer is ready.")
return lb_details
else:
self.log.info("Not all ports are open. Waiting...")
else:
if not lb_details:
self.log.info("LoadBalancer details not available yet.")
continue

working_address = self._check_load_balancer_readiness(lb_details)

if working_address:
self.log.info("LoadBalancer is ready.")
lb_details["working_address"] = working_address
return lb_details

self.log.info("LoadBalancer is not ready yet. Waiting...")

except AirflowException:
self.log.info("LoadBalancer service is not available yet...")
Expand Down
2 changes: 1 addition & 1 deletion ray_provider/operators/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _setup_load_balancer(self, name: str, namespace: str, context: Context) -> N

if lb_details:
self.log.info(lb_details)
dns = lb_details["ip_or_hostname"]
dns = lb_details["working_address"]
for port in lb_details["ports"]:
url = f"http://{dns}:{port['port']}"
context["task_instance"].xcom_push(key=port["name"], value=url)
Expand Down
8 changes: 8 additions & 0 deletions scripts/test/integration_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pytest -vv \
--cov=ray_provider \
--cov-report=term-missing \
--cov-report=xml \
--durations=0 \
-m integration \
-s \
--log-cli-level=DEBUG
Loading
Loading