From 8e111c11997451950e1effaa844da9c4f8a12750 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 10 Apr 2024 11:31:55 +0400 Subject: [PATCH 01/22] fix(filesystem): UNC paths are not supported --- dlt/common/storages/fsspec_filesystem.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index b1cbc11bf9..6b4407a3ba 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -210,8 +210,10 @@ def open( # noqa: A003 elif compression == "disable": compression_arg = None else: - raise ValueError("""The argument `compression` must have one of the following values: - "auto", "enable", "disable".""") + raise ValueError( + """The argument `compression` must have one of the following values: + "auto", "enable", "disable".""" + ) opened_file: IO[Any] # if the user has already extracted the content, we use it so there is no need to @@ -282,12 +284,16 @@ def glob_files( # this is a file so create a proper file url bucket_url = pathlib.Path(bucket_url).absolute().as_uri() bucket_url_parsed = urlparse(bucket_url) + bucket_url_no_schema = bucket_url_parsed._replace(scheme="", query="").geturl() bucket_url_no_schema = ( bucket_url_no_schema[2:] if bucket_url_no_schema.startswith("//") else bucket_url_no_schema ) filter_url = posixpath.join(bucket_url_no_schema, file_glob) + if filter_url.startswith(r"/\\"): + filter_url = filter_url[1:] + glob_result = fs_client.glob(filter_url, detail=True) if isinstance(glob_result, list): raise NotImplementedError( @@ -302,10 +308,16 @@ def glob_files( # make that absolute path on a file:// if bucket_url_parsed.scheme == "file" and not file.startswith("/"): file = f"/{file}" - file_name = posixpath.relpath(file, bucket_url_no_schema) - file_url = bucket_url_parsed._replace( - path=posixpath.join(bucket_url_parsed.path, file_name) - ).geturl() + + if file.startswith("//"): + file_name = file.replace("//", "\\\\") + file_url = "file:///" + file_name + else: + file_name = posixpath.relpath(file, bucket_url_no_schema) + + file_url = bucket_url_parsed._replace( + path=posixpath.join(bucket_url_parsed.path, file_name) + ).geturl() mime_type, encoding = guess_mime_type(file_name) yield FileItem( From 45d1d7b42a4d39c7639bbe73577146f1c0a11462 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 11 Apr 2024 13:43:39 +0400 Subject: [PATCH 02/22] small fix --- dlt/common/storages/fsspec_filesystem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 6b4407a3ba..65f02eb080 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -311,7 +311,7 @@ def glob_files( if file.startswith("//"): file_name = file.replace("//", "\\\\") - file_url = "file:///" + file_name + file_url = "file://" + file_name else: file_name = posixpath.relpath(file, bucket_url_no_schema) From e39b8bc363f277b18abb3366a83aa19101b7228f Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 17 Apr 2024 12:06:16 +0400 Subject: [PATCH 03/22] use glob for UNC paths --- dlt/common/storages/fsspec_filesystem.py | 11 +++-- .../load/filesystem/test_filesystem_common.py | 41 +++++++++++++++++++ 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 65f02eb080..65dee1ce76 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -1,4 +1,5 @@ import io +import glob import gzip import mimetypes import pathlib @@ -291,10 +292,14 @@ def glob_files( ) filter_url = posixpath.join(bucket_url_no_schema, file_glob) - if filter_url.startswith(r"/\\"): - filter_url = filter_url[1:] + if "$" in filter_url: # process UNC paths with Python glob module + files = glob.glob(filter_url, recursive=True) + glob_result = {} + for file in files: + glob_result[file] = fs_client.info(file) + else: + glob_result = fs_client.glob(filter_url, detail=True) - glob_result = fs_client.glob(filter_url, detail=True) if isinstance(glob_result, list): raise NotImplementedError( "Cannot request details when using fsspec.glob. For adlfs (Azure) please use version" diff --git a/tests/load/filesystem/test_filesystem_common.py b/tests/load/filesystem/test_filesystem_common.py index 4c94766097..c7614e48ab 100644 --- a/tests/load/filesystem/test_filesystem_common.py +++ b/tests/load/filesystem/test_filesystem_common.py @@ -1,4 +1,5 @@ import os +import platform import posixpath from typing import Union, Dict from urllib.parse import urlparse @@ -18,6 +19,7 @@ from tests.utils import preserve_environ, autouse_test_storage from .utils import self_signed_cert from tests.common.configuration.utils import environment +from tests.pipeline.utils import assert_load_info, load_table_counts @with_config(spec=FilesystemConfiguration, sections=("destination", "filesystem")) @@ -172,3 +174,42 @@ def test_s3_wrong_client_certificate(default_buckets_env: str, self_signed_cert: with pytest.raises(SSLError, match="SSL: CERTIFICATE_VERIFY_FAILED"): print(filesystem.ls("", detail=False)) + + +@pytest.mark.skipif(platform.system() != "Windows", reason="Test it only on Windows") +def test_windows_unc_path() -> None: + config = get_config() + config.read_only = True + + unc_path = r"\\localhost\\" + os.path.abspath("tests/common/storages/samples").replace(":", "$") + abs_path = os.path.abspath(r"tests/common/storages/samples") + + for bucket_url in [ + unc_path, + "file://" + unc_path, + abs_path, + abs_path.replace(r"\\", "/"), + ]: + + filesystem, _ = fsspec_from_config(config) + + try: + all_file_items = list(glob_files(filesystem, bucket_url)) + expected_files = [ + "freshman_kgs.csv", + "freshman_lbs.csv", + "mlb_players.csv", + "mlb_teams_2012.csv", + "mlb_players.jsonl", + "A881_20230920.csv", + "A803_20230919.csv", + "A803_20230920.csv", + "mlb_players.parquet", + "taxi.csv.gz", + "sample.txt", + ] + for file in all_file_items: + file_name = file["file_name"].split("\\")[-1].split("/")[-1] + assert file_name in expected_files + except NotImplementedError as ex: + pytest.skip(f"Skipping due to {str(ex)}") From 7c465bc058b19f07ec1542c1162865ff550e8dea Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 17 Apr 2024 14:23:09 +0400 Subject: [PATCH 04/22] add the third slash --- dlt/common/storages/fsspec_filesystem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 154cf55df8..e6b1df5f9e 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -316,7 +316,7 @@ def glob_files( if file.startswith("//"): file_name = file.replace("//", "\\\\") - file_url = "file://" + file_name + file_url = "file:///" + file_name else: file_name = posixpath.relpath(file, bucket_url_no_schema) From 6fee7280e007a9388520398ee2bd631d2cf5bfbc Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 17 Apr 2024 14:25:31 +0400 Subject: [PATCH 05/22] fix slahes --- dlt/common/storages/fsspec_filesystem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index e6b1df5f9e..283dd5aad7 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -315,7 +315,7 @@ def glob_files( file = f"/{file}" if file.startswith("//"): - file_name = file.replace("//", "\\\\") + file_name = file file_url = "file:///" + file_name else: file_name = posixpath.relpath(file, bucket_url_no_schema) From f43a13a3125d4ae1291b0732cee6b0bebe292c79 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 17 Apr 2024 14:26:46 +0400 Subject: [PATCH 06/22] fix if --- dlt/common/storages/fsspec_filesystem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 283dd5aad7..2b79add1c2 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -314,7 +314,7 @@ def glob_files( if bucket_url_parsed.scheme == "file" and not file.startswith("/"): file = f"/{file}" - if file.startswith("//"): + if "$" in file: file_name = file file_url = "file:///" + file_name else: From 95edfafdcb912c3ef2a6b269809d9d821d496e90 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 17 Apr 2024 23:14:07 +0200 Subject: [PATCH 07/22] less invasive switch to Python glob --- dlt/common/storages/fsspec_filesystem.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 2b79add1c2..194a1435a1 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -291,8 +291,9 @@ def glob_files( bucket_url_no_schema[2:] if bucket_url_no_schema.startswith("//") else bucket_url_no_schema ) filter_url = posixpath.join(bucket_url_no_schema, file_glob) - - if "$" in filter_url: # process UNC paths with Python glob module + is_unc_path = fs_client.protocol == "file" and "$" in filter_url + if is_unc_path: + # process UNC paths with Python glob module files = glob.glob(filter_url, recursive=True) glob_result = {} for file in files: @@ -314,7 +315,7 @@ def glob_files( if bucket_url_parsed.scheme == "file" and not file.startswith("/"): file = f"/{file}" - if "$" in file: + if is_unc_path: file_name = file file_url = "file:///" + file_name else: From 50c6f2b471e5bc8a72c3bb1751312952f1c1ca64 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 17 Apr 2024 23:42:22 +0200 Subject: [PATCH 08/22] moves unc test to local so it runs on windows --- dlt/common/storages/fsspec_filesystem.py | 6 +-- .../common/storages/test_local_filesystem.py | 39 ++++++++++++++++++ .../load/filesystem/test_filesystem_common.py | 41 ------------------- 3 files changed, 41 insertions(+), 45 deletions(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 194a1435a1..5117159e50 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -211,10 +211,8 @@ def open( # noqa: A003 elif compression == "disable": compression_arg = None else: - raise ValueError( - """The argument `compression` must have one of the following values: - "auto", "enable", "disable".""" - ) + raise ValueError("""The argument `compression` must have one of the following values: + "auto", "enable", "disable".""") opened_file: IO[Any] # if the user has already extracted the content, we use it so there is no need to diff --git a/tests/common/storages/test_local_filesystem.py b/tests/common/storages/test_local_filesystem.py index ea6adec2c7..3f4e6ed885 100644 --- a/tests/common/storages/test_local_filesystem.py +++ b/tests/common/storages/test_local_filesystem.py @@ -2,6 +2,7 @@ import itertools import pytest import pathlib +import platform from dlt.common.storages import fsspec_from_config, FilesystemConfiguration from dlt.common.storages.fsspec_filesystem import FileItemDict, glob_files @@ -54,3 +55,41 @@ def test_filesystem_decompress() -> None: # read as uncompressed binary with file_dict.open(compression="enable") as f: assert f.read().startswith(b'"1200864931","2015-07-01 00:00:13"') + + +@pytest.mark.skipif(platform.system() != "Windows", reason="Test it only on Windows") +def test_windows_unc_path() -> None: + config = FilesystemConfiguration(bucket_url=TEST_SAMPLE_FILES) + config.read_only = True + + unc_path = r"\\localhost\\" + os.path.abspath("tests/common/storages/samples").replace(":", "$") + abs_path = os.path.abspath(r"tests/common/storages/samples") + + for bucket_url in [ + unc_path, + "file://" + unc_path, + abs_path, + abs_path.replace(r"\\", "/"), + ]: + filesystem, _ = fsspec_from_config(config) + + try: + all_file_items = list(glob_files(filesystem, bucket_url)) + expected_files = [ + "freshman_kgs.csv", + "freshman_lbs.csv", + "mlb_players.csv", + "mlb_teams_2012.csv", + "mlb_players.jsonl", + "A881_20230920.csv", + "A803_20230919.csv", + "A803_20230920.csv", + "mlb_players.parquet", + "taxi.csv.gz", + "sample.txt", + ] + for file in all_file_items: + file_name = file["file_name"].split("\\")[-1].split("/")[-1] + assert file_name in expected_files + except NotImplementedError as ex: + pytest.skip(f"Skipping due to {str(ex)}") diff --git a/tests/load/filesystem/test_filesystem_common.py b/tests/load/filesystem/test_filesystem_common.py index ab1c7e2769..5cb064a3b2 100644 --- a/tests/load/filesystem/test_filesystem_common.py +++ b/tests/load/filesystem/test_filesystem_common.py @@ -1,5 +1,4 @@ import os -import platform import posixpath from typing import Union, Dict @@ -29,7 +28,6 @@ from tests.utils import preserve_environ, autouse_test_storage from .utils import self_signed_cert from tests.common.configuration.utils import environment -from tests.pipeline.utils import assert_load_info, load_table_counts # mark all tests as essential, do not remove @@ -258,42 +256,3 @@ def test_filesystem_destination_passed_parameters_override_config_values() -> No bound_config = filesystem_destination.configuration(filesystem_config) assert bound_config.current_datetime == config_now assert bound_config.extra_placeholders == config_extra_placeholders - - -@pytest.mark.skipif(platform.system() != "Windows", reason="Test it only on Windows") -def test_windows_unc_path() -> None: - config = get_config() - config.read_only = True - - unc_path = r"\\localhost\\" + os.path.abspath("tests/common/storages/samples").replace(":", "$") - abs_path = os.path.abspath(r"tests/common/storages/samples") - - for bucket_url in [ - unc_path, - "file://" + unc_path, - abs_path, - abs_path.replace(r"\\", "/"), - ]: - - filesystem, _ = fsspec_from_config(config) - - try: - all_file_items = list(glob_files(filesystem, bucket_url)) - expected_files = [ - "freshman_kgs.csv", - "freshman_lbs.csv", - "mlb_players.csv", - "mlb_teams_2012.csv", - "mlb_players.jsonl", - "A881_20230920.csv", - "A803_20230919.csv", - "A803_20230920.csv", - "mlb_players.parquet", - "taxi.csv.gz", - "sample.txt", - ] - for file in all_file_items: - file_name = file["file_name"].split("\\")[-1].split("/")[-1] - assert file_name in expected_files - except NotImplementedError as ex: - pytest.skip(f"Skipping due to {str(ex)}") From 2f1346c379dc84d15c2bd5ea462c9813f38760a8 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 18 Apr 2024 12:55:41 +0400 Subject: [PATCH 09/22] review fixes --- dlt/common/storages/fsspec_filesystem.py | 20 ++++++---- .../common/storages/test_local_filesystem.py | 38 +++++++++---------- 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 5117159e50..5903aed61a 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -211,8 +211,10 @@ def open( # noqa: A003 elif compression == "disable": compression_arg = None else: - raise ValueError("""The argument `compression` must have one of the following values: - "auto", "enable", "disable".""") + raise ValueError( + """The argument `compression` must have one of the following values: + "auto", "enable", "disable".""" + ) opened_file: IO[Any] # if the user has already extracted the content, we use it so there is no need to @@ -277,9 +279,12 @@ def glob_files( """ import os + is_unc_path = "$" in bucket_url bucket_url_parsed = urlparse(bucket_url) # if this is a file path without a scheme - if not bucket_url_parsed.scheme or (os.path.isabs(bucket_url) and "\\" in bucket_url): + if ( + not bucket_url_parsed.scheme or (os.path.isabs(bucket_url) and "\\" in bucket_url) + ) and not is_unc_path: # this is a file so create a proper file url bucket_url = pathlib.Path(bucket_url).absolute().as_uri() bucket_url_parsed = urlparse(bucket_url) @@ -289,7 +294,6 @@ def glob_files( bucket_url_no_schema[2:] if bucket_url_no_schema.startswith("//") else bucket_url_no_schema ) filter_url = posixpath.join(bucket_url_no_schema, file_glob) - is_unc_path = fs_client.protocol == "file" and "$" in filter_url if is_unc_path: # process UNC paths with Python glob module files = glob.glob(filter_url, recursive=True) @@ -309,13 +313,15 @@ def glob_files( if md["type"] != "file": continue + scheme = bucket_url_parsed.scheme # make that absolute path on a file:// if bucket_url_parsed.scheme == "file" and not file.startswith("/"): file = f"/{file}" if is_unc_path: - file_name = file - file_url = "file:///" + file_name + file_url = "file:///" + file + file_name = file.replace(bucket_url_no_schema, "").replace("\\", "/").lstrip("/") + scheme = "file" else: file_name = posixpath.relpath(file, bucket_url_no_schema) @@ -329,6 +335,6 @@ def glob_files( file_url=file_url, mime_type=mime_type, encoding=encoding, - modification_date=MTIME_DISPATCH[bucket_url_parsed.scheme](md), + modification_date=MTIME_DISPATCH[scheme](md), size_in_bytes=int(md["size"]), ) diff --git a/tests/common/storages/test_local_filesystem.py b/tests/common/storages/test_local_filesystem.py index 3f4e6ed885..6ad03e99f9 100644 --- a/tests/common/storages/test_local_filesystem.py +++ b/tests/common/storages/test_local_filesystem.py @@ -73,23 +73,21 @@ def test_windows_unc_path() -> None: ]: filesystem, _ = fsspec_from_config(config) - try: - all_file_items = list(glob_files(filesystem, bucket_url)) - expected_files = [ - "freshman_kgs.csv", - "freshman_lbs.csv", - "mlb_players.csv", - "mlb_teams_2012.csv", - "mlb_players.jsonl", - "A881_20230920.csv", - "A803_20230919.csv", - "A803_20230920.csv", - "mlb_players.parquet", - "taxi.csv.gz", - "sample.txt", - ] - for file in all_file_items: - file_name = file["file_name"].split("\\")[-1].split("/")[-1] - assert file_name in expected_files - except NotImplementedError as ex: - pytest.skip(f"Skipping due to {str(ex)}") + all_file_items = list(glob_files(filesystem, bucket_url)) + expected_files = [ + "csv/freshman_kgs.csv", + "csv/freshman_lbs.csv", + "csv/mlb_players.csv", + "csv/mlb_teams_2012.csv", + "jsonl/mlb_players.jsonl", + "met_csv/A801/A881_20230920.csv", + "met_csv/A803/A803_20230919.csv", + "met_csv/A803/A803_20230920.csv", + "parquet/mlb_players.parquet", + "gzip/taxi.csv.gz", + "sample.txt", + ] + assert len(all_file_items) == len(expected_files) + + for file in all_file_items: + assert file["file_name"] in expected_files From 0b8e04ef1dc8dd4b340777c1030d396142b18da2 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Mon, 22 Apr 2024 16:41:34 +0400 Subject: [PATCH 10/22] test UNC path reading --- dlt/common/storages/fsspec_filesystem.py | 48 +++++++++++++------ .../common/storages/test_local_filesystem.py | 26 +++------- tests/common/storages/utils.py | 9 +++- 3 files changed, 47 insertions(+), 36 deletions(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 5903aed61a..9ebfa17b2c 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -2,6 +2,7 @@ import glob import gzip import mimetypes +import os import pathlib import posixpath from io import BytesIO @@ -182,6 +183,15 @@ def fsspec(self) -> AbstractFileSystem: else: return fsspec_filesystem(self["file_url"], self.credentials)[0] + @property + def unc_url(self) -> str: + """Get the URL in a form of UNC path. + + Returns: + str: UNC path. + """ + return self["file_url"].replace("file:", "").lstrip("/") + def open( # noqa: A003 self, mode: str = "rb", @@ -237,9 +247,15 @@ def open( # noqa: A003 **text_kwargs, ) else: - opened_file = self.fsspec.open( - self["file_url"], mode=mode, compression=compression_arg, **kwargs - ) + if "$" in self["file_url"]: + if self["file_name"].endswith(".gz") and compression_arg == "gzip": + opened_file = gzip.open(self.unc_url, mode=mode, **kwargs) + else: + opened_file = open(self.unc_url, mode=mode, **kwargs) + else: + opened_file = self.fsspec.open( + self["file_url"], mode=mode, compression=compression_arg, **kwargs + ) return opened_file def read_bytes(self) -> bytes: @@ -248,11 +264,14 @@ def read_bytes(self) -> bytes: Returns: bytes: The file content. """ - return ( # type: ignore - self["file_content"] - if "file_content" in self and self["file_content"] is not None - else self.fsspec.read_bytes(self["file_url"]) - ) + if "file_content" in self and self["file_content"] is not None: + return self["file_content"] + else: + if "$" in self["file_url"]: + with open(self.unc_url, "rb") as f: + return f.read() + + return self.fsspec.read_bytes(self["file_url"]) def guess_mime_type(file_name: str) -> Sequence[str]: @@ -277,8 +296,6 @@ def glob_files( Returns: Iterable[FileItem]: The list of files. """ - import os - is_unc_path = "$" in bucket_url bucket_url_parsed = urlparse(bucket_url) # if this is a file path without a scheme @@ -320,18 +337,19 @@ def glob_files( if is_unc_path: file_url = "file:///" + file - file_name = file.replace(bucket_url_no_schema, "").replace("\\", "/").lstrip("/") + rel_path = file.replace(bucket_url_no_schema, "").replace("\\", "/").lstrip("/") scheme = "file" else: - file_name = posixpath.relpath(file, bucket_url_no_schema) + rel_path = posixpath.relpath(file, bucket_url_no_schema) file_url = bucket_url_parsed._replace( - path=posixpath.join(bucket_url_parsed.path, file_name) + path=posixpath.join(bucket_url_parsed.path, rel_path) ).geturl() - mime_type, encoding = guess_mime_type(file_name) + mime_type, encoding = guess_mime_type(rel_path) yield FileItem( - file_name=file_name, + file_name=os.path.basename(rel_path), + relative_path=rel_path, file_url=file_url, mime_type=mime_type, encoding=encoding, diff --git a/tests/common/storages/test_local_filesystem.py b/tests/common/storages/test_local_filesystem.py index 6ad03e99f9..08ebf28807 100644 --- a/tests/common/storages/test_local_filesystem.py +++ b/tests/common/storages/test_local_filesystem.py @@ -58,7 +58,8 @@ def test_filesystem_decompress() -> None: @pytest.mark.skipif(platform.system() != "Windows", reason="Test it only on Windows") -def test_windows_unc_path() -> None: +@pytest.mark.parametrize("load_content", [True, False]) +def test_windows_unc_path(load_content: bool) -> None: config = FilesystemConfiguration(bucket_url=TEST_SAMPLE_FILES) config.read_only = True @@ -73,21 +74,8 @@ def test_windows_unc_path() -> None: ]: filesystem, _ = fsspec_from_config(config) - all_file_items = list(glob_files(filesystem, bucket_url)) - expected_files = [ - "csv/freshman_kgs.csv", - "csv/freshman_lbs.csv", - "csv/mlb_players.csv", - "csv/mlb_teams_2012.csv", - "jsonl/mlb_players.jsonl", - "met_csv/A801/A881_20230920.csv", - "met_csv/A803/A803_20230919.csv", - "met_csv/A803/A803_20230920.csv", - "parquet/mlb_players.parquet", - "gzip/taxi.csv.gz", - "sample.txt", - ] - assert len(all_file_items) == len(expected_files) - - for file in all_file_items: - assert file["file_name"] in expected_files + try: + all_file_items = list(glob_files(filesystem, bucket_url)) + assert_sample_files(all_file_items, filesystem, config, load_content) + except NotImplementedError as ex: + pytest.skip("Skipping due to " + str(ex)) diff --git a/tests/common/storages/utils.py b/tests/common/storages/utils.py index 0eb472a9af..0782151c7f 100644 --- a/tests/common/storages/utils.py +++ b/tests/common/storages/utils.py @@ -53,7 +53,7 @@ def assert_sample_files( for item in all_file_items: # only accept file items we know - assert item["file_name"] in minimally_expected_file_items + assert item["relative_path"] in minimally_expected_file_items # is valid url file_url_parsed = urlparse(item["file_url"]) @@ -63,7 +63,12 @@ def assert_sample_files( assert isinstance(item["mime_type"], str) assert isinstance(item["size_in_bytes"], int) assert isinstance(item["modification_date"], pendulum.DateTime) - content = filesystem.read_bytes(item["file_url"]) + + if "$" in item["file_url"]: + with open(item["file_url"].replace("file:", "").lstrip("/"), "rb") as f: + content = f.read() + else: + content = filesystem.read_bytes(item["file_url"]) assert len(content) == item["size_in_bytes"] if load_content: item["file_content"] = content From 7c1bdfb360e8b0cfdd32f375b9dcde81b1310b71 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 23 Apr 2024 10:55:59 +0400 Subject: [PATCH 11/22] add docs --- dlt/common/storages/fsspec_filesystem.py | 4 +++- .../website/docs/dlt-ecosystem/verified-sources/filesystem.md | 3 ++- tests/common/storages/utils.py | 2 ++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 9ebfa17b2c..9da904ae5c 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -6,6 +6,7 @@ import pathlib import posixpath from io import BytesIO +from gzip import GzipFile from typing import ( Literal, cast, @@ -43,6 +44,7 @@ class FileItem(TypedDict, total=False): file_url: str file_name: str + relative_path: str mime_type: str encoding: Optional[str] modification_date: pendulum.DateTime @@ -197,7 +199,7 @@ def open( # noqa: A003 mode: str = "rb", compression: Literal["auto", "disable", "enable"] = "auto", **kwargs: Any, - ) -> IO[Any]: + ) -> Union[GzipFile, IO[Any]]: """Open the file as a fsspec file. This method opens the file represented by this dictionary as a file-like object using diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md index c4cb0e536e..8ff4e2363a 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md @@ -296,7 +296,8 @@ data. You can quickly build pipelines to: #### `FileItem` Fields: - `file_url` - Complete URL of the file; also the primary key (e.g. `file://`). -- `file_name` - Name or relative path of the file from the bucket URL. +- `file_name` - Name of the file from the bucket URL. +- `relative_path` - Relative path of the file in the bucket. - `mime_type` - File's mime type; sourced from the bucket provider or inferred from its extension. - `modification_date` - File's last modification time (format: `pendulum.DateTime`). - `size_in_bytes` - File size. diff --git a/tests/common/storages/utils.py b/tests/common/storages/utils.py index 0782151c7f..dabcb99881 100644 --- a/tests/common/storages/utils.py +++ b/tests/common/storages/utils.py @@ -49,6 +49,7 @@ def assert_sample_files( "gzip/taxi.csv.gz", "sample.txt", } + expected_file_names = [path.split("/")[-1] for path in minimally_expected_file_items] assert len(all_file_items) == len(minimally_expected_file_items) for item in all_file_items: @@ -58,6 +59,7 @@ def assert_sample_files( # is valid url file_url_parsed = urlparse(item["file_url"]) assert isinstance(item["file_name"], str) + assert item["file_name"] in expected_file_names assert file_url_parsed.path.endswith(item["file_name"]) assert item["file_url"].startswith(config.protocol) assert isinstance(item["mime_type"], str) From e7a893f6365864c2c46c21ff6cb84f6e432c8798 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 23 Apr 2024 11:36:32 +0400 Subject: [PATCH 12/22] lint ignores --- dlt/common/storages/fsspec_filesystem.py | 8 ++++---- tests/common/storages/utils.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 9da904ae5c..23e993b038 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -192,7 +192,7 @@ def unc_url(self) -> str: Returns: str: UNC path. """ - return self["file_url"].replace("file:", "").lstrip("/") + return self["file_url"].replace("file:", "").lstrip("/") # type: ignore def open( # noqa: A003 self, @@ -228,7 +228,7 @@ def open( # noqa: A003 "auto", "enable", "disable".""" ) - opened_file: IO[Any] + opened_file: Union[IO[Any], GzipFile] # if the user has already extracted the content, we use it so there is no need to # download the file again. if "file_content" in self: @@ -267,13 +267,13 @@ def read_bytes(self) -> bytes: bytes: The file content. """ if "file_content" in self and self["file_content"] is not None: - return self["file_content"] + return self["file_content"] # type: ignore else: if "$" in self["file_url"]: with open(self.unc_url, "rb") as f: return f.read() - return self.fsspec.read_bytes(self["file_url"]) + return self.fsspec.read_bytes(self["file_url"]) # type: ignore def guess_mime_type(file_name: str) -> Sequence[str]: diff --git a/tests/common/storages/utils.py b/tests/common/storages/utils.py index dabcb99881..c620c54220 100644 --- a/tests/common/storages/utils.py +++ b/tests/common/storages/utils.py @@ -93,7 +93,7 @@ def assert_sample_files( # fieldnames below are not really correct but allow to load first 3 columns # even if first row does not have header names - elements = list(DictReader(f, fieldnames=["A", "B", "C"])) + elements = list(DictReader(f, fieldnames=["A", "B", "C"])) # type: ignore assert len(elements) > 0 if item["mime_type"] == "application/parquet": # verify it is a real parquet From fbcffbb6c3b530ac68d16b241ab7199095771859 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 23 Apr 2024 12:24:20 +0400 Subject: [PATCH 13/22] lint fix --- tests/common/storages/test_local_filesystem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/common/storages/test_local_filesystem.py b/tests/common/storages/test_local_filesystem.py index 08ebf28807..2e62043a7e 100644 --- a/tests/common/storages/test_local_filesystem.py +++ b/tests/common/storages/test_local_filesystem.py @@ -51,7 +51,7 @@ def test_filesystem_decompress() -> None: with file_dict.open(mode="tr") as f: lines = f.readlines() assert len(lines) > 1 - assert lines[0].startswith('"1200864931","2015-07-01 00:00:13"') + assert lines[0].startswith('"1200864931","2015-07-01 00:00:13"') # type: ignore # read as uncompressed binary with file_dict.open(compression="enable") as f: assert f.read().startswith(b'"1200864931","2015-07-01 00:00:13"') From f11171412277ef7e10bfa446c42158f985f7fba0 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 23 Apr 2024 12:34:20 +0400 Subject: [PATCH 14/22] lint fix --- dlt/common/storages/fsspec_filesystem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 23e993b038..d43d2a7ebc 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -253,7 +253,7 @@ def open( # noqa: A003 if self["file_name"].endswith(".gz") and compression_arg == "gzip": opened_file = gzip.open(self.unc_url, mode=mode, **kwargs) else: - opened_file = open(self.unc_url, mode=mode, **kwargs) + opened_file = open(self.unc_url, mode=mode, **kwargs) # type: ignore else: opened_file = self.fsspec.open( self["file_url"], mode=mode, compression=compression_arg, **kwargs From 7c0a918b00994178270b79cbbbbad23c92add964 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 23 Apr 2024 12:38:22 +0400 Subject: [PATCH 15/22] excess ignore --- dlt/common/storages/fsspec_filesystem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index d43d2a7ebc..23e993b038 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -253,7 +253,7 @@ def open( # noqa: A003 if self["file_name"].endswith(".gz") and compression_arg == "gzip": opened_file = gzip.open(self.unc_url, mode=mode, **kwargs) else: - opened_file = open(self.unc_url, mode=mode, **kwargs) # type: ignore + opened_file = open(self.unc_url, mode=mode, **kwargs) else: opened_file = self.fsspec.open( self["file_url"], mode=mode, compression=compression_arg, **kwargs From 5232ce6959f999ecd685a768bc98e19074a0b0ab Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 23 Apr 2024 13:15:41 +0400 Subject: [PATCH 16/22] lint fix --- dlt/common/storages/fsspec_filesystem.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 23e993b038..f0088565d0 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -253,7 +253,7 @@ def open( # noqa: A003 if self["file_name"].endswith(".gz") and compression_arg == "gzip": opened_file = gzip.open(self.unc_url, mode=mode, **kwargs) else: - opened_file = open(self.unc_url, mode=mode, **kwargs) + opened_file = open(self.unc_url, mode=mode, **kwargs) # noqa: ENC003 else: opened_file = self.fsspec.open( self["file_url"], mode=mode, compression=compression_arg, **kwargs From e0ef3a957e29b9c033474ed458ac32c3f8d8e020 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sat, 27 Apr 2024 12:56:02 +0200 Subject: [PATCH 17/22] adds utils to convert from file to local path to filesystem config --- dlt/common/storages/configuration.py | 86 ++++++++++++++++++++++------ 1 file changed, 67 insertions(+), 19 deletions(-) diff --git a/dlt/common/storages/configuration.py b/dlt/common/storages/configuration.py index d0100c335d..ef70f49094 100644 --- a/dlt/common/storages/configuration.py +++ b/dlt/common/storages/configuration.py @@ -1,6 +1,7 @@ import os -from typing import TYPE_CHECKING, Any, Literal, Optional, Type, get_args, ClassVar, Dict, Union -from urllib.parse import urlparse +import pathlib +from typing import Any, Literal, Optional, Type, get_args, ClassVar, Dict, Union +from urllib.parse import urlparse, unquote from dlt.common.configuration import configspec, resolve_type from dlt.common.configuration.exceptions import ConfigurationValueError @@ -87,24 +88,22 @@ class FilesystemConfiguration(BaseConfiguration): @property def protocol(self) -> str: """`bucket_url` protocol""" - url = urlparse(self.bucket_url) - # this prevents windows absolute paths to be recognized as schemas - if not url.scheme or (os.path.isabs(self.bucket_url) and "\\" in self.bucket_url): + if self.is_local_path(self.bucket_url): return "file" else: - return url.scheme + return urlparse(self.bucket_url).scheme def on_resolved(self) -> None: - url = urlparse(self.bucket_url) - if not url.path and not url.netloc: + uri = urlparse(self.bucket_url) + if not uri.path and not uri.netloc: raise ConfigurationValueError( - "File path or netloc missing. Field bucket_url of FilesystemClientConfiguration" - " must contain valid url with a path or host:password component." + "File path and netloc are missing. Field bucket_url of" + " FilesystemClientConfiguration must contain valid uri with a path or host:password" + " component." ) # this is just a path in a local file system - if url.path == self.bucket_url: - url = url._replace(scheme="file") - self.bucket_url = url.geturl() + if self.is_local_path(self.bucket_url): + self.bucket_url = self.make_file_uri(self.bucket_url) @resolve_type("credentials") def resolve_credentials_type(self) -> Type[CredentialsConfiguration]: @@ -117,11 +116,60 @@ def fingerprint(self) -> str: def __str__(self) -> str: """Return displayable destination location""" - url = urlparse(self.bucket_url) + uri = urlparse(self.bucket_url) # do not show passwords - if url.password: - new_netloc = f"{url.username}:****@{url.hostname}" - if url.port: - new_netloc += f":{url.port}" - return url._replace(netloc=new_netloc).geturl() + if uri.password: + new_netloc = f"{uri.username}:****@{uri.hostname}" + if uri.port: + new_netloc += f":{uri.port}" + return uri._replace(netloc=new_netloc).geturl() return self.bucket_url + + @staticmethod + def is_local_path(uri: str) -> bool: + """Checks if `uri` is a local path, without a schema""" + uri_parsed = urlparse(uri) + # this prevents windows absolute paths to be recognized as schemas + return not uri_parsed.scheme or os.path.isabs(uri) + + @staticmethod + def make_local_path(file_uri: str) -> str: + """Gets a valid local filesystem path from file:// scheme. + Supports POSIX/Windows/UNC paths + + Returns: + str: local filesystem path + """ + uri = urlparse(file_uri) + if uri.scheme != "file": + raise ValueError(f"Must be file scheme but is {uri.scheme}") + if not uri.path and not uri.netloc: + raise ConfigurationValueError("File path and netloc are missing.") + local_path = unquote(uri.path) + if uri.netloc: + # or UNC file://localhost/path + local_path = "//" + unquote(uri.netloc) + local_path + else: + # if we are on windows, strip the POSIX root from path which is always absolute + if os.path.sep != local_path[0]: + # filesystem root + if local_path == "/": + return str(pathlib.Path("/").resolve()) + # this prevents /C:/ or ///share/ where both POSIX and Windows root are present + if os.path.isabs(local_path[1:]): + local_path = local_path[1:] + return str(pathlib.Path(local_path)) + + @staticmethod + def make_file_uri(local_path: str) -> str: + """Creates a normalized file:// uri from a local path + + netloc is never set. UNC paths are represented as file:////host/path + """ + # we do not go straight into `uri` here because this converts UNC paths + # into file://host/path and we want file://///host/path + p_ = pathlib.Path(local_path) + p_ = p_.expanduser() + # if not p_.is_absolute(): + p_ = p_.resolve() + return "file:///" + p_.as_posix().lstrip("/") From d983976520fec0aa3468ae431cabab2a691fe4c2 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sat, 27 Apr 2024 12:57:02 +0200 Subject: [PATCH 18/22] offloads globbing to Python glob for locat paths, allows UNC and native paths forms, improves tests --- dlt/common/storages/fsspec_filesystem.py | 100 +++----- .../common/storages/test_local_filesystem.py | 238 +++++++++++++++--- tests/common/storages/utils.py | 68 +++-- 3 files changed, 290 insertions(+), 116 deletions(-) diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index f0088565d0..0c108e67d4 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -2,7 +2,6 @@ import glob import gzip import mimetypes -import os import pathlib import posixpath from io import BytesIO @@ -186,13 +185,14 @@ def fsspec(self) -> AbstractFileSystem: return fsspec_filesystem(self["file_url"], self.credentials)[0] @property - def unc_url(self) -> str: - """Get the URL in a form of UNC path. + def local_file_path(self) -> str: + """Gets a valid local filesystem path from file:// scheme. + Supports POSIX/Windows/UNC paths Returns: - str: UNC path. + str: local filesystem path """ - return self["file_url"].replace("file:", "").lstrip("/") # type: ignore + return FilesystemConfiguration.make_local_path(self["file_url"]) def open( # noqa: A003 self, @@ -223,10 +223,8 @@ def open( # noqa: A003 elif compression == "disable": compression_arg = None else: - raise ValueError( - """The argument `compression` must have one of the following values: - "auto", "enable", "disable".""" - ) + raise ValueError("""The argument `compression` must have one of the following values: + "auto", "enable", "disable".""") opened_file: Union[IO[Any], GzipFile] # if the user has already extracted the content, we use it so there is no need to @@ -249,15 +247,14 @@ def open( # noqa: A003 **text_kwargs, ) else: - if "$" in self["file_url"]: - if self["file_name"].endswith(".gz") and compression_arg == "gzip": - opened_file = gzip.open(self.unc_url, mode=mode, **kwargs) - else: - opened_file = open(self.unc_url, mode=mode, **kwargs) # noqa: ENC003 + if "local" in self.fsspec.protocol: + # use native local file path to open file:// uris + file_url = self.local_file_path else: - opened_file = self.fsspec.open( - self["file_url"], mode=mode, compression=compression_arg, **kwargs - ) + file_url = self["file_url"] + opened_file = self.fsspec.open( + file_url, mode=mode, compression=compression_arg, **kwargs + ) return opened_file def read_bytes(self) -> bytes: @@ -269,11 +266,8 @@ def read_bytes(self) -> bytes: if "file_content" in self and self["file_content"] is not None: return self["file_content"] # type: ignore else: - if "$" in self["file_url"]: - with open(self.unc_url, "rb") as f: - return f.read() - - return self.fsspec.read_bytes(self["file_url"]) # type: ignore + with self.open(mode="rb", compression="disable") as f: + return f.read() def guess_mime_type(file_name: str) -> Sequence[str]: @@ -298,59 +292,45 @@ def glob_files( Returns: Iterable[FileItem]: The list of files. """ - is_unc_path = "$" in bucket_url - bucket_url_parsed = urlparse(bucket_url) - # if this is a file path without a scheme - if ( - not bucket_url_parsed.scheme or (os.path.isabs(bucket_url) and "\\" in bucket_url) - ) and not is_unc_path: - # this is a file so create a proper file url - bucket_url = pathlib.Path(bucket_url).absolute().as_uri() + is_local_fs = "local" in fs_client.protocol + if is_local_fs and FilesystemConfiguration.is_local_path(bucket_url): + bucket_url = FilesystemConfiguration.make_file_uri(bucket_url) + bucket_url_parsed = urlparse(bucket_url) + else: bucket_url_parsed = urlparse(bucket_url) - bucket_url_no_schema = bucket_url_parsed._replace(scheme="", query="").geturl() - bucket_url_no_schema = ( - bucket_url_no_schema[2:] if bucket_url_no_schema.startswith("//") else bucket_url_no_schema - ) - filter_url = posixpath.join(bucket_url_no_schema, file_glob) - if is_unc_path: - # process UNC paths with Python glob module - files = glob.glob(filter_url, recursive=True) - glob_result = {} - for file in files: - glob_result[file] = fs_client.info(file) + if is_local_fs: + root_dir = FilesystemConfiguration.make_local_path(bucket_url) + # use a Python glob to get files + files = glob.glob(str(pathlib.Path(root_dir).joinpath(file_glob)), recursive=True) + glob_result = {file: fs_client.info(file) for file in files} else: + root_dir = bucket_url_parsed._replace(scheme="", query="").geturl().lstrip("/") + filter_url = posixpath.join(root_dir, file_glob) glob_result = fs_client.glob(filter_url, detail=True) - - if isinstance(glob_result, list): - raise NotImplementedError( - "Cannot request details when using fsspec.glob. For adlfs (Azure) please use version" - " 2023.9.0 or later" - ) + if isinstance(glob_result, list): + raise NotImplementedError( + "Cannot request details when using fsspec.glob. For adlfs (Azure) please use" + " version 2023.9.0 or later" + ) for file, md in glob_result.items(): if md["type"] != "file": continue - - scheme = bucket_url_parsed.scheme - # make that absolute path on a file:// - if bucket_url_parsed.scheme == "file" and not file.startswith("/"): - file = f"/{file}" - - if is_unc_path: - file_url = "file:///" + file - rel_path = file.replace(bucket_url_no_schema, "").replace("\\", "/").lstrip("/") - scheme = "file" + # relative paths are always POSIX + if is_local_fs: + rel_path = pathlib.Path(file).relative_to(root_dir).as_posix() + file_url = FilesystemConfiguration.make_file_uri(file) else: - rel_path = posixpath.relpath(file, bucket_url_no_schema) - + rel_path = posixpath.relpath(file, root_dir) file_url = bucket_url_parsed._replace( path=posixpath.join(bucket_url_parsed.path, rel_path) ).geturl() + scheme = bucket_url_parsed.scheme mime_type, encoding = guess_mime_type(rel_path) yield FileItem( - file_name=os.path.basename(rel_path), + file_name=posixpath.basename(rel_path), relative_path=rel_path, file_url=file_url, mime_type=mime_type, diff --git a/tests/common/storages/test_local_filesystem.py b/tests/common/storages/test_local_filesystem.py index 2e62043a7e..b94b16cd96 100644 --- a/tests/common/storages/test_local_filesystem.py +++ b/tests/common/storages/test_local_filesystem.py @@ -1,38 +1,209 @@ import os -import itertools import pytest import pathlib -import platform +from dlt.common.configuration.exceptions import ConfigurationValueError +from dlt.common.configuration.resolve import resolve_configuration from dlt.common.storages import fsspec_from_config, FilesystemConfiguration from dlt.common.storages.fsspec_filesystem import FileItemDict, glob_files -from tests.common.storages.utils import assert_sample_files +from tests.common.storages.utils import assert_sample_files, TEST_SAMPLE_FILES +from tests.utils import skipifnotwindows, skipifwindows -TEST_SAMPLE_FILES = "tests/common/storages/samples" +UNC_LOCAL_PATH = r"\\localhost\c$\tests\common\test.csv" +UNC_WSL_PATH = r"\\wsl.localhost\Ubuntu-18.04\home\rudolfix\ .dlt" +@skipifnotwindows @pytest.mark.parametrize( - "bucket_url,load_content", itertools.product(["file:///", "/", ""], [True, False]) + "bucket_url,file_url", + ( + (UNC_LOCAL_PATH, "file:///" + pathlib.PureWindowsPath(UNC_LOCAL_PATH).as_posix()), + (UNC_WSL_PATH, "file:///" + pathlib.PureWindowsPath(UNC_WSL_PATH).as_posix()), + (r"C:\hello", "file:///C:/hello"), + (r"a\b $\b", "file:///" + pathlib.Path(r"a\b $\b").resolve().as_posix()), + # same paths but with POSIX separators + ( + UNC_LOCAL_PATH.replace("\\", "/"), + "file:///" + pathlib.PureWindowsPath(UNC_LOCAL_PATH).as_posix(), + ), + ( + UNC_WSL_PATH.replace("\\", "/"), + "file:///" + pathlib.PureWindowsPath(UNC_WSL_PATH).as_posix(), + ), + (r"C:\hello".replace("\\", "/"), "file:///C:/hello"), + ( + r"a\b $\b".replace("\\", "/"), + "file:///" + pathlib.Path(r"a\b $\b").resolve().as_posix(), + ), + ), ) -def test_filesystem_dict_local(bucket_url: str, load_content: bool) -> None: +def test_local_path_win_configuration(bucket_url: str, file_url: str) -> None: + assert FilesystemConfiguration.is_local_path(bucket_url) is True + assert FilesystemConfiguration.make_file_uri(bucket_url) == file_url + + c = resolve_configuration(FilesystemConfiguration(bucket_url)) + assert c.protocol == "file" + assert c.bucket_url == file_url + assert FilesystemConfiguration.make_local_path(c.bucket_url) == str( + pathlib.Path(bucket_url).resolve() + ) + + +@skipifnotwindows +@pytest.mark.parametrize( + "bucket_url", + ( + r"~\Documents\csv_files\a", + r"~\Documents\csv_files\a".replace("\\", "/"), + ), +) +def test_local_user_win_path_configuration(bucket_url: str) -> None: + file_url = "file:///" + pathlib.Path(bucket_url).expanduser().as_posix().lstrip("/") + assert FilesystemConfiguration.is_local_path(bucket_url) is True + assert FilesystemConfiguration.make_file_uri(bucket_url) == file_url + + c = resolve_configuration(FilesystemConfiguration(bucket_url)) + assert c.protocol == "file" + assert c.bucket_url == file_url + assert FilesystemConfiguration.make_local_path(c.bucket_url) == str( + pathlib.Path(bucket_url).expanduser() + ) + + +@skipifnotwindows +def test_file_win_configuration() -> None: + assert ( + FilesystemConfiguration.make_local_path("file://localhost/c$/samples") + == r"\\localhost\c$\samples" + ) + assert ( + FilesystemConfiguration.make_local_path("file://///localhost/c$/samples") + == r"\\localhost\c$\samples" + ) + assert FilesystemConfiguration.make_local_path("file:///C:/samples") == r"C:\samples" + + +@skipifwindows +@pytest.mark.parametrize( + "bucket_url,file_url", + ( + (r"/var/logs", "file:///var/logs"), + (r"_storage", "file://" + pathlib.Path("_storage").resolve().as_posix()), + ), +) +def test_file_posix_configuration(bucket_url: str, file_url: str) -> None: + assert FilesystemConfiguration.is_local_path(bucket_url) is True + assert FilesystemConfiguration.make_file_uri(bucket_url) == file_url + + c = resolve_configuration(FilesystemConfiguration(bucket_url)) + assert c.protocol == "file" + assert c.bucket_url == file_url + assert FilesystemConfiguration.make_local_path(c.bucket_url) == str( + pathlib.Path(bucket_url).resolve() + ) + + +@skipifwindows +@pytest.mark.parametrize( + "bucket_url", + ("~/docs",), +) +def test_local_user_posix_path_configuration(bucket_url: str) -> None: + file_url = "file:///" + pathlib.Path(bucket_url).expanduser().as_posix().lstrip("/") + assert FilesystemConfiguration.is_local_path(bucket_url) is True + assert FilesystemConfiguration.make_file_uri(bucket_url) == file_url + + c = resolve_configuration(FilesystemConfiguration(bucket_url)) + assert c.protocol == "file" + assert c.bucket_url == file_url + assert FilesystemConfiguration.make_local_path(c.bucket_url) == str( + pathlib.Path(bucket_url).expanduser() + ) + + +@pytest.mark.parametrize( + "bucket_url,local_path", + ( + ("file://_storage", "_storage"), + ("file://_storage/a/b/c.txt", "_storage/a/b/c.txt"), + ), +) +def test_file_host_configuration(bucket_url: str, local_path: str) -> None: + # recognized as UNC path on windows. on POSIX the path does not make sense and start with "//" + assert FilesystemConfiguration.is_local_path(bucket_url) is False + assert FilesystemConfiguration.make_local_path(bucket_url) == str( + pathlib.Path("//" + local_path) + ) + + c = resolve_configuration(FilesystemConfiguration(bucket_url)) + assert c.protocol == "file" + assert c.bucket_url == bucket_url + + +@pytest.mark.parametrize( + "bucket_url,local_path,norm_bucket_url", + ( + ("file:", "", ""), + ("file://", "", ""), + ("file:/", "/", "file:///" + pathlib.Path("/").resolve().as_posix().lstrip("/")), + ), +) +def test_file_filesystem_configuration( + bucket_url: str, local_path: str, norm_bucket_url: str +) -> None: + assert FilesystemConfiguration.is_local_path(bucket_url) is False + if local_path == "": + # those paths are invalid + with pytest.raises(ConfigurationValueError): + FilesystemConfiguration.make_local_path(bucket_url) + else: + assert FilesystemConfiguration.make_local_path(bucket_url) == str( + pathlib.Path(local_path).resolve() + ) + assert FilesystemConfiguration.make_file_uri(local_path) == norm_bucket_url + + if local_path == "": + with pytest.raises(ConfigurationValueError): + resolve_configuration(FilesystemConfiguration(bucket_url)) + else: + c = resolve_configuration(FilesystemConfiguration(bucket_url)) + assert c.protocol == "file" + assert c.bucket_url == bucket_url + assert FilesystemConfiguration.make_local_path(c.bucket_url) == str( + pathlib.Path(local_path).resolve() + ) + + +@pytest.mark.parametrize( + "bucket_url", + ( + "s3://dlt-ci-test-bucket/", + "gdrive://Xaie902-1/a/c", + ), +) +def test_filesystem_bucket_configuration(bucket_url: str) -> None: + assert FilesystemConfiguration.is_local_path(bucket_url) is False + + +@pytest.mark.parametrize("bucket_url", ("file:/", "file:///", "/", "")) +@pytest.mark.parametrize("load_content", (True, False)) +@pytest.mark.parametrize("glob_filter", ("**", "**/*.csv", "*.txt", "met_csv/A803/*.csv")) +def test_filesystem_dict_local(bucket_url: str, load_content: bool, glob_filter: str) -> None: if bucket_url in [""]: # relative paths - bucket_url = TEST_SAMPLE_FILES + bucket_url = str(pathlib.Path(TEST_SAMPLE_FILES)) else: if bucket_url == "/": bucket_url = os.path.abspath(TEST_SAMPLE_FILES) else: - bucket_url = pathlib.Path(TEST_SAMPLE_FILES).absolute().as_uri() + bucket_url = bucket_url + pathlib.Path(TEST_SAMPLE_FILES).resolve().as_posix() config = FilesystemConfiguration(bucket_url=bucket_url) filesystem, _ = fsspec_from_config(config) # use glob to get data - try: - all_file_items = list(glob_files(filesystem, bucket_url)) - assert_sample_files(all_file_items, filesystem, config, load_content) - except NotImplementedError as ex: - pytest.skip("Skipping due to " + str(ex)) + all_file_items = list(glob_files(filesystem, bucket_url, file_glob=glob_filter)) + assert_sample_files(all_file_items, filesystem, config, load_content, glob_filter) def test_filesystem_decompress() -> None: @@ -57,25 +228,28 @@ def test_filesystem_decompress() -> None: assert f.read().startswith(b'"1200864931","2015-07-01 00:00:13"') -@pytest.mark.skipif(platform.system() != "Windows", reason="Test it only on Windows") +# create windows UNC paths, on POSIX systems they are not used +WIN_ABS_PATH = os.path.abspath(TEST_SAMPLE_FILES) +WIN_UNC_PATH = "\\\\localhost\\" + WIN_ABS_PATH.replace(":", "$").lower() + + +@skipifnotwindows +@pytest.mark.parametrize( + "bucket_url", + ( + WIN_UNC_PATH, + "file:///" + pathlib.Path(WIN_UNC_PATH).as_posix(), + "file://localhost/" + pathlib.Path(WIN_ABS_PATH).as_posix().replace(":", "$"), + WIN_ABS_PATH, + "file:///" + pathlib.Path(WIN_ABS_PATH).as_posix(), + # r"\\wsl.localhost\Ubuntu-18.04\home\rudolfix\src\dlt\tests\common\storages\samples" + ), +) @pytest.mark.parametrize("load_content", [True, False]) -def test_windows_unc_path(load_content: bool) -> None: +@pytest.mark.parametrize("glob_filter", ("**", "**/*.csv", "*.txt", "met_csv/A803/*.csv")) +def test_windows_unc_path(load_content: bool, bucket_url: str, glob_filter: str) -> None: config = FilesystemConfiguration(bucket_url=TEST_SAMPLE_FILES) config.read_only = True - - unc_path = r"\\localhost\\" + os.path.abspath("tests/common/storages/samples").replace(":", "$") - abs_path = os.path.abspath(r"tests/common/storages/samples") - - for bucket_url in [ - unc_path, - "file://" + unc_path, - abs_path, - abs_path.replace(r"\\", "/"), - ]: - filesystem, _ = fsspec_from_config(config) - - try: - all_file_items = list(glob_files(filesystem, bucket_url)) - assert_sample_files(all_file_items, filesystem, config, load_content) - except NotImplementedError as ex: - pytest.skip("Skipping due to " + str(ex)) + fs_client, _ = fsspec_from_config(config) + all_file_items = list(glob_files(fs_client, bucket_url, file_glob=glob_filter)) + assert_sample_files(all_file_items, fs_client, config, load_content, glob_filter) diff --git a/tests/common/storages/utils.py b/tests/common/storages/utils.py index c620c54220..0737136622 100644 --- a/tests/common/storages/utils.py +++ b/tests/common/storages/utils.py @@ -1,3 +1,5 @@ +import os +import glob from pathlib import Path from urllib.parse import urlparse import pytest @@ -22,6 +24,21 @@ from dlt.common.typing import StrAny, TDataItems from dlt.common.utils import uniq_id +TEST_SAMPLE_FILES = "tests/common/storages/samples" +MINIMALLY_EXPECTED_RELATIVE_PATHS = { + "csv/freshman_kgs.csv", + "csv/freshman_lbs.csv", + "csv/mlb_players.csv", + "csv/mlb_teams_2012.csv", + "jsonl/mlb_players.jsonl", + "met_csv/A801/A881_20230920.csv", + "met_csv/A803/A803_20230919.csv", + "met_csv/A803/A803_20230920.csv", + "parquet/mlb_players.parquet", + "gzip/taxi.csv.gz", + "sample.txt", +} + @pytest.fixture def load_storage() -> LoadStorage: @@ -30,31 +47,33 @@ def load_storage() -> LoadStorage: return s +def glob_local_case(glob_filter: str) -> List[str]: + all_files = [ + os.path.relpath(p, TEST_SAMPLE_FILES) + for p in glob.glob(os.path.join(TEST_SAMPLE_FILES, glob_filter), recursive=True) + if os.path.isfile(p) + ] + return [Path(p).as_posix() for p in all_files] + + def assert_sample_files( all_file_items: List[FileItem], filesystem: AbstractFileSystem, config: FilesystemConfiguration, load_content: bool, + glob_filter: str = "**", ) -> None: - minimally_expected_file_items = { - "csv/freshman_kgs.csv", - "csv/freshman_lbs.csv", - "csv/mlb_players.csv", - "csv/mlb_teams_2012.csv", - "jsonl/mlb_players.jsonl", - "met_csv/A801/A881_20230920.csv", - "met_csv/A803/A803_20230919.csv", - "met_csv/A803/A803_20230920.csv", - "parquet/mlb_players.parquet", - "gzip/taxi.csv.gz", - "sample.txt", - } - expected_file_names = [path.split("/")[-1] for path in minimally_expected_file_items] - assert len(all_file_items) == len(minimally_expected_file_items) + # sanity checks: all expected files are in the fixtures + assert MINIMALLY_EXPECTED_RELATIVE_PATHS == set(glob_local_case("**")) + # filter expected files by glob filter + expected_relative_paths = glob_local_case(glob_filter) + expected_file_names = [path.split("/")[-1] for path in expected_relative_paths] + + assert len(all_file_items) == len(expected_relative_paths) for item in all_file_items: # only accept file items we know - assert item["relative_path"] in minimally_expected_file_items + assert item["relative_path"] in expected_relative_paths # is valid url file_url_parsed = urlparse(item["file_url"]) @@ -62,21 +81,22 @@ def assert_sample_files( assert item["file_name"] in expected_file_names assert file_url_parsed.path.endswith(item["file_name"]) assert item["file_url"].startswith(config.protocol) + assert item["file_url"].endswith(item["relative_path"]) assert isinstance(item["mime_type"], str) assert isinstance(item["size_in_bytes"], int) assert isinstance(item["modification_date"], pendulum.DateTime) - if "$" in item["file_url"]: - with open(item["file_url"].replace("file:", "").lstrip("/"), "rb") as f: - content = f.read() - else: - content = filesystem.read_bytes(item["file_url"]) + # create file dict + file_dict = FileItemDict(item, config.credentials) + + try: + # try to load using local filesystem + content = filesystem.read_bytes(file_dict.local_file_path) + except ValueError: + content = filesystem.read_bytes(file_dict["file_url"]) assert len(content) == item["size_in_bytes"] if load_content: item["file_content"] = content - - # create file dict - file_dict = FileItemDict(item, config.credentials) dict_content = file_dict.read_bytes() assert content == dict_content with file_dict.open() as f: From 3810f80782c2cf3e4b4722f4c84c909233850563 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sat, 27 Apr 2024 12:57:46 +0200 Subject: [PATCH 19/22] allows the test file bucket to be relative path, fixes tests, adds several globs to tests --- tests/.dlt/config.toml | 2 +- .../load/filesystem/test_filesystem_common.py | 46 +++++++++++-------- tests/load/utils.py | 5 +- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/tests/.dlt/config.toml b/tests/.dlt/config.toml index cce4335f5a..ba86edf417 100644 --- a/tests/.dlt/config.toml +++ b/tests/.dlt/config.toml @@ -4,7 +4,7 @@ sentry_dsn="https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4 [tests] bucket_url_gs="gs://ci-test-bucket" bucket_url_s3="s3://dlt-ci-test-bucket" -bucket_url_file="file://_storage" +bucket_url_file="_storage" bucket_url_az="az://dlt-ci-test-bucket" bucket_url_r2="s3://dlt-ci-test-bucket" # use "/" as root path diff --git a/tests/load/filesystem/test_filesystem_common.py b/tests/load/filesystem/test_filesystem_common.py index 5cb064a3b2..3677765c9f 100644 --- a/tests/load/filesystem/test_filesystem_common.py +++ b/tests/load/filesystem/test_filesystem_common.py @@ -57,41 +57,54 @@ def test_filesystem_configuration() -> None: def test_filesystem_instance(with_gdrive_buckets_env: str) -> None: - @retry(stop=stop_after_attempt(10), wait=wait_fixed(1)) - def check_file_exists(): - files = filesystem.ls(url, detail=True) - details = next(d for d in files if d["name"] == file_url) - assert details["size"] == 10 - - def check_file_changed(): - details = filesystem.info(file_url) + @retry(stop=stop_after_attempt(10), wait=wait_fixed(1), reraise=True) + def check_file_exists(filedir_: str, file_url_: str): + try: + files = filesystem.ls(filedir_, detail=True) + details = next(d for d in files if d["name"] == file_url_) + assert details["size"] == 10 + except Exception as ex: + print(ex) + raise + + def check_file_changed(file_url_: str): + details = filesystem.info(file_url_) assert details["size"] == 11 assert (MTIME_DISPATCH[config.protocol](details) - now).seconds < 160 bucket_url = os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] config = get_config() - assert bucket_url.startswith(config.protocol) + # we do not add protocol to bucket_url (we need relative path) + assert bucket_url.startswith(config.protocol) or config.protocol == "file" filesystem, url = fsspec_from_config(config) if config.protocol != "file": assert bucket_url.endswith(url) # do a few file ops now = pendulum.now() filename = f"filesystem_common_{uniq_id()}" - file_url = posixpath.join(url, filename) + file_dir = posixpath.join(url, f"filesystem_common_dir_{uniq_id()}") + file_url = posixpath.join(file_dir, filename) try: + filesystem.mkdir(file_dir, create_parents=False) filesystem.pipe(file_url, b"test bytes") - check_file_exists() + check_file_exists(file_dir, file_url) filesystem.pipe(file_url, b"test bytes2") - check_file_changed() + check_file_changed(file_url) finally: filesystem.rm(file_url) + # s3 does not create folder with mkdir + if config.protocol != "s3": + filesystem.rmdir(file_dir) assert not filesystem.exists(file_url) with pytest.raises(FileNotFoundError): filesystem.info(file_url) @pytest.mark.parametrize("load_content", (True, False)) -def test_filesystem_dict(with_gdrive_buckets_env: str, load_content: bool) -> None: +@pytest.mark.parametrize("glob_filter", ("**", "**/*.csv", "*.txt", "met_csv/A803/*.csv")) +def test_filesystem_dict( + with_gdrive_buckets_env: str, load_content: bool, glob_filter: str +) -> None: bucket_url = os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] config = get_config() # enable caches @@ -106,11 +119,8 @@ def test_filesystem_dict(with_gdrive_buckets_env: str, load_content: bool) -> No ).geturl() filesystem, _ = fsspec_from_config(config) # use glob to get data - try: - all_file_items = list(glob_files(filesystem, bucket_url)) - assert_sample_files(all_file_items, filesystem, config, load_content) - except NotImplementedError as ex: - pytest.skip(f"Skipping due to {str(ex)}") + all_file_items = list(glob_files(filesystem, bucket_url, glob_filter)) + assert_sample_files(all_file_items, filesystem, config, load_content, glob_filter) @pytest.mark.skipif("s3" not in ALL_FILESYSTEM_DRIVERS, reason="s3 destination not configured") diff --git a/tests/load/utils.py b/tests/load/utils.py index 74cd782a56..b0c9290970 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -5,6 +5,7 @@ from typing import Any, Iterator, List, Sequence, IO, Tuple, Optional, Dict, Union, Generator import shutil from pathlib import Path +from urllib.parse import urlparse from dataclasses import dataclass import dlt @@ -67,7 +68,9 @@ # Filter out buckets not in all filesystem drivers WITH_GDRIVE_BUCKETS = [GCS_BUCKET, AWS_BUCKET, FILE_BUCKET, MEMORY_BUCKET, AZ_BUCKET, GDRIVE_BUCKET] WITH_GDRIVE_BUCKETS = [ - bucket for bucket in WITH_GDRIVE_BUCKETS if bucket.split(":")[0] in ALL_FILESYSTEM_DRIVERS + bucket + for bucket in WITH_GDRIVE_BUCKETS + if (urlparse(bucket).scheme or "file") in ALL_FILESYSTEM_DRIVERS ] # temporary solution to include gdrive bucket in tests, From 44cca367f25493a5a32ec2ee2b72db9eeb8c8c00 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sat, 27 Apr 2024 12:58:28 +0200 Subject: [PATCH 20/22] bumps dev adlfs version up to have working glob --- poetry.lock | 61 ++++++----------------------------------------------- 1 file changed, 6 insertions(+), 55 deletions(-) diff --git a/poetry.lock b/poetry.lock index d6e78f4144..56e44a00d8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "about-time" @@ -13,13 +13,13 @@ files = [ [[package]] name = "adlfs" -version = "2023.8.0" +version = "2024.4.1" description = "Access Azure Datalake Gen1 with fsspec and dask" optional = true python-versions = ">=3.8" files = [ - {file = "adlfs-2023.8.0-py3-none-any.whl", hash = "sha256:3eb248a3c2a30b419f1147bd7676d156b5219f96ef7f11d47166afd2a3bdb07e"}, - {file = "adlfs-2023.8.0.tar.gz", hash = "sha256:07e804f6df4593acfcaf01025b162e30ac13e523d3570279c98b2d91a18026d9"}, + {file = "adlfs-2024.4.1-py3-none-any.whl", hash = "sha256:acea94612ddacaa34ea8c6babcc95b8da6982f930cdade7a86fbd17382403e16"}, + {file = "adlfs-2024.4.1.tar.gz", hash = "sha256:75530a45447f358ae53c5c39c298b8d966dae684be84db899f63b94cd96fc000"}, ] [package.dependencies] @@ -28,10 +28,11 @@ azure-core = ">=1.23.1,<2.0.0" azure-datalake-store = ">=0.0.46,<0.1" azure-identity = "*" azure-storage-blob = ">=12.12.0" -fsspec = ">=2021.10.1" +fsspec = ">=2023.12.0" [package.extras] docs = ["furo", "myst-parser", "numpydoc", "sphinx"] +tests = ["arrow", "dask[dataframe]", "docker", "pytest", "pytest-mock"] [[package]] name = "agate" @@ -3727,56 +3728,6 @@ files = [ {file = "google_re2-1.1-4-cp39-cp39-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1f4d4f0823e8b2f6952a145295b1ff25245ce9bb136aff6fe86452e507d4c1dd"}, {file = "google_re2-1.1-4-cp39-cp39-win32.whl", hash = "sha256:1afae56b2a07bb48cfcfefaa15ed85bae26a68f5dc7f9e128e6e6ea36914e847"}, {file = "google_re2-1.1-4-cp39-cp39-win_amd64.whl", hash = "sha256:aa7d6d05911ab9c8adbf3c225a7a120ab50fd2784ac48f2f0d140c0b7afc2b55"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:222fc2ee0e40522de0b21ad3bc90ab8983be3bf3cec3d349c80d76c8bb1a4beb"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:d4763b0b9195b72132a4e7de8e5a9bf1f05542f442a9115aa27cfc2a8004f581"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_13_0_arm64.whl", hash = "sha256:209649da10c9d4a93d8a4d100ecbf9cc3b0252169426bec3e8b4ad7e57d600cf"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_13_0_x86_64.whl", hash = "sha256:68813aa333c1604a2df4a495b2a6ed065d7c8aebf26cc7e7abb5a6835d08353c"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_14_0_arm64.whl", hash = "sha256:370a23ec775ad14e9d1e71474d56f381224dcf3e72b15d8ca7b4ad7dd9cd5853"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_14_0_x86_64.whl", hash = "sha256:14664a66a3ddf6bc9e56f401bf029db2d169982c53eff3f5876399104df0e9a6"}, - {file = "google_re2-1.1-5-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3ea3722cc4932cbcebd553b69dce1b4a73572823cff4e6a244f1c855da21d511"}, - {file = "google_re2-1.1-5-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e14bb264c40fd7c627ef5678e295370cd6ba95ca71d835798b6e37502fc4c690"}, - {file = "google_re2-1.1-5-cp310-cp310-win32.whl", hash = "sha256:39512cd0151ea4b3969c992579c79b423018b464624ae955be685fc07d94556c"}, - {file = "google_re2-1.1-5-cp310-cp310-win_amd64.whl", hash = "sha256:ac66537aa3bc5504320d922b73156909e3c2b6da19739c866502f7827b3f9fdf"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:5b5ea68d54890c9edb1b930dcb2658819354e5d3f2201f811798bbc0a142c2b4"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:33443511b6b83c35242370908efe2e8e1e7cae749c766b2b247bf30e8616066c"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_13_0_arm64.whl", hash = "sha256:413d77bdd5ba0bfcada428b4c146e87707452ec50a4091ec8e8ba1413d7e0619"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_13_0_x86_64.whl", hash = "sha256:5171686e43304996a34baa2abcee6f28b169806d0e583c16d55e5656b092a414"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:3b284db130283771558e31a02d8eb8fb756156ab98ce80035ae2e9e3a5f307c4"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_14_0_x86_64.whl", hash = "sha256:296e6aed0b169648dc4b870ff47bd34c702a32600adb9926154569ef51033f47"}, - {file = "google_re2-1.1-5-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:38d50e68ead374160b1e656bbb5d101f0b95fb4cc57f4a5c12100155001480c5"}, - {file = "google_re2-1.1-5-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2a0416a35921e5041758948bcb882456916f22845f66a93bc25070ef7262b72a"}, - {file = "google_re2-1.1-5-cp311-cp311-win32.whl", hash = "sha256:a1d59568bbb5de5dd56dd6cdc79907db26cce63eb4429260300c65f43469e3e7"}, - {file = "google_re2-1.1-5-cp311-cp311-win_amd64.whl", hash = "sha256:72f5a2f179648b8358737b2b493549370debd7d389884a54d331619b285514e3"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:cbc72c45937b1dc5acac3560eb1720007dccca7c9879138ff874c7f6baf96005"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:5fadd1417fbef7235fa9453dba4eb102e6e7d94b1e4c99d5fa3dd4e288d0d2ae"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_13_0_arm64.whl", hash = "sha256:040f85c63cc02696485b59b187a5ef044abe2f99b92b4fb399de40b7d2904ccc"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_13_0_x86_64.whl", hash = "sha256:64e3b975ee6d9bbb2420494e41f929c1a0de4bcc16d86619ab7a87f6ea80d6bd"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:8ee370413e00f4d828eaed0e83b8af84d7a72e8ee4f4bd5d3078bc741dfc430a"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:5b89383001079323f693ba592d7aad789d7a02e75adb5d3368d92b300f5963fd"}, - {file = "google_re2-1.1-5-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:63cb4fdfbbda16ae31b41a6388ea621510db82feb8217a74bf36552ecfcd50ad"}, - {file = "google_re2-1.1-5-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9ebedd84ae8be10b7a71a16162376fd67a2386fe6361ef88c622dcf7fd679daf"}, - {file = "google_re2-1.1-5-cp312-cp312-win32.whl", hash = "sha256:c8e22d1692bc2c81173330c721aff53e47ffd3c4403ff0cd9d91adfd255dd150"}, - {file = "google_re2-1.1-5-cp312-cp312-win_amd64.whl", hash = "sha256:5197a6af438bb8c4abda0bbe9c4fbd6c27c159855b211098b29d51b73e4cbcf6"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_12_0_arm64.whl", hash = "sha256:b6727e0b98417e114b92688ad2aa256102ece51f29b743db3d831df53faf1ce3"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:711e2b6417eb579c61a4951029d844f6b95b9b373b213232efd413659889a363"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_13_0_arm64.whl", hash = "sha256:71ae8b3df22c5c154c8af0f0e99d234a450ef1644393bc2d7f53fc8c0a1e111c"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_13_0_x86_64.whl", hash = "sha256:94a04e214bc521a3807c217d50cf099bbdd0c0a80d2d996c0741dbb995b5f49f"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_14_0_arm64.whl", hash = "sha256:a770f75358508a9110c81a1257721f70c15d9bb592a2fb5c25ecbd13566e52a5"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_14_0_x86_64.whl", hash = "sha256:07c9133357f7e0b17c6694d5dcb82e0371f695d7c25faef2ff8117ef375343ff"}, - {file = "google_re2-1.1-5-cp38-cp38-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:204ca6b1cf2021548f4a9c29ac015e0a4ab0a7b6582bf2183d838132b60c8fda"}, - {file = "google_re2-1.1-5-cp38-cp38-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f0b95857c2c654f419ca684ec38c9c3325c24e6ba7d11910a5110775a557bb18"}, - {file = "google_re2-1.1-5-cp38-cp38-win32.whl", hash = "sha256:347ac770e091a0364e822220f8d26ab53e6fdcdeaec635052000845c5a3fb869"}, - {file = "google_re2-1.1-5-cp38-cp38-win_amd64.whl", hash = "sha256:ec32bb6de7ffb112a07d210cf9f797b7600645c2d5910703fa07f456dd2150e0"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_12_0_arm64.whl", hash = "sha256:eb5adf89060f81c5ff26c28e261e6b4997530a923a6093c9726b8dec02a9a326"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:a22630c9dd9ceb41ca4316bccba2643a8b1d5c198f21c00ed5b50a94313aaf10"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_13_0_arm64.whl", hash = "sha256:544dc17fcc2d43ec05f317366375796351dec44058e1164e03c3f7d050284d58"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_13_0_x86_64.whl", hash = "sha256:19710af5ea88751c7768575b23765ce0dfef7324d2539de576f75cdc319d6654"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_14_0_arm64.whl", hash = "sha256:f82995a205e08ad896f4bd5ce4847c834fab877e1772a44e5f262a647d8a1dec"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_14_0_x86_64.whl", hash = "sha256:63533c4d58da9dc4bc040250f1f52b089911699f0368e0e6e15f996387a984ed"}, - {file = "google_re2-1.1-5-cp39-cp39-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:79e00fcf0cb04ea35a22b9014712d448725ce4ddc9f08cc818322566176ca4b0"}, - {file = "google_re2-1.1-5-cp39-cp39-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:bc41afcefee2da6c4ed883a93d7f527c4b960cd1d26bbb0020a7b8c2d341a60a"}, - {file = "google_re2-1.1-5-cp39-cp39-win32.whl", hash = "sha256:486730b5e1f1c31b0abc6d80abe174ce4f1188fe17d1b50698f2bf79dc6e44be"}, - {file = "google_re2-1.1-5-cp39-cp39-win_amd64.whl", hash = "sha256:4de637ca328f1d23209e80967d1b987d6b352cd01b3a52a84b4d742c69c3da6c"}, ] [[package]] From 23aee6d4ae543ba6a490dcfe27d0a10b42f42acf Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sat, 27 Apr 2024 12:58:38 +0200 Subject: [PATCH 21/22] updates filesystem docs --- dlt/sources/helpers/rest_client/paginators.py | 2 + docs/tools/fix_grammar_gpt.py | 4 +- .../dlt-ecosystem/destinations/filesystem.md | 41 +++++++++++++++++-- .../verified-sources/filesystem.md | 20 ++++++--- 4 files changed, 55 insertions(+), 12 deletions(-) diff --git a/dlt/sources/helpers/rest_client/paginators.py b/dlt/sources/helpers/rest_client/paginators.py index a46558e4ab..cf06284c61 100644 --- a/dlt/sources/helpers/rest_client/paginators.py +++ b/dlt/sources/helpers/rest_client/paginators.py @@ -171,6 +171,7 @@ class BaseReferencePaginator(BasePaginator): 2. `update_request` method to update the request object with the next page reference. """ + def __init__(self) -> None: super().__init__() self.__next_reference: Optional[str] = None @@ -208,6 +209,7 @@ class BaseNextUrlPaginator(BaseReferencePaginator): See `HeaderLinkPaginator` and `JSONResponsePaginator` for examples. """ + def update_request(self, request: Request) -> None: # Handle relative URLs if self._next_reference: diff --git a/docs/tools/fix_grammar_gpt.py b/docs/tools/fix_grammar_gpt.py index 420f7cfa57..065b53d470 100644 --- a/docs/tools/fix_grammar_gpt.py +++ b/docs/tools/fix_grammar_gpt.py @@ -18,7 +18,7 @@ # constants BASE_DIR = "../website/docs" GPT_MODEL = "gpt-3.5-turbo-0125" -MAX_CHUNK_SIZE = 14000 # make sure that this is below the context window size of the model to not have cut off files +MAX_CHUNK_SIZE = 14000 # make sure that this is below the context window size of the model to not have cut off files SYSTEM_PROMPT = """\ You are a grammar checker. Every message you get will be a document that is to be grammarchecked and returned as such. @@ -104,7 +104,7 @@ def get_chunk_length(chunk: List[str]) -> int: chunks.append(current_chunk) # sanity test, make sure we still have the full doc - assert doc == functools.reduce(lambda a, b: a+b, chunks) + assert doc == functools.reduce(lambda a, b: a + b, chunks) fmt.note(f"Created {len(chunks)} chunks") diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index fce417b8d2..56012ac773 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -174,7 +174,43 @@ If for any reason you want to have those files in a local folder, set up the `bu ```toml [destination.filesystem] bucket_url = "file:///absolute/path" # three / for an absolute path -# bucket_url = "file://relative/path" # two / for a relative path +``` + +`dlt` correctly handles the native local file paths. Indeed, using the `file://` schema may be not intuitive especially for Windows users. + +```toml +[destination.filesystem] +bucket_url = 'C:\a\b\c' +``` + +In the example above we specify `bucket_url` using **toml's literal strings** that do not require [escaping of backslashes](https://github.com/toml-lang/toml/blob/main/toml.md#string). + +```toml +[destination.unc_destination] +bucket_url = '\\localhost\c$\a\b\c' # UNC equivalent of C:\a\b\c + +[destination.posix_destination] +bucket_url = '/var/local/data' # absolute POSIX style path + +[destination.relative_destination] +bucket_url = '_storage/data' # relative POSIX style path +``` + +In the examples above we define a few named filesystem destinations: +* **unc_destination** demonstrates Windows UNC path in native form +* **posix_destination** demonstrates native POSIX (Linux/Mac) absolute path +* **relative_destination** demonstrates native POSIX (Linux/Mac) relative path. In this case `filesystem` destination will store files in `$cwd/_storage/data` path +where **$cwd** is your current working directory. + +`dlt` supports Windows [UNC paths with file:// scheme](https://en.wikipedia.org/wiki/File_URI_scheme). They can be specified using **host** or purely as **path** +component. + +```toml +[destination.unc_with_host] +bucket_url="file://localhost/c$/a/b/c" + +[destination.unc_with_path] +bucket_url="file:////localhost/c$/a/b/c" ``` ## Write disposition @@ -408,7 +444,4 @@ managed in the regular way by the final destination you have configured. You will also notice `init` files being present in the root folder and the special `dlt` folders. In the absence of the concepts of schemas and tables in blob storages and directories, `dlt` uses these special files to harmonize the behavior of the `filesystem` destination with the other implemented destinations. - - - \ No newline at end of file diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md index 8ff4e2363a..7184d4ccf1 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md @@ -122,13 +122,15 @@ For more information, read the 1. Finally, enter credentials for your chosen destination as per the [docs](../destinations/). 1. You can pass the bucket URL and glob pattern or use `config.toml`. For local filesystems, use - `file://` or skip the schema. + `file://` or skip the schema and provide the local path in a format native for your operating system. ```toml [sources.filesystem] # use [sources.readers.credentials] for the "readers" source - bucket_url="~/Documents/csv_files/" + bucket_url='~\Documents\csv_files\' file_glob="*" ``` + In the example above we use Windows path to current user's Documents folder. Mind that literal toml string (single quotes) + was used to conveniently use the backslashes without need to escape. For remote file systems you need to add the schema, it will be used to get the protocol being used: @@ -143,6 +145,12 @@ For more information, read the :::caution For Azure, use adlfs>=2023.9.0. Older versions mishandle globs. ::: + +### Use local file system paths +You can use both native local file system paths and in form of `file:` uri. Absolute, relative and UNC Windows paths are supported. +You can find relevant examples in [filesystem destination documentation](../destinations/filesystem.md#local-file-system) which follows +the same rules to specify the `bucket_url`. + ## Run the pipeline 1. Before running the pipeline, ensure that you have installed all the necessary dependencies by @@ -295,20 +303,20 @@ data. You can quickly build pipelines to: #### `FileItem` Fields: -- `file_url` - Complete URL of the file; also the primary key (e.g. `file://`). +- `file_url` - Complete URL of the file; also the primary key (e.g. `s3://bucket-name/path/file`). - `file_name` - Name of the file from the bucket URL. -- `relative_path` - Relative path of the file in the bucket. +- `relative_path` - Set when doing `glob`, is a relative path to a `bucket_url` argument. - `mime_type` - File's mime type; sourced from the bucket provider or inferred from its extension. - `modification_date` - File's last modification time (format: `pendulum.DateTime`). - `size_in_bytes` - File size. - `file_content` - Content, provided upon request. :::info -When using a nested or recursive glob pattern, `file_name` will include the file's path. For +When using a nested or recursive glob pattern, `relative_path` will include the file's path relative to `bucket_url`. For instance, using the resource: `filesystem("az://dlt-ci-test-bucket/standard_source/samples", file_glob="met_csv/A801/*.csv")` will produce file names relative to the `/standard_source/samples` path, such as -`met_csv/A801/A881_20230920.csv`. +`met_csv/A801/A881_20230920.csv`. For local filesystems, POSIX paths (using "/" as separator) are returned. ::: ### File Manipulation From 556a2560ed39d80f69ae395737df72be1ebef1e2 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Sat, 27 Apr 2024 13:34:01 +0200 Subject: [PATCH 22/22] fixes UNC to file uri --- dlt/common/storages/configuration.py | 11 ++++------- tests/common/storages/test_local_filesystem.py | 15 ++++++++------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/dlt/common/storages/configuration.py b/dlt/common/storages/configuration.py index ef70f49094..1c5f39ea82 100644 --- a/dlt/common/storages/configuration.py +++ b/dlt/common/storages/configuration.py @@ -164,12 +164,9 @@ def make_local_path(file_uri: str) -> str: def make_file_uri(local_path: str) -> str: """Creates a normalized file:// uri from a local path - netloc is never set. UNC paths are represented as file:////host/path + netloc is never set. UNC paths are represented as file://host/path """ - # we do not go straight into `uri` here because this converts UNC paths - # into file://host/path and we want file://///host/path p_ = pathlib.Path(local_path) - p_ = p_.expanduser() - # if not p_.is_absolute(): - p_ = p_.resolve() - return "file:///" + p_.as_posix().lstrip("/") + p_ = p_.expanduser().resolve() + # return "file:///" + p_.as_posix().lstrip("/") + return p_.as_uri() diff --git a/tests/common/storages/test_local_filesystem.py b/tests/common/storages/test_local_filesystem.py index b94b16cd96..6bcf8234e6 100644 --- a/tests/common/storages/test_local_filesystem.py +++ b/tests/common/storages/test_local_filesystem.py @@ -1,6 +1,7 @@ import os import pytest import pathlib +from urllib.parse import quote from dlt.common.configuration.exceptions import ConfigurationValueError from dlt.common.configuration.resolve import resolve_configuration @@ -18,23 +19,23 @@ @pytest.mark.parametrize( "bucket_url,file_url", ( - (UNC_LOCAL_PATH, "file:///" + pathlib.PureWindowsPath(UNC_LOCAL_PATH).as_posix()), - (UNC_WSL_PATH, "file:///" + pathlib.PureWindowsPath(UNC_WSL_PATH).as_posix()), + (UNC_LOCAL_PATH, pathlib.PureWindowsPath(UNC_LOCAL_PATH).as_uri()), + (UNC_WSL_PATH, pathlib.PureWindowsPath(UNC_WSL_PATH).as_uri()), (r"C:\hello", "file:///C:/hello"), - (r"a\b $\b", "file:///" + pathlib.Path(r"a\b $\b").resolve().as_posix()), + (r"a\b $\b", "file:///" + pathlib.Path(r"a\\" + quote("b $") + r"\b").resolve().as_posix()), # same paths but with POSIX separators ( UNC_LOCAL_PATH.replace("\\", "/"), - "file:///" + pathlib.PureWindowsPath(UNC_LOCAL_PATH).as_posix(), + pathlib.PureWindowsPath(UNC_LOCAL_PATH).as_uri(), ), ( UNC_WSL_PATH.replace("\\", "/"), - "file:///" + pathlib.PureWindowsPath(UNC_WSL_PATH).as_posix(), + pathlib.PureWindowsPath(UNC_WSL_PATH).as_uri(), ), (r"C:\hello".replace("\\", "/"), "file:///C:/hello"), ( r"a\b $\b".replace("\\", "/"), - "file:///" + pathlib.Path(r"a\b $\b").resolve().as_posix(), + "file:///" + pathlib.Path(r"a\\" + quote("b $") + r"\b").resolve().as_posix(), ), ), ) @@ -88,7 +89,7 @@ def test_file_win_configuration() -> None: @pytest.mark.parametrize( "bucket_url,file_url", ( - (r"/var/logs", "file:///var/logs"), + (r"/src/local/app", "file:///src/local/app"), (r"_storage", "file://" + pathlib.Path("_storage").resolve().as_posix()), ), )