diff --git a/dlt/destinations/impl/filesystem/filesystem.py b/dlt/destinations/impl/filesystem/filesystem.py index 5dae4bf295..debfda06dc 100644 --- a/dlt/destinations/impl/filesystem/filesystem.py +++ b/dlt/destinations/impl/filesystem/filesystem.py @@ -73,7 +73,15 @@ def __init__( load_package_timestamp=dlt.current.load_package()["state"]["created_at"], # type: ignore extra_placeholders=config.extra_placeholders, ) + + # We would like to avoid failing for local filesystem where + # deeply nested directory will not exist before writing a file. + # It `auto_mkdir` is disabled by default in fsspec so we made some + # trade offs between different options and decided on this. item = self.make_remote_path() + if self.config.protocol == "file": + fs_client.makedirs(posixpath.dirname(item), exist_ok=True) + fs_client.put_file(local_path, item) def make_remote_path(self) -> str: diff --git a/tests/load/pipeline/test_filesystem_pipeline.py b/tests/load/pipeline/test_filesystem_pipeline.py index 5e94d42748..c641e6271f 100644 --- a/tests/load/pipeline/test_filesystem_pipeline.py +++ b/tests/load/pipeline/test_filesystem_pipeline.py @@ -3,7 +3,6 @@ import posixpath from pathlib import Path from typing import Any, Callable, List, Dict, cast - import dlt import pytest @@ -13,7 +12,6 @@ from dlt.common.storages.load_storage import LoadJobInfo from dlt.destinations import filesystem from dlt.destinations.impl.filesystem.filesystem import FilesystemClient -from dlt.common.schema.typing import LOADS_TABLE_NAME from tests.cases import arrow_table_all_data_types from tests.common.utils import load_json_case @@ -282,7 +280,9 @@ def some_source(): @pytest.mark.parametrize("layout", TEST_LAYOUTS) -def test_filesystem_destination_extended_layout_placeholders(layout: str) -> None: +def test_filesystem_destination_extended_layout_placeholders( + layout: str, default_buckets_env: str +) -> None: data = load_json_case("simple_row") call_count = 0 @@ -303,30 +303,31 @@ def count(*args, **kwargs) -> Any: "hiphip": counter("Hurraaaa"), } now = pendulum.now() - os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://_storage" + fs_destination = filesystem( + layout=layout, + extra_placeholders=extra_placeholders, + current_datetime=counter(now), + ) pipeline = dlt.pipeline( pipeline_name="test_extended_layouts", - destination=filesystem( - layout=layout, - extra_placeholders=extra_placeholders, - kwargs={"auto_mkdir": True}, - current_datetime=counter(now), - ), + destination=fs_destination, ) load_info = pipeline.run( dlt.resource(data, name="simple_rows"), write_disposition="append", ) client = pipeline.destination_client() + expected_files = set() known_files = set() for basedir, _dirs, files in client.fs_client.walk(client.dataset_path): # type: ignore[attr-defined] # strip out special tables if "_dlt" in basedir: continue + for file in files: if ".jsonl" in file: - expected_files.add(os.path.join(basedir, file)) + expected_files.add(posixpath.join(basedir, file)) for load_package in load_info.load_packages: for load_info in load_package.jobs["completed_jobs"]: # type: ignore[assignment] @@ -343,8 +344,8 @@ def count(*args, **kwargs) -> Any: load_package_timestamp=load_info.created_at.to_iso8601_string(), # type: ignore[attr-defined] extra_placeholders=extra_placeholders, ) - full_path = os.path.join(client.dataset_path, path) # type: ignore[attr-defined] - assert os.path.exists(full_path) + full_path = posixpath.join(client.dataset_path, path) # type: ignore[attr-defined] + assert client.fs_client.exists(full_path) # type: ignore[attr-defined] if ".jsonl" in full_path: known_files.add(full_path)