Skip to content

Commit

Permalink
Merge pull request #118 from tjgalvin/copycontext
Browse files Browse the repository at this point in the history
Create Copy/Temporary context managers
  • Loading branch information
tjgalvin authored Jun 19, 2024
2 parents 34e464c + 9bd7f77 commit f95bbd9
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
- Added new masking modes
- More Option types available in the template configuration
- Tarball of files and linmos fields

- Added context managers to help with MEMDIR / LOCALDIR like variables
-
## Pre 0.2.3

Everything chsnges all the time
104 changes: 104 additions & 0 deletions flint/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import shutil
import subprocess
from contextlib import contextmanager
from pathlib import Path
from typing import List, Optional, Tuple, Union

Expand All @@ -24,6 +25,109 @@
# that only opens the FITS file once and places things into common field names.


@contextmanager
def hold_then_move_into(
hold_directory: Path, move_directory: Path, delete_hold_on_exist: bool = True
) -> Path:
"""Create a temporary directory such that anything within it one the
exit of the context manager is copied over to `move_directory`.
Args:
hold_directory (Path): Location of directory to temporarily base work from
move_directory (Path): Final directort location to move items into
delete_hold_on_exist (bool, optional): Whether `hold_directory` is deleted on exit of the context. Defaults to True.
Returns:
Path: Path to the temporary folder
Yields:
Iterator[Path]: Path to the temporary folder
"""
# TODO: except extra files and folders to copy into `hold_directory` that are
# also placed back on exit
hold_directory = Path(hold_directory)
move_directory = Path(move_directory)

if hold_directory == move_directory:
yield move_directory
else:
for directory in (hold_directory, move_directory):
if directory.exists():
assert directory.is_dir()
else:
directory.mkdir(parents=True)

assert all([d.is_dir() for d in (hold_directory, move_directory)])

yield hold_directory

for file_or_folder in hold_directory.glob("*"):
logger.info(f"Moving {file_or_folder=} to {move_directory=}")
shutil.move(str(file_or_folder), move_directory)

if delete_hold_on_exist:
remove_files_folders(hold_directory)


@contextmanager
def temporarily_move_into(
subject: Path, temporary_directory: Optional[Path] = None
) -> Path:
"""Given a file or folder, temporarily copy it into the path specified
by `temporary_directory` for the duration of the context manager. Upon
exit the original copy, specified by `subject`, is removed and replaced
by the copy within `temporary_directory`.
`temporary_directory` will be created internally, and an error will be
raised if it exists.
If `temporary_directory` describes a nested path only the lowest directory
is removed.
If `temporary_directory` is None the `subject` path is returned and there
is no copying and deleting performed.
Args:
subject (Path): The file or folder to temporarily move
temporary_directory (Optional[Path], optional): The temporary directory to work with. If none the subject path is returned. Defaults to None.
Yields:
Path: The path to the temporary object
"""
subject = Path(subject)
temporary_directory = Path(temporary_directory) if temporary_directory else None

if temporary_directory is None:
yield subject
else:
temporary_directory.mkdir(parents=True, exist_ok=True)
assert (
temporary_directory.is_dir()
), f"{temporary_directory=} exists and is not a folder"

output_item = temporary_directory / subject.name
assert not output_item.exists(), f"{output_item=} already exists! "

logger.info(f"Moving {subject=} to {output_item=}")

if subject.is_dir():
logger.info(f"{subject=} is a directory, recursively copying")
copy_directory(
input_directory=subject, output_directory=output_item.absolute()
)
else:
shutil.copy(subject, output_item)

yield output_item

logger.info(f"Moving {output_item} back to {subject=}")
remove_files_folders(subject)
shutil.move(output_item, subject)

logger.info(f"Removing {temporary_directory=}")
shutil.rmtree(temporary_directory)


