From 26768d94085e8fb6ab188be01f860f18278293cc Mon Sep 17 00:00:00 2001 From: Eugene <53026723+e-galan@users.noreply.github.com> Date: Fri, 28 Jun 2024 06:43:26 +0000 Subject: [PATCH] Update example_cloud_run.py to show job resource limit setting (#40456) --- .../operators/cloud/cloud_run.rst | 15 ++- .../cloud/cloud_run/example_cloud_run.py | 94 ++++++++++++++++--- 2 files changed, 90 insertions(+), 19 deletions(-) diff --git a/docs/apache-airflow-providers-google/operators/cloud/cloud_run.rst b/docs/apache-airflow-providers-google/operators/cloud/cloud_run.rst index cf90afde68e8..aedd807f27c2 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/cloud_run.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/cloud_run.rst @@ -33,16 +33,23 @@ Create a job Before you create a job in Cloud Run, you need to define it. For more information about the Job object fields, visit `Google Cloud Run Job description `__ -A simple job configuration can look as follows: +A simple job configuration can be created with a Job object: .. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_run/example_cloud_run.py :language: python :dedent: 0 - :start-after: [START howto_operator_cloud_run_job_creation] - :end-before: [END howto_operator_cloud_run_job_creation] + :start-after: [START howto_cloud_run_job_instance_creation] + :end-before: [END howto_cloud_run_job_instance_creation] +or with a Python dictionary: -With this configuration we can create the job: +.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_run/example_cloud_run.py + :language: python + :dedent: 0 + :start-after: [START howto_cloud_run_job_dict_creation] + :end-before: [END howto_cloud_run_job_dict_creation] + +You can create a Cloud Run Job with any of these configurations : :class:`~airflow.providers.google.cloud.operators.cloud_run.CloudRunCreateJobOperator` .. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_run/example_cloud_run.py diff --git a/tests/system/providers/google/cloud/cloud_run/example_cloud_run.py b/tests/system/providers/google/cloud/cloud_run/example_cloud_run.py index 3b62e2ade900..5a82cca2a7aa 100644 --- a/tests/system/providers/google/cloud/cloud_run/example_cloud_run.py +++ b/tests/system/providers/google/cloud/cloud_run/example_cloud_run.py @@ -15,9 +15,8 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -""" -Example Airflow DAG that uses Google Cloud Run Operators. -""" + +"""Example Airflow DAG that uses Google Cloud Run Operators.""" from __future__ import annotations @@ -39,7 +38,7 @@ from airflow.utils.trigger_rule import TriggerRule PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") -DAG_ID = "example_cloud_run" +DAG_ID = "cloud_run" region = "us-central1" job_name_prefix = "cloudrun-system-test-job" @@ -117,24 +116,89 @@ def _assert_jobs(ti): def _assert_one_job(ti): job_dicts = ti.xcom_pull(task_ids=[list_jobs_limit_task_name], key="return_value") - assert len(job_dicts[0]) == 1 -# [START howto_operator_cloud_run_job_creation] -def _create_job(): +# [START howto_cloud_run_job_instance_creation] +def _create_job_instance() -> Job: + """ + Create a Cloud Run job configuration with google.cloud.run_v2.Job object. + + As a minimum the configuration must contain a container image name in its template. + The rest of the configuration parameters are optional and will be populated with default values if not set. + """ job = Job() container = k8s_min.Container() container.image = "us-docker.pkg.dev/cloudrun/container/job:latest" + container.resources.limits = {"cpu": "2", "memory": "1Gi"} job.template.template.containers.append(container) return job -# [END howto_operator_cloud_run_job_creation] +# [END howto_cloud_run_job_instance_creation] + + +# [START howto_cloud_run_job_dict_creation] +def _create_job_dict() -> dict: + """ + Create a Cloud Run job configuration with a Python dict. + + As a minimum the configuration must contain a container image name in its template. + """ + return { + "template": { + "template": { + "containers": [ + { + "image": "us-docker.pkg.dev/cloudrun/container/job:latest", + "resources": { + "limits": {"cpu": "1", "memory": "512Mi"}, + "cpu_idle": False, + "startup_cpu_boost": False, + }, + "name": "", + "command": [], + "args": [], + "env": [], + "ports": [], + "volume_mounts": [], + "working_dir": "", + "depends_on": [], + } + ], + "volumes": [], + "execution_environment": 0, + "encryption_key": "", + }, + "labels": {}, + "annotations": {}, + "parallelism": 0, + "task_count": 0, + }, + "name": "", + "uid": "", + "generation": "0", + "labels": {}, + "annotations": {}, + "creator": "", + "last_modifier": "", + "client": "", + "client_version": "", + "launch_stage": 0, + "observed_generation": "0", + "conditions": [], + "execution_count": 0, + "reconciling": False, + "satisfies_pzs": False, + "etag": "", + } + + +# [END howto_cloud_run_job_dict_creation] -def _create_job_with_label(): - job = _create_job() +def _create_job_instance_with_label(): + job = _create_job_instance() job.labels = {"somelabel": "label1"} return job @@ -144,7 +208,7 @@ def _create_job_with_label(): schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, - tags=["example"], + tags=["example", "cloud", "run"], ) as dag: # [START howto_operator_cloud_run_create_job] create1 = CloudRunCreateJobOperator( @@ -152,7 +216,7 @@ def _create_job_with_label(): project_id=PROJECT_ID, region=region, job_name=job1_name, - job=_create_job(), + job=_create_job_instance(), dag=dag, ) # [END howto_operator_cloud_run_create_job] @@ -162,7 +226,7 @@ def _create_job_with_label(): project_id=PROJECT_ID, region=region, job_name=job2_name, - job=Job.to_dict(_create_job()), + job=_create_job_dict(), dag=dag, ) @@ -171,7 +235,7 @@ def _create_job_with_label(): project_id=PROJECT_ID, region=region, job_name=job3_name, - job=Job.to_dict(_create_job()), + job=Job.to_dict(_create_job_instance()), dag=dag, ) @@ -249,7 +313,7 @@ def _create_job_with_label(): project_id=PROJECT_ID, region=region, job_name=job1_name, - job=_create_job_with_label(), + job=_create_job_instance_with_label(), dag=dag, ) # [END howto_operator_cloud_update_job]