diff --git a/green/config.py b/green/config.py index cc3af0b..139e2b7 100644 --- a/green/config.py +++ b/green/config.py @@ -580,13 +580,13 @@ class ConfigFile: # pragma: no cover """ def __init__(self, filepath: pathlib.Path) -> None: - self._first = True - self._lines = filepath.read_text().splitlines(keepends=True) + self._first: bool = True + self._lines: list[str] = filepath.read_text().splitlines(keepends=True) - def __iter__(self): + def __iter__(self) -> ConfigFile: return self - def __next__(self): + def __next__(self) -> str: if self._first: self._first = False return "[green]\n" diff --git a/green/djangorunner.py b/green/djangorunner.py index c7b5f74..ee743dd 100644 --- a/green/djangorunner.py +++ b/green/djangorunner.py @@ -11,6 +11,7 @@ from __future__ import annotations from argparse import ArgumentParser, Namespace +import pathlib import os import sys from typing import Any, Final, Sequence @@ -23,10 +24,10 @@ # If we're not being run from an actual django project, set up django config os.environ.setdefault("DJANGO_SETTINGS_MODULE", "green.djangorunner") -BASE_DIR = os.path.dirname(os.path.dirname(__file__)) +BASE_DIR = pathlib.Path(__file__).absolute().parent.parent SECRET_KEY: Final[str] = ")9^_e(=cisybdt4m4+fs+_wb%d$!9mpcoy0um^alvx%gexj#jv" -DEBUG = True -TEMPLATE_DEBUG = True +DEBUG: bool = True +TEMPLATE_DEBUG: bool = True ALLOWED_HOSTS: Sequence[str] = [] INSTALLED_APPS: Final[Sequence[str]] = ( "django.contrib.admin", @@ -51,14 +52,14 @@ DATABASES: Final[dict[str, dict[str, str]]] = { "default": { "ENGINE": "django.db.backends.sqlite3", - "NAME": os.path.join(BASE_DIR, "db.sqlite3"), + "NAME": str(BASE_DIR / "db.sqlite3"), } } LANGUAGE_CODE: Final[str] = "en-us" TIME_ZONE: Final[str] = "UTC" -USE_I18N = True -USE_L10N = True -USE_TZ = True +USE_I18N: bool = True +USE_L10N: bool = True +USE_TZ: bool = True STATIC_URL: Final[str] = "/static/" # End of django fake config stuff @@ -75,7 +76,7 @@ def django_missing() -> None: from django.test.runner import DiscoverRunner class DjangoRunner(DiscoverRunner): - def __init__(self, verbose: int = -1, **kwargs): + def __init__(self, verbose: int = -1, **kwargs: Any): super().__init__(**kwargs) self.verbose = verbose self.loader = GreenTestLoader() diff --git a/green/junit.py b/green/junit.py index 8b735c2..d72770e 100644 --- a/green/junit.py +++ b/green/junit.py @@ -13,6 +13,7 @@ from green.result import GreenTestResult, ProtoTest, ProtoError from lxml.etree import _Element + # TODO: use NamedTuple for TestVerdict. TestVerdict: TypeAlias = Union[ Tuple[int, ProtoTest], Tuple[int, ProtoTest, Union[str, ProtoError]] ] @@ -111,7 +112,9 @@ def _suite_name(test) -> str: return f"{test.module}.{test.class_name}" @staticmethod - def _add_failures(collection: TestsCollection, test_results: GreenTestResult): + def _add_failures( + collection: TestsCollection, test_results: GreenTestResult + ) -> None: for each_test, failure in test_results.failures: key = JUnitXML._suite_name(each_test) if key not in collection: @@ -127,7 +130,9 @@ def _add_errors(collection: TestsCollection, test_results: GreenTestResult): collection[key].append((Verdict.ERROR, each_test, error)) @staticmethod - def _add_skipped_tests(collection: TestsCollection, test_results: GreenTestResult): + def _add_skipped_tests( + collection: TestsCollection, test_results: GreenTestResult + ) -> None: for each_test, reason in test_results.skipped: key = JUnitXML._suite_name(each_test) if key not in collection: @@ -160,16 +165,23 @@ def _convert_suite( return xml_suite @staticmethod - def _count_test_with_verdict(verdict: int, suite): + def _count_test_with_verdict(verdict: int, suite: list[TestVerdict]) -> int: return sum(1 for entry in suite if entry[0] == verdict) - def _convert_test(self, results, verdict, test, *details) -> _Element: + def _convert_test( + self, + results: GreenTestResult, + verdict: int, + test: ProtoTest, + *details: str | ProtoError, + ) -> _Element: xml_test = Element(JUnitDialect.TEST_CASE) xml_test.set(JUnitDialect.NAME, test.method_name) xml_test.set(JUnitDialect.CLASS_NAME, test.class_name) xml_test.set(JUnitDialect.TEST_TIME, test.test_time) - xml_verdict = self._convert_verdict(verdict, test, details) + error: str | ProtoError | None = details[0] if details else None + xml_verdict = self._convert_verdict(verdict, test, error) if xml_verdict is not None: xml_test.append(xml_verdict) @@ -185,21 +197,25 @@ def _convert_test(self, results, verdict, test, *details) -> _Element: return xml_test - def _convert_verdict(self, verdict: int, test, details) -> _Element | None: + # FIXME: test is not used. + def _convert_verdict( + self, verdict: int, test: ProtoTest, error_details: str | ProtoError | None + ) -> _Element | None: + message = str(error_details) if error_details else "" if verdict == Verdict.FAILED: failure = Element(JUnitDialect.FAILURE) - failure.text = str(details[0]) + failure.text = message return failure if verdict == Verdict.ERROR: error = Element(JUnitDialect.ERROR) - error.text = str(details[0]) + error.text = message return error if verdict == Verdict.SKIPPED: skipped = Element(JUnitDialect.SKIPPED) - skipped.text = str(details[0]) + skipped.text = message return skipped return None @staticmethod - def _suite_time(suite) -> float: + def _suite_time(suite: list[TestVerdict]) -> float: return sum(float(each_test.test_time) for verdict, each_test, *details in suite) diff --git a/green/loader.py b/green/loader.py index 4e6edab..c2ad134 100644 --- a/green/loader.py +++ b/green/loader.py @@ -12,7 +12,7 @@ import sys import unittest import traceback -from typing import Iterable, TYPE_CHECKING +from typing import Iterable, Type, TYPE_CHECKING, Union from green.output import debug from green import result @@ -21,6 +21,9 @@ if TYPE_CHECKING: from types import ModuleType from unittest import TestSuite, TestCase + from doctest import _DocTestSuite + + FlattenableTests = Union[TestSuite, _DocTestSuite, GreenTestSuite] python_file_pattern = re.compile(r"^[_a-z]\w*?\.py$", re.IGNORECASE) python_dir_pattern = re.compile(r"^[_a-z]\w*?$", re.IGNORECASE) @@ -33,12 +36,14 @@ class GreenTestLoader(unittest.TestLoader): TestSuite. """ - suiteClass = GreenTestSuite + suiteClass: Type[GreenTestSuite] = GreenTestSuite - def loadTestsFromTestCase(self, testCaseClass): + def loadTestsFromTestCase( + self, testCaseClass: Type[unittest.TestCase] + ) -> GreenTestSuite: debug(f"Examining test case {testCaseClass.__name__}", 3) - def filter_test_methods(attrname): + def filter_test_methods(attrname: str) -> bool: return ( attrname.startswith(self.testMethodPrefix) and callable(getattr(testCaseClass, attrname)) @@ -53,9 +58,9 @@ def filter_test_methods(attrname): if not test_case_names and hasattr(testCaseClass, "runTest"): test_case_names = ["runTest"] - return flattenTestSuite(map(testCaseClass, test_case_names)) + return flattenTestSuite(testCaseClass(name) for name in test_case_names) - def loadFromModuleFilename(self, filename: str): + def loadFromModuleFilename(self, filename: str) -> TestSuite: dotted_module, parent_dir = findDottedModuleAndParentDir(filename) # Adding the parent path of the module to the start of sys.path is # the closest we can get to an absolute import in Python that I can @@ -87,7 +92,7 @@ def testSkipped(self): dotted_module, filename, traceback.format_exc() ) - def testFailure(self): + def testFailure(self) -> None: raise ImportError(message) TestClass = type( @@ -104,15 +109,29 @@ def testFailure(self): # --- Find the tests inside the loaded module --- return self.loadTestsFromModule(loaded_module) - def loadTestsFromModule(self, module, pattern=None): + def loadTestsFromModule( # type: ignore[override] + self, module: ModuleType, *, pattern: str | None = None + ) -> GreenTestSuite: tests = super().loadTestsFromModule(module, pattern=pattern) return flattenTestSuite(tests, module) - def loadTestsFromName(self, name, module=None): + def loadTestsFromName( + self, name: str, module: ModuleType | None = None + ) -> GreenTestSuite: tests = super().loadTestsFromName(name, module) return flattenTestSuite(tests, module) - def discover(self, current_path, file_pattern="test*.py", top_level_dir=None): + # TODO: In unittest/loader.py this is not supposed to return None but it + # always returns self.suiteClass(tests). Maybe we should do the same by + # returning GreenTestSuite() but empty instead. It might be possible that + # this is what is triggering the failures when running tests with the + # @skipIf decorator with 3.12.1. + def discover( # type: ignore[override] + self, + current_path: str, + file_pattern: str = "test*.py", + top_level_dir: str | None = None, + ) -> GreenTestSuite | None: """ I take a path to a directory and discover all the tests inside files matching file_pattern. @@ -167,6 +186,9 @@ def discover(self, current_path, file_pattern="test*.py", top_level_dir=None): def loadTargets( self, targets: Iterable[str] | str, file_pattern: str = "test*.py" ) -> GreenTestSuite | None: + """ + Load the given test targets. This is green specific and not part of unittest.TestLoader. + """ # If a string was passed in, put it into a tuple. if isinstance(targets, str): targets = [targets] @@ -177,7 +199,7 @@ def loadTargets( target_dict[target] = True targets = target_dict.keys() - suites = [] + suites: list[GreenTestSuite] = [] for target in targets: suite = self.loadTarget(target, file_pattern) if not suite: @@ -193,11 +215,14 @@ def loadTargets( return flattenTestSuite(suites) if suites else None - def loadTarget(self, target, file_pattern="test*.py"): + def loadTarget( + self, target: str, file_pattern: str = "test*.py" + ) -> GreenTestSuite | None: + """ + Load the given test target. This is green specific and not part of unittest.TestLoader. + """ debug( - "Attempting to load target '{}' with file_pattern '{}'".format( - target, file_pattern - ) + f"Attempting to load target '{target}' with file_pattern '{file_pattern}'." ) # For a test loader, we want to always the current working directory to @@ -222,7 +247,7 @@ def loadTarget(self, target, file_pattern="test*.py"): if target and (target[0] != "."): try: filename = importlib.import_module(target).__file__ - if "__init__.py" in filename: + if filename and "__init__.py" in filename: pkg_in_path_dir = os.path.dirname(filename) except: pkg_in_path_dir = None @@ -261,13 +286,17 @@ def loadTarget(self, target, file_pattern="test*.py"): bare_file = target # some/file pyless_file = target + ".py" - for candidate in [bare_file, pyless_file]: + for candidate in (bare_file, pyless_file): if (candidate is None) or (not os.path.isfile(candidate)): continue need_cleanup = False cwd = os.getcwd() if cwd != sys.path[0]: need_cleanup = True + # TODO: look into how much larger we grow sys.path with each + # candidate. It is possible that we end up with a lot of + # duplicate entries that might make imports slower. + # This is because sys.path.remove(cwd) is not in a Finally block. sys.path.insert(0, cwd) try: dotted_path = target.replace(".py", "").replace(os.sep, ".") @@ -290,7 +319,7 @@ def loadTarget(self, target, file_pattern="test*.py"): "it.".format(dotted_path) ) - def testFailure(self): + def testFailure(self) -> None: raise ImportError(message) # pragma: no cover TestClass = type( @@ -300,6 +329,7 @@ def testFailure(self): ) return self.suiteClass((TestClass(dotted_path),)) if need_cleanup: + # TODO: this might need to be in a finally block. sys.path.remove(cwd) if tests and tests.countTestCases(): debug(f"Load method: FILE - {candidate}") @@ -338,13 +368,15 @@ def toProtoTestList( return test_list -def toParallelTargets(suite, targets: Iterable[str]) -> list[str]: +def toParallelTargets(suite: GreenTestSuite, targets: Iterable[str]) -> list[str]: """ Produce a list of targets which should be tested in parallel. For the most part, this will be a list of test modules. The exception is when a dotted name representing something more granular than a module was input (like an individual test case or test method). + + This is green specific and not part of unittest/loader.py. """ if isinstance(targets, str): # This should not happen, but mypy treats str as a valid sequence of strings. @@ -387,7 +419,7 @@ def toParallelTargets(suite, targets: Iterable[str]) -> list[str]: return parallel_targets -def getCompletions(target: list[str] | str): +def getCompletions(target: list[str] | str) -> str: # This option expects 0 or 1 targets if not isinstance(target, str): target = target[0] @@ -439,7 +471,7 @@ def getCompletions(target: list[str] | str): def isPackage(file_path: pathlib.Path) -> bool: """ - Determine whether or not a given path is a (sub)package or not. + Determine whether or not a given path is a (sub)package or not. Green specific. """ return file_path.is_dir() and (file_path / "__init__.py").is_file() @@ -452,6 +484,8 @@ def findDottedModuleAndParentDir(file_path: str | pathlib.Path) -> tuple[str, st For filepath '/a/b/c/d.py' where b is the package, ('b.c.d', '/a') would be returned. + + This is green specific and not part of unittest/loader.py. """ path = pathlib.Path(file_path) if not path.is_file(): @@ -466,7 +500,7 @@ def findDottedModuleAndParentDir(file_path: str | pathlib.Path) -> tuple[str, st return dotted_module, str(parent_dir) -def isTestCaseDisabled(test_case_class: TestCase, method_name: str): +def isTestCaseDisabled(test_case_class: Type[TestCase], method_name: str) -> bool: """ I check to see if a method on a TestCase has been disabled via nose's convention for disabling a TestCase. This makes it so that users can @@ -477,14 +511,16 @@ def isTestCaseDisabled(test_case_class: TestCase, method_name: str): def flattenTestSuite( - test_suite: list[TestSuite] | TestSuite, module: ModuleType | None = None + test_suite: Iterable[FlattenableTests | TestCase] | FlattenableTests | TestCase, + module: ModuleType | None = None, ) -> GreenTestSuite: """ Look for a `doctest_modules` list and attempt to add doctest tests to the - suite of tests that we are about to flatten. + suite of tests that we are about to flatten. Green specific. """ # todo: rename this function to something more appropriate. - suites = test_suite if isinstance(test_suite, list) else [test_suite] + suites: list[Iterable[FlattenableTests | TestCase] | FlattenableTests | TestCase] + suites = [test_suite] doctest_modules = getattr(module, "doctest_modules", ()) for doctest_module in doctest_modules: doc_suite = DocTestSuite(doctest_module) @@ -494,11 +530,14 @@ def flattenTestSuite( # Now extract all tests from the suite hierarchies and flatten them into a # single suite with all tests. - tests: list[TestCase | TestSuite] = [] + tests: list[TestSuite | GreenTestSuite | TestCase] = [] for suite in suites: # injected_module is not present in DocTestSuite. injected_module: str | None = getattr(suite, "injected_module", None) - for test in suite: + # We might have received an iterable of TestCase instances from loadTestsFromTestCase(). + # If this happens iterating over it should not be possible. This will + # require further investigation. + for test in suite: # type: ignore if injected_module: # For doctests, inject the test module name so we can later # grab it and use it to group the doctest output along with the diff --git a/green/output.py b/green/output.py index d0fe137..37f6dd9 100644 --- a/green/output.py +++ b/green/output.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Iterable, TYPE_CHECKING +from typing import Iterable, TextIO, Type, TYPE_CHECKING from colorama import Fore, Style from colorama.ansi import Cursor @@ -14,16 +14,15 @@ if TYPE_CHECKING: from colorama.ansitowin32 import StreamWrapper - from colorama.initialise import _TextIOT global debug_level debug_level = 0 -text_type = str +text_type: Type[str] = str unicode = None # so pyflakes stops complaining -def debug(message: str, level: int = 1): +def debug(message: str, level: int = 1) -> None: """ So we can tune how much debug output we get when we turn it on. """ @@ -36,7 +35,7 @@ class Colors: A class to centralize wrapping strings in terminal colors. """ - def __init__(self, termcolor: bool | None = None): + def __init__(self, termcolor: bool | None = None) -> None: """Initialize the Colors object. Args: @@ -124,13 +123,13 @@ class GreenStream: def __init__( self, - stream: _TextIOT, + stream: TextIO, override_appveyor: bool = False, disable_windows: bool = False, disable_unidecode: bool = False, ) -> None: self.disable_unidecode = disable_unidecode - self.stream: _TextIOT | StreamWrapper = stream + self.stream: TextIO | StreamWrapper = stream # Ironically, Windows CI platforms such as GitHub Actions and AppVeyor don't support windows # win32 system calls for colors, but it WILL interpret posix ansi escape codes! (The # opposite of an actual windows command prompt) diff --git a/green/process.py b/green/process.py index b5f31eb..febf2c1 100644 --- a/green/process.py +++ b/green/process.py @@ -1,14 +1,32 @@ +""" +Handle running unittests suites in parallel. +""" + from __future__ import annotations import logging import multiprocessing +import multiprocessing.pool +from multiprocessing.pool import MaybeEncodingError # type: ignore from multiprocessing.pool import Pool +from multiprocessing import util # type: ignore + import os import random import sys import tempfile import traceback -from typing import Type, TYPE_CHECKING, Union, Tuple, Callable, Iterable, Mapping, Any +from typing import ( + Type, + TYPE_CHECKING, + Union, + Tuple, + Callable, + Iterable, + Mapping, + Any, + TypeVar, +) import coverage @@ -19,12 +37,20 @@ if TYPE_CHECKING: from types import TracebackType from queue import Queue + + from multiprocessing.context import SpawnContext, SpawnProcess + from multiprocessing.pool import ApplyResult + from multiprocessing.queues import SimpleQueue + from green.suite import GreenTestSuite + from green.runner import InitializerOrFinalizer + from green.result import RunnableTestT ExcInfoType = Union[ Tuple[Type[BaseException], BaseException, TracebackType], Tuple[None, None, None], ] + _T = TypeVar("_T") # Super-useful debug function for finding problems in the subprocesses, and it @@ -51,7 +77,7 @@ class ProcessLogger: def __init__(self, callable: Callable) -> None: self.__callable = callable - def __call__(self, *args, **kwargs): + def __call__(self, *args, **kwargs) -> Any: try: return self.__callable(*args, **kwargs) except Exception: @@ -76,32 +102,33 @@ class LoggingDaemonlessPool(Pool): _wrap_exception: bool = True @staticmethod - def Process(ctx, *args, **kwargs): + def Process(ctx: SpawnContext, *args: Any, **kwargs: Any) -> SpawnProcess: return ctx.Process(daemon=False, *args, **kwargs) def apply_async( self, - func: Callable, + func: Callable[[Any, Any], _T], # should be the poolRunner method. args: Iterable = (), kwargs: Mapping[str, Any] | None = None, - callback=None, - error_callback=None, - ): + callback: Callable[[_T], Any] | None = None, + error_callback: Callable[[BaseException], Any] | None = None, + ) -> ApplyResult[_T]: if kwargs is None: kwargs = {} - return Pool.apply_async( - self, ProcessLogger(func), args, kwargs, callback, error_callback + return super().apply_async( + ProcessLogger(func), args, kwargs, callback, error_callback ) def __init__( self, - processes=None, - initializer=None, - initargs=(), - maxtasksperchild=None, - context=None, - finalizer=None, - finalargs=(), + processes: int | None = None, + initializer: Callable | None = None, + initargs: Iterable[Any] = (), + maxtasksperchild: int | None = None, + context: Any | None = None, + # Green specific: + finalizer: Callable | None = None, + finalargs: Iterable[Any] = (), ): self._finalizer = finalizer self._finalargs = finalargs @@ -125,19 +152,19 @@ def _repopulate_pool(self): @staticmethod def _repopulate_pool_static( - ctx, - Process, - processes, - pool, - inqueue, - outqueue, - initializer, - initargs, - maxtasksperchild, - wrap_exception, - finalizer, - finalargs, - ): + ctx: SpawnContext, + Process: Callable, # LoggingDaemonlessPool.Process + processes: int, + pool: list[Callable], # list of LoggingDaemonlessPool.Process + inqueue: SimpleQueue, + outqueue: SimpleQueue, + initializer: InitializerOrFinalizer, + initargs: tuple, + maxtasksperchild: int | None, + wrap_exception: bool, + finalizer: InitializerOrFinalizer, + finalargs: tuple, + ) -> None: """ Bring the number of pool processes up to the specified number, for use after reaping workers which have exited. @@ -163,27 +190,28 @@ def _repopulate_pool_static( util.debug("added worker") -import multiprocessing.pool -from multiprocessing import util # type: ignore -from multiprocessing.pool import MaybeEncodingError # type: ignore - - def worker( - inqueue, - outqueue, - initializer=None, - initargs=(), - maxtasks=None, - wrap_exception=False, - finalizer=None, - finalargs=(), + inqueue: SimpleQueue, + outqueue: SimpleQueue, + initializer: InitializerOrFinalizer | None = None, + initargs: tuple = (), + maxtasks: int | None = None, + wrap_exception: bool = False, + finalizer: Callable | None = None, + finalargs: tuple = (), ): # pragma: no cover + # TODO: revisit this assert; these statements are skipped by the python + # compiler in optimized mode. assert maxtasks is None or (isinstance(maxtasks, int) and maxtasks > 0) put = outqueue.put get = inqueue.get - if hasattr(inqueue, "_writer"): - inqueue._writer.close() - outqueue._reader.close() + + writer = getattr(inqueue, "_writer", None) + if writer is not None: + writer.close() + reader = getattr(outqueue, "_reader", None) + if reader is not None: + reader.close() if initializer is not None: try: @@ -206,10 +234,12 @@ def worker( job, i, func, args, kwds = task try: result = (True, func(*args, **kwds)) - except Exception as e: + except Exception as result_error: if wrap_exception: - e = ExceptionWithTraceback(e, e.__traceback__) - result = (False, e) + result_error = ExceptionWithTraceback( + result_error, result_error.__traceback__ + ) + result = (False, result_error) try: put((job, i, result)) except Exception as e: @@ -237,8 +267,8 @@ def __str__(self) -> str: # Unmodified (see above) -class ExceptionWithTraceback: # pragma: no cover - def __init__(self, exc: BaseException, tb: TracebackType): +class ExceptionWithTraceback(Exception): # pragma: no cover + def __init__(self, exc: BaseException, tb: TracebackType | None): tb_lines = traceback.format_exception(type(exc), exc, tb) tb_text = "".join(tb_lines) self.exc = exc @@ -259,14 +289,13 @@ def rebuild_exc(exc: BaseException, tb: str): # pragma: no cover # ----------------------------------------------------------------------------- -# Fixme: `omit_patterns=[]` is a global mutable. def poolRunner( target: str, queue: Queue, coverage_number: int | None = None, - omit_patterns: str | Iterable[str] | None = [], + omit_patterns: str | Iterable[str] | None = None, cov_config_file: bool = True, -): # pragma: no cover +) -> None: # pragma: no cover """ I am the function that pool worker processes run. I run one unit test. @@ -281,7 +310,7 @@ def poolRunner( saved_tempdir = tempfile.tempdir tempfile.tempdir = tempfile.mkdtemp() - def raise_internal_failure(msg: str): + def raise_internal_failure(msg: str) -> None: err = sys.exc_info() t = ProtoTest() t.module = "green.loader" @@ -294,7 +323,7 @@ def raise_internal_failure(msg: str): queue.put(result) cleanup() - def cleanup(): + def cleanup() -> None: # Restore the state of the temp directory tempfile.tempdir = saved_tempdir queue.put(None) @@ -318,19 +347,19 @@ def cleanup(): # What to do each time an individual test is started already_sent = set() - def start_callback(test): + def start_callback(test: RunnableTestT) -> None: # Let the main process know what test we are starting - test = proto_test(test) - if test not in already_sent: - queue.put(test) - already_sent.add(test) + test_proto = proto_test(test) + if test_proto not in already_sent: + queue.put(test_proto) + already_sent.add(test_proto) - def finalize_callback(test_result): + def finalize_callback(test_result: ProtoTestResult) -> None: # Let the main process know what happened with the test run queue.put(test_result) result = ProtoTestResult(start_callback, finalize_callback) - test: GreenTestSuite | None = None + test: GreenTestSuite | None try: loader = GreenTestLoader() test = loader.loadTargets(target) @@ -376,24 +405,26 @@ def finalize_callback(test_result): # loadTargets() returned an object without a run() method, probably # None description = ( - 'Test loader returned an un-runnable object. Is "{}" ' + f'Test loader returned an un-runnable object. Is "{target}" ' "importable from your current location? Maybe you " "forgot an __init__.py in your directory? Unrunnable " - "object looks like: {} of type {} with dir {}".format( - target, str(test), type(test), dir(test) - ) + f"object looks like: {test} of type {type(test)} with dir {dir(test)}" ) no_run_error = (TypeError, TypeError(description), None) t = ProtoTest() - target_list = target.split(".") - t.module = ".".join(target_list[:-2]) if len(target_list) > 1 else target - t.class_name = target.split(".")[-2] if len(target_list) > 1 else "UnknownClass" t.description = description - t.method_name = ( - target.split(".")[-1] if len(target_list) > 1 else "unknown_method" - ) + target_list = target.split(".") + if len(target_list) > 1: + t.module = ".".join(target_list[:-2]) + t.class_name = target_list[-2] + t.method_name = target_list[-1] + else: + t.module = target + t.class_name = "UnknownClass" + t.method_name = "unknown_method" result.startTest(t) - result.addError(t, no_run_error) + # Ignoring that no_run_error traceback is None. + result.addError(t, no_run_error) # type: ignore[arg-type] result.stopTest(t) queue.put(result) diff --git a/green/result.py b/green/result.py index d86f5e6..1e9e605 100644 --- a/green/result.py +++ b/green/result.py @@ -12,10 +12,12 @@ from unittest.result import failfast from unittest import TestCase, TestSuite -from green.output import Colors, debug +from green.output import Colors, debug, GreenStream from green.version import pretty_version if TYPE_CHECKING: + from green.process import ExcInfoType + TestCaseT = Union["ProtoTest", TestCase, DocTestCase] RunnableTestT = Union[TestCaseT, TestSuite] @@ -32,7 +34,7 @@ def proto_test(test: RunnableTestT) -> ProtoTest: return ProtoTest(test) -def proto_error(err: list | tuple | ProtoError) -> ProtoError: +def proto_error(err: ExcInfoType | ProtoError) -> ProtoError: """ If err is a ProtoError, I just return it. Otherwise, I create a ProtoError out of err and return it. @@ -109,7 +111,7 @@ def __init__(self, test: TestCase | DocTestCase | TestSuite | None = None) -> No doc_segments.append(line) self.docstr_part = " ".join(doc_segments) - def __eq__(self, other) -> bool: + def __eq__(self, other: Any) -> bool: return self.__hash__() == other.__hash__() def __hash__(self) -> int: @@ -153,7 +155,7 @@ class ProtoError: and can pass between processes. """ - def __init__(self, err: list | tuple) -> None: + def __init__(self, err: ExcInfoType) -> None: self.traceback_lines = traceback.format_exception(*err) def __str__(self) -> str: @@ -165,15 +167,15 @@ class BaseTestResult: I am inherited by ProtoTestResult and GreenTestResult. """ - def __init__(self, stream, *, colors: Colors | None = None): - self.stdout_output: dict[ProtoTest, Any] = {} - self.stderr_errput: dict[ProtoTest, Any] = {} - self.stream = stream + def __init__(self, stream: GreenStream | None, *, colors: Colors | None = None): + self.stdout_output: dict[ProtoTest, str] = {} + self.stderr_errput: dict[ProtoTest, str] = {} + self.stream: GreenStream | None = stream self.colors: Colors = colors or Colors() # The collectedDurations list is new in Python 3.12. self.collectedDurations: list[tuple[str, float]] = [] - def recordStdout(self, test: TestCaseT, output): + def recordStdout(self, test: RunnableTestT, output): """ Called with stdout that the suite decided to capture so we can report the captured output somewhere. @@ -182,7 +184,7 @@ def recordStdout(self, test: TestCaseT, output): test = proto_test(test) self.stdout_output[test] = output - def recordStderr(self, test: TestCaseT, errput): + def recordStderr(self, test: RunnableTestT, errput): """ Called with stderr that the suite decided to capture so we can report the captured "errput" somewhere. @@ -199,13 +201,13 @@ def displayStdout(self, test: TestCaseT): """ test = proto_test(test) if test.dotted_name in self.stdout_output: + if self.stream is None: + raise ValueError("stream is None") colors = self.colors + captured = "Captured stdout" self.stream.write( - "\n{} for {}\n{}".format( - colors.yellow("Captured stdout"), - colors.bold(test.dotted_name), - self.stdout_output[test], - ) + f"\n{colors.yellow(captured)} for {colors.bold(test.dotted_name)}\n" + f"{self.stdout_output[test]}" ) del self.stdout_output[test] @@ -217,13 +219,13 @@ def displayStderr(self, test: TestCaseT): """ test = proto_test(test) if test.dotted_name in self.stderr_errput: + if self.stream is None: + raise ValueError("stream is None") colors = self.colors + captured = "Captured stderr" self.stream.write( - "\n{} for {}\n{}".format( - colors.yellow("Captured stderr"), - colors.bold(test.dotted_name), - self.stderr_errput[test], - ) + f"\n{colors.yellow(captured)} for {colors.bold(test.dotted_name)}\n" + f"{self.stderr_errput[test]}" ) del self.stderr_errput[test] @@ -255,7 +257,7 @@ class ProtoTestResult(BaseTestResult): def __init__( self, start_callback: Callable[[RunnableTestT], None] | None = None, - finalize_callback: Callable[[RunnableTestT], None] | None = None, + finalize_callback: Callable[[ProtoTestResult], None] | None = None, ) -> None: super().__init__(None, colors=None) self.start_callback = start_callback @@ -322,7 +324,7 @@ def __setstate__(self, state: dict[str, Any]) -> None: self.start_callback = None self.finalize_callback = None - def startTest(self, test: RunnableTestT): + def startTest(self, test: RunnableTestT) -> None: """ Called before each test runs. """ @@ -332,7 +334,7 @@ def startTest(self, test: RunnableTestT): if self.start_callback: self.start_callback(test) - def stopTest(self, test: RunnableTestT): + def stopTest(self, test: RunnableTestT) -> None: """ Called after each test runs. """ @@ -341,7 +343,7 @@ def stopTest(self, test: RunnableTestT): else: self.test_time = "0.0" - def finalize(self): + def finalize(self) -> None: """ I am here so that after the GreenTestSuite has had a chance to inject the captured stdout/stderr back into me, I can relay that through to @@ -352,43 +354,46 @@ def finalize(self): self.finalize_callback(self) self.finalize_callback_called = True - def addSuccess(self, test: TestCaseT): + def addSuccess(self, test: TestCaseT) -> None: """ Called when a test passed. """ self.passing.append(proto_test(test)) - def addError(self, test: RunnableTestT, err): + def addError(self, test: RunnableTestT, err: ProtoError | ExcInfoType) -> None: """ Called when a test raises an exception. """ self.errors.append((proto_test(test), proto_error(err))) - def addFailure(self, test: TestCaseT, err): + def addFailure(self, test: TestCaseT, err: ExcInfoType) -> None: """ Called when a test fails a unittest assertion. """ self.failures.append((proto_test(test), proto_error(err))) - def addSkip(self, test: TestCaseT, reason): + def addSkip(self, test: TestCaseT, reason: str) -> None: """ Called when a test is skipped. """ self.skipped.append((proto_test(test), reason)) - def addExpectedFailure(self, test: TestCaseT, err): + def addExpectedFailure(self, test: TestCaseT, err: ExcInfoType) -> None: """ Called when a test fails, and we expected the failure. """ self.expectedFailures.append((proto_test(test), proto_error(err))) - def addUnexpectedSuccess(self, test: TestCaseT): + def addUnexpectedSuccess(self, test: TestCaseT) -> None: """ Called when a test passed, but we expected a failure """ self.unexpectedSuccesses.append(proto_test(test)) - def addSubTest(self, test: TestCaseT, subtest, err): + # The _SubTest class is private and masked so we cannot easily type annotate. + def addSubTest( + self, test: TestCaseT, subtest: Any, err: ExcInfoType | None + ) -> None: """ Called at the end of a subtest no matter its result. @@ -397,7 +402,7 @@ def addSubTest(self, test: TestCaseT, subtest, err): separate test result. It's very meta. """ if err is not None: - if issubclass(err[0], test.failureException): + if err[0] is not None and issubclass(err[0], test.failureException): self.addFailure(subtest, err) else: self.addError(subtest, err) @@ -408,12 +413,13 @@ class GreenTestResult(BaseTestResult): Aggregates test results and outputs them to a stream. """ + stream: GreenStream last_class: str = "" last_module: str = "" first_text_output: str = "" shouldStop: bool = False - def __init__(self, args: argparse.Namespace, stream) -> None: + def __init__(self, args: argparse.Namespace, stream: GreenStream) -> None: super().__init__(stream, colors=Colors(args.termcolor)) self.args = args self.showAll: bool = args.verbose > 1 @@ -621,9 +627,9 @@ def stopTest(self, test: RunnableTestT) -> None: def _reportOutcome( self, test: RunnableTestT, - outcome_char, + outcome_char: str, color_func: Callable[[str], str], - err=None, + err: ProtoError | None = None, reason: str = "", ) -> None: self.testsRun += 1 @@ -651,7 +657,9 @@ def _reportOutcome( self.stream.write(color_func(outcome_char)) self.stream.flush() - def addSuccess(self, test: RunnableTestT, test_time=None): + def addSuccess( + self, test: RunnableTestT, test_time: float | str | None = None + ) -> None: """ Called when a test passed. """ @@ -662,20 +670,24 @@ def addSuccess(self, test: RunnableTestT, test_time=None): self._reportOutcome(test, ".", self.colors.passing) @failfast - def addError(self, test: RunnableTestT, err, test_time=None): + def addError( + self, test: RunnableTestT, err: ProtoError, test_time: float | str | None = None + ) -> None: """ Called when a test raises an exception. """ test = proto_test(test) if test_time: test.test_time = str(test_time) - err = proto_error(err) - self.errors.append((test, err)) - self.all_errors.append((test, self.colors.error, "Error", err)) - self._reportOutcome(test, "E", self.colors.error, err) + error = proto_error(err) + self.errors.append((test, error)) + self.all_errors.append((test, self.colors.error, "Error", error)) + self._reportOutcome(test, "E", self.colors.error, error) @failfast - def addFailure(self, test: RunnableTestT, err, test_time=None): + def addFailure( + self, test: RunnableTestT, err: ProtoError, test_time: float | str | None = None + ) -> None: """ Called when a test fails a unittest assertion. """ @@ -687,15 +699,16 @@ def addFailure(self, test: RunnableTestT, err, test_time=None): self.addSkip(test, reason) return - test = proto_test(test) + test_proto = proto_test(test) if test_time: - test.test_time = str(test_time) - err = proto_error(err) - self.failures.append((test, err)) - self.all_errors.append((test, self.colors.error, "Failure", err)) - self._reportOutcome(test, "F", self.colors.failing, err) + test_proto.test_time = str(test_time) + self.failures.append((test_proto, err)) + self.all_errors.append((test_proto, self.colors.error, "Failure", err)) + self._reportOutcome(test_proto, "F", self.colors.failing, err) - def addSkip(self, test: RunnableTestT, reason: str, test_time=None): + def addSkip( + self, test: RunnableTestT, reason: str, test_time: float | str | None = None + ) -> None: """ Called when a test is skipped. """ @@ -705,7 +718,9 @@ def addSkip(self, test: RunnableTestT, reason: str, test_time=None): self.skipped.append((test, reason)) self._reportOutcome(test, "s", self.colors.skipped, reason=reason) - def addExpectedFailure(self, test: RunnableTestT, err, test_time=None): + def addExpectedFailure( + self, test: RunnableTestT, err: ProtoError, test_time: float | str | None = None + ) -> None: """ Called when a test fails, and we expected the failure. """ @@ -716,7 +731,9 @@ def addExpectedFailure(self, test: RunnableTestT, err, test_time=None): self.expectedFailures.append((test, err)) self._reportOutcome(test, "x", self.colors.expectedFailure, err) - def addUnexpectedSuccess(self, test: RunnableTestT, test_time=None) -> None: + def addUnexpectedSuccess( + self, test: RunnableTestT, test_time: float | str | None = None + ) -> None: """ Called when a test passed, but we expected a failure. """ diff --git a/green/runner.py b/green/runner.py index ca6f1bd..82ebd15 100644 --- a/green/runner.py +++ b/green/runner.py @@ -1,9 +1,11 @@ +"""Running tests.""" + from __future__ import annotations import argparse import multiprocessing from sys import modules -from typing import TYPE_CHECKING +from typing import TextIO, TYPE_CHECKING from unittest.signals import registerResult, installHandler, removeResult import warnings @@ -27,11 +29,11 @@ class InitializerOrFinalizer: appropriate time. """ - def __init__(self, dotted_function): + def __init__(self, dotted_function: str) -> None: self.module_part = ".".join(dotted_function.split(".")[:-1]) self.function_part = ".".join(dotted_function.split(".")[-1:]) - def __call__(self, *args): + def __call__(self, *args) -> None: if not self.module_part: return try: @@ -58,14 +60,14 @@ def __call__(self, *args): def run( - suite, stream, args: argparse.Namespace, testing: bool = False + suite, stream: TextIO | GreenStream, args: argparse.Namespace, testing: bool = False ) -> GreenTestResult: """ Run the given test case or test suite with the specified arguments. Any args.stream passed in will be wrapped in a GreenStream """ - if not issubclass(GreenStream, type(stream)): + if not isinstance(stream, GreenStream): stream = GreenStream( stream, disable_windows=args.disable_windows, diff --git a/green/suite.py b/green/suite.py index 194202d..6b0917c 100644 --- a/green/suite.py +++ b/green/suite.py @@ -2,15 +2,21 @@ import argparse from fnmatch import fnmatch +from io import StringIO import sys +import unittest +from typing import Iterable, TYPE_CHECKING from unittest.suite import _call_if_exists, _DebugResult, _isnotsuite, TestSuite # type: ignore from unittest import util -import unittest -from io import StringIO from green.config import get_default_args from green.output import GreenStream +if TYPE_CHECKING: + from unittest.case import TestCase + from unittest.result import TestResult + from green.result import GreenTestResult, ProtoTestResult + class GreenTestSuite(TestSuite): """ @@ -20,9 +26,13 @@ class GreenTestSuite(TestSuite): 2) It adds Green-specific features (see customize()) """ - args = None + args: argparse.Namespace | None = None - def __init__(self, tests=(), args: argparse.Namespace | None = None) -> None: + def __init__( + self, + tests: Iterable[TestCase | TestSuite] = (), + args: argparse.Namespace | None = None, + ) -> None: # You should either set GreenTestSuite.args before instantiation, or # pass args into __init__ self._removed_tests = 0 @@ -32,20 +42,20 @@ def __init__(self, tests=(), args: argparse.Namespace | None = None) -> None: self.customize(args) super().__init__(tests) - def addTest(self, test) -> None: + def addTest(self, test: TestCase | TestSuite) -> None: """ Override default behavior with some green-specific behavior. """ - if ( - self.full_test_pattern - # test can actually be suites and things. Only tests have - # _testMethodName - and getattr(test, "_testMethodName", False) + if self.full_test_pattern: + # test can actually be suites and things. Only tests have _testMethodName. + method_name = getattr(test, "_testMethodName", None) # Fake test cases (generated for module import failures, for example) # do not start with 'test'. We still want to see those fake cases. - and test._testMethodName.startswith("test") - ): - if not fnmatch(test._testMethodName, self.full_test_pattern): + if ( + method_name + and method_name.startswith("test") + and not fnmatch(method_name, self.full_test_pattern) + ): return super().addTest(test) @@ -86,18 +96,20 @@ def countTestCases(self) -> int: cases += test.countTestCases() return cases - def _handleClassSetUp(self, test, result) -> None: + def _handleClassSetUp( + self, test: TestCase | TestSuite, result: ProtoTestResult + ) -> None: previousClass = getattr(result, "_previousTestClass", None) currentClass = test.__class__ if currentClass == previousClass: return - if result._moduleSetUpFailed: + if result._moduleSetUpFailed: # type: ignore[attr-defined] return if getattr(currentClass, "__unittest_skip__", False): return try: - currentClass._classSetupFailed = False + currentClass._classSetupFailed = False # type: ignore except TypeError: # test may actually be a function # so its class will be a builtin-type @@ -110,44 +122,46 @@ def _handleClassSetUp(self, test, result) -> None: setUpClass() # Upstream Python forgets to take SkipTest into account except unittest.case.SkipTest as e: - currentClass.__unittest_skip__ = True - currentClass.__unittest_skip_why__ = str(e) + currentClass.__unittest_skip__ = True # type: ignore + currentClass.__unittest_skip_why__ = str(e) # type: ignore # -- END of fix except Exception as e: if isinstance(result, _DebugResult): raise - currentClass._classSetupFailed = True + currentClass._classSetupFailed = True # type: ignore className = util.strclass(currentClass) self._createClassOrModuleLevelException( # type: ignore result, e, "setUpClass", className ) finally: _call_if_exists(result, "_restoreStdout") - if currentClass._classSetupFailed is True: - currentClass.doClassCleanups() - if currentClass.tearDown_exceptions: - for exc in currentClass.tearDown_exceptions: + if currentClass._classSetupFailed is True: # type: ignore + currentClass.doClassCleanups() # type: ignore + if currentClass.tearDown_exceptions: # type: ignore + for exc in currentClass.tearDown_exceptions: # type: ignore self._createClassOrModuleLevelException( # type: ignore result, exc[1], "setUpClass", className, info=exc ) - def run(self, result): + def run( # type: ignore[override] + self, result: ProtoTestResult, debug: bool = False + ) -> ProtoTestResult: """ Emulate unittest's behavior, with Green-specific changes. """ topLevel = False if getattr(result, "_testRunEntered", False) is False: - result._testRunEntered = topLevel = True + result._testRunEntered = topLevel = True # type: ignore for index, test in enumerate(self): if result.shouldStop: break if _isnotsuite(test): - self._tearDownPreviousClass(test, result) - self._handleModuleFixture(test, result) - self._handleClassSetUp(test, result) - result._previousTestClass = test.__class__ + self._tearDownPreviousClass(test, result) # type: ignore[attr-defined] + self._handleModuleFixture(test, result) # type: ignore[attr-defined] + self._handleClassSetUp(test, result) # type: ignore[attr-defined] + result._previousTestClass = test.__class__ # type: ignore[attr-defined] if getattr(test.__class__, "_classSetupFailed", False) or getattr( result, "_moduleSetUpFailed", False @@ -159,10 +173,10 @@ def run(self, result): captured_stderr = StringIO() saved_stdout = sys.stdout saved_stderr = sys.stderr - sys.stdout = GreenStream(captured_stdout) - sys.stderr = GreenStream(captured_stderr) + sys.stdout = GreenStream(captured_stdout) # type: ignore[assignment] + sys.stderr = GreenStream(captured_stderr) # type: ignore[assignment] - test(result) + test(result) # type: ignore[arg-type] if _isnotsuite(test): if not self.allow_stdout: @@ -188,9 +202,9 @@ def run(self, result): errors_before = len(result.errors) if topLevel: - self._tearDownPreviousClass(None, result) - self._handleModuleTearDown(result) - result._testRunEntered = False + self._tearDownPreviousClass(None, result) # type: ignore[attr-defined] + self._handleModuleTearDown(result) # type: ignore[attr-defined] + result._testRunEntered = False # type: ignore[attr-defined] # Special handling for class/module tear-down errors. startTest() and # finalize() both trigger communication between the subprocess and @@ -201,16 +215,17 @@ def run(self, result): result.errors[:-difference], result.errors[-difference:], ) - for test, err in new_errors: + for test_proto, err in new_errors: # test = ProtoTest() - test.module = result._previousTestClass.__module__ - test.class_name = result._previousTestClass.__name__ + previous_test_class = result._previousTestClass # type: ignore[attr-defined] + test_proto.module = previous_test_class.__module__ + test_proto.class_name = previous_test_class.__name__ # test.method_name = 'some method name' - test.is_class_or_module_teardown_error = True - test.name = "Error in class or module teardown" + test_proto.is_class_or_module_teardown_error = True + test_proto.name = "Error in class or module teardown" # test.docstr_part = 'docstr part' # error_holder.description - result.startTest(test) - result.addError(test, err) - result.stopTest(test) + result.startTest(test_proto) + result.addError(test_proto, err) + result.stopTest(test_proto) result.finalize() return result diff --git a/green/version.py b/green/version.py index c1c37b5..ff5a836 100644 --- a/green/version.py +++ b/green/version.py @@ -5,12 +5,12 @@ import coverage # pragma: no cover -__version__ = ( +__version__: str = ( (pathlib.Path(__file__).parent / "VERSION").read_text(encoding="utf-8").strip() ) # pragma: no cover -def pretty_version(): # pragma: no cover +def pretty_version() -> str: # pragma: no cover python_version = ".".join(str(x) for x in sys.version_info[0:3]) return ( f"Green {__version__}, Coverage {coverage.__version__}, Python {python_version}"