Skip to content

Commit

Permalink
[Backport to 1.7.latest] Fix assorted source freshness edgecases so c…
Browse files Browse the repository at this point in the history
…heck is run or actionable information (#9825) (#9826)

* Fix assorted source freshness edgecases so check is run or actionable information (#9825)

* Ensure BaseRunner handles nodes without `build_path`

Some nodes, like SourceDefinition nodes, don't have a `build_path` property.
This is problematic because we take in nodes with no type checking, and
assume they have properties sometimes, like `build_path`. This was just
the case in BaseRunner's `_handle_generic_exception` and
`_handle_interal_exception` methods. Thus to stop dbt from crashing when
trying to handle an exception related to a node without a `build_path`,
we added an private method to the BaseRunner class for safely trying
to get `build_path`.

* Use keyword arguments when instantiating `Note` events in freshness.py

Previously we were passing arguments during the `Note` event instantiations
in freshness.py as positional arguments. This would cause not the desired
`Note` event to be emitted, but instead get the message
```
[Note] Don't use positional arguments when constructing logging events
```
which was our fault, not the users'. Additionally, we were passing the
level for the event in the `Note` instantiation when we needed to be
passing it to the `fire_event` method.

* Raise error when `loaded_at_field` is `None` and metadata check isn't possible

Previously if a source freshness check didn't have a `loaded_at_field` and
metadata source freshness wasn't supported by the adapter, then we'd log
a warning message and let the source freshness check continue. This was problematic
because the source freshness check couldn't actually continue and the process
would raise an error in the form
```
type object argument after ** must be a mapping, not NoneType
```
because the `freshness` variable was never getting set. This error wasn't particularly
helpful for any person running into it. So instead of letting that error
happen we now deliberately raise an error with helpful information.

* Add test which ensures bad source freshness checks raise appropriate error

This test directly tests that when a source freshness check doesn't have a
`loaded_at_field` and the adapter in use doesn't support metadata checks,
then the appropriate error message gets raised. That is, it directly tests
the change made in a162d53. This test indirectly tests the changes in both
7ec2f82 and 7b0ff31 as the appropriate error can only be raised because
we've fixed other upstream issues via those commits.

* Add changelog entry for source freshness edgecase fixes

* Update imports of dbt.artifacts to pre-dbt.artifacts locations

Moving things to dbt.artifacts started to happen AFTER the release of
dbt-core 1.7, thus 1.7 has no concept of dbt.articacts. The changes brought
in via the previous commit, were cherry-picked from main (1.8.0 beta) and
thus reference dbt.artifacts. This caused everything to blow up, because
dbt.artifacts doesn't exist. Here we've updated the dbt.artifacts imports
to their pre-dbt.artifacts paths.
  • Loading branch information
QMalcolm authored Mar 29, 2024
1 parent dd070b9 commit b8681a3
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 33 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20240326-162100.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Fix assorted source freshness edgecases so check is run or actionable information
is given
time: 2024-03-26T16:21:00.008936-07:00
custom:
Author: QMalcolm
Issue: "9078"
7 changes: 5 additions & 2 deletions core/dbt/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ def __init__(self, config, adapter, node, node_index, num_nodes) -> None:
def compile(self, manifest: Manifest) -> Any:
pass

def _node_build_path(self) -> Optional[str]:
return self.node.build_path if hasattr(self.node, "build_path") else None

def get_result_status(self, result) -> Dict[str, str]:
if result.status == NodeStatus.Error:
return {"node_status": "error", "node_error": str(result.message)}
Expand Down Expand Up @@ -339,15 +342,15 @@ def _handle_catchable_exception(self, e, ctx):
def _handle_internal_exception(self, e, ctx):
fire_event(
InternalErrorOnRun(
build_path=self.node.build_path, exc=str(e), node_info=get_node_info()
build_path=self._node_build_path(), exc=str(e), node_info=get_node_info()
)
)
return str(e)

def _handle_generic_exception(self, e, ctx):
fire_event(
GenericExceptionOnRun(
build_path=self.node.build_path,
build_path=self._node_build_path(),
unique_id=self.node.unique_id,
exc=str(e),
node_info=get_node_info(),
Expand Down
11 changes: 5 additions & 6 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ def execute(self, compiled_node, manifest):
if compiled_node.freshness.filter is not None:
fire_event(
Note(
f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'.",
EventLevel.WARN,
)
msg=f"A filter cannot be applied to a metadata freshness check on source '{compiled_node.name}'."
),
EventLevel.WARN,
)

adapter_response, freshness = self.adapter.calculate_freshness_from_metadata(
Expand All @@ -131,9 +131,8 @@ def execute(self, compiled_node, manifest):

status = compiled_node.freshness.status(freshness["age"])
else:
status = FreshnessStatus.Warn
fire_event(
Note(f"Skipping freshness for source {compiled_node.name}."),
raise DbtRuntimeError(
f"Could not compute freshness for source {compiled_node.name}: no 'loaded_at_field' provided and {self.adapter.type()} adapter does not support metadata-based freshness checks."
)

# adapter_response was not returned in previous versions, so this will be None
Expand Down
13 changes: 12 additions & 1 deletion tests/functional/sources/test_source_freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import dbt.version
from dbt.cli.main import dbtRunner
from dbt.contracts.results import FreshnessResult, FreshnessStatus
from tests.functional.sources.common_source_setup import BaseSourcesTest
from tests.functional.sources.fixtures import (
error_models_schema_yml,
Expand Down Expand Up @@ -384,7 +385,7 @@ class TestMetadataFreshnessFails:
def models(self):
return {"schema.yml": freshness_via_metadata_schema_yml}

def test_metadata_freshness_fails(self, project):
def test_metadata_freshness_unsupported_parse_warning(self, project):
"""Since the default test adapter (postgres) does not support metadata
based source freshness checks, trying to use that mechanism should
result in a parse-time warning."""
Expand All @@ -399,3 +400,13 @@ def warning_probe(e):
runner.invoke(["parse"])

assert got_warning

def test_metadata_freshness_unsupported_error_when_run(self, project):

runner = dbtRunner()
result = runner.invoke(["source", "freshness"])
assert isinstance(result.result, FreshnessResult)
assert len(result.result.results) == 1
freshness_result = result.result.results[0]
assert freshness_result.status == FreshnessStatus.RuntimeErr
assert "Could not compute freshness for source test_table" in freshness_result.message
30 changes: 30 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pytest

from dbt.contracts.graph.model_config import SourceConfig
from dbt.contracts.graph.nodes import SourceDefinition
from dbt.contracts.graph.unparsed import Quoting
from dbt.node_types import NodeType


@pytest.fixture
def basic_parsed_source_definition_object():
return SourceDefinition(
columns={},
database="some_db",
description="",
fqn=["test", "source", "my_source", "my_source_table"],
identifier="my_source_table",
loader="stitch",
name="my_source_table",
original_file_path="/root/models/sources.yml",
package_name="test",
path="/root/models/sources.yml",
quoting=Quoting(),
resource_type=NodeType.Source,
schema="some_schema",
source_description="my source description",
source_name="my_source",
unique_id="test.source.my_source.my_source_table",
tags=[],
config=SourceConfig(),
)
24 changes: 24 additions & 0 deletions tests/unit/task/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from dbt.task.base import BaseRunner
from dbt.contracts.graph.nodes import SourceDefinition


class MockRunner(BaseRunner):
def compile(self):
pass


class TestBaseRunner:
def test_handle_generic_exception_handles_nodes_without_build_path(
self, basic_parsed_source_definition_object: SourceDefinition
):
# Source definition nodes don't have `build_path` attributes. Thus, this
# test will fail if _handle_generic_exception doesn't account for this
runner = MockRunner(
config=None,
adapter=None,
node=basic_parsed_source_definition_object,
node_index=None,
num_nodes=None,
)
assert not hasattr(basic_parsed_source_definition_object, "build_path")
runner._handle_generic_exception(Exception("bad thing happened"), ctx=None)
24 changes: 0 additions & 24 deletions tests/unit/test_contracts_graph_parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1944,30 +1944,6 @@ def basic_parsed_source_definition_dict():
}


@pytest.fixture
def basic_parsed_source_definition_object():
return SourceDefinition(
columns={},
database="some_db",
description="",
fqn=["test", "source", "my_source", "my_source_table"],
identifier="my_source_table",
loader="stitch",
name="my_source_table",
original_file_path="/root/models/sources.yml",
package_name="test",
path="/root/models/sources.yml",
quoting=Quoting(),
resource_type=NodeType.Source,
schema="some_schema",
source_description="my source description",
source_name="my_source",
unique_id="test.source.my_source.my_source_table",
tags=[],
config=SourceConfig(),
)


@pytest.fixture
def complex_parsed_source_definition_dict():
return {
Expand Down

0 comments on commit b8681a3

Please sign in to comment.