diff --git a/dlt/common/storages/configuration.py b/dlt/common/storages/configuration.py index d0100c335d..1c5f39ea82 100644 --- a/dlt/common/storages/configuration.py +++ b/dlt/common/storages/configuration.py @@ -1,6 +1,7 @@ import os -from typing import TYPE_CHECKING, Any, Literal, Optional, Type, get_args, ClassVar, Dict, Union -from urllib.parse import urlparse +import pathlib +from typing import Any, Literal, Optional, Type, get_args, ClassVar, Dict, Union +from urllib.parse import urlparse, unquote from dlt.common.configuration import configspec, resolve_type from dlt.common.configuration.exceptions import ConfigurationValueError @@ -87,24 +88,22 @@ class FilesystemConfiguration(BaseConfiguration): @property def protocol(self) -> str: """`bucket_url` protocol""" - url = urlparse(self.bucket_url) - # this prevents windows absolute paths to be recognized as schemas - if not url.scheme or (os.path.isabs(self.bucket_url) and "\\" in self.bucket_url): + if self.is_local_path(self.bucket_url): return "file" else: - return url.scheme + return urlparse(self.bucket_url).scheme def on_resolved(self) -> None: - url = urlparse(self.bucket_url) - if not url.path and not url.netloc: + uri = urlparse(self.bucket_url) + if not uri.path and not uri.netloc: raise ConfigurationValueError( - "File path or netloc missing. Field bucket_url of FilesystemClientConfiguration" - " must contain valid url with a path or host:password component." + "File path and netloc are missing. Field bucket_url of" + " FilesystemClientConfiguration must contain valid uri with a path or host:password" + " component." ) # this is just a path in a local file system - if url.path == self.bucket_url: - url = url._replace(scheme="file") - self.bucket_url = url.geturl() + if self.is_local_path(self.bucket_url): + self.bucket_url = self.make_file_uri(self.bucket_url) @resolve_type("credentials") def resolve_credentials_type(self) -> Type[CredentialsConfiguration]: @@ -117,11 +116,57 @@ def fingerprint(self) -> str: def __str__(self) -> str: """Return displayable destination location""" - url = urlparse(self.bucket_url) + uri = urlparse(self.bucket_url) # do not show passwords - if url.password: - new_netloc = f"{url.username}:****@{url.hostname}" - if url.port: - new_netloc += f":{url.port}" - return url._replace(netloc=new_netloc).geturl() + if uri.password: + new_netloc = f"{uri.username}:****@{uri.hostname}" + if uri.port: + new_netloc += f":{uri.port}" + return uri._replace(netloc=new_netloc).geturl() return self.bucket_url + + @staticmethod + def is_local_path(uri: str) -> bool: + """Checks if `uri` is a local path, without a schema""" + uri_parsed = urlparse(uri) + # this prevents windows absolute paths to be recognized as schemas + return not uri_parsed.scheme or os.path.isabs(uri) + + @staticmethod + def make_local_path(file_uri: str) -> str: + """Gets a valid local filesystem path from file:// scheme. + Supports POSIX/Windows/UNC paths + + Returns: + str: local filesystem path + """ + uri = urlparse(file_uri) + if uri.scheme != "file": + raise ValueError(f"Must be file scheme but is {uri.scheme}") + if not uri.path and not uri.netloc: + raise ConfigurationValueError("File path and netloc are missing.") + local_path = unquote(uri.path) + if uri.netloc: + # or UNC file://localhost/path + local_path = "//" + unquote(uri.netloc) + local_path + else: + # if we are on windows, strip the POSIX root from path which is always absolute + if os.path.sep != local_path[0]: + # filesystem root + if local_path == "/": + return str(pathlib.Path("/").resolve()) + # this prevents /C:/ or ///share/ where both POSIX and Windows root are present + if os.path.isabs(local_path[1:]): + local_path = local_path[1:] + return str(pathlib.Path(local_path)) + + @staticmethod + def make_file_uri(local_path: str) -> str: + """Creates a normalized file:// uri from a local path + + netloc is never set. UNC paths are represented as file://host/path + """ + p_ = pathlib.Path(local_path) + p_ = p_.expanduser().resolve() + # return "file:///" + p_.as_posix().lstrip("/") + return p_.as_uri() diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 3a2b483970..0c108e67d4 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -1,9 +1,11 @@ import io +import glob import gzip import mimetypes import pathlib import posixpath from io import BytesIO +from gzip import GzipFile from typing import ( Literal, cast, @@ -41,6 +43,7 @@ class FileItem(TypedDict, total=False): file_url: str file_name: str + relative_path: str mime_type: str encoding: Optional[str] modification_date: pendulum.DateTime @@ -181,12 +184,22 @@ def fsspec(self) -> AbstractFileSystem: else: return fsspec_filesystem(self["file_url"], self.credentials)[0] + @property + def local_file_path(self) -> str: + """Gets a valid local filesystem path from file:// scheme. + Supports POSIX/Windows/UNC paths + + Returns: + str: local filesystem path + """ + return FilesystemConfiguration.make_local_path(self["file_url"]) + def open( # noqa: A003 self, mode: str = "rb", compression: Literal["auto", "disable", "enable"] = "auto", **kwargs: Any, - ) -> IO[Any]: + ) -> Union[GzipFile, IO[Any]]: """Open the file as a fsspec file. This method opens the file represented by this dictionary as a file-like object using @@ -213,7 +226,7 @@ def open( # noqa: A003 raise ValueError("""The argument `compression` must have one of the following values: "auto", "enable", "disable".""") - opened_file: IO[Any] + opened_file: Union[IO[Any], GzipFile] # if the user has already extracted the content, we use it so there is no need to # download the file again. if "file_content" in self: @@ -234,8 +247,13 @@ def open( # noqa: A003 **text_kwargs, ) else: + if "local" in self.fsspec.protocol: + # use native local file path to open file:// uris + file_url = self.local_file_path + else: + file_url = self["file_url"] opened_file = self.fsspec.open( - self["file_url"], mode=mode, compression=compression_arg, **kwargs + file_url, mode=mode, compression=compression_arg, **kwargs ) return opened_file @@ -245,11 +263,11 @@ def read_bytes(self) -> bytes: Returns: bytes: The file content. """ - return ( # type: ignore - self["file_content"] - if "file_content" in self and self["file_content"] is not None - else self.fsspec.read_bytes(self["file_url"]) - ) + if "file_content" in self and self["file_content"] is not None: + return self["file_content"] # type: ignore + else: + with self.open(mode="rb", compression="disable") as f: + return f.read() def guess_mime_type(file_name: str) -> Sequence[str]: @@ -274,45 +292,49 @@ def glob_files( Returns: Iterable[FileItem]: The list of files. """ - import os - - bucket_url_parsed = urlparse(bucket_url) - # if this is a file path without a scheme - if not bucket_url_parsed.scheme or (os.path.isabs(bucket_url) and "\\" in bucket_url): - # this is a file so create a proper file url - bucket_url = pathlib.Path(bucket_url).absolute().as_uri() + is_local_fs = "local" in fs_client.protocol + if is_local_fs and FilesystemConfiguration.is_local_path(bucket_url): + bucket_url = FilesystemConfiguration.make_file_uri(bucket_url) + bucket_url_parsed = urlparse(bucket_url) + else: bucket_url_parsed = urlparse(bucket_url) - bucket_url_no_schema = bucket_url_parsed._replace(scheme="", query="").geturl() - bucket_url_no_schema = ( - bucket_url_no_schema[2:] if bucket_url_no_schema.startswith("//") else bucket_url_no_schema - ) - filter_url = posixpath.join(bucket_url_no_schema, file_glob) - glob_result = fs_client.glob(filter_url, detail=True) - if isinstance(glob_result, list): - raise NotImplementedError( - "Cannot request details when using fsspec.glob. For adlfs (Azure) please use version" - " 2023.9.0 or later" - ) + if is_local_fs: + root_dir = FilesystemConfiguration.make_local_path(bucket_url) + # use a Python glob to get files + files = glob.glob(str(pathlib.Path(root_dir).joinpath(file_glob)), recursive=True) + glob_result = {file: fs_client.info(file) for file in files} + else: + root_dir = bucket_url_parsed._replace(scheme="", query="").geturl().lstrip("/") + filter_url = posixpath.join(root_dir, file_glob) + glob_result = fs_client.glob(filter_url, detail=True) + if isinstance(glob_result, list): + raise NotImplementedError( + "Cannot request details when using fsspec.glob. For adlfs (Azure) please use" + " version 2023.9.0 or later" + ) for file, md in glob_result.items(): if md["type"] != "file": continue + # relative paths are always POSIX + if is_local_fs: + rel_path = pathlib.Path(file).relative_to(root_dir).as_posix() + file_url = FilesystemConfiguration.make_file_uri(file) + else: + rel_path = posixpath.relpath(file, root_dir) + file_url = bucket_url_parsed._replace( + path=posixpath.join(bucket_url_parsed.path, rel_path) + ).geturl() - # make that absolute path on a file:// - if bucket_url_parsed.scheme == "file" and not file.startswith("/"): - file = f"/{file}" - file_name = posixpath.relpath(file, bucket_url_no_schema) - file_url = bucket_url_parsed._replace( - path=posixpath.join(bucket_url_parsed.path, file_name) - ).geturl() - - mime_type, encoding = guess_mime_type(file_name) + scheme = bucket_url_parsed.scheme + mime_type, encoding = guess_mime_type(rel_path) yield FileItem( - file_name=file_name, + file_name=posixpath.basename(rel_path), + relative_path=rel_path, file_url=file_url, mime_type=mime_type, encoding=encoding, - modification_date=MTIME_DISPATCH[bucket_url_parsed.scheme](md), + modification_date=MTIME_DISPATCH[scheme](md), size_in_bytes=int(md["size"]), ) diff --git a/dlt/sources/helpers/rest_client/paginators.py b/dlt/sources/helpers/rest_client/paginators.py index a46558e4ab..cf06284c61 100644 --- a/dlt/sources/helpers/rest_client/paginators.py +++ b/dlt/sources/helpers/rest_client/paginators.py @@ -171,6 +171,7 @@ class BaseReferencePaginator(BasePaginator): 2. `update_request` method to update the request object with the next page reference. """ + def __init__(self) -> None: super().__init__() self.__next_reference: Optional[str] = None @@ -208,6 +209,7 @@ class BaseNextUrlPaginator(BaseReferencePaginator): See `HeaderLinkPaginator` and `JSONResponsePaginator` for examples. """ + def update_request(self, request: Request) -> None: # Handle relative URLs if self._next_reference: diff --git a/docs/tools/fix_grammar_gpt.py b/docs/tools/fix_grammar_gpt.py index 420f7cfa57..065b53d470 100644 --- a/docs/tools/fix_grammar_gpt.py +++ b/docs/tools/fix_grammar_gpt.py @@ -18,7 +18,7 @@ # constants BASE_DIR = "../website/docs" GPT_MODEL = "gpt-3.5-turbo-0125" -MAX_CHUNK_SIZE = 14000 # make sure that this is below the context window size of the model to not have cut off files +MAX_CHUNK_SIZE = 14000 # make sure that this is below the context window size of the model to not have cut off files SYSTEM_PROMPT = """\ You are a grammar checker. Every message you get will be a document that is to be grammarchecked and returned as such. @@ -104,7 +104,7 @@ def get_chunk_length(chunk: List[str]) -> int: chunks.append(current_chunk) # sanity test, make sure we still have the full doc - assert doc == functools.reduce(lambda a, b: a+b, chunks) + assert doc == functools.reduce(lambda a, b: a + b, chunks) fmt.note(f"Created {len(chunks)} chunks") diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index fce417b8d2..56012ac773 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -174,7 +174,43 @@ If for any reason you want to have those files in a local folder, set up the `bu ```toml [destination.filesystem] bucket_url = "file:///absolute/path" # three / for an absolute path -# bucket_url = "file://relative/path" # two / for a relative path +``` + +`dlt` correctly handles the native local file paths. Indeed, using the `file://` schema may be not intuitive especially for Windows users. + +```toml +[destination.filesystem] +bucket_url = 'C:\a\b\c' +``` + +In the example above we specify `bucket_url` using **toml's literal strings** that do not require [escaping of backslashes](https://github.com/toml-lang/toml/blob/main/toml.md#string). + +```toml +[destination.unc_destination] +bucket_url = '\\localhost\c$\a\b\c' # UNC equivalent of C:\a\b\c + +[destination.posix_destination] +bucket_url = '/var/local/data' # absolute POSIX style path + +[destination.relative_destination] +bucket_url = '_storage/data' # relative POSIX style path +``` + +In the examples above we define a few named filesystem destinations: +* **unc_destination** demonstrates Windows UNC path in native form +* **posix_destination** demonstrates native POSIX (Linux/Mac) absolute path +* **relative_destination** demonstrates native POSIX (Linux/Mac) relative path. In this case `filesystem` destination will store files in `$cwd/_storage/data` path +where **$cwd** is your current working directory. + +`dlt` supports Windows [UNC paths with file:// scheme](https://en.wikipedia.org/wiki/File_URI_scheme). They can be specified using **host** or purely as **path** +component. + +```toml +[destination.unc_with_host] +bucket_url="file://localhost/c$/a/b/c" + +[destination.unc_with_path] +bucket_url="file:////localhost/c$/a/b/c" ``` ## Write disposition @@ -408,7 +444,4 @@ managed in the regular way by the final destination you have configured. You will also notice `init` files being present in the root folder and the special `dlt` folders. In the absence of the concepts of schemas and tables in blob storages and directories, `dlt` uses these special files to harmonize the behavior of the `filesystem` destination with the other implemented destinations. - - - \ No newline at end of file diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md index c4cb0e536e..7184d4ccf1 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md @@ -122,13 +122,15 @@ For more information, read the 1. Finally, enter credentials for your chosen destination as per the [docs](../destinations/). 1. You can pass the bucket URL and glob pattern or use `config.toml`. For local filesystems, use - `file://` or skip the schema. + `file://` or skip the schema and provide the local path in a format native for your operating system. ```toml [sources.filesystem] # use [sources.readers.credentials] for the "readers" source - bucket_url="~/Documents/csv_files/" + bucket_url='~\Documents\csv_files\' file_glob="*" ``` + In the example above we use Windows path to current user's Documents folder. Mind that literal toml string (single quotes) + was used to conveniently use the backslashes without need to escape. For remote file systems you need to add the schema, it will be used to get the protocol being used: @@ -143,6 +145,12 @@ For more information, read the :::caution For Azure, use adlfs>=2023.9.0. Older versions mishandle globs. ::: + +### Use local file system paths +You can use both native local file system paths and in form of `file:` uri. Absolute, relative and UNC Windows paths are supported. +You can find relevant examples in [filesystem destination documentation](../destinations/filesystem.md#local-file-system) which follows +the same rules to specify the `bucket_url`. + ## Run the pipeline 1. Before running the pipeline, ensure that you have installed all the necessary dependencies by @@ -295,19 +303,20 @@ data. You can quickly build pipelines to: #### `FileItem` Fields: -- `file_url` - Complete URL of the file; also the primary key (e.g. `file://`). -- `file_name` - Name or relative path of the file from the bucket URL. +- `file_url` - Complete URL of the file; also the primary key (e.g. `s3://bucket-name/path/file`). +- `file_name` - Name of the file from the bucket URL. +- `relative_path` - Set when doing `glob`, is a relative path to a `bucket_url` argument. - `mime_type` - File's mime type; sourced from the bucket provider or inferred from its extension. - `modification_date` - File's last modification time (format: `pendulum.DateTime`). - `size_in_bytes` - File size. - `file_content` - Content, provided upon request. :::info -When using a nested or recursive glob pattern, `file_name` will include the file's path. For +When using a nested or recursive glob pattern, `relative_path` will include the file's path relative to `bucket_url`. For instance, using the resource: `filesystem("az://dlt-ci-test-bucket/standard_source/samples", file_glob="met_csv/A801/*.csv")` will produce file names relative to the `/standard_source/samples` path, such as -`met_csv/A801/A881_20230920.csv`. +`met_csv/A801/A881_20230920.csv`. For local filesystems, POSIX paths (using "/" as separator) are returned. ::: ### File Manipulation diff --git a/poetry.lock b/poetry.lock index d6e78f4144..56e44a00d8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "about-time" @@ -13,13 +13,13 @@ files = [ [[package]] name = "adlfs" -version = "2023.8.0" +version = "2024.4.1" description = "Access Azure Datalake Gen1 with fsspec and dask" optional = true python-versions = ">=3.8" files = [ - {file = "adlfs-2023.8.0-py3-none-any.whl", hash = "sha256:3eb248a3c2a30b419f1147bd7676d156b5219f96ef7f11d47166afd2a3bdb07e"}, - {file = "adlfs-2023.8.0.tar.gz", hash = "sha256:07e804f6df4593acfcaf01025b162e30ac13e523d3570279c98b2d91a18026d9"}, + {file = "adlfs-2024.4.1-py3-none-any.whl", hash = "sha256:acea94612ddacaa34ea8c6babcc95b8da6982f930cdade7a86fbd17382403e16"}, + {file = "adlfs-2024.4.1.tar.gz", hash = "sha256:75530a45447f358ae53c5c39c298b8d966dae684be84db899f63b94cd96fc000"}, ] [package.dependencies] @@ -28,10 +28,11 @@ azure-core = ">=1.23.1,<2.0.0" azure-datalake-store = ">=0.0.46,<0.1" azure-identity = "*" azure-storage-blob = ">=12.12.0" -fsspec = ">=2021.10.1" +fsspec = ">=2023.12.0" [package.extras] docs = ["furo", "myst-parser", "numpydoc", "sphinx"] +tests = ["arrow", "dask[dataframe]", "docker", "pytest", "pytest-mock"] [[package]] name = "agate" @@ -3727,56 +3728,6 @@ files = [ {file = "google_re2-1.1-4-cp39-cp39-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1f4d4f0823e8b2f6952a145295b1ff25245ce9bb136aff6fe86452e507d4c1dd"}, {file = "google_re2-1.1-4-cp39-cp39-win32.whl", hash = "sha256:1afae56b2a07bb48cfcfefaa15ed85bae26a68f5dc7f9e128e6e6ea36914e847"}, {file = "google_re2-1.1-4-cp39-cp39-win_amd64.whl", hash = "sha256:aa7d6d05911ab9c8adbf3c225a7a120ab50fd2784ac48f2f0d140c0b7afc2b55"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:222fc2ee0e40522de0b21ad3bc90ab8983be3bf3cec3d349c80d76c8bb1a4beb"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:d4763b0b9195b72132a4e7de8e5a9bf1f05542f442a9115aa27cfc2a8004f581"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_13_0_arm64.whl", hash = "sha256:209649da10c9d4a93d8a4d100ecbf9cc3b0252169426bec3e8b4ad7e57d600cf"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_13_0_x86_64.whl", hash = "sha256:68813aa333c1604a2df4a495b2a6ed065d7c8aebf26cc7e7abb5a6835d08353c"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_14_0_arm64.whl", hash = "sha256:370a23ec775ad14e9d1e71474d56f381224dcf3e72b15d8ca7b4ad7dd9cd5853"}, - {file = "google_re2-1.1-5-cp310-cp310-macosx_14_0_x86_64.whl", hash = "sha256:14664a66a3ddf6bc9e56f401bf029db2d169982c53eff3f5876399104df0e9a6"}, - {file = "google_re2-1.1-5-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3ea3722cc4932cbcebd553b69dce1b4a73572823cff4e6a244f1c855da21d511"}, - {file = "google_re2-1.1-5-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e14bb264c40fd7c627ef5678e295370cd6ba95ca71d835798b6e37502fc4c690"}, - {file = "google_re2-1.1-5-cp310-cp310-win32.whl", hash = "sha256:39512cd0151ea4b3969c992579c79b423018b464624ae955be685fc07d94556c"}, - {file = "google_re2-1.1-5-cp310-cp310-win_amd64.whl", hash = "sha256:ac66537aa3bc5504320d922b73156909e3c2b6da19739c866502f7827b3f9fdf"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:5b5ea68d54890c9edb1b930dcb2658819354e5d3f2201f811798bbc0a142c2b4"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:33443511b6b83c35242370908efe2e8e1e7cae749c766b2b247bf30e8616066c"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_13_0_arm64.whl", hash = "sha256:413d77bdd5ba0bfcada428b4c146e87707452ec50a4091ec8e8ba1413d7e0619"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_13_0_x86_64.whl", hash = "sha256:5171686e43304996a34baa2abcee6f28b169806d0e583c16d55e5656b092a414"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:3b284db130283771558e31a02d8eb8fb756156ab98ce80035ae2e9e3a5f307c4"}, - {file = "google_re2-1.1-5-cp311-cp311-macosx_14_0_x86_64.whl", hash = "sha256:296e6aed0b169648dc4b870ff47bd34c702a32600adb9926154569ef51033f47"}, - {file = "google_re2-1.1-5-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:38d50e68ead374160b1e656bbb5d101f0b95fb4cc57f4a5c12100155001480c5"}, - {file = "google_re2-1.1-5-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2a0416a35921e5041758948bcb882456916f22845f66a93bc25070ef7262b72a"}, - {file = "google_re2-1.1-5-cp311-cp311-win32.whl", hash = "sha256:a1d59568bbb5de5dd56dd6cdc79907db26cce63eb4429260300c65f43469e3e7"}, - {file = "google_re2-1.1-5-cp311-cp311-win_amd64.whl", hash = "sha256:72f5a2f179648b8358737b2b493549370debd7d389884a54d331619b285514e3"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:cbc72c45937b1dc5acac3560eb1720007dccca7c9879138ff874c7f6baf96005"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:5fadd1417fbef7235fa9453dba4eb102e6e7d94b1e4c99d5fa3dd4e288d0d2ae"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_13_0_arm64.whl", hash = "sha256:040f85c63cc02696485b59b187a5ef044abe2f99b92b4fb399de40b7d2904ccc"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_13_0_x86_64.whl", hash = "sha256:64e3b975ee6d9bbb2420494e41f929c1a0de4bcc16d86619ab7a87f6ea80d6bd"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:8ee370413e00f4d828eaed0e83b8af84d7a72e8ee4f4bd5d3078bc741dfc430a"}, - {file = "google_re2-1.1-5-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:5b89383001079323f693ba592d7aad789d7a02e75adb5d3368d92b300f5963fd"}, - {file = "google_re2-1.1-5-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:63cb4fdfbbda16ae31b41a6388ea621510db82feb8217a74bf36552ecfcd50ad"}, - {file = "google_re2-1.1-5-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9ebedd84ae8be10b7a71a16162376fd67a2386fe6361ef88c622dcf7fd679daf"}, - {file = "google_re2-1.1-5-cp312-cp312-win32.whl", hash = "sha256:c8e22d1692bc2c81173330c721aff53e47ffd3c4403ff0cd9d91adfd255dd150"}, - {file = "google_re2-1.1-5-cp312-cp312-win_amd64.whl", hash = "sha256:5197a6af438bb8c4abda0bbe9c4fbd6c27c159855b211098b29d51b73e4cbcf6"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_12_0_arm64.whl", hash = "sha256:b6727e0b98417e114b92688ad2aa256102ece51f29b743db3d831df53faf1ce3"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:711e2b6417eb579c61a4951029d844f6b95b9b373b213232efd413659889a363"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_13_0_arm64.whl", hash = "sha256:71ae8b3df22c5c154c8af0f0e99d234a450ef1644393bc2d7f53fc8c0a1e111c"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_13_0_x86_64.whl", hash = "sha256:94a04e214bc521a3807c217d50cf099bbdd0c0a80d2d996c0741dbb995b5f49f"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_14_0_arm64.whl", hash = "sha256:a770f75358508a9110c81a1257721f70c15d9bb592a2fb5c25ecbd13566e52a5"}, - {file = "google_re2-1.1-5-cp38-cp38-macosx_14_0_x86_64.whl", hash = "sha256:07c9133357f7e0b17c6694d5dcb82e0371f695d7c25faef2ff8117ef375343ff"}, - {file = "google_re2-1.1-5-cp38-cp38-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:204ca6b1cf2021548f4a9c29ac015e0a4ab0a7b6582bf2183d838132b60c8fda"}, - {file = "google_re2-1.1-5-cp38-cp38-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:f0b95857c2c654f419ca684ec38c9c3325c24e6ba7d11910a5110775a557bb18"}, - {file = "google_re2-1.1-5-cp38-cp38-win32.whl", hash = "sha256:347ac770e091a0364e822220f8d26ab53e6fdcdeaec635052000845c5a3fb869"}, - {file = "google_re2-1.1-5-cp38-cp38-win_amd64.whl", hash = "sha256:ec32bb6de7ffb112a07d210cf9f797b7600645c2d5910703fa07f456dd2150e0"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_12_0_arm64.whl", hash = "sha256:eb5adf89060f81c5ff26c28e261e6b4997530a923a6093c9726b8dec02a9a326"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:a22630c9dd9ceb41ca4316bccba2643a8b1d5c198f21c00ed5b50a94313aaf10"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_13_0_arm64.whl", hash = "sha256:544dc17fcc2d43ec05f317366375796351dec44058e1164e03c3f7d050284d58"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_13_0_x86_64.whl", hash = "sha256:19710af5ea88751c7768575b23765ce0dfef7324d2539de576f75cdc319d6654"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_14_0_arm64.whl", hash = "sha256:f82995a205e08ad896f4bd5ce4847c834fab877e1772a44e5f262a647d8a1dec"}, - {file = "google_re2-1.1-5-cp39-cp39-macosx_14_0_x86_64.whl", hash = "sha256:63533c4d58da9dc4bc040250f1f52b089911699f0368e0e6e15f996387a984ed"}, - {file = "google_re2-1.1-5-cp39-cp39-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:79e00fcf0cb04ea35a22b9014712d448725ce4ddc9f08cc818322566176ca4b0"}, - {file = "google_re2-1.1-5-cp39-cp39-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:bc41afcefee2da6c4ed883a93d7f527c4b960cd1d26bbb0020a7b8c2d341a60a"}, - {file = "google_re2-1.1-5-cp39-cp39-win32.whl", hash = "sha256:486730b5e1f1c31b0abc6d80abe174ce4f1188fe17d1b50698f2bf79dc6e44be"}, - {file = "google_re2-1.1-5-cp39-cp39-win_amd64.whl", hash = "sha256:4de637ca328f1d23209e80967d1b987d6b352cd01b3a52a84b4d742c69c3da6c"}, ] [[package]] diff --git a/tests/.dlt/config.toml b/tests/.dlt/config.toml index cce4335f5a..ba86edf417 100644 --- a/tests/.dlt/config.toml +++ b/tests/.dlt/config.toml @@ -4,7 +4,7 @@ sentry_dsn="https://6f6f7b6f8e0f458a89be4187603b55fe@o1061158.ingest.sentry.io/4 [tests] bucket_url_gs="gs://ci-test-bucket" bucket_url_s3="s3://dlt-ci-test-bucket" -bucket_url_file="file://_storage" +bucket_url_file="_storage" bucket_url_az="az://dlt-ci-test-bucket" bucket_url_r2="s3://dlt-ci-test-bucket" # use "/" as root path diff --git a/tests/common/storages/test_local_filesystem.py b/tests/common/storages/test_local_filesystem.py index ea6adec2c7..6bcf8234e6 100644 --- a/tests/common/storages/test_local_filesystem.py +++ b/tests/common/storages/test_local_filesystem.py @@ -1,37 +1,210 @@ import os -import itertools import pytest import pathlib +from urllib.parse import quote +from dlt.common.configuration.exceptions import ConfigurationValueError +from dlt.common.configuration.resolve import resolve_configuration from dlt.common.storages import fsspec_from_config, FilesystemConfiguration from dlt.common.storages.fsspec_filesystem import FileItemDict, glob_files -from tests.common.storages.utils import assert_sample_files +from tests.common.storages.utils import assert_sample_files, TEST_SAMPLE_FILES +from tests.utils import skipifnotwindows, skipifwindows -TEST_SAMPLE_FILES = "tests/common/storages/samples" +UNC_LOCAL_PATH = r"\\localhost\c$\tests\common\test.csv" +UNC_WSL_PATH = r"\\wsl.localhost\Ubuntu-18.04\home\rudolfix\ .dlt" +@skipifnotwindows @pytest.mark.parametrize( - "bucket_url,load_content", itertools.product(["file:///", "/", ""], [True, False]) + "bucket_url,file_url", + ( + (UNC_LOCAL_PATH, pathlib.PureWindowsPath(UNC_LOCAL_PATH).as_uri()), + (UNC_WSL_PATH, pathlib.PureWindowsPath(UNC_WSL_PATH).as_uri()), + (r"C:\hello", "file:///C:/hello"), + (r"a\b $\b", "file:///" + pathlib.Path(r"a\\" + quote("b $") + r"\b").resolve().as_posix()), + # same paths but with POSIX separators + ( + UNC_LOCAL_PATH.replace("\\", "/"), + pathlib.PureWindowsPath(UNC_LOCAL_PATH).as_uri(), + ), + ( + UNC_WSL_PATH.replace("\\", "/"), + pathlib.PureWindowsPath(UNC_WSL_PATH).as_uri(), + ), + (r"C:\hello".replace("\\", "/"), "file:///C:/hello"), + ( + r"a\b $\b".replace("\\", "/"), + "file:///" + pathlib.Path(r"a\\" + quote("b $") + r"\b").resolve().as_posix(), + ), + ), ) -def test_filesystem_dict_local(bucket_url: str, load_content: bool) -> None: +def test_local_path_win_configuration(bucket_url: str, file_url: str) -> None: + assert FilesystemConfiguration.is_local_path(bucket_url) is True + assert FilesystemConfiguration.make_file_uri(bucket_url) == file_url + + c = resolve_configuration(FilesystemConfiguration(bucket_url)) + assert c.protocol == "file" + assert c.bucket_url == file_url + assert FilesystemConfiguration.make_local_path(c.bucket_url) == str( + pathlib.Path(bucket_url).resolve() + ) + + +@skipifnotwindows +@pytest.mark.parametrize( + "bucket_url", + ( + r"~\Documents\csv_files\a", + r"~\Documents\csv_files\a".replace("\\", "/"), + ), +) +def test_local_user_win_path_configuration(bucket_url: str) -> None: + file_url = "file:///" + pathlib.Path(bucket_url).expanduser().as_posix().lstrip("/") + assert FilesystemConfiguration.is_local_path(bucket_url) is True + assert FilesystemConfiguration.make_file_uri(bucket_url) == file_url + + c = resolve_configuration(FilesystemConfiguration(bucket_url)) + assert c.protocol == "file" + assert c.bucket_url == file_url + assert FilesystemConfiguration.make_local_path(c.bucket_url) == str( + pathlib.Path(bucket_url).expanduser() + ) + + +@skipifnotwindows +def test_file_win_configuration() -> None: + assert ( + FilesystemConfiguration.make_local_path("file://localhost/c$/samples") + == r"\\localhost\c$\samples" + ) + assert ( + FilesystemConfiguration.make_local_path("file://///localhost/c$/samples") + == r"\\localhost\c$\samples" + ) + assert FilesystemConfiguration.make_local_path("file:///C:/samples") == r"C:\samples" + + +@skipifwindows +@pytest.mark.parametrize( + "bucket_url,file_url", + ( + (r"/src/local/app", "file:///src/local/app"), + (r"_storage", "file://" + pathlib.Path("_storage").resolve().as_posix()), + ), +) +def test_file_posix_configuration(bucket_url: str, file_url: str) -> None: + assert FilesystemConfiguration.is_local_path(bucket_url) is True + assert FilesystemConfiguration.make_file_uri(bucket_url) == file_url + + c = resolve_configuration(FilesystemConfiguration(bucket_url)) + assert c.protocol == "file" + assert c.bucket_url == file_url + assert FilesystemConfiguration.make_local_path(c.bucket_url) == str( + pathlib.Path(bucket_url).resolve() + ) + + +@skipifwindows +@pytest.mark.parametrize( + "bucket_url", + ("~/docs",), +) +def test_local_user_posix_path_configuration(bucket_url: str) -> None: + file_url = "file:///" + pathlib.Path(bucket_url).expanduser().as_posix().lstrip("/") + assert FilesystemConfiguration.is_local_path(bucket_url) is True + assert FilesystemConfiguration.make_file_uri(bucket_url) == file_url + + c = resolve_configuration(FilesystemConfiguration(bucket_url)) + assert c.protocol == "file" + assert c.bucket_url == file_url + assert FilesystemConfiguration.make_local_path(c.bucket_url) == str( + pathlib.Path(bucket_url).expanduser() + ) + + +@pytest.mark.parametrize( + "bucket_url,local_path", + ( + ("file://_storage", "_storage"), + ("file://_storage/a/b/c.txt", "_storage/a/b/c.txt"), + ), +) +def test_file_host_configuration(bucket_url: str, local_path: str) -> None: + # recognized as UNC path on windows. on POSIX the path does not make sense and start with "//" + assert FilesystemConfiguration.is_local_path(bucket_url) is False + assert FilesystemConfiguration.make_local_path(bucket_url) == str( + pathlib.Path("//" + local_path) + ) + + c = resolve_configuration(FilesystemConfiguration(bucket_url)) + assert c.protocol == "file" + assert c.bucket_url == bucket_url + + +@pytest.mark.parametrize( + "bucket_url,local_path,norm_bucket_url", + ( + ("file:", "", ""), + ("file://", "", ""), + ("file:/", "/", "file:///" + pathlib.Path("/").resolve().as_posix().lstrip("/")), + ), +) +def test_file_filesystem_configuration( + bucket_url: str, local_path: str, norm_bucket_url: str +) -> None: + assert FilesystemConfiguration.is_local_path(bucket_url) is False + if local_path == "": + # those paths are invalid + with pytest.raises(ConfigurationValueError): + FilesystemConfiguration.make_local_path(bucket_url) + else: + assert FilesystemConfiguration.make_local_path(bucket_url) == str( + pathlib.Path(local_path).resolve() + ) + assert FilesystemConfiguration.make_file_uri(local_path) == norm_bucket_url + + if local_path == "": + with pytest.raises(ConfigurationValueError): + resolve_configuration(FilesystemConfiguration(bucket_url)) + else: + c = resolve_configuration(FilesystemConfiguration(bucket_url)) + assert c.protocol == "file" + assert c.bucket_url == bucket_url + assert FilesystemConfiguration.make_local_path(c.bucket_url) == str( + pathlib.Path(local_path).resolve() + ) + + +@pytest.mark.parametrize( + "bucket_url", + ( + "s3://dlt-ci-test-bucket/", + "gdrive://Xaie902-1/a/c", + ), +) +def test_filesystem_bucket_configuration(bucket_url: str) -> None: + assert FilesystemConfiguration.is_local_path(bucket_url) is False + + +@pytest.mark.parametrize("bucket_url", ("file:/", "file:///", "/", "")) +@pytest.mark.parametrize("load_content", (True, False)) +@pytest.mark.parametrize("glob_filter", ("**", "**/*.csv", "*.txt", "met_csv/A803/*.csv")) +def test_filesystem_dict_local(bucket_url: str, load_content: bool, glob_filter: str) -> None: if bucket_url in [""]: # relative paths - bucket_url = TEST_SAMPLE_FILES + bucket_url = str(pathlib.Path(TEST_SAMPLE_FILES)) else: if bucket_url == "/": bucket_url = os.path.abspath(TEST_SAMPLE_FILES) else: - bucket_url = pathlib.Path(TEST_SAMPLE_FILES).absolute().as_uri() + bucket_url = bucket_url + pathlib.Path(TEST_SAMPLE_FILES).resolve().as_posix() config = FilesystemConfiguration(bucket_url=bucket_url) filesystem, _ = fsspec_from_config(config) # use glob to get data - try: - all_file_items = list(glob_files(filesystem, bucket_url)) - assert_sample_files(all_file_items, filesystem, config, load_content) - except NotImplementedError as ex: - pytest.skip("Skipping due to " + str(ex)) + all_file_items = list(glob_files(filesystem, bucket_url, file_glob=glob_filter)) + assert_sample_files(all_file_items, filesystem, config, load_content, glob_filter) def test_filesystem_decompress() -> None: @@ -50,7 +223,34 @@ def test_filesystem_decompress() -> None: with file_dict.open(mode="tr") as f: lines = f.readlines() assert len(lines) > 1 - assert lines[0].startswith('"1200864931","2015-07-01 00:00:13"') + assert lines[0].startswith('"1200864931","2015-07-01 00:00:13"') # type: ignore # read as uncompressed binary with file_dict.open(compression="enable") as f: assert f.read().startswith(b'"1200864931","2015-07-01 00:00:13"') + + +# create windows UNC paths, on POSIX systems they are not used +WIN_ABS_PATH = os.path.abspath(TEST_SAMPLE_FILES) +WIN_UNC_PATH = "\\\\localhost\\" + WIN_ABS_PATH.replace(":", "$").lower() + + +@skipifnotwindows +@pytest.mark.parametrize( + "bucket_url", + ( + WIN_UNC_PATH, + "file:///" + pathlib.Path(WIN_UNC_PATH).as_posix(), + "file://localhost/" + pathlib.Path(WIN_ABS_PATH).as_posix().replace(":", "$"), + WIN_ABS_PATH, + "file:///" + pathlib.Path(WIN_ABS_PATH).as_posix(), + # r"\\wsl.localhost\Ubuntu-18.04\home\rudolfix\src\dlt\tests\common\storages\samples" + ), +) +@pytest.mark.parametrize("load_content", [True, False]) +@pytest.mark.parametrize("glob_filter", ("**", "**/*.csv", "*.txt", "met_csv/A803/*.csv")) +def test_windows_unc_path(load_content: bool, bucket_url: str, glob_filter: str) -> None: + config = FilesystemConfiguration(bucket_url=TEST_SAMPLE_FILES) + config.read_only = True + fs_client, _ = fsspec_from_config(config) + all_file_items = list(glob_files(fs_client, bucket_url, file_glob=glob_filter)) + assert_sample_files(all_file_items, fs_client, config, load_content, glob_filter) diff --git a/tests/common/storages/utils.py b/tests/common/storages/utils.py index 0eb472a9af..0737136622 100644 --- a/tests/common/storages/utils.py +++ b/tests/common/storages/utils.py @@ -1,3 +1,5 @@ +import os +import glob from pathlib import Path from urllib.parse import urlparse import pytest @@ -22,6 +24,21 @@ from dlt.common.typing import StrAny, TDataItems from dlt.common.utils import uniq_id +TEST_SAMPLE_FILES = "tests/common/storages/samples" +MINIMALLY_EXPECTED_RELATIVE_PATHS = { + "csv/freshman_kgs.csv", + "csv/freshman_lbs.csv", + "csv/mlb_players.csv", + "csv/mlb_teams_2012.csv", + "jsonl/mlb_players.jsonl", + "met_csv/A801/A881_20230920.csv", + "met_csv/A803/A803_20230919.csv", + "met_csv/A803/A803_20230920.csv", + "parquet/mlb_players.parquet", + "gzip/taxi.csv.gz", + "sample.txt", +} + @pytest.fixture def load_storage() -> LoadStorage: @@ -30,46 +47,56 @@ def load_storage() -> LoadStorage: return s +def glob_local_case(glob_filter: str) -> List[str]: + all_files = [ + os.path.relpath(p, TEST_SAMPLE_FILES) + for p in glob.glob(os.path.join(TEST_SAMPLE_FILES, glob_filter), recursive=True) + if os.path.isfile(p) + ] + return [Path(p).as_posix() for p in all_files] + + def assert_sample_files( all_file_items: List[FileItem], filesystem: AbstractFileSystem, config: FilesystemConfiguration, load_content: bool, + glob_filter: str = "**", ) -> None: - minimally_expected_file_items = { - "csv/freshman_kgs.csv", - "csv/freshman_lbs.csv", - "csv/mlb_players.csv", - "csv/mlb_teams_2012.csv", - "jsonl/mlb_players.jsonl", - "met_csv/A801/A881_20230920.csv", - "met_csv/A803/A803_20230919.csv", - "met_csv/A803/A803_20230920.csv", - "parquet/mlb_players.parquet", - "gzip/taxi.csv.gz", - "sample.txt", - } - assert len(all_file_items) == len(minimally_expected_file_items) + # sanity checks: all expected files are in the fixtures + assert MINIMALLY_EXPECTED_RELATIVE_PATHS == set(glob_local_case("**")) + # filter expected files by glob filter + expected_relative_paths = glob_local_case(glob_filter) + expected_file_names = [path.split("/")[-1] for path in expected_relative_paths] + + assert len(all_file_items) == len(expected_relative_paths) for item in all_file_items: # only accept file items we know - assert item["file_name"] in minimally_expected_file_items + assert item["relative_path"] in expected_relative_paths # is valid url file_url_parsed = urlparse(item["file_url"]) assert isinstance(item["file_name"], str) + assert item["file_name"] in expected_file_names assert file_url_parsed.path.endswith(item["file_name"]) assert item["file_url"].startswith(config.protocol) + assert item["file_url"].endswith(item["relative_path"]) assert isinstance(item["mime_type"], str) assert isinstance(item["size_in_bytes"], int) assert isinstance(item["modification_date"], pendulum.DateTime) - content = filesystem.read_bytes(item["file_url"]) - assert len(content) == item["size_in_bytes"] - if load_content: - item["file_content"] = content # create file dict file_dict = FileItemDict(item, config.credentials) + + try: + # try to load using local filesystem + content = filesystem.read_bytes(file_dict.local_file_path) + except ValueError: + content = filesystem.read_bytes(file_dict["file_url"]) + assert len(content) == item["size_in_bytes"] + if load_content: + item["file_content"] = content dict_content = file_dict.read_bytes() assert content == dict_content with file_dict.open() as f: @@ -86,7 +113,7 @@ def assert_sample_files( # fieldnames below are not really correct but allow to load first 3 columns # even if first row does not have header names - elements = list(DictReader(f, fieldnames=["A", "B", "C"])) + elements = list(DictReader(f, fieldnames=["A", "B", "C"])) # type: ignore assert len(elements) > 0 if item["mime_type"] == "application/parquet": # verify it is a real parquet diff --git a/tests/load/filesystem/test_filesystem_common.py b/tests/load/filesystem/test_filesystem_common.py index 5cb064a3b2..3677765c9f 100644 --- a/tests/load/filesystem/test_filesystem_common.py +++ b/tests/load/filesystem/test_filesystem_common.py @@ -57,41 +57,54 @@ def test_filesystem_configuration() -> None: def test_filesystem_instance(with_gdrive_buckets_env: str) -> None: - @retry(stop=stop_after_attempt(10), wait=wait_fixed(1)) - def check_file_exists(): - files = filesystem.ls(url, detail=True) - details = next(d for d in files if d["name"] == file_url) - assert details["size"] == 10 - - def check_file_changed(): - details = filesystem.info(file_url) + @retry(stop=stop_after_attempt(10), wait=wait_fixed(1), reraise=True) + def check_file_exists(filedir_: str, file_url_: str): + try: + files = filesystem.ls(filedir_, detail=True) + details = next(d for d in files if d["name"] == file_url_) + assert details["size"] == 10 + except Exception as ex: + print(ex) + raise + + def check_file_changed(file_url_: str): + details = filesystem.info(file_url_) assert details["size"] == 11 assert (MTIME_DISPATCH[config.protocol](details) - now).seconds < 160 bucket_url = os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] config = get_config() - assert bucket_url.startswith(config.protocol) + # we do not add protocol to bucket_url (we need relative path) + assert bucket_url.startswith(config.protocol) or config.protocol == "file" filesystem, url = fsspec_from_config(config) if config.protocol != "file": assert bucket_url.endswith(url) # do a few file ops now = pendulum.now() filename = f"filesystem_common_{uniq_id()}" - file_url = posixpath.join(url, filename) + file_dir = posixpath.join(url, f"filesystem_common_dir_{uniq_id()}") + file_url = posixpath.join(file_dir, filename) try: + filesystem.mkdir(file_dir, create_parents=False) filesystem.pipe(file_url, b"test bytes") - check_file_exists() + check_file_exists(file_dir, file_url) filesystem.pipe(file_url, b"test bytes2") - check_file_changed() + check_file_changed(file_url) finally: filesystem.rm(file_url) + # s3 does not create folder with mkdir + if config.protocol != "s3": + filesystem.rmdir(file_dir) assert not filesystem.exists(file_url) with pytest.raises(FileNotFoundError): filesystem.info(file_url) @pytest.mark.parametrize("load_content", (True, False)) -def test_filesystem_dict(with_gdrive_buckets_env: str, load_content: bool) -> None: +@pytest.mark.parametrize("glob_filter", ("**", "**/*.csv", "*.txt", "met_csv/A803/*.csv")) +def test_filesystem_dict( + with_gdrive_buckets_env: str, load_content: bool, glob_filter: str +) -> None: bucket_url = os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] config = get_config() # enable caches @@ -106,11 +119,8 @@ def test_filesystem_dict(with_gdrive_buckets_env: str, load_content: bool) -> No ).geturl() filesystem, _ = fsspec_from_config(config) # use glob to get data - try: - all_file_items = list(glob_files(filesystem, bucket_url)) - assert_sample_files(all_file_items, filesystem, config, load_content) - except NotImplementedError as ex: - pytest.skip(f"Skipping due to {str(ex)}") + all_file_items = list(glob_files(filesystem, bucket_url, glob_filter)) + assert_sample_files(all_file_items, filesystem, config, load_content, glob_filter) @pytest.mark.skipif("s3" not in ALL_FILESYSTEM_DRIVERS, reason="s3 destination not configured") diff --git a/tests/load/utils.py b/tests/load/utils.py index 74cd782a56..b0c9290970 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -5,6 +5,7 @@ from typing import Any, Iterator, List, Sequence, IO, Tuple, Optional, Dict, Union, Generator import shutil from pathlib import Path +from urllib.parse import urlparse from dataclasses import dataclass import dlt @@ -67,7 +68,9 @@ # Filter out buckets not in all filesystem drivers WITH_GDRIVE_BUCKETS = [GCS_BUCKET, AWS_BUCKET, FILE_BUCKET, MEMORY_BUCKET, AZ_BUCKET, GDRIVE_BUCKET] WITH_GDRIVE_BUCKETS = [ - bucket for bucket in WITH_GDRIVE_BUCKETS if bucket.split(":")[0] in ALL_FILESYSTEM_DRIVERS + bucket + for bucket in WITH_GDRIVE_BUCKETS + if (urlparse(bucket).scheme or "file") in ALL_FILESYSTEM_DRIVERS ] # temporary solution to include gdrive bucket in tests,