diff --git a/dlt/common/storages/fsspec_filesystem.py b/dlt/common/storages/fsspec_filesystem.py index 5e003c66f6..036a86c225 100644 --- a/dlt/common/storages/fsspec_filesystem.py +++ b/dlt/common/storages/fsspec_filesystem.py @@ -49,21 +49,20 @@ class FileItem(TypedDict, total=False): file_content: Optional[bytes] -# Map of protocol to mtime resolver -# we only need to support a small finite set of protocols -MTIME_DISPATCH = { - "s3": lambda f: ensure_pendulum_datetime(f["LastModified"]), - "adl": lambda f: ensure_pendulum_datetime(f["LastModified"]), - "az": lambda f: ensure_pendulum_datetime(f["last_modified"]), - "gcs": lambda f: ensure_pendulum_datetime(f["updated"]), - "file": lambda f: ensure_pendulum_datetime(f["mtime"]), - "memory": lambda f: ensure_pendulum_datetime(f["created"]), - "gdrive": lambda f: ensure_pendulum_datetime(f["modifiedTime"]), +DEFAULT_MTIME_FIELD_NAME = "mtime" +MTIME_FIELD_NAMES = { + "file": "mtime", + "s3": "LastModified", + "adl": "LastModified", + "az": "last_modified", + "gcs": "updated", + "memory": "created", + "gdrive": "modifiedTime", } # Support aliases -MTIME_DISPATCH["gs"] = MTIME_DISPATCH["gcs"] -MTIME_DISPATCH["s3a"] = MTIME_DISPATCH["s3"] -MTIME_DISPATCH["abfs"] = MTIME_DISPATCH["az"] +MTIME_FIELD_NAMES["gs"] = MTIME_FIELD_NAMES["gcs"] +MTIME_FIELD_NAMES["s3a"] = MTIME_FIELD_NAMES["s3"] +MTIME_FIELD_NAMES["abfs"] = MTIME_FIELD_NAMES["az"] # Map of protocol to a filesystem type CREDENTIALS_DISPATCH: Dict[str, Callable[[FilesystemConfiguration], DictStrAny]] = { @@ -110,7 +109,7 @@ def register_implementation_in_fsspec(protocol: str) -> None: if protocol in known_implementations: return - if not protocol in CUSTOM_IMPLEMENTATIONS: + if protocol not in CUSTOM_IMPLEMENTATIONS: raise ValueError( f"Unknown protocol: '{protocol}' is not an fsspec known " "implementations nor a dlt custom implementations." @@ -304,6 +303,32 @@ def guess_mime_type(file_name: str) -> Sequence[str]: return type_ +def extract_mtime(file_metadata: Dict[str, Any], protocol: str = None) -> pendulum.DateTime: + """Extract the modification time from file listing metadata. + + Args: + file_metadata (Dict[str, Any]): The file metadata. + protocol (str) [Optional]: The protocol. If not provided, None or not a known protocol, + then default field name `mtime` is tried. `mtime` is used for the "file" fsspec + implementation and our custom fsspec implementations. + + Returns: + pendulum.DateTime: The modification time. + + Raises: + KeyError: If the resolved field name is not found in the metadata. Current dlt use-cases + depend on a modified date. For example, transactional files, incremental destination + loading. + """ + field_name = MTIME_FIELD_NAMES.get(protocol, DEFAULT_MTIME_FIELD_NAME) + try: + return ensure_pendulum_datetime(file_metadata[field_name]) + except KeyError: + if protocol not in MTIME_FIELD_NAMES: + extra_message = " {DEFAULT_MTIME_FIELD_NAME} was used by default." + raise KeyError(f"`{field_name}` not found in metadata.{extra_message}") + + def glob_files( fs_client: AbstractFileSystem, bucket_url: str, file_glob: str = "**" ) -> Iterator[FileItem]: @@ -350,12 +375,14 @@ def glob_files( path=posixpath.join(bucket_url_parsed.path, file_name) ).geturl() + modification_date = extract_mtime(md, bucket_url_parsed.scheme) + mime_type, encoding = guess_mime_type(file_name) yield FileItem( file_name=file_name, file_url=file_url, mime_type=mime_type, encoding=encoding, - modification_date=MTIME_DISPATCH[bucket_url_parsed.scheme](md), + modification_date=modification_date, size_in_bytes=int(md["size"]), ) diff --git a/dlt/common/storages/transactional_file.py b/dlt/common/storages/transactional_file.py index e5ee220904..0fe9c310cf 100644 --- a/dlt/common/storages/transactional_file.py +++ b/dlt/common/storages/transactional_file.py @@ -16,7 +16,7 @@ import fsspec from dlt.common.pendulum import pendulum, timedelta -from dlt.common.storages.fsspec_filesystem import MTIME_DISPATCH +from dlt.common.storages.fsspec_filesystem import extract_mtime def lock_id(k: int = 4) -> str: @@ -56,8 +56,7 @@ def __init__(self, path: str, fs: fsspec.AbstractFileSystem) -> None: path: The path to lock. fs: The fsspec file system. """ - proto = fs.protocol[0] if isinstance(fs.protocol, (list, tuple)) else fs.protocol - self.extract_mtime = MTIME_DISPATCH.get(proto, MTIME_DISPATCH["file"]) + self._proto = fs.protocol[0] if isinstance(fs.protocol, (list, tuple)) else fs.protocol parsed_path = Path(path) if not parsed_path.is_absolute(): @@ -65,7 +64,7 @@ def __init__(self, path: str, fs: fsspec.AbstractFileSystem) -> None: f"{path} is not absolute. Please pass only absolute paths to TransactionalFile" ) self.path = path - if proto == "file": + if self._proto == "file": # standardize path separator to POSIX. fsspec always uses POSIX. Windows may use either. self.path = parsed_path.as_posix() @@ -103,7 +102,7 @@ def _sync_locks(self) -> t.List[str]: if not name.startswith(self.lock_prefix): continue # Purge stale locks - mtime = self.extract_mtime(lock) + mtime = extract_mtime(lock, self._proto) if now - mtime > timedelta(seconds=TransactionalFile.LOCK_TTL_SECONDS): try: # Janitors can race, so we ignore errors self._fs.rm(name) diff --git a/tests/common/storages/implementations/test_gitpythonfs.py b/tests/common/storages/implementations/test_gitpythonfs.py index a1f05a07ef..31a4f76118 100644 --- a/tests/common/storages/implementations/test_gitpythonfs.py +++ b/tests/common/storages/implementations/test_gitpythonfs.py @@ -173,7 +173,7 @@ def test_ls_file_details(repo_fixture: Iterator[Any]) -> None: assert isinstance( details["mode"], str ), "Should be a string representation of octal, without the 0o prefix." - assert isinstance(details["committed_date"], int) + assert isinstance(details["mtime"], int) def test_git_refs(repo_fixture: Iterator[Any]) -> None: diff --git a/tests/load/filesystem/test_filesystem_common.py b/tests/load/filesystem/test_filesystem_common.py index 1987d1274d..fd4592d897 100644 --- a/tests/load/filesystem/test_filesystem_common.py +++ b/tests/load/filesystem/test_filesystem_common.py @@ -13,7 +13,7 @@ from dlt.common.storages import fsspec_from_config, FilesystemConfiguration from dlt.common.storages.fsspec_filesystem import ( register_implementation_in_fsspec, - MTIME_DISPATCH, + extract_mtime, glob_files, ) from dlt.common.utils import uniq_id @@ -96,7 +96,7 @@ def check_file_exists(): def check_file_changed(): details = filesystem.info(file_url) assert details["size"] == 11 - assert (MTIME_DISPATCH[config.protocol](details) - now).seconds < 60 + assert (extract_mtime(details, config.protocol) - now).seconds < 60 bucket_url = os.environ["DESTINATION__FILESYSTEM__BUCKET_URL"] config = get_config()