diff --git a/logot/_match.py b/logot/_match.py index 2416947c..4e0694ee 100644 --- a/logot/_match.py +++ b/logot/_match.py @@ -39,8 +39,8 @@ } -def _match_regex(pattern: re.Pattern[str], value: str) -> bool: - return pattern.fullmatch(value) is not None +def _match_regex(pattern: re.Pattern[str], msg: str) -> bool: + return pattern.fullmatch(msg) is not None def compile_matcher(pattern: str) -> Matcher: diff --git a/logot/_util.py b/logot/_util.py index d6d298f1..0ec89239 100644 --- a/logot/_util.py +++ b/logot/_util.py @@ -1,23 +1,19 @@ from __future__ import annotations import logging -from typing import NewType -# An integer log level corresponding to a registered log level. -LevelNo = NewType("LevelNo", int) - -def to_levelno(level: int | str) -> LevelNo: +def to_levelno(level: int | str) -> int: # Handle `int` level. if isinstance(level, int): if logging.getLevelName(level).startswith("Level "): raise ValueError(f"Unknown level: {level!r}") - return LevelNo(level) + return level # Handle `str` level. if isinstance(level, str): levelno = logging.getLevelName(level) if not isinstance(levelno, int): raise ValueError(f"Unknown level: {level!r}") - return LevelNo(levelno) + return levelno # Fail on other types. raise TypeError(f"Invalid level: {level!r}") diff --git a/logot/logged.py b/logot/logged.py new file mode 100644 index 00000000..4a7834ca --- /dev/null +++ b/logot/logged.py @@ -0,0 +1,188 @@ +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod + +from logot._match import compile_matcher +from logot._util import to_levelno + + +class Logged(ABC): + __slots__ = () + + def __rshift__(self, log: Logged) -> Logged: + return _OrderedAllLogged.from_compose(self, log) + + def __and__(self, log: Logged) -> Logged: + return _UnorderedAllLogged.from_compose(self, log) + + def __or__(self, log: Logged) -> Logged: + return _AnyLogged.from_compose(self, log) + + def __str__(self) -> str: + return self._str(indent="") + + @abstractmethod + def __eq__(self, other: object) -> bool: + raise NotImplementedError + + @abstractmethod + def __repr__(self) -> str: + raise NotImplementedError + + @abstractmethod + def _reduce(self, record: logging.LogRecord) -> Logged | None: + raise NotImplementedError + + @abstractmethod + def _str(self, *, indent: str) -> str: + raise NotImplementedError + + +class _LogRecordLogged(Logged): + __slots__ = ("_levelno", "_msg", "_matcher") + + def __init__(self, levelno: int, msg: str) -> None: + self._levelno = levelno + self._msg = msg + self._matcher = compile_matcher(msg) + + def __eq__(self, other: object) -> bool: + return isinstance(other, _LogRecordLogged) and other._levelno == self._levelno and other._msg == self._msg + + def __repr__(self) -> str: + return f"log({logging.getLevelName(self._levelno)!r}, {self._msg!r})" + + def _reduce(self, record: logging.LogRecord) -> Logged | None: + if self._levelno == record.levelno and self._matcher(record.getMessage()): + return None + return self + + def _str(self, *, indent: str) -> str: + return f"[{logging.getLevelName(self._levelno)}] {self._msg}" + + +def log(level: int | str, msg: str) -> Logged: + return _LogRecordLogged(to_levelno(level), msg) + + +def debug(msg: str) -> Logged: + return _LogRecordLogged(logging.DEBUG, msg) + + +def info(msg: str) -> Logged: + return _LogRecordLogged(logging.INFO, msg) + + +def warning(msg: str) -> Logged: + return _LogRecordLogged(logging.WARNING, msg) + + +def error(msg: str) -> Logged: + return _LogRecordLogged(logging.ERROR, msg) + + +def critical(msg: str) -> Logged: + return _LogRecordLogged(logging.CRITICAL, msg) + + +class _ComposedLogged(Logged): + __slots__ = ("_logs",) + + def __init__(self, logs: tuple[Logged, ...]) -> None: + assert len(logs) > 1, "Unreachable" + self._logs = logs + + def __eq__(self, other: object) -> bool: + return isinstance(other, self.__class__) and other._logs == self._logs + + @classmethod + def from_compose(cls, log_a: Logged, log_b: Logged) -> Logged: + # If possible, flatten nested logs of the same type. + if isinstance(log_a, cls): + if isinstance(log_b, cls): + return cls((*log_a._logs, *log_b._logs)) + return cls((*log_a._logs, log_b)) + if isinstance(log_b, cls): + return cls((log_a, *log_b._logs)) + # Wrap the logs without flattening. + return cls((log_a, log_b)) + + @classmethod + def from_reduce(cls, logs: tuple[Logged, ...]) -> Logged | None: + assert logs, "Unreachable" + # If there is a single log, do not wrap it. + if len(logs) == 1: + return logs[0] + # Wrap the logs. + return cls(logs) + + +class _OrderedAllLogged(_ComposedLogged): + __slots__ = () + + def __repr__(self) -> str: + return f"({' >> '.join(map(repr, self._logs))})" + + def _reduce(self, record: logging.LogRecord) -> Logged | None: + log = self._logs[0] + reduced_log = log._reduce(record) + # Handle full reduction. + if reduced_log is None: + return _OrderedAllLogged.from_reduce(self._logs[1:]) + # Handle partial reduction. + if reduced_log is not log: + return _OrderedAllLogged((reduced_log, *self._logs[1:])) + # Handle no reduction. + return self + + def _str(self, *, indent: str) -> str: + return f"\n{indent}".join(log._str(indent=indent) for log in self._logs) + + +class _UnorderedAllLogged(_ComposedLogged): + __slots__ = () + + def __repr__(self) -> str: + return f"({' & '.join(map(repr, self._logs))})" + + def _reduce(self, record: logging.LogRecord) -> Logged | None: + for n, log in enumerate(self._logs): + reduced_log = log._reduce(record) + # Handle full reduction. + if reduced_log is None: + return _UnorderedAllLogged.from_reduce((*self._logs[:n], *self._logs[n + 1 :])) + # Handle partial reduction. + if reduced_log is not log: + return _UnorderedAllLogged((*self._logs[:n], reduced_log, *self._logs[n + 1 :])) + # Handle no reduction. + return self + + def _str(self, *, indent: str) -> str: + nested_indent = indent + " " + logs_str = "".join(f"\n{indent}- {log._str(indent=nested_indent)}" for log in self._logs) + return f"Unordered:{logs_str}" + + +class _AnyLogged(_ComposedLogged): + __slots__ = () + + def __repr__(self) -> str: + return f"({' | '.join(map(repr, self._logs))})" + + def _reduce(self, record: logging.LogRecord) -> Logged | None: + for n, log in enumerate(self._logs): + reduced_log = log._reduce(record) + # Handle full reduction. + if reduced_log is None: + return None + # Handle partial reduction. + if reduced_log is not log: + return _AnyLogged((*self._logs[:n], reduced_log, *self._logs[n + 1 :])) + # Handle no reduction. + return self + + def _str(self, *, indent: str) -> str: + nested_indent = indent + " " + logs_str = "".join(f"\n{indent}- {log._str(indent=nested_indent)}" for log in self._logs) + return f"Any:{logs_str}" diff --git a/poetry.lock b/poetry.lock index f73e96e5..73d9afa7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -297,13 +297,13 @@ sphinx-basic-ng = "*" [[package]] name = "hypothesis" -version = "6.96.1" +version = "6.96.2" description = "A library for property-based testing" optional = false python-versions = ">=3.8" files = [ - {file = "hypothesis-6.96.1-py3-none-any.whl", hash = "sha256:848ea0952f0bdfd02eac59e41b03f1cbba8fa2cffeffa8db328bbd6cfe159974"}, - {file = "hypothesis-6.96.1.tar.gz", hash = "sha256:955a57e56be4607c81c17ca53e594af54aadeed91e07b88bb7f84e8208ea7739"}, + {file = "hypothesis-6.96.2-py3-none-any.whl", hash = "sha256:34b5fb4c487f159083fa5db186b4357b42a27e597795e9cbccead8cde0242060"}, + {file = "hypothesis-6.96.2.tar.gz", hash = "sha256:524a0ac22c8dfff640f21f496b85ee193a470e8570ab7707b8e3bfccd7da34a6"}, ] [package.dependencies] diff --git a/pyproject.toml b/pyproject.toml index 24abf2ec..379317fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "logot" -version = "0.0.1a3" +version = "0.0.1a4" description = "Log-based testing" authors = ["Dave Hall "] license = "MIT" diff --git a/tests/test_logged.py b/tests/test_logged.py new file mode 100644 index 00000000..7f2464be --- /dev/null +++ b/tests/test_logged.py @@ -0,0 +1,300 @@ +from __future__ import annotations + +import logging + +from logot import logged + + +def record(level: int, msg: str) -> logging.LogRecord: + return logging.LogRecord(name="logot", level=level, pathname=__file__, lineno=0, msg=msg, args=(), exc_info=None) + + +def assert_reduce(log: logged.Logged | None, *records: logging.LogRecord) -> None: + for record in records: + # The `Logged` should not have been fully reduced. + assert log is not None + log = log._reduce(record) + # Once all log records are consumed, the `Logged` should have been fully-reduced. + assert log is None + + +def test_log_record_logged_eq_pass() -> None: + assert logged.info("foo bar") == logged.info("foo bar") + + +def test_log_record_logged_eq_fail() -> None: + # Different levels are not equal. + assert logged.info("foo bar") != logged.debug("foo bar") + # Different messages are not equal. + assert logged.info("foo bar") != logged.info("foo") + + +def test_log_record_logged_repr() -> None: + assert repr(logged.log(logging.DEBUG, "foo bar")) == "log('DEBUG', 'foo bar')" + assert repr(logged.debug("foo bar")) == "log('DEBUG', 'foo bar')" + assert repr(logged.info("foo bar")) == "log('INFO', 'foo bar')" + assert repr(logged.warning("foo bar")) == "log('WARNING', 'foo bar')" + assert repr(logged.error("foo bar")) == "log('ERROR', 'foo bar')" + assert repr(logged.critical("foo bar")) == "log('CRITICAL', 'foo bar')" + + +def test_log_record_logged_str() -> None: + assert str(logged.log(logging.DEBUG, "foo bar")) == "[DEBUG] foo bar" + assert str(logged.debug("foo bar")) == "[DEBUG] foo bar" + assert str(logged.info("foo bar")) == "[INFO] foo bar" + assert str(logged.warning("foo bar")) == "[WARNING] foo bar" + assert str(logged.error("foo bar")) == "[ERROR] foo bar" + assert str(logged.critical("foo bar")) == "[CRITICAL] foo bar" + + +def test_log_record_logged_reduce() -> None: + assert_reduce( + logged.info("foo bar"), + record(logging.INFO, "boom!"), # Non-matching. + record(logging.DEBUG, "foo bar"), # Non-matching. + record(logging.INFO, "foo bar"), # Matching. + ) + + +def test_ordered_all_logged_eq_pass() -> None: + assert (logged.info("foo") >> logged.info("bar")) == (logged.info("foo") >> logged.info("bar")) + + +def test_ordered_all_logged_eq_fail() -> None: + # Different orderings are not equal. + assert (logged.info("foo") >> logged.info("bar")) != (logged.info("bar") >> logged.info("foo")) + # Different operators are not equal. + assert (logged.info("foo") >> logged.info("bar")) != (logged.info("foo") & logged.info("bar")) + + +def test_ordered_all_logged_repr() -> None: + # Composed `Logged` are flattened from the left. + assert ( + repr((logged.info("foo") >> logged.info("bar")) >> logged.info("baz")) + == "(log('INFO', 'foo') >> log('INFO', 'bar') >> log('INFO', 'baz'))" + ) + # Composed `Logged` are flattened from the right. + assert ( + repr(logged.info("foo") >> (logged.info("bar") >> logged.info("baz"))) + == "(log('INFO', 'foo') >> log('INFO', 'bar') >> log('INFO', 'baz'))" + ) + # Composed `Logged` are flattened from the left and right. + assert ( + repr((logged.info("foo") >> logged.info("bar")) >> (logged.info("baz") >> logged.info("bat"))) + == "(log('INFO', 'foo') >> log('INFO', 'bar') >> log('INFO', 'baz') >> log('INFO', 'bat'))" + ) + + +def test_ordered_all_logged_str() -> None: + assert str(logged.info("foo") >> logged.info("bar")) == "\n".join( + ( + "[INFO] foo", + "[INFO] bar", + ) + ) + # Indentation is sane with multiple nested composed `Logged`. + assert str( + (logged.info("foo1") & logged.info("foo2")) + >> ( + (logged.info("bar1a") | logged.info("bar1b")) + & ((logged.info("bar2a1") >> logged.info("bar2a2")) | (logged.info("bar2b1") >> logged.info("bar2b2"))) + ) + ) == "\n".join( + ( + "Unordered:", + "- [INFO] foo1", + "- [INFO] foo2", + "Unordered:", + "- Any:", + " - [INFO] bar1a", + " - [INFO] bar1b", + "- Any:", + " - [INFO] bar2a1", + " [INFO] bar2a2", + " - [INFO] bar2b1", + " [INFO] bar2b2", + ) + ) + + +def test_ordered_all_logged_reduce() -> None: + assert_reduce( + logged.info("foo") >> logged.info("bar") >> logged.info("baz"), + record(logging.INFO, "boom!"), # Non-matching. + record(logging.INFO, "baz"), # Non-matching. + record(logging.INFO, "bar"), # Non-matching. + record(logging.INFO, "foo"), # Matching. + record(logging.INFO, "foo"), # Non-matching. + record(logging.INFO, "bar"), # Matching. + record(logging.INFO, "baz"), # Matching. + ) + assert_reduce( + (logged.info("foo1") & logged.info("foo2")) >> (logged.info("bar1") & logged.info("bar2")), + record(logging.INFO, "boom!"), # Non-matching. + record(logging.INFO, "bar2"), # Non-matching. + record(logging.INFO, "foo2"), # Matching. + record(logging.INFO, "bar1"), # Non-matching. + record(logging.INFO, "foo1"), # Matching. + record(logging.INFO, "bar2"), # Matching. + record(logging.INFO, "bar1"), # Matching. + ) + + +def test_unordered_all_logged_eq_pass() -> None: + assert (logged.info("foo") & logged.info("bar")) == (logged.info("foo") & logged.info("bar")) + + +def test_unordered_all_logged_eq_fail() -> None: + # Different orderings are not equal. + assert (logged.info("foo") & logged.info("bar")) != (logged.info("bar") & logged.info("foo")) + # Different operators are not equal. + assert (logged.info("foo") & logged.info("bar")) != (logged.info("foo") >> logged.info("bar")) + + +def test_unordered_all_logged_repr() -> None: + # Composed `Logged` are flattened from the left. + assert ( + repr((logged.info("foo") & logged.info("bar")) & logged.info("baz")) + == "(log('INFO', 'foo') & log('INFO', 'bar') & log('INFO', 'baz'))" + ) + # Composed `Logged` are flattened from the right. + assert ( + repr(logged.info("foo") & (logged.info("bar") & logged.info("baz"))) + == "(log('INFO', 'foo') & log('INFO', 'bar') & log('INFO', 'baz'))" + ) + # Composed `Logged` are flattened from the left and right. + assert ( + repr((logged.info("foo") & logged.info("bar")) & (logged.info("baz") & logged.info("bat"))) + == "(log('INFO', 'foo') & log('INFO', 'bar') & log('INFO', 'baz') & log('INFO', 'bat'))" + ) + + +def test_unordered_all_logged_str() -> None: + assert str(logged.info("foo") & logged.info("bar")) == "\n".join( + ( + "Unordered:", + "- [INFO] foo", + "- [INFO] bar", + ) + ) + # Indentation is sane with multiple nested composed `Logged`. + assert str( + (logged.info("foo1") >> logged.info("foo2")) + & ( + (logged.info("bar1a") | logged.info("bar1b")) + >> ((logged.info("bar2a1") >> logged.info("bar2a2")) | (logged.info("bar2b1") >> logged.info("bar2b2"))) + ) + ) == "\n".join( + ( + "Unordered:", + "- [INFO] foo1", + " [INFO] foo2", + "- Any:", + " - [INFO] bar1a", + " - [INFO] bar1b", + " Any:", + " - [INFO] bar2a1", + " [INFO] bar2a2", + " - [INFO] bar2b1", + " [INFO] bar2b2", + ) + ) + + +def test_unordered_all_logged_reduce() -> None: + assert_reduce( + logged.info("foo") & logged.info("bar") & logged.info("baz"), + record(logging.INFO, "boom!"), # Non-matching. + record(logging.INFO, "baz"), # Matching. + record(logging.INFO, "baz"), # Non-matching. + record(logging.INFO, "bar"), # Matching. + record(logging.INFO, "foo"), # Matching. + ) + assert_reduce( + (logged.info("foo1") >> logged.info("foo2")) & (logged.info("bar1") >> logged.info("bar2")), + record(logging.INFO, "boom!"), # Non-matching. + record(logging.INFO, "bar2"), # Non-matching. + record(logging.INFO, "foo2"), # Non-matching. + record(logging.INFO, "bar1"), # Matching. + record(logging.INFO, "foo1"), # Matching. + record(logging.INFO, "foo2"), # Matching. + record(logging.INFO, "bar2"), # Matching. + ) + + +def test_any_logged_eq_pass() -> None: + assert (logged.info("foo") | logged.info("bar")) == (logged.info("foo") | logged.info("bar")) + + +def test_any_logged_eq_fail() -> None: + # Different orderings are not equal. + assert (logged.info("foo") | logged.info("bar")) != (logged.info("bar") | logged.info("foo")) + # Different operators are not equal. + assert (logged.info("foo") | logged.info("bar")) != (logged.info("foo") >> logged.info("bar")) + + +def test_any_logged_repr() -> None: + # Composed `Logged` are flattened from the left. + assert ( + repr((logged.info("foo") | logged.info("bar")) | logged.info("baz")) + == "(log('INFO', 'foo') | log('INFO', 'bar') | log('INFO', 'baz'))" + ) + # Composed `Logged` are flattened from the right. + assert ( + repr(logged.info("foo") | (logged.info("bar") | logged.info("baz"))) + == "(log('INFO', 'foo') | log('INFO', 'bar') | log('INFO', 'baz'))" + ) + # Composed `Logged` are flattened from the left and right. + assert ( + repr((logged.info("foo") | logged.info("bar")) | (logged.info("baz") | logged.info("bat"))) + == "(log('INFO', 'foo') | log('INFO', 'bar') | log('INFO', 'baz') | log('INFO', 'bat'))" + ) + + +def test_any_logged_str() -> None: + assert str(logged.info("foo") | logged.info("bar")) == "\n".join( + ( + "Any:", + "- [INFO] foo", + "- [INFO] bar", + ) + ) + # Indentation is sane with multiple nested composed `Logged`. + assert str( + (logged.info("foo1") >> logged.info("foo2")) + | ( + (logged.info("bar1a") & logged.info("bar1b")) + >> ((logged.info("bar2a1") >> logged.info("bar2a2")) & (logged.info("bar2b1") >> logged.info("bar2b2"))) + ) + ) == "\n".join( + ( + "Any:", + "- [INFO] foo1", + " [INFO] foo2", + "- Unordered:", + " - [INFO] bar1a", + " - [INFO] bar1b", + " Unordered:", + " - [INFO] bar2a1", + " [INFO] bar2a2", + " - [INFO] bar2b1", + " [INFO] bar2b2", + ) + ) + + +def test_any_logged_reduce() -> None: + assert_reduce( + logged.info("foo") | logged.info("bar") | logged.info("baz"), + record(logging.INFO, "boom!"), # Non-matching. + record(logging.INFO, "bar"), # Matching. + ) + assert_reduce( + (logged.info("foo1") >> logged.info("foo2")) | (logged.info("bar1") >> logged.info("bar2")), + record(logging.INFO, "boom!"), # Non-matching. + record(logging.INFO, "bar2"), # Non-matching. + record(logging.INFO, "foo2"), # Non-matching. + record(logging.INFO, "bar1"), # Matching. + record(logging.INFO, "foo1"), # Matching. + record(logging.INFO, "foo2"), # Matching. + )