def get_environment_variable(variable: str) -> Union[str, None]:
"""Get the value of an environment variable if it exists. If it does not
a None is returned.
Expand Down
129 changes: 129 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,138 @@
get_environment_variable,
get_packaged_resource_path,
get_pixels_per_beam,
hold_then_move_into,
temporarily_move_into,
)


def test_hold_then_move_same_folder(tmpdir):
a = Path(tmpdir) / "Captin"

with hold_then_move_into(hold_directory=a, move_directory=a) as example:
assert a == example


def test_hold_then_test_errors(tmpdir):
"""Make sure some basic error handling"""

a = Path(tmpdir) / "Jack.txt"
b = Path(tmpdir) / "Sparrow.txt"

a.touch()
b.touch()

with pytest.raises(AssertionError):
with hold_then_move_into(hold_directory=a, move_directory=b) as _:
logger.info("This will not be here")


def test_hold_then_move_into(tmpdir):
"""See whether the hold directory can have things dumped into it, then
moved into place on exit of the context manager"""

tmpdir = Path(tmpdir)

hold_directory = Path(tmpdir / "putthingshere")
move_directory = Path(tmpdir / "the/final/location")

assert all([not d.exists() for d in (hold_directory, move_directory)])
no_files = 45
with hold_then_move_into(
hold_directory=hold_directory, move_directory=move_directory
) as put_dir:
assert put_dir.exists()
for i in range(no_files):
file: Path = put_dir / f"some_file_{i}.txt"
file.write_text(f"This is a file {i}")

assert len(list(put_dir.glob("*"))) == no_files
assert move_directory.exists()
assert len(list(move_directory.glob("*"))) == 0

assert len(list(move_directory.glob("*"))) == no_files
assert not put_dir.exists()
assert not hold_directory.exists()


def test_temporarily_move_into_none(tmpdir):
"""Make sure that the temporary context manager returns the same path without
any deleting should the temporary directory be set to None"""

TEXT = "this is a test message"
source_test = Path(tmpdir) / "source_dir2/test.txt"
source_test.parent.mkdir()
source_test.touch()
assert source_test.read_text() == ""

with temporarily_move_into(
subject=source_test, temporary_directory=None
) as temp_file:
assert isinstance(temp_file, Path)
assert source_test.samefile(temp_file)
temp_file.write_text(TEXT)

assert source_test.read_text() == TEXT


def test_temporarily_move_into_with_directory(tmpdir):
"""See whether the temp move context manager behaves in a sane way using the
case where the subject is a directory"""
TEXT = "this is a test message"
source_test = Path(tmpdir) / "source_dir2/test.txt"
source_test.parent.mkdir()
source_test.touch()
assert source_test.read_text() == ""
source_parent = source_test.parent

temp_dir = Path(tmpdir) / "someotherdirforwdir"
assert not temp_dir.exists()

with temporarily_move_into(
subject=source_parent, temporary_directory=temp_dir
) as temp_parent:
assert isinstance(temp_parent, Path)
assert Path(temp_parent).exists()
assert Path(temp_parent).is_dir()
temp_file = Path(temp_parent) / "test.txt"
assert temp_file.read_text() == ""

temp_file.write_text(TEXT)
assert temp_file.read_text() == TEXT

assert source_test.read_text() == TEXT
assert not temp_file.exists()
assert not temp_dir.exists()


def test_temporarily_move_into(tmpdir):
"""See whether the temp move context manager behaves in a sane way"""
TEXT = "this is a test message"
source_test = Path(tmpdir) / "source_dir/test.txt"
source_test.parent.mkdir()
source_test.touch()
assert source_test.read_text() == ""

temp_dir = Path(tmpdir) / "someotherdir"
assert not temp_dir.exists()

with temporarily_move_into(
subject=source_test, temporary_directory=temp_dir
) as temp_file:
assert isinstance(temp_file, Path)
assert Path(temp_file).exists()
assert Path(temp_file).is_file()

assert temp_file.read_text() == ""

temp_file.write_text(TEXT)
assert temp_file.read_text() == TEXT

assert source_test.read_text() == TEXT
assert not temp_file.exists()
assert not temp_dir.exists()


@pytest.fixture(scope="session", autouse=True)
def set_env():
"""Set up variables for a specific test"""
Expand Down

0 comments on commit f95bbd9

Please sign in to comment.