Skip to content

Commit

Permalink
Use data from initial request to populate metadata object
Browse files Browse the repository at this point in the history
  • Loading branch information
rbiseck3 committed Oct 2, 2024
1 parent 02a5b1f commit 6b100cf
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 96 deletions.
29 changes: 28 additions & 1 deletion unstructured_ingest/v2/processes/connectors/fsspec/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

from dataclasses import dataclass, field
from pathlib import Path
from time import time
from typing import Any, Generator, Optional

from pydantic import Field, Secret

from unstructured_ingest.utils.dep_check import requires_dependencies
from unstructured_ingest.v2.interfaces import DownloadResponse, FileData
from unstructured_ingest.v2.interfaces import DownloadResponse, FileData, FileDataSourceMetadata
from unstructured_ingest.v2.processes.connector_registry import (
DestinationRegistryEntry,
SourceRegistryEntry,
Expand Down Expand Up @@ -107,6 +108,32 @@ def sterilize_info(self, path) -> dict:
def run(self, **kwargs: Any) -> Generator[FileData, None, None]:
return super().run(**kwargs)

def get_metadata(self, file_data: dict) -> FileDataSourceMetadata:
path = file_data["name"]
date_created = (
file_data.get("creation_time").timestamp() if "creation_time" in file_data else None
)
date_modified = (
file_data.get("last_modified").timestamp() if "last_modified" in file_data else None
)

file_size = file_data.get("size") if "size" in file_data else None

version = file_data.get("etag")
record_locator = {
"protocol": self.index_config.protocol,
"remote_file_path": self.index_config.remote_url,
}
return FileDataSourceMetadata(
date_created=date_created,
date_modified=date_modified,
date_processed=str(time()),
version=version,
url=f"{self.index_config.protocol}://{path}",
record_locator=record_locator,
filesize_bytes=file_size,
)


class AzureDownloaderConfig(FsspecDownloaderConfig):
pass
Expand Down
30 changes: 29 additions & 1 deletion unstructured_ingest/v2/processes/connectors/fsspec/box.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

from dataclasses import dataclass, field
from pathlib import Path
from time import time
from typing import Any, Generator, Optional

from dateutil import parser
from pydantic import Field, Secret

from unstructured_ingest.utils.dep_check import requires_dependencies
from unstructured_ingest.v2.interfaces import DownloadResponse, FileData
from unstructured_ingest.v2.interfaces import DownloadResponse, FileData, FileDataSourceMetadata
from unstructured_ingest.v2.processes.connector_registry import (
DestinationRegistryEntry,
SourceRegistryEntry,
Expand Down Expand Up @@ -73,6 +75,32 @@ def run(self, **kwargs: Any) -> Generator[FileData, None, None]:
def precheck(self) -> None:
super().precheck()

def get_metadata(self, file_data: dict) -> FileDataSourceMetadata:
path = file_data["name"]
date_created = None
date_modified = None
if modified_at_str := file_data.get("modified_at"):
date_modified = parser.parse(modified_at_str).timestamp()
if created_at_str := file_data.get("created_at"):
date_created = parser.parse(created_at_str).timestamp()

file_size = file_data.get("size") if "size" in file_data else None

version = file_data.get("id")
record_locator = {
"protocol": self.index_config.protocol,
"remote_file_path": self.index_config.remote_url,
}
return FileDataSourceMetadata(
date_created=date_created,
date_modified=date_modified,
date_processed=str(time()),
version=version,
url=f"{self.index_config.protocol}://{path}",
record_locator=record_locator,
filesize_bytes=file_size,
)


class BoxDownloaderConfig(FsspecDownloaderConfig):
pass
Expand Down
33 changes: 32 additions & 1 deletion unstructured_ingest/v2/processes/connectors/fsspec/dropbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

from dataclasses import dataclass, field
from pathlib import Path
from time import time
from typing import Any, Generator, Optional

from pydantic import Field, Secret

from unstructured_ingest.utils.dep_check import requires_dependencies
from unstructured_ingest.v2.interfaces import DownloadResponse, FileData
from unstructured_ingest.v2.interfaces import DownloadResponse, FileData, FileDataSourceMetadata
from unstructured_ingest.v2.processes.connector_registry import (
DestinationRegistryEntry,
SourceRegistryEntry,
Expand Down Expand Up @@ -49,6 +50,36 @@ class DropboxIndexer(FsspecIndexer):
index_config: DropboxIndexerConfig
connector_type: str = CONNECTOR_TYPE

def get_metadata(self, file_data: dict) -> FileDataSourceMetadata:
path = file_data["name"].lstrip("/")
date_created = None
date_modified = None
server_modified = file_data.get("server_modified")
client_modified = file_data.get("client_modified")
if server_modified and client_modified and server_modified > client_modified:
date_created = str(client_modified.timestamp())
date_modified = str(server_modified.timestamp())
elif server_modified and client_modified and server_modified < client_modified:
date_created = str(server_modified.timestamp())
date_modified = str(client_modified.timestamp())

file_size = file_data.get("size") if "size" in file_data else None

version = file_data.get("content_hash")
record_locator = {
"protocol": self.index_config.protocol,
"remote_file_path": self.index_config.remote_url,
}
return FileDataSourceMetadata(
date_created=date_created,
date_modified=date_modified,
date_processed=str(time()),
version=version,
url=f"{self.index_config.protocol}://{path}",
record_locator=record_locator,
filesize_bytes=file_size,
)

@requires_dependencies(["dropboxdrivefs", "fsspec"], extras="dropbox")
def __post_init__(self):
# dropbox expects the path to start with a /
Expand Down
94 changes: 20 additions & 74 deletions unstructured_ingest/v2/processes/connectors/fsspec/fsspec.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
from __future__ import annotations

import contextlib
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from time import time
from typing import TYPE_CHECKING, Any, Generator, Optional, TypeVar
from uuid import NAMESPACE_DNS, uuid5

Expand Down Expand Up @@ -113,103 +110,52 @@ def precheck(self) -> None:
logger.error(f"failed to validate connection: {e}", exc_info=True)
raise SourceConnectionError(f"failed to validate connection: {e}")

def list_files(self) -> list[str]:
def get_file_data(self) -> list[dict[str, Any]]:
if not self.index_config.recursive:
# fs.ls does not walk directories
# directories that are listed in cloud storage can cause problems
# because they are seen as 0 byte files
found = self.fs.ls(self.index_config.path_without_protocol, detail=True)
if isinstance(found, list):
return [
x.get("name") for x in found if x.get("size") > 0 and x.get("type") == "file"
]
else:
raise TypeError(f"unhandled response type from ls: {type(found)}")
files = self.fs.ls(self.index_config.path_without_protocol, detail=True)

else:
# fs.find will recursively walk directories
# "size" is a common key for all the cloud protocols with fs
found = self.fs.find(
self.index_config.path_without_protocol,
detail=True,
)
if isinstance(found, dict):
return [
k for k, v in found.items() if v.get("size") > 0 and v.get("type") == "file"
]
else:
raise TypeError(f"unhandled response type from find: {type(found)}")

def get_metadata(self, path: str) -> FileDataSourceMetadata:
date_created = None
date_modified = None
file_size = None
try:
created: Optional[Any] = self.fs.created(path)
if created:
if isinstance(created, datetime):
date_created = str(created.timestamp())
else:
date_created = str(created)
except NotImplementedError:
pass
files = found.values()
filtered_files = [
file for file in files if file.get("size") > 0 and file.get("type") == "file"
]
return filtered_files

try:
modified: Optional[Any] = self.fs.modified(path)
if modified:
if isinstance(modified, datetime):
date_modified = str(modified.timestamp())
else:
date_modified = str(modified)
except NotImplementedError:
pass
with contextlib.suppress(AttributeError):
file_size = self.fs.size(path)

version = self.fs.checksum(path)
metadata: dict[str, str] = {}
with contextlib.suppress(AttributeError):
metadata = self.fs.metadata(path)
record_locator = {
"protocol": self.index_config.protocol,
"remote_file_path": self.index_config.remote_url,
}
file_stat = self.fs.stat(path=path)
if file_id := file_stat.get("id"):
record_locator["file_id"] = file_id
if metadata:
record_locator["metadata"] = metadata
return FileDataSourceMetadata(
date_created=date_created,
date_modified=date_modified,
date_processed=str(time()),
version=str(version),
url=f"{self.index_config.protocol}://{path}",
record_locator=record_locator,
filesize_bytes=file_size,
)
def get_metadata(self, file_data: dict) -> FileDataSourceMetadata:
raise NotImplementedError()

def sterilize_info(self, path) -> dict:
info = self.fs.info(path=path)
return sterilize_dict(data=info)

def run(self, **kwargs: Any) -> Generator[FileData, None, None]:
files = self.list_files()
for file in files:
files = self.get_file_data()
for file_data in files:
file_path = file_data["key"]
# Note: we remove any remaining leading slashes (Box introduces these)
# to get a valid relative path
rel_path = file.replace(self.index_config.path_without_protocol, "").lstrip("/")
rel_path = file_path.replace(self.index_config.path_without_protocol, "").lstrip("/")

additional_metadata = self.sterilize_info(path=file)
additional_metadata["original_file_path"] = file
additional_metadata = self.sterilize_info(path=file_path)
additional_metadata["original_file_path"] = file_path
yield FileData(
identifier=str(uuid5(NAMESPACE_DNS, file)),
identifier=str(uuid5(NAMESPACE_DNS, file_path)),
connector_type=self.connector_type,
source_identifiers=SourceIdentifiers(
filename=Path(file).name,
filename=Path(file_path).name,
rel_path=rel_path or None,
fullpath=file,
fullpath=file_path,
),
metadata=self.get_metadata(path=file),
metadata=self.get_metadata(path=file_path),
additional_metadata=additional_metadata,
)

Expand Down
30 changes: 29 additions & 1 deletion unstructured_ingest/v2/processes/connectors/fsspec/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

from dataclasses import dataclass, field
from pathlib import Path
from time import time
from typing import Any, Generator, Optional, Union

from dateutil import parser
from pydantic import Field, Secret

from unstructured_ingest.utils.dep_check import requires_dependencies
from unstructured_ingest.utils.string_and_date_utils import json_to_dict
from unstructured_ingest.v2.interfaces import DownloadResponse, FileData
from unstructured_ingest.v2.interfaces import DownloadResponse, FileData, FileDataSourceMetadata
from unstructured_ingest.v2.processes.connector_registry import (
DestinationRegistryEntry,
SourceRegistryEntry,
Expand Down Expand Up @@ -106,6 +108,32 @@ def run(self, **kwargs: Any) -> Generator[FileData, None, None]:
def precheck(self) -> None:
super().precheck()

def get_metadata(self, file_data: dict) -> FileDataSourceMetadata:
path = file_data["name"]
date_created = None
date_modified = None
if modified_at_str := file_data.get("updated"):
date_modified = parser.parse(modified_at_str).timestamp()
if created_at_str := file_data.get("timeCreated:"):
date_created = parser.parse(created_at_str).timestamp()

file_size = file_data.get("size") if "size" in file_data else None

version = file_data.get("etag")
record_locator = {
"protocol": self.index_config.protocol,
"remote_file_path": self.index_config.remote_url,
}
return FileDataSourceMetadata(
date_created=date_created,
date_modified=date_modified,
date_processed=str(time()),
version=version,
url=f"{self.index_config.protocol}://{path}",
record_locator=record_locator,
filesize_bytes=file_size,
)


class GcsDownloaderConfig(FsspecDownloaderConfig):
pass
Expand Down
28 changes: 11 additions & 17 deletions unstructured_ingest/v2/processes/connectors/fsspec/s3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import contextlib
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from time import time
from typing import Any, Generator, Optional
Expand Down Expand Up @@ -80,27 +79,22 @@ class S3Indexer(FsspecIndexer):
index_config: S3IndexerConfig
connector_type: str = CONNECTOR_TYPE

def get_metadata(self, path: str) -> FileDataSourceMetadata:
def get_metadata(self, file_data: dict) -> FileDataSourceMetadata:
path = file_data["Key"]
date_created = None
date_modified = None
file_size = None
try:
modified: Optional[datetime] = self.fs.modified(path)
if modified:
date_created = str(modified.timestamp())
date_modified = str(modified.timestamp())
except NotImplementedError:
pass
with contextlib.suppress(AttributeError):
file_size = self.fs.size(path)
modified = file_data.get("LastModified")
if modified:
date_created = str(modified.timestamp())
date_modified = str(modified.timestamp())

file_size = file_data.get("size") if "size" in file_data else None
file_size = file_size or file_data.get("Size")

version = None
info: dict[str, Any] = self.fs.info(path)
if etag := info.get("ETag"):
version = str(etag).rstrip('"').lstrip('"')
version = file_data.get("ETag").rstrip('"').lstrip('"') if "ETag" in file_data else None
metadata: dict[str, str] = {}
with contextlib.suppress(AttributeError):
metadata = self.fs.metadata(path)
metadata = self.fs.metadata(path=path)
record_locator = {
"protocol": self.index_config.protocol,
"remote_file_path": self.index_config.remote_url,
Expand Down
Loading

0 comments on commit 6b100cf

Please sign in to comment.