diff --git a/.codecov.yml b/.codecov.yml index 84a63c6..35cde5c 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -1,14 +1,4 @@ coverage: status: - project: - default: - # Commits pushed to main should not make the overall - # project coverage decrease by more than 1% - target: auto - threshold: 1% - patch: - default: - # Be tolerant on code coverage diff on PRs to limit - # noisy red coverage status on github PRs. - target: auto - threshold: 20% + project: off + patch: off diff --git a/belay/cli/__init__.py b/belay/cli/__init__.py index e69de29..52cc189 100644 --- a/belay/cli/__init__.py +++ b/belay/cli/__init__.py @@ -0,0 +1 @@ +from .main import app, exec, identify, info, run, sync diff --git a/belay/cli/main.py b/belay/cli/main.py index 577b68f..c4820f1 100644 --- a/belay/cli/main.py +++ b/belay/cli/main.py @@ -1,13 +1,14 @@ from functools import partial from pathlib import Path from time import sleep +from typing import List, Optional, Union import typer from rich.console import Console from rich.progress import Progress from typer import Argument, Option -import belay +from belay import Device Arg = partial(Argument, ..., show_default=False) Opt = partial(Option) @@ -16,6 +17,13 @@ state = {} console: Console +_help_port = ( + "Port (like /dev/ttyUSB0) or WebSocket (like ws://192.168.1.100) of device." +) +_help_password = ( # nosec + "Password for communication methods (like WebREPL) that require authentication." +) + @app.callback() def callback(silent: bool = False): @@ -31,40 +39,70 @@ def callback(silent: bool = False): @app.command() def sync( - port: str = Arg( - help="Port (like /dev/ttyUSB0) or WebSocket (like ws://192.168.1.100) of device." - ), - folder: Path = Arg(help="Path to folder to sync."), - password: str = Opt( - "", - help="Password for communication methods (like WebREPL) that require authentication.", + port: str = Arg(help=_help_port), + folder: Path = Arg(help="Path of local file or folder to sync."), + dst: str = Opt("/", help="Destination directory to unpack folder contents to."), + password: str = Opt("", help=_help_password), + keep: Optional[List[str]] = Opt(None, help="Files to keep."), + ignore: Optional[List[str]] = Opt(None, help="Files to ignore."), + mpy_cross_binary: Optional[Path] = Opt( + None, help="Compile py files with this executable." ), ): """Synchronize a folder to device.""" + # Typer issues: https://github.com/tiangolo/typer/issues/410 + keep = keep if keep else None + ignore = ignore if ignore else None + with Progress() as progress: task_id = progress.add_task("") progress_update = partial(progress.update, task_id) progress_update(description=f"Connecting to {port}") - device = belay.Device(port, password=password) + device = Device(port, password=password) progress_update(description=f"Connected to {port}.") - device.sync(folder, progress_update=progress_update) + device.sync( + folder, + dst=dst, + keep=keep, + ignore=ignore, + mpy_cross_binary=mpy_cross_binary, + progress_update=progress_update, + ) progress_update(description="Sync complete.") +@app.command() +def run( + port: str = Arg(help=_help_port), + file: Path = Arg(help="File to run on-device."), + password: str = Opt("", help=_help_password), +): + """Run file on-device.""" + device = Device(port, password=password) + content = file.read_text() + device(content) + + +@app.command() +def exec( + port: str = Arg(help=_help_port), + statement: str = Arg(help="Statement to execute on-device."), + password: str = Opt("", help=_help_password), +): + """Execute python statement on-device.""" + device = Device(port, password=password) + device(statement) + + @app.command() def info( - port: str = Arg( - help="Port (like /dev/ttyUSB0) or WebSocket (like ws://192.168.1.100) of device." - ), - password: str = Opt( - "", - help="Password for communication methods (like WebREPL) that require authentication.", - ), + port: str = Arg(help=_help_port), + password: str = Opt("", help=_help_password), ): """Display device firmware information.""" - device = belay.Device(port, password=password) + device = Device(port, password=password) version_str = "v" + ".".join(str(x) for x in device.implementation.version) print( f"{device.implementation.name} {version_str} - {device.implementation.platform}" @@ -74,14 +112,9 @@ def info( @app.command() def identify( - port: str = Arg( - help="Port (like /dev/ttyUSB0) or WebSocket (like ws://192.168.1.100) of device." - ), + port: str = Arg(help=_help_port), pin: int = Arg(help="GPIO pin to flash LED on."), - password: str = Opt( - "", - help="Password for communication methods (like WebREPL) that require authentication.", - ), + password: str = Opt("", help=_help_password), neopixel: bool = Option(False, help="Indicator is a neopixel."), ): """Display device firmware information and blink an LED.""" diff --git a/belay/device.py b/belay/device.py index 3dcfe9a..f5be4d0 100644 --- a/belay/device.py +++ b/belay/device.py @@ -3,6 +3,7 @@ import linecache import secrets import string +import subprocess # nosec import sys import tempfile from abc import ABC, abstractmethod @@ -12,6 +13,8 @@ from pathlib import Path from typing import Callable, Dict, Generator, List, Optional, Set, TextIO, Tuple, Union +import lox +from pathspec import PathSpec from serial import SerialException from . import snippets @@ -43,18 +46,19 @@ def _read_snippet(name): return pkg_resources.read_text(snippets, f"{name}.py") -def _local_hash_file(fn: str) -> int: - """Compute the FNV-1a 64-bit hash of a file.""" - h = 0xCBF29CE484222325 - size = 1 << 64 - with open(fn, "rb") as f: # noqa: PL123 +def _local_hash_file(fn: Union[str, Path]) -> int: + """Compute the FNV-1a 32-bit hash of a file.""" + fn = Path(fn) + h = 0x811C9DC5 + size = 1 << 32 + with fn.open("rb") as f: while True: data = f.read(65536) if not data: break for byte in data: h = h ^ byte - h = (h * 0x100000001B3) % size + h = (h * 0x01000193) % size return h @@ -62,6 +66,26 @@ def _random_python_identifier(n=16): return "_" + "".join(secrets.choice(_python_identifier_chars) for _ in range(n)) +class NotBelayResponse(Exception): + """Parsed response wasn't for Belay.""" + + +def _parse_belay_response(line): + if not line.startswith("_BELAY"): + raise NotBelayResponse + line = line[6:] + code, line = line[0], line[1:] + + if code == "R": + # Result + return ast.literal_eval(line) + elif code == "S": + # StopIteration + raise StopIteration + else: + raise ValueError(f'Received unknown code: "{code}"') + + class _Executer(ABC): def __init__(self, device): # To avoid Executer.__setattr__ raising an error @@ -266,6 +290,120 @@ def multi_executer(*args, **kwargs): return multi_executer +def _discover_files_dirs( + remote_dir: str, + local_file_or_folder: Path, + ignore: Optional[list] = None, +): + src_objects = [] + if local_file_or_folder.is_dir(): + if ignore is None: + ignore = [] + ignore_spec = PathSpec.from_lines("gitwildmatch", ignore) + for src_object in local_file_or_folder.rglob("*"): + if ignore_spec.match_file(str(src_object)): + continue + src_objects.append(src_object) + # Sort so that folder creation comes before file sending. + src_objects.sort() + + src_files, src_dirs = [], [] + for src_object in src_objects: + if src_object.is_dir(): + src_dirs.append(src_object) + else: + src_files.append(src_object) + dst_files = [ + remote_dir / src.relative_to(local_file_or_folder) for src in src_files + ] + else: + src_files = [local_file_or_folder] + src_dirs = [] + dst_files = [Path(remote_dir) / local_file_or_folder.name] + + return src_files, src_dirs, dst_files + + +def _preprocess_keep( + keep: Union[None, list, str, bool], + dst: str, +) -> list: + if keep is None: + if dst == "/": + keep = ["boot.py", "webrepl_cfg.py"] + else: + keep = [] + elif isinstance(keep, str): + keep = [keep] + elif isinstance(keep, (list, tuple)): + pass + elif isinstance(keep, bool): + keep = [] + else: + raise ValueError + keep = [str(dst / Path(x)) for x in keep] + return keep + + +def _preprocess_ignore(ignore: Union[None, str, list, tuple]) -> list: + if ignore is None: + ignore = ["*.pyc", "__pycache__", ".DS_Store", ".pytest_cache"] + elif isinstance(ignore, str): + ignore = [ignore] + elif isinstance(ignore, (list, tuple)): + ignore = list(ignore) + else: + raise ValueError + return ignore + + +def _preprocess_src_file( + tmp_dir: Union[str, Path], + src_file: Union[str, Path], + minify: bool, + mpy_cross_binary: Union[str, Path, None], +) -> Path: + tmp_dir = Path(tmp_dir) + src_file = Path(src_file) + + if src_file.is_absolute(): + transformed = tmp_dir / src_file.relative_to(tmp_dir.anchor) + else: + transformed = tmp_dir / src_file + transformed.parent.mkdir(parents=True, exist_ok=True) + + if src_file.suffix == ".py": + if mpy_cross_binary: + mpy_file = transformed.with_suffix(".mpy") + subprocess.check_output( # nosec + [mpy_cross_binary, "-o", mpy_file, src_file] + ) + return transformed + elif minify: + minified = minify_code(src_file.read_text()) + transformed.write_text(minified) + return transformed + + return src_file + + +@lox.thread(8) +def _preprocess_src_file_hash(*args, **kwargs): + src_file = _preprocess_src_file(*args, **kwargs) + src_hash = _local_hash_file(src_file) + return src_file, src_hash + + +def _generate_dst_dirs(dst, src, src_dirs) -> list: + dst_dirs = [str(dst / x.relative_to(src)) for x in src_dirs] + # Add all directories leading up to ``dst``. + dst_prefix_tokens = dst.split("/") + for i in range(2, len(dst_prefix_tokens) + (dst[-1] != "/")): + dst_dirs.append("/".join(dst_prefix_tokens[:i])) + dst_dirs.sort() + return dst_dirs + + @dataclass class Implementation: """Implementation dataclass detailing the device. @@ -369,7 +507,7 @@ def __call__( minify: bool = True, stream_out: TextIO = sys.stdout, record=True, - ) -> BelayReturn: + ): """Execute code on-device. Parameters @@ -411,18 +549,10 @@ def __call__( lines = res.split("\r\n") for line in lines[:-1]: - if line.startswith("_BELAY"): - line = line[6:] - code, line = line[0], line[1:] - - if code == "R": - # Result - return ast.literal_eval(line) - elif code == "S": - # StopIteration - raise StopIteration - else: - raise ValueError(f'Received unknown code: "{code}"') + try: + return _parse_belay_response(line) + except NotBelayResponse: + pass if stream_out: stream_out.write(line) @@ -431,35 +561,54 @@ def __call__( def sync( self, folder: Union[str, Path], + dst: str = "/", + keep: Union[None, list, str, bool] = None, + ignore: Union[None, list, str] = None, minify: bool = True, - keep: Union[None, list, str] = None, + mpy_cross_binary: Union[str, Path, None] = None, progress_update=None, ) -> None: - """Sync a local directory to the root of remote filesystem. + """Sync a local directory to the remote filesystem. For each local file, check the remote file's hash, and transfer if they differ. If a file/folder exists on the remote filesystem that doesn't exist in the local - folder, then delete it. + folder, then delete it (unless it's in ``keep``). Parameters ---------- folder: str, Path - Directory of files to sync to the root of the board's filesystem. + Single file or directory of files to sync to the root of the board's filesystem. + dst: str + Destination **directory** on device. + Defaults to unpacking ``folder`` to root. + keep: None | str | list | bool + Do NOT delete these file(s) on-device if not present in ``folder``. + If ``true``, don't delete any files on device. + If ``false``, delete all unsynced files (same as passing ``[]``). + If ``dst is None``, defaults to ``["boot.py", "webrepl_cfg.py"]``. + ignore: None | str | list + Git's wildmatch patterns to NOT sync to the device. + Defaults to ``["*.pyc", "__pycache__", ".DS_Store", ".pytest_cache"]``. minify: bool Minify python files prior to syncing. Defaults to ``True``. - keep: str or list - Do NOT delete these file(s) on-device if not present in ``folder``. - Defaults to ``["boot.py", "webrepl_cfg.py"]``. - progress: + mpy_cross_binary: Union[str, Path, None] + Path to mpy-cross binary. If provided, ``.py`` will automatically + be compiled. + Takes precedence over minifying. + progress_update: Partial for ``rich.progress.Progress.update(task_id,...)`` to update with sync status. """ folder = Path(folder).resolve() + dst = str(dst) + if not dst.startswith("/"): + raise ValueError('dst must start with "/"') + elif len(dst) > 1: + dst = dst.rstrip("/") + if not folder.exists(): raise ValueError(f'"{folder}" does not exist.') - if not folder.is_dir(): - raise ValueError(f'"{folder}" is not a directory.') # Create a list of all files and dirs (on-device). # This is so we know what to clean up after done syncing. @@ -469,26 +618,31 @@ def sync( # Remove the keep files from the on-device ``all_files`` set # so they don't get deleted. - if keep is None: - keep = ["boot.py", "webrepl_cfg.py"] - elif isinstance(keep, str): - keep = [keep] - keep = [x if x[0] == "/" else "/" + x for x in keep] + keep_all = folder.is_file() or keep is True + keep = _preprocess_keep(keep, dst) + ignore = _preprocess_ignore(ignore) - # Sort so that folder creation comes before file sending. - src_objects = sorted(folder.rglob("*")) - src_files, src_dirs = [], [] - for src_object in src_objects: - if src_object.is_dir(): - src_dirs.append(src_object) - else: - src_files.append(src_object) - dst_files = [f"/{src.relative_to(folder)}" for src in src_files] - dst_dirs = [f"/{src.relative_to(folder)}" for src in src_dirs] - dst_dirs.sort() - keep = [x for x in keep if x not in dst_files] - if dst_files + keep: - self(f"for x in {repr(dst_files + keep)}:\n all_files.discard(x)") + if keep_all: + # Don't build up the device of files, we won't be deleting anything + self("del __belay_fs") + else: + self(f"__belay_fs({repr(dst)}); all_dirs.sort(); del __belay_fs") + + src_files, src_dirs, dst_files = _discover_files_dirs(dst, folder, ignore) + + if mpy_cross_binary: + dst_files = [ + dst_file.with_suffix(".mpy") if dst_file.suffix == ".py" else dst_file + for dst_file in dst_files + ] + dst_files = [str(dst_file) for dst_file in dst_files] + dst_dirs = _generate_dst_dirs(dst, folder, src_dirs) + + if not keep_all: + # prevent keep duplicates in the concat'd file list + keep = [x for x in keep if x not in dst_files] + if dst_files + keep: + self(f"for x in {repr(dst_files + keep)}:\n all_files.discard(x)") # Try and make all remote dirs if dst_dirs: @@ -496,35 +650,40 @@ def sync( progress_update(description="Creating remote directories...") self(f"__belay_mkdirs({repr(dst_dirs)})") - # Get all remote hashes - if progress_update: - progress_update(description="Fetching remote hashes...") - dst_hashes = self(f"__belay_hfs({repr(dst_files)})") - - if len(dst_hashes) != len(dst_files): - raise Exception + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_dir = Path(tmp_dir) + for src_file in src_files: + # Pre-process files in another thread while we get remote hashes + _preprocess_src_file_hash.scatter( + tmp_dir, src_file, minify, mpy_cross_binary + ) - if progress_update: - progress_update(total=len(src_files)) - for src, dst, dst_hash in zip(src_files, dst_files, dst_hashes): + # Get all remote hashes if progress_update: - progress_update(description=f"Syncing: {dst[1:]}") + progress_update(description="Fetching remote hashes...") + dst_hashes = self(f"__belay_hfs({repr(dst_files)})") - with tempfile.TemporaryDirectory() as tmp_dir: - tmp_dir = Path(tmp_dir) + src_files_and_hashes = _preprocess_src_file_hash.gather() - if minify and src.suffix == ".py": - minified = minify_code(src.read_text()) - src = tmp_dir / src.name - src.write_text(minified) + if len(dst_hashes) != len(dst_files): + raise Exception - # All other files, just sync over. - src_hash = _local_hash_file(src) + puts = [] + for (src_file, src_hash), dst_file, dst_hash in zip( + src_files_and_hashes, dst_files, dst_hashes + ): if src_hash != dst_hash: - self._board.fs_put(src, dst) + puts.append((src_file, dst_file)) if progress_update: - progress_update(advance=1) + progress_update(total=len(puts)) + + for src_file, dst_file in puts: + if progress_update: + progress_update(description=f"Pushing: {dst_file[1:]}") + self._board.fs_put(src_file, dst_file) + if progress_update: + progress_update(advance=1) # Remove all the files and directories that did not exist in local filesystem. if progress_update: diff --git a/belay/snippets/sync_begin.py b/belay/snippets/sync_begin.py index 40caa23..9b32e82 100644 --- a/belay/snippets/sync_begin.py +++ b/belay/snippets/sync_begin.py @@ -1,22 +1,30 @@ # Creates and populates two set[str]: all_files, all_dirs import os -def __belay_hf(fn): - h = 0xcbf29ce484222325 - size = 1 << 64 +import micropython +@micropython.native +def __belay_hf(fn, buf): + # inherently is inherently modulo 32-bit arithmetic + @micropython.viper + def xor_mm(data, state: uint, prime: uint) -> uint: + for b in data: + state = uint((state ^ uint(b)) * prime) + return state + + h = 0x811c9dc5 try: - with open(fn, "rb") as f: - while True: - data = f.read(4096) - if not data: - break - for byte in data: - h = h ^ byte - h = (h * 0x100000001b3) % size + f = open(fn, "rb") + while True: + n = f.readinto(buf) + if n == 0: + break + h = xor_mm(buf[:n], h, 0x01000193) + f.close() except OSError: - return 0 + h = 0 return h def __belay_hfs(fns): - print("_BELAYR" + repr([__belay_hf(fn) for fn in fns])) + buf = memoryview(bytearray(4096)) + print("_BELAYR" + repr([__belay_hf(fn, buf) for fn in fns])) def __belay_mkdirs(fns): for fn in fns: try: @@ -24,17 +32,20 @@ def __belay_mkdirs(fns): except OSError: pass all_files, all_dirs = set(), [] -def __belay_fs(path=""): - for elem in os.listdir(path): - full_name = path + "/" + elem +def __belay_fs(path="/", check=True): + if not path: + path = "/" + elif not path.endswith("/"): + path += "/" + if check: try: - if os.stat(elem)[0] & 0x4000: # is_dir - all_dirs.append(full_name) - __belay_fs(full_name) - else: - all_files.add(full_name) + os.stat(path) except OSError: - pass -__belay_fs() -all_dirs.sort() -del __belay_fs + return + for elem in os.ilistdir(path): + full_name = path + elem[0] + if elem[1] & 0x4000: # is_dir + all_dirs.append(full_name) + __belay_fs(full_name, check=False) + else: + all_files.add(full_name) diff --git a/pyproject.toml b/pyproject.toml index fe0aea2..f425225 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,8 @@ belay = "belay.cli.main:app" python = "^3.8" pyserial = "^3.1" typer = {extras = ["all"], version = "^0.6"} +pathspec = "*" +lox = "^0.11.0" [tool.poetry.group.docs.dependencies] sphinx = "~4.5.0" @@ -40,6 +42,7 @@ pre_commit = "^2.16.0" pytest = "^7.1.2" pytest-cov = "^3.0.0" pytest-mock = "^3.7.0" +line_profiler = "^3.5.1" [tool.coverage.run] branch = true diff --git a/tests/cli/test_exec.py b/tests/cli/test_exec.py new file mode 100644 index 0000000..15c74c8 --- /dev/null +++ b/tests/cli/test_exec.py @@ -0,0 +1,5 @@ +def test_exec_basic(mocker, mock_device, cli_runner): + mock_device.patch("belay.cli.main.Device") + result = cli_runner("exec", "print('hello world')") + assert result.exit_code == 0 + mock_device.inst.assert_called_once_with("print('hello world')") diff --git a/tests/cli/test_identify.py b/tests/cli/test_identify.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/cli/test_info.py b/tests/cli/test_info.py new file mode 100644 index 0000000..49c69ee --- /dev/null +++ b/tests/cli/test_info.py @@ -0,0 +1,8 @@ +def test_info_basic(mocker, mock_device, cli_runner, tmp_path): + mock_device.patch("belay.cli.main.Device") + mock_device.inst.implementation.name = "testingpython" + mock_device.inst.implementation.version = (4, 7, 9) + mock_device.inst.implementation.platform = "pytest" + result = cli_runner("info") + assert result.exit_code == 0 + assert result.output == "testingpython v4.7.9 - pytest\n" diff --git a/tests/cli/test_run.py b/tests/cli/test_run.py new file mode 100644 index 0000000..713022c --- /dev/null +++ b/tests/cli/test_run.py @@ -0,0 +1,7 @@ +def test_run_basic(mocker, mock_device, cli_runner, tmp_path): + mock_device.patch("belay.cli.main.Device") + py_file = tmp_path / "foo.py" + py_file.write_text("print('hello')\nprint('world')") + result = cli_runner("run", str(py_file)) + assert result.exit_code == 0 + mock_device.inst.assert_called_once_with("print('hello')\nprint('world')") diff --git a/tests/cli/test_sync.py b/tests/cli/test_sync.py new file mode 100644 index 0000000..25b7d3c --- /dev/null +++ b/tests/cli/test_sync.py @@ -0,0 +1,15 @@ +from pathlib import PosixPath + + +def test_sync_basic(mocker, mock_device, cli_runner): + mock_device.patch("belay.cli.main.Device") + result = cli_runner("sync", "foo") + assert result.exit_code == 0 + mock_device.inst.sync.assert_called_once_with( + PosixPath("foo"), + dst="/", + keep=None, + ignore=None, + mpy_cross_binary=None, + progress_update=mocker.ANY, + ) diff --git a/tests/conftest.py b/tests/conftest.py index 6bbfbd4..ecae3d7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,43 @@ from distutils import dir_util +from functools import partial from pathlib import Path import pytest +from typer.testing import CliRunner + +from belay.cli import app + + +class MockDevice: + def __init__(self, mocker): + self.mocker = mocker + self.inst = mocker.MagicMock() + self.cls = None + + def patch(self, target: str): + self.cls = self.mocker.patch(target, return_value=self.inst) + + def cls_assert_common(self): + self.cls.assert_called_once_with("/dev/ttyUSB0", password="password") + + +@pytest.fixture +def mock_device(mocker): + return MockDevice(mocker) + + +@pytest.fixture +def cli_runner(mock_device): + cli_runner = CliRunner() + + def run(cmd, *args): + result = cli_runner.invoke( + app, [cmd, "/dev/ttyUSB0", *args, "--password", "password"] + ) + mock_device.cls_assert_common() + return result + + return run @pytest.fixture diff --git a/tests/test_device.py b/tests/test_device.py index 984d11d..f95de0d 100644 --- a/tests/test_device.py +++ b/tests/test_device.py @@ -1,8 +1,7 @@ -from unittest.mock import call - import pytest import belay +import belay.device @pytest.fixture @@ -25,7 +24,7 @@ def mock_device(mock_pyboard): def test_device_init(mock_device): - pass + """Just checks if everything in ``__init__`` is called fine.""" def test_device_init_no_startup(mock_pyboard): @@ -127,76 +126,21 @@ def test_device_traceback_execute(mocker, mock_device, tmp_path): assert exc_info.value.args[0] == expected_msg -@pytest.fixture -def sync_path(tmp_path): - (tmp_path / "alpha.py").write_text("def alpha():\n pass") - (tmp_path / "bar.txt").write_text("bar contents") - (tmp_path / "foo.txt").write_text("foo contents") - (tmp_path / "folder1" / "folder1_1").mkdir(parents=True) - (tmp_path / "folder1" / "file1.txt").write_text("file1 contents") - (tmp_path / "folder1" / "folder1_1" / "file1_1.txt").write_text("file1_1 contents") - - return tmp_path - - -def test_device_sync_empty_remote(mocker, mock_device, sync_path): - payload = ("_BELAYR" + repr([b""] * 5) + "\r\n").encode("utf-8") - mock_device._board.exec = mocker.MagicMock(return_value=payload) - - mock_device.sync(sync_path) - - mock_device._board.exec.assert_has_calls( - [ - call( - "for x in['/alpha.py','/bar.txt','/folder1/file1.txt','/folder1/folder1_1/file1_1.txt','/foo.txt','/boot.py','/webrepl_cfg.py']:\n all_files.discard(x)" - ), - call("__belay_mkdirs(['/folder1','/folder1/folder1_1'])"), - call( - "__belay_hfs(['/alpha.py','/bar.txt','/folder1/file1.txt','/folder1/folder1_1/file1_1.txt','/foo.txt'])" - ), - ] - ) +def test_parse_belay_response_unknown(): + with pytest.raises(ValueError): + belay.device._parse_belay_response("_BELAYA") - mock_device._board.fs_put.assert_has_calls( - [ - call(sync_path / "bar.txt", "/bar.txt"), - call(sync_path / "folder1/file1.txt", "/folder1/file1.txt"), - call( - sync_path / "folder1/folder1_1/file1_1.txt", - "/folder1/folder1_1/file1_1.txt", - ), - call(sync_path / "foo.txt", "/foo.txt"), - ] - ) +def test_parse_belay_response_stop_iteration(): + with pytest.raises(StopIteration): + belay.device._parse_belay_response("_BELAYS") -def test_device_sync_partial_remote(mocker, mock_device, sync_path): - def __belay_hfs(fns): - out = [] - for fn in fns: - local_fn = sync_path / fn[1:] - if local_fn.stem.endswith("1"): - out.append(b"\x00") - else: - out.append(belay.device._local_hash_file(local_fn)) - return out - - def side_effect(cmd): - if not cmd.startswith("__belay_hfs"): - return b"" - nonlocal __belay_hfs - return ("_BELAYR" + repr(eval(cmd)) + "\r\n").encode("utf-8") - - mock_device._board.exec = mocker.MagicMock(side_effect=side_effect) - - mock_device.sync(sync_path) - - mock_device._board.fs_put.assert_has_calls( - [ - call(sync_path / "folder1/file1.txt", "/folder1/file1.txt"), - call( - sync_path / "folder1/folder1_1/file1_1.txt", - "/folder1/folder1_1/file1_1.txt", - ), - ] - ) + +def test_parse_belay_response_r(): + assert [1, 2, 3] == belay.device._parse_belay_response("_BELAYR[1,2,3]") + assert 1 == belay.device._parse_belay_response("_BELAYR1") + assert 1.23 == belay.device._parse_belay_response("_BELAYR1.23") + assert "a" == belay.device._parse_belay_response("_BELAYR'a'") + assert {1} == belay.device._parse_belay_response("_BELAYR{1}") + assert b"foo" == belay.device._parse_belay_response("_BELAYRb'foo'") + assert belay.device._parse_belay_response("_BELAYRFalse") is False diff --git a/tests/test_device_sync.py b/tests/test_device_sync.py new file mode 100644 index 0000000..93889c9 --- /dev/null +++ b/tests/test_device_sync.py @@ -0,0 +1,367 @@ +import os +from pathlib import Path, PosixPath +from unittest.mock import call + +import pytest + +import belay +import belay.device + + +def uint(x): + """For patching micropython.viper code for testing.""" + return x % (1 << 32) + + +def ilistdir(x): + for name in os.listdir(x): + stat = os.stat(x + "/" + name) # noqa: PL116 + yield (name, stat.st_mode, stat.st_ino) + + +@pytest.fixture +def mock_pyboard(mocker): + def mock_init(self, *args, **kwargs): + self.serial = None + + exec_side_effect = [b'_BELAYR("micropython", (1, 19, 1), "rp2")\r\n'] * 100 + + mocker.patch.object(belay.device.Pyboard, "__init__", mock_init) + mocker.patch("belay.device.Pyboard.enter_raw_repl", return_value=None) + mocker.patch("belay.device.Pyboard.exec", side_effect=exec_side_effect) + mocker.patch("belay.device.Pyboard.fs_put") + + +@pytest.fixture +def mock_device(mock_pyboard): + device = belay.Device() + return device + + +@pytest.fixture +def sync_path(tmp_path): + (tmp_path / "alpha.py").write_text("def alpha():\n pass") + (tmp_path / "bar.txt").write_text("bar contents") + (tmp_path / "foo.txt").write_text("foo contents") + (tmp_path / "folder1" / "folder1_1").mkdir(parents=True) + (tmp_path / "folder1" / "file1.txt").write_text("file1 contents") + (tmp_path / "folder1" / "folder1_1" / "file1_1.txt").write_text("file1_1 contents") + + return tmp_path + + +@pytest.fixture +def sync_begin(): + snippet = belay.device._read_snippet("sync_begin") + # Patch out micropython stuff + lines = snippet.split("\n") + lines = [x for x in lines if "micropython" not in x] + snippet = "\n".join(lines) + snippet = snippet.replace("os.ilistdir", "ilistdir") + exec(snippet, globals()) + + +def test_sync_local_belay_hf(sync_begin, tmp_path): + """Test local FNV-1a hash implementation. + + Test vector from: http://www.isthe.com/chongo/src/fnv/test_fnv.c + """ + f = tmp_path / "test_file" + f.write_text("foobar") + actual = belay.device._local_hash_file(f) + assert actual == 0xBF9CF968 + + +def test_sync_device_belay_hf(sync_begin, tmp_path): + """Test on-device FNV-1a hash implementation.""" + f = tmp_path / "test_file" + f.write_text("foobar") + buf = memoryview(bytearray(4096)) + actual = __belay_hf(str(f), buf) # noqa: F821 + assert actual == 0xBF9CF968 + + +def test_sync_device_belay_hfs(sync_begin, capsys, tmp_path): + fooba_file = tmp_path / "fooba_file" + fooba_file.write_text("fooba") + + foobar_file = tmp_path / "foobar_file" + foobar_file.write_text("foobar") + + return_value = __belay_hfs([str(fooba_file), str(foobar_file)]) # noqa: F821 + assert return_value is None + captured = capsys.readouterr() + assert captured.out == f"_BELAYR[{0x39aaa18a}, {0xbf9cf968}]\n" + + +def test_sync_device_belay_mkdirs(sync_begin, tmp_path): + paths = [ + tmp_path, + tmp_path / "foo1", + tmp_path / "foo1" / "foo2", + tmp_path / "bar1", + tmp_path / "bar1" / "bar2", + ] + paths = [str(x) for x in paths] + __belay_mkdirs(paths) # noqa: F821 + assert (tmp_path).is_dir() + assert (tmp_path / "foo1").is_dir() + assert (tmp_path / "foo1" / "foo2").is_dir() + assert (tmp_path / "bar1").is_dir() + assert (tmp_path / "bar1" / "bar2").is_dir() + + +def test_sync_device_belay_fs_basic(sync_begin, tmp_path): + expected_files = { + tmp_path / "foo1" / "foo2" / "foo_file.py", + tmp_path / "bar1" / "bar2" / "bar_file.py", + } + expected_dirs = { + tmp_path / "foo1", + tmp_path / "foo1" / "foo2", + tmp_path / "bar1", + tmp_path / "bar1" / "bar2", + } + for f in expected_files: + f.parent.mkdir(parents=True, exist_ok=True) + f.touch() + + __belay_fs(str(tmp_path)) # noqa: F821 + _all_files = {Path(x) for x in all_files} # noqa: F821 + _all_dirs = {Path(x) for x in all_dirs} # noqa: F821 + + assert _all_files == expected_files + assert _all_dirs == expected_dirs + + +def test_sync_device_belay_fs_does_not_exist(sync_begin, tmp_path): + non_existing_dir = tmp_path / "does_not_exist" + __belay_fs(str(non_existing_dir)) # noqa: F821 + assert all_files == set() # noqa: F821 + assert all_dirs == [] # noqa: F821 + + +def test_device_sync_empty_remote(mocker, mock_device, sync_path): + payload = ("_BELAYR" + repr([b""] * 5) + "\r\n").encode("utf-8") + mock_device._board.exec = mocker.MagicMock(return_value=payload) + + mock_device.sync(sync_path) + + mock_device._board.exec.assert_has_calls( + [ + call( + "for x in['/alpha.py','/bar.txt','/folder1/file1.txt','/folder1/folder1_1/file1_1.txt','/foo.txt','/boot.py','/webrepl_cfg.py']:\n all_files.discard(x)" + ), + call("__belay_mkdirs(['/folder1','/folder1/folder1_1'])"), + call( + "__belay_hfs(['/alpha.py','/bar.txt','/folder1/file1.txt','/folder1/folder1_1/file1_1.txt','/foo.txt'])" + ), + ] + ) + + mock_device._board.fs_put.assert_has_calls( + [ + call(sync_path / "bar.txt", "/bar.txt"), + call(sync_path / "folder1/file1.txt", "/folder1/file1.txt"), + call( + sync_path / "folder1/folder1_1/file1_1.txt", + "/folder1/folder1_1/file1_1.txt", + ), + call(sync_path / "foo.txt", "/foo.txt"), + ] + ) + + +def test_device_sync_partial_remote(mocker, mock_device, sync_path): + def __belay_hfs(fns): + out = [] + for fn in fns: + local_fn = sync_path / fn[1:] + if local_fn.stem.endswith("1"): + out.append(b"\x00") + else: + out.append(belay.device._local_hash_file(local_fn)) + return out + + def side_effect(cmd): + if not cmd.startswith("__belay_hfs"): + return b"" + nonlocal __belay_hfs + return ("_BELAYR" + repr(eval(cmd)) + "\r\n").encode("utf-8") + + mock_device._board.exec = mocker.MagicMock(side_effect=side_effect) + + mock_device.sync(sync_path) + + mock_device._board.fs_put.assert_has_calls( + [ + call(sync_path / "folder1/file1.txt", "/folder1/file1.txt"), + call( + sync_path / "folder1/folder1_1/file1_1.txt", + "/folder1/folder1_1/file1_1.txt", + ), + ] + ) + + +def test_discover_files_dirs_dir(tmp_path): + (tmp_path / "file1.ext").touch() + (tmp_path / "file2.ext").touch() + (tmp_path / "folder1").mkdir() + (tmp_path / "folder1" / "file3.ext").touch() + + remote_dir = "/foo/bar" + src_files, src_dirs, dst_files = belay.device._discover_files_dirs( + remote_dir=remote_dir, + local_file_or_folder=tmp_path, + ) + + src_files = [x.relative_to(tmp_path) for x in src_files] + src_dirs = [x.relative_to(tmp_path) for x in src_dirs] + assert src_files == [ + PosixPath("file1.ext"), + PosixPath("file2.ext"), + PosixPath("folder1/file3.ext"), + ] + assert src_dirs == [PosixPath("folder1")] + assert dst_files == [ + PosixPath("/foo/bar/file1.ext"), + PosixPath("/foo/bar/file2.ext"), + PosixPath("/foo/bar/folder1/file3.ext"), + ] + + +def test_discover_files_dirs_dir_ignore(tmp_path): + (tmp_path / "file1.ext").touch() + (tmp_path / "file2.pyc").touch() + (tmp_path / "folder1").mkdir() + (tmp_path / "folder1" / "file3.ext").touch() + + remote_dir = "/foo/bar" + src_files, src_dirs, dst_files = belay.device._discover_files_dirs( + remote_dir=remote_dir, + local_file_or_folder=tmp_path, + ignore=["*.pyc"], + ) + + src_files = [x.relative_to(tmp_path) for x in src_files] + src_dirs = [x.relative_to(tmp_path) for x in src_dirs] + assert src_files == [ + PosixPath("file1.ext"), + PosixPath("folder1/file3.ext"), + ] + assert src_dirs == [PosixPath("folder1")] + assert dst_files == [ + PosixPath("/foo/bar/file1.ext"), + PosixPath("/foo/bar/folder1/file3.ext"), + ] + + +def test_discover_files_dirs_empty(tmp_path): + remote_dir = "/foo/bar" + src_files, src_dirs, dst_files = belay.device._discover_files_dirs( + remote_dir=remote_dir, + local_file_or_folder=tmp_path, + ) + + assert src_files == [] + assert src_dirs == [] + assert dst_files == [] + + +def test_discover_files_dirs_single_file(tmp_path): + single_file = tmp_path / "file1.ext" + single_file.touch() + + remote_dir = "/foo/bar" + src_files, src_dirs, dst_files = belay.device._discover_files_dirs( + remote_dir=remote_dir, + local_file_or_folder=single_file, + ) + + src_files = [x.relative_to(tmp_path) for x in src_files] + src_dirs = [x.relative_to(tmp_path) for x in src_dirs] + assert src_files == [PosixPath("file1.ext")] + assert src_dirs == [] + assert dst_files == [PosixPath("/foo/bar/file1.ext")] + + +def test_preprocess_keep_none_root(): + actual = belay.device._preprocess_keep(None, "/") + assert actual == ["/boot.py", "/webrepl_cfg.py"] + + +def test_preprocess_keep_none_nonroot(): + actual = belay.device._preprocess_keep(None, "/foo") + assert actual == [] + + +def test_preprocess_keep_list(): + actual = belay.device._preprocess_keep(["foo"], "/") + assert actual == ["/foo"] + + +def test_preprocess_keep_str(): + actual = belay.device._preprocess_keep("foo", "/") + assert actual == ["/foo"] + + +def test_preprocess_keep_bool_true(): + actual = belay.device._preprocess_keep(True, "/") + assert actual == [] + + +def test_preprocess_keep_bool_false(): + actual = belay.device._preprocess_keep(False, "/") + assert actual == [] + + +def test_preprocess_ignore_none(): + actual = belay.device._preprocess_ignore(None) + assert actual == ["*.pyc", "__pycache__", ".DS_Store", ".pytest_cache"] + + +def test_preprocess_ignore_list(): + actual = belay.device._preprocess_ignore(["foo", "bar"]) + assert actual == ["foo", "bar"] + + +def test_preprocess_ignore_str(): + actual = belay.device._preprocess_ignore("foo") + assert actual == ["foo"] + + +def test_preprocess_src_file_default_py(tmp_path): + actual = belay.device._preprocess_src_file(tmp_path, "foo/bar/baz.py", False, None) + assert actual == Path("foo/bar/baz.py") + + +def test_preprocess_src_file_default_generic(tmp_path): + actual = belay.device._preprocess_src_file( + tmp_path, "foo/bar/baz.generic", False, None + ) + assert actual == Path("foo/bar/baz.generic") + + +def test_generate_dst_dirs(): + dst = "/foo/bar" + src = Path("/bloop/bleep") + src_dirs = [ + src / "dir1", + src / "dir1" / "dir1_1", + src / "dir1" / "dir1_2", + src / "dir2", + src / "dir2" / "dir2_1", + src / "dir2" / "dir2_2", + ] + dst_dirs = belay.device._generate_dst_dirs(dst, src, src_dirs) + assert dst_dirs == [ + "/foo", + "/foo/bar", + "/foo/bar/dir1", + "/foo/bar/dir1/dir1_1", + "/foo/bar/dir1/dir1_2", + "/foo/bar/dir2", + "/foo/bar/dir2/dir2_1", + "/foo/bar/dir2/dir2_2", + ]