diff --git a/cosmos/operators/gcp_cloud_run_job.py b/cosmos/operators/gcp_cloud_run_job.py index 6b4ded49a..76570d56a 100644 --- a/cosmos/operators/gcp_cloud_run_job.py +++ b/cosmos/operators/gcp_cloud_run_job.py @@ -15,6 +15,7 @@ DbtRunOperationMixin, DbtSeedMixin, DbtSnapshotMixin, + DbtSourceMixin, DbtTestMixin, ) @@ -135,6 +136,15 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) +class DbtSourceGcpCloudRunJobOperator(DbtSourceMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core source freshness command. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + class DbtRunGcpCloudRunJobOperator(DbtRunMixin, DbtGcpCloudRunJobBaseOperator): """ Executes a dbt core run command. diff --git a/tests/operators/test_gcp_cloud_run_job.py b/tests/operators/test_gcp_cloud_run_job.py index 1582456d6..08b7ba999 100644 --- a/tests/operators/test_gcp_cloud_run_job.py +++ b/tests/operators/test_gcp_cloud_run_job.py @@ -16,6 +16,7 @@ DbtRunOperationGcpCloudRunJobOperator, DbtSeedGcpCloudRunJobOperator, DbtSnapshotGcpCloudRunJobOperator, + DbtSourceGcpCloudRunJobOperator, DbtTestGcpCloudRunJobOperator, ) @@ -171,12 +172,13 @@ def test_dbt_gcp_cloud_run_job_build_command(): "seed": DbtSeedGcpCloudRunJobOperator(**BASE_KWARGS), "build": DbtBuildGcpCloudRunJobOperator(**BASE_KWARGS), "snapshot": DbtSnapshotGcpCloudRunJobOperator(**BASE_KWARGS), + "source": DbtSourceGcpCloudRunJobOperator(**BASE_KWARGS), "run-operation": DbtRunOperationGcpCloudRunJobOperator(macro_name="some-macro", **BASE_KWARGS), } for command_name, command_operator in result_map.items(): command_operator.build_command(context=MagicMock(), cmd_flags=MagicMock()) - if command_name != "run-operation": + if command_name not in ("run-operation", "source"): assert command_operator.command == [ "dbt", command_name, @@ -185,7 +187,7 @@ def test_dbt_gcp_cloud_run_job_build_command(): "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", "--no-version-check", ] - else: + elif command_name == "run-operation": assert command_operator.command == [ "dbt", command_name, @@ -195,6 +197,16 @@ def test_dbt_gcp_cloud_run_job_build_command(): "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", "--no-version-check", ] + else: + assert command_operator.command == [ + "dbt", + command_name, + "freshness", + "--vars", + "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n" + "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", + "--no-version-check", + ] @skip_on_empty_operator