Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rsync: Modeled after fsspec.generic.rsync this uses two fs explicitly #1403

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions fsspec/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,3 +627,101 @@ def atomic_write(path: str, mode: str = "wb"):
raise
else:
os.replace(fn, path)


def rsync(
source,
destination,
source_fs: AbstractFileSystem,
target_fs: AbstractFileSystem,
delete_missing=False,
source_field="size",
dest_field="size",
update_cond="different",
inst_kwargs=None,
) -> None:
"""Sync files between two directory trees

(experimental)

Parameters
----------
source: str
Root of the directory tree to take files from. This must be a directory, but
do not include any terminating "/" character
destination: str
Root path to copy into. The contents of this location should be
identical to the contents of ``source`` when done. This will be made a
directory, and the terminal "/" should not be included.
delete_missing: bool
If there are paths in the destination that don't exist in the
source and this is True, delete them. Otherwise, leave them alone.
source_field: str | callable
If ``update_field`` is "different", this is the key in the info
of source files to consider for difference. Maybe a function of the
info dict.
dest_field: str | callable
If ``update_field`` is "different", this is the key in the info
of destination files to consider for difference. May be a function of
the info dict.
update_cond: "different"|"always"|"never"
If "always", every file is copied, regardless of whether it exists in
the destination. If "never", files that exist in the destination are
not copied again. If "different" (default), only copy if the info
fields given by ``source_field`` and ``dest_field`` (usually "size")
are different. Other comparisons may be added in the future.
inst_kwargs: dict|None
If ``fs`` is None, use this set of keyword arguments to make a
GenericFileSystem instance
fs: GenericFileSystem|None
Instance to use if explicitly given. The instance defines how to
to make downstream file system instances from paths.
"""
logger = logging.getLogger("fsspec.rsync")
source = source_fs._strip_protocol(source)
destination = target_fs._strip_protocol(destination)
allfiles = source_fs.find(source, withdirs=True, detail=True)
if not source_fs.isdir(source):
raise ValueError("Can only rsync on a directory")
otherfiles = target_fs.find(destination, withdirs=True, detail=True)
dirs = [
a
for a, v in allfiles.items()
if v["type"] == "directory" and a.replace(source, destination) not in otherfiles
]
logger.debug(f"{len(dirs)} directories to create")
if dirs:
for dirn in dirs:
target_fs.makedirs(dirn.replace(source, destination), exist_ok=True)
allfiles = {a: v for a, v in allfiles.items() if v["type"] == "file"}
logger.debug(f"{len(allfiles)} files to consider for copy")
to_delete = [
o
for o, v in otherfiles.items()
if o.replace(destination, source) not in allfiles and v["type"] == "file"
]
for k, v in allfiles.copy().items():
otherfile = k.replace(source, destination)
if otherfile in otherfiles:
if update_cond == "always":
allfiles[k] = otherfile
elif update_cond == "different":
inf1 = source_field(v) if callable(source_field) else v[source_field]
v2 = otherfiles[otherfile]
inf2 = dest_field(v2) if callable(dest_field) else v2[dest_field]
if inf1 != inf2:
# details mismatch, make copy
allfiles[k] = otherfile
else:
# details match, don't copy
allfiles.pop(k)
else:
# file not in target yet
allfiles[k] = otherfile
logger.debug(f"{len(allfiles)} files to copy")
if allfiles:
source_files, target_files = zip(*allfiles.items())
source_fs.cp(source_files, target_files, **kwargs)
logger.debug(f"{len(to_delete)} files to delete")
if delete_missing:
target_fs.rm(to_delete)