Skip to content

Commit

Permalink
Update example_cloud_run.py to show job resource limit setting (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
e-galan authored Jun 28, 2024
1 parent 57fb776 commit 26768d9
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 19 deletions.
15 changes: 11 additions & 4 deletions docs/apache-airflow-providers-google/operators/cloud/cloud_run.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,23 @@ Create a job
Before you create a job in Cloud Run, you need to define it.
For more information about the Job object fields, visit `Google Cloud Run Job description <https://cloud.google.com/run/docs/reference/rpc/google.cloud.run.v2#google.cloud.run.v2.Job>`__

A simple job configuration can look as follows:
A simple job configuration can be created with a Job object:

.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_run/example_cloud_run.py
:language: python
:dedent: 0
:start-after: [START howto_operator_cloud_run_job_creation]
:end-before: [END howto_operator_cloud_run_job_creation]
:start-after: [START howto_cloud_run_job_instance_creation]
:end-before: [END howto_cloud_run_job_instance_creation]

or with a Python dictionary:

With this configuration we can create the job:
.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_run/example_cloud_run.py
:language: python
:dedent: 0
:start-after: [START howto_cloud_run_job_dict_creation]
:end-before: [END howto_cloud_run_job_dict_creation]

You can create a Cloud Run Job with any of these configurations :
:class:`~airflow.providers.google.cloud.operators.cloud_run.CloudRunCreateJobOperator`

.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_run/example_cloud_run.py
Expand Down
94 changes: 79 additions & 15 deletions tests/system/providers/google/cloud/cloud_run/example_cloud_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that uses Google Cloud Run Operators.
"""

"""Example Airflow DAG that uses Google Cloud Run Operators."""

from __future__ import annotations

Expand All @@ -39,7 +38,7 @@
from airflow.utils.trigger_rule import TriggerRule

PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
DAG_ID = "example_cloud_run"
DAG_ID = "cloud_run"

region = "us-central1"
job_name_prefix = "cloudrun-system-test-job"
Expand Down Expand Up @@ -117,24 +116,89 @@ def _assert_jobs(ti):

def _assert_one_job(ti):
job_dicts = ti.xcom_pull(task_ids=[list_jobs_limit_task_name], key="return_value")

assert len(job_dicts[0]) == 1


# [START howto_operator_cloud_run_job_creation]
def _create_job():
# [START howto_cloud_run_job_instance_creation]
def _create_job_instance() -> Job:
"""
Create a Cloud Run job configuration with google.cloud.run_v2.Job object.
As a minimum the configuration must contain a container image name in its template.
The rest of the configuration parameters are optional and will be populated with default values if not set.
"""
job = Job()
container = k8s_min.Container()
container.image = "us-docker.pkg.dev/cloudrun/container/job:latest"
container.resources.limits = {"cpu": "2", "memory": "1Gi"}
job.template.template.containers.append(container)
return job


# [END howto_operator_cloud_run_job_creation]
# [END howto_cloud_run_job_instance_creation]


# [START howto_cloud_run_job_dict_creation]
def _create_job_dict() -> dict:
"""
Create a Cloud Run job configuration with a Python dict.
As a minimum the configuration must contain a container image name in its template.
"""
return {
"template": {
"template": {
"containers": [
{
"image": "us-docker.pkg.dev/cloudrun/container/job:latest",
"resources": {
"limits": {"cpu": "1", "memory": "512Mi"},
"cpu_idle": False,
"startup_cpu_boost": False,
},
"name": "",
"command": [],
"args": [],
"env": [],
"ports": [],
"volume_mounts": [],
"working_dir": "",
"depends_on": [],
}
],
"volumes": [],
"execution_environment": 0,
"encryption_key": "",
},
"labels": {},
"annotations": {},
"parallelism": 0,
"task_count": 0,
},
"name": "",
"uid": "",
"generation": "0",
"labels": {},
"annotations": {},
"creator": "",
"last_modifier": "",
"client": "",
"client_version": "",
"launch_stage": 0,
"observed_generation": "0",
"conditions": [],
"execution_count": 0,
"reconciling": False,
"satisfies_pzs": False,
"etag": "",
}


# [END howto_cloud_run_job_dict_creation]


def _create_job_with_label():
job = _create_job()
def _create_job_instance_with_label():
job = _create_job_instance()
job.labels = {"somelabel": "label1"}
return job

Expand All @@ -144,15 +208,15 @@ def _create_job_with_label():
schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
tags=["example", "cloud", "run"],
) as dag:
# [START howto_operator_cloud_run_create_job]
create1 = CloudRunCreateJobOperator(
task_id=create1_task_name,
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
job=_create_job(),
job=_create_job_instance(),
dag=dag,
)
# [END howto_operator_cloud_run_create_job]
Expand All @@ -162,7 +226,7 @@ def _create_job_with_label():
project_id=PROJECT_ID,
region=region,
job_name=job2_name,
job=Job.to_dict(_create_job()),
job=_create_job_dict(),
dag=dag,
)

Expand All @@ -171,7 +235,7 @@ def _create_job_with_label():
project_id=PROJECT_ID,
region=region,
job_name=job3_name,
job=Job.to_dict(_create_job()),
job=Job.to_dict(_create_job_instance()),
dag=dag,
)

Expand Down Expand Up @@ -249,7 +313,7 @@ def _create_job_with_label():
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
job=_create_job_with_label(),
job=_create_job_instance_with_label(),
dag=dag,
)
# [END howto_operator_cloud_update_job]
Expand Down

0 comments on commit 26768d9

Please sign in to comment.