Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable auto_mkdir for local filesystem and run on all supported stora… #1263

Merged
merged 6 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,15 @@ def __init__(
load_package_timestamp=dlt.current.load_package()["state"]["created_at"], # type: ignore
extra_placeholders=config.extra_placeholders,
)

# We would like to avoid failing for local filesystem where
# deeply nested directory will not exist before writing a file.
# It `auto_mkdir` is disabled by default in fsspec so we made some
# trade offs between different options and decided on this.
item = self.make_remote_path()
if self.config.protocol.startswith("file"):
fs_client.makedirs(posixpath.dirname(item), exist_ok=True)

fs_client.put_file(local_path, item)

def make_remote_path(self) -> str:
Expand Down
27 changes: 14 additions & 13 deletions tests/load/pipeline/test_filesystem_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import posixpath
from pathlib import Path
from typing import Any, Callable, List, Dict, cast

import dlt
import pytest

Expand All @@ -13,7 +12,6 @@
from dlt.common.storages.load_storage import LoadJobInfo
from dlt.destinations import filesystem
from dlt.destinations.impl.filesystem.filesystem import FilesystemClient
from dlt.common.schema.typing import LOADS_TABLE_NAME

from tests.cases import arrow_table_all_data_types
from tests.common.utils import load_json_case
Expand Down Expand Up @@ -282,7 +280,9 @@ def some_source():


@pytest.mark.parametrize("layout", TEST_LAYOUTS)
def test_filesystem_destination_extended_layout_placeholders(layout: str) -> None:
def test_filesystem_destination_extended_layout_placeholders(
layout: str, default_buckets_env: str
) -> None:
data = load_json_case("simple_row")
call_count = 0

Expand All @@ -303,30 +303,31 @@ def count(*args, **kwargs) -> Any:
"hiphip": counter("Hurraaaa"),
}
now = pendulum.now()
os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] = "file://_storage"
fs_destination = filesystem(
layout=layout,
extra_placeholders=extra_placeholders,
current_datetime=counter(now),
)
pipeline = dlt.pipeline(
pipeline_name="test_extended_layouts",
destination=filesystem(
layout=layout,
extra_placeholders=extra_placeholders,
kwargs={"auto_mkdir": True},
current_datetime=counter(now),
),
destination=fs_destination,
)
load_info = pipeline.run(
dlt.resource(data, name="simple_rows"),
write_disposition="append",
)
client = pipeline.destination_client()

expected_files = set()
known_files = set()
for basedir, _dirs, files in client.fs_client.walk(client.dataset_path): # type: ignore[attr-defined]
# strip out special tables
if "_dlt" in basedir:
continue

for file in files:
if ".jsonl" in file:
expected_files.add(os.path.join(basedir, file))
expected_files.add(posixpath.join(basedir, file))

for load_package in load_info.load_packages:
for load_info in load_package.jobs["completed_jobs"]: # type: ignore[assignment]
Expand All @@ -343,8 +344,8 @@ def count(*args, **kwargs) -> Any:
load_package_timestamp=load_info.created_at.to_iso8601_string(), # type: ignore[attr-defined]
extra_placeholders=extra_placeholders,
)
full_path = os.path.join(client.dataset_path, path) # type: ignore[attr-defined]
assert os.path.exists(full_path)
full_path = posixpath.join(client.dataset_path, path) # type: ignore[attr-defined]
assert client.fs_client.exists(full_path) # type: ignore[attr-defined]
if ".jsonl" in full_path:
known_files.add(full_path)

Expand Down
Loading