diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 224d0f37b..9f22d8add 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -46,9 +46,7 @@ jobs: python-version: "${{ matrix.python-version }}" cache: "poetry" - - run: poetry env use "${{ matrix.python-version }}" - - - run: poetry install --extras "django sqlalchemy" + - run: poetry install --all-extras - name: Run tests run: scripts/tests @@ -75,7 +73,7 @@ jobs: - uses: actions/setup-python@v5 with: - python-version: "3.8" # Important for importlib_metadata + python-version: "3.8" cache: "poetry" - name: Install dependencies diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9a132c5c3..7051beef2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -41,7 +41,6 @@ repos: - croniter==3.0.3 - django-stubs==5.0.4 - django==4.2.15 - - importlib-metadata==8.2.0 - importlib-resources==6.4.2 - psycopg2-binary==2.9.9 - psycopg[pool]==3.2.1 diff --git a/.readthedocs.yml b/.readthedocs.yml index 75c91a136..19de0a4c5 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -8,7 +8,7 @@ version: 2 build: os: "ubuntu-22.04" tools: - python: "3.10" + python: "latest" jobs: post_create_environment: - python -m pip install poetry diff --git a/dev-env b/dev-env index b374a2a74..4543c2ffd 100755 --- a/dev-env +++ b/dev-env @@ -61,6 +61,6 @@ echo "We've gone ahead and set up a few additional commands for you:" echo "- htmlcov: Opens the test coverage results in your browser" echo "- htmldoc: Opens the locally built sphinx documentation in your browser" echo "- lint: Run code formatters & linters" -echo "- docs: Build doc" +echo "- docs: Build doc (note: needs 'poetry install --with docs' which needs a recent Python)" echo "" echo 'Quit the poetry shell with the command `deactivate`' diff --git a/docs/conf.py b/docs/conf.py index 397deceb2..d85b9409e 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -39,7 +39,6 @@ "myst_parser", "sphinx.ext.napoleon", "sphinx.ext.autodoc", - "sphinx_autodoc_typehints", "sphinxcontrib.programoutput", "sphinx_github_changelog", "sphinx_copybutton", @@ -99,6 +98,13 @@ html_favicon = "favicon.ico" +# -- Options for sphinx.ext.autodoc ------------------------------------------ + +autodoc_typehints = "both" +autodoc_type_aliases = { + "JSONDict": "procrastinate.types.JSONDict", +} + # -- Options for sphinx_github_changelog --------------------------------- sphinx_github_changelog_token = os.environ.get("CHANGELOG_GITHUB_TOKEN") diff --git a/docs/reference.rst b/docs/reference.rst index dc6302e0f..a0e9f60f3 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -7,33 +7,26 @@ App .. autoclass:: procrastinate.App :members: open, open_async, task, run_worker, run_worker_async, configure_task, from_path, add_tasks_from, add_task_alias, with_connector, periodic, + tasks, job_manager Connectors ---------- .. autoclass:: procrastinate.PsycopgConnector - :members: - :exclude-members: open_async, close_async .. autoclass:: procrastinate.SyncPsycopgConnector - :members: - :exclude-members: open, close .. autoclass:: procrastinate.contrib.aiopg.AiopgConnector - :members: - :exclude-members: open_async, close_async .. autoclass:: procrastinate.contrib.psycopg2.Psycopg2Connector - :members: - :exclude-members: open, close .. autoclass:: procrastinate.testing.InMemoryConnector - :members: reset - + :members: reset, jobs Tasks ----- .. autoclass:: procrastinate.tasks.Task - :members: defer, defer_async, configure + :members: defer, defer_async, configure, name, aliases, retry_strategy, + pass_context, queue, lock, queueing_lock When tasks are created with argument ``pass_context``, they are provided a `JobContext` argument: @@ -62,6 +55,7 @@ Jobs ---- .. autoclass:: procrastinate.jobs.Job + :members: Retry strategies diff --git a/poetry.lock b/poetry.lock index f79cc5100..2abc5d3d4 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "aiopg" @@ -1458,25 +1458,6 @@ docs = ["sphinxcontrib-websupport"] lint = ["docutils-stubs", "flake8 (>=3.5.0)", "flake8-simplify", "isort", "mypy (>=0.990)", "ruff", "sphinx-lint", "types-requests"] test = ["cython", "filelock", "html5lib", "pytest (>=4.6)"] -[[package]] -name = "sphinx-autodoc-typehints" -version = "2.0.1" -description = "Type hints (PEP 484) support for the Sphinx autodoc extension" -optional = false -python-versions = ">=3.8" -files = [ - {file = "sphinx_autodoc_typehints-2.0.1-py3-none-any.whl", hash = "sha256:f73ae89b43a799e587e39266672c1075b2ef783aeb382d3ebed77c38a3fc0149"}, - {file = "sphinx_autodoc_typehints-2.0.1.tar.gz", hash = "sha256:60ed1e3b2c970acc0aa6e877be42d48029a9faec7378a17838716cacd8c10b12"}, -] - -[package.dependencies] -sphinx = ">=7.1.2" - -[package.extras] -docs = ["furo (>=2024.1.29)"] -numpy = ["nptyping (>=2.5)"] -testing = ["covdefaults (>=2.3)", "coverage (>=7.4.2)", "diff-cover (>=8.0.3)", "pytest (>=8.0.1)", "pytest-cov (>=4.1)", "sphobjinv (>=2.3.1)", "typing-extensions (>=4.9)"] - [[package]] name = "sphinx-basic-ng" version = "1.0.0b2" @@ -1841,4 +1822,4 @@ sqlalchemy = ["sqlalchemy"] [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "19c0fde5a638a2fad417755825b0925a1252d3ff052255b9f212829b567e456c" +content-hash = "ac5a99fe7f7c8531669219ac47284b5b483282a953c12d3b997c92a373d241ae" diff --git a/procrastinate/app.py b/procrastinate/app.py index 13955a957..a29ee6880 100644 --- a/procrastinate/app.py +++ b/procrastinate/app.py @@ -23,14 +23,6 @@ class App(blueprints.Blueprint): and use it to decorate your tasks with `App.task`. You can run a worker with `App.run_worker`. - - Attributes - ---------- - tasks : ``Dict[str, tasks.Task]`` - The mapping of all tasks known by the app. Only procrastinate is expected to - make changes to this mapping. - job_manager : `manager.JobManager` - The `JobManager` linked to the application """ @classmethod @@ -95,7 +87,10 @@ def __init__( self.worker_defaults = worker_defaults or {} self.periodic_defaults = periodic_defaults or {} - self.job_manager = manager.JobManager(connector=self.connector) + #: The :py:class:`~manager.JobManager` linked to the application + self.job_manager: manager.JobManager = manager.JobManager( + connector=self.connector + ) self._register_builtin_tasks() @@ -113,7 +108,7 @@ def with_connector(self, connector: connector_module.BaseConnector) -> App: Returns ------- - `App` + : A new compatible app. """ app = App( @@ -137,7 +132,7 @@ def replace_connector( connector : The new connector to use. - Returns + Yields ------- `App` A new compatible app. @@ -174,7 +169,7 @@ def configure_task( Parameters ---------- - name : str + name: Name of the task. If not explicitly defined, this will be the dotted path to the task (``my.module.my_task``) @@ -183,7 +178,7 @@ def configure_task( Returns ------- - ``jobs.JobDeferrer`` + : Launch ``.defer(**task_kwargs)`` on this object to defer your job. """ from procrastinate import tasks @@ -227,32 +222,32 @@ async def run_worker_async(self, **kwargs) -> None: Parameters ---------- - queues : ``Optional[Iterable[str]]`` + queues: ``Optional[Iterable[str]]`` List of queues to listen to, or None to listen to every queue (defaults to ``None``). - wait : ``bool`` + wait: ``bool`` If False, the worker will terminate as soon as it has caught up with the queues. If True, the worker will work until it is stopped by a signal (``ctrl+c``, ``SIGINT``, ``SIGTERM``) (defaults to ``True``). - concurrency : ``int`` + concurrency: ``int`` Indicates how many asynchronous jobs the worker can run in parallel. Do not use concurrency if you have synchronous blocking tasks. See `howto/production/concurrency` (defaults to ``1``). - name : ``Optional[str]`` + name: ``Optional[str]`` Name of the worker. Will be passed in the `JobContext` and used in the logs (defaults to ``None`` which will result in the worker named ``worker``). - timeout : ``float`` + timeout: ``float`` Indicates the maximum duration (in seconds) the worker waits between each database job poll. Raising this parameter can lower the rate at which the worker makes queries to the database for requesting jobs. (defaults to 5.0) - listen_notify : ``bool`` + listen_notify: ``bool`` If ``True``, the worker will dedicate a connection from the pool to listening to database events, notifying of newly available jobs. If ``False``, the worker will just poll the database periodically (see ``timeout``). (defaults to ``True``) - delete_jobs : ``str`` + delete_jobs: ``str`` If ``always``, the worker will automatically delete all jobs on completion. If ``successful`` the worker will only delete successful jobs. If ``never``, the worker will keep the jobs in the database. diff --git a/procrastinate/blueprints.py b/procrastinate/blueprints.py index 0304cb42e..5f737d3b4 100644 --- a/procrastinate/blueprints.py +++ b/procrastinate/blueprints.py @@ -69,6 +69,8 @@ def my_task(): """ def __init__(self) -> None: + #: The mapping of all tasks known by the app. Only procrastinate is + #: expected to make changes to this mapping. self.tasks: dict[str, Task] = {} self.periodic_registry = periodic.PeriodicRegistry() self._check_stack() diff --git a/procrastinate/builtin_tasks.py b/procrastinate/builtin_tasks.py index c31afd5ef..422cc37c4 100644 --- a/procrastinate/builtin_tasks.py +++ b/procrastinate/builtin_tasks.py @@ -26,13 +26,13 @@ async def remove_old_jobs( queue : The name of the queue in which jobs will be deleted. If not specified, the task will delete jobs from all queues. - remove_error : ``Optional[bool]`` + remove_error: By default only successful jobs will be removed. When this parameter is True failed jobs will also be deleted. - remove_cancelled : ``Optional[bool]`` + remove_cancelled: By default only successful jobs will be removed. When this parameter is True cancelled jobs will also be deleted. - remove_aborted : ``Optional[bool]`` + remove_aborted: By default only successful jobs will be removed. When this parameter is True aborted jobs will also be deleted. """ diff --git a/procrastinate/contrib/aiopg/aiopg_connector.py b/procrastinate/contrib/aiopg/aiopg_connector.py index 40d4b8a22..c3ef93976 100644 --- a/procrastinate/contrib/aiopg/aiopg_connector.py +++ b/procrastinate/contrib/aiopg/aiopg_connector.py @@ -98,35 +98,35 @@ def __init__( Parameters ---------- - json_dumps : + json_dumps: The JSON dumps function to use for serializing job arguments. Defaults to the function used by psycopg2. See the `psycopg2 doc`_. - json_loads : + json_loads: The JSON loads function to use for deserializing job arguments. Defaults to the function used by psycopg2. See the `psycopg2 doc`_. Unused if the pool is externally created and set into the connector through the ``App.open_async`` method. - dsn : ``Optional[str]`` + dsn: ``Optional[str]`` Passed to aiopg. Default is "" instead of None, which means if no argument is passed, it will connect to localhost:5432 instead of a Unix-domain local socket file. - enable_json : ``bool`` + enable_json: ``bool`` Passed to aiopg. Default is False instead of True to avoid messing with the global state. enable_hstore: ``bool`` Passed to aiopg. Default is False instead of True to avoid messing with the global state. - enable_uuid : ``bool`` + enable_uuid: ``bool`` Passed to aiopg. Default is False instead of True to avoid messing with the global state. - cursor_factory : ``psycopg2.extensions.cursor`` + cursor_factory: ``psycopg2.extensions.cursor`` Passed to aiopg. Default is ``psycopg2.extras.RealDictCursor`` instead of standard cursor. There is no identified use case for changing this. - maxsize : ``int`` + maxsize: ``int`` Passed to aiopg. If value is 1, then listen/notify feature will be deactivated. - minsize : ``int`` + minsize: ``int`` Passed to aiopg. Initial connections are not opened when the connector is created, but at first use of the pool. """ diff --git a/procrastinate/contrib/django/django_connector.py b/procrastinate/contrib/django/django_connector.py index 42564d5cf..909c7ac8b 100644 --- a/procrastinate/contrib/django/django_connector.py +++ b/procrastinate/contrib/django/django_connector.py @@ -156,7 +156,7 @@ def get_worker_connector(self) -> connector.BaseAsyncConnector: Returns ------- - ``procrastinate.contrib.aiopg.AiopgConnector`` or ``procrastinate.contrib.psycopg3.PsycopgConnector`` + : A connector that can be used in a worker """ alias = settings.settings.DATABASE_ALIAS diff --git a/procrastinate/contrib/django/utils.py b/procrastinate/contrib/django/utils.py index c7b279f0a..9def57551 100644 --- a/procrastinate/contrib/django/utils.py +++ b/procrastinate/contrib/django/utils.py @@ -20,7 +20,7 @@ def connector_params(alias: str = "default") -> dict[str, Any]: Returns ------- - ``Dict[str, Any]`` + : Provide these keyword arguments when instantiating your connector """ wrapper = connections[alias] diff --git a/procrastinate/contrib/psycopg2/psycopg2_connector.py b/procrastinate/contrib/psycopg2/psycopg2_connector.py index 5c6e6b64f..bb847e95d 100644 --- a/procrastinate/contrib/psycopg2/psycopg2_connector.py +++ b/procrastinate/contrib/psycopg2/psycopg2_connector.py @@ -83,23 +83,23 @@ def __init__( Parameters ---------- - json_dumps : + json_dumps: The JSON dumps function to use for serializing job arguments. Defaults to the function used by psycopg2. See the `psycopg2 doc`_. - json_loads : + json_loads: The JSON loads function to use for deserializing job arguments. Defaults to the function used by psycopg2. See the `psycopg2 doc`_. Unused if the pool is externally created and set into the connector through the ``App.open`` method. - minconn : int + minconn: int Passed to psycopg2, default set to 1 (same as aiopg). - maxconn : int + maxconn: int Passed to psycopg2, default set to 10 (same as aiopg). - dsn : ``Optional[str]`` + dsn: ``Optional[str]`` Passed to psycopg2. Default is "" instead of None, which means if no argument is passed, it will connect to localhost:5432 instead of a Unix-domain local socket file. - cursor_factory : ``psycopg2.extensions.cursor`` + cursor_factory: ``psycopg2.extensions.cursor`` Passed to psycopg2. Default is ``psycopg2.extras.RealDictCursor`` instead of standard cursor. There is no identified use case for changing this. diff --git a/procrastinate/job_context.py b/procrastinate/job_context.py index a35fe5582..ad93f4425 100644 --- a/procrastinate/job_context.py +++ b/procrastinate/job_context.py @@ -40,28 +40,19 @@ class JobContext: Execution context of a running job. In theory, all attributes are optional. In practice, in a task, they will always be set to their proper value. - - Attributes - ---------- - app : `App` - Procrastinate `App` running this job - worker_name : ``str`` - Name of the worker (may be useful for logging) - worker_queues : ``Optional[Iterable[str]]`` - Queues listened by this worker - worker_id : ``int``` - In case there are multiple async sub-workers, this is the id of the sub-worker. - job : `Job` - Current `Job` instance - task : `Task` - Current `Task` instance """ + #: Procrastinate `App` running this job app: app_module.App | None = None + #: Name of the worker (may be useful for logging) worker_name: str | None = None + #: Queues listened by this worker worker_queues: Iterable[str] | None = None + #: In case there are multiple async sub-workers, this is the id of the sub-worker. worker_id: int | None = None + #: Corresponding :py:class:`~jobs.Job` job: jobs.Job | None = None + #: Corresponding :py:class:`~tasks.Task` task: tasks.Task | None = None job_result: JobResult = attr.ib(factory=JobResult) additional_context: dict = attr.ib(factory=dict) diff --git a/procrastinate/jobs.py b/procrastinate/jobs.py index bcd55a6d5..6a2560c01 100644 --- a/procrastinate/jobs.py +++ b/procrastinate/jobs.py @@ -48,42 +48,29 @@ class Job: """ A job is the launching of a specific task with specific values for the keyword arguments. - - Attributes - ---------- - id : - Internal id uniquely identifying the job. - status : - Status of the job. - priority : - Priority of the job. - queue : - Queue name the job will be run in. - lock : - No two jobs with the same lock string can run simultaneously - queueing_lock : - No two jobs with the same queueing lock can be waiting in the queue. - task_name : - Name of the associated task. - task_kwargs : - Arguments used to call the task. - scheduled_at : - Date and time after which the job is expected to run. - attempts : - Number of times the job has been tried. """ + #: Internal id uniquely identifying the job. id: int | None = None + #: Status of the job. status: str | None = None + #: Queue name the job will be run in. queue: str + #: Priority of the job. priority: int = DEFAULT_PRIORITY + #: No two jobs with the same lock string can run simultaneously lock: str | None + #: No two jobs with the same queueing lock can be waiting in the queue. queueing_lock: str | None + #: Name of the associated task. task_name: str + #: Arguments used to call the task. task_kwargs: types.JSONDict = attr.ib(factory=dict) + #: Date and time after which the job is expected to run. scheduled_at: datetime.datetime | None = attr.ib( default=None, validator=check_aware ) + #: Number of times the job has been tried. attempts: int = 0 @classmethod diff --git a/procrastinate/manager.py b/procrastinate/manager.py index 129b50962..d77261f17 100644 --- a/procrastinate/manager.py +++ b/procrastinate/manager.py @@ -29,11 +29,12 @@ async def defer_job_async(self, job: jobs.Job) -> jobs.Job: Parameters ---------- - job : `jobs.Job` + job: + The job to defer Returns ------- - `jobs.Job` + : A copy of the job instance with the id set. """ # Make sure this code stays synchronized with .defer_job() @@ -125,12 +126,12 @@ async def fetch_job(self, queues: Iterable[str] | None) -> jobs.Job | None: Parameters ---------- - queues : ``Optional[Iterable[str]]`` + queues: Filter by job queue names Returns ------- - ``Optional[jobs.Job]`` + : None if no suitable job was found. The job otherwise. """ @@ -156,17 +157,13 @@ async def get_stalled_jobs( Parameters ---------- - nb_seconds : ``int`` + nb_seconds: Only jobs that have been in ``doing`` state for longer than this will be returned - queue : ``Optional[str]`` + queue: Filter by job queue name - task_name : ``Optional[str]`` + task_name: Filter by job task name - - Returns - ------- - ``Iterable[jobs.Job]`` """ rows = await self.connector.execute_query_all_async( query=sql.queries["select_stalled_jobs"], @@ -191,15 +188,15 @@ async def delete_old_jobs( Parameters ---------- - nb_hours : ``int`` + nb_hours: Consider jobs that been in a final state for more than ``nb_hours`` - queue : ``Optional[str]`` + queue: Filter by job queue name - include_error : ``Optional[bool]`` + include_error: If ``True``, also consider errored jobs. ``False`` by default - include_cancelled : ``Optional[bool]`` + include_cancelled: If ``True``, also consider cancelled jobs. ``False`` by default. - include_aborted : ``Optional[bool]`` + include_aborted: If ``True``, also consider aborted jobs. ``False`` by default. """ # We only consider finished jobs by default @@ -229,8 +226,8 @@ async def finish_job( Parameters ---------- - job : `jobs.Job` - status : `jobs.Status` + job: + status: ``succeeded``, ``failed`` or ``aborted`` """ assert job.id # TODO remove this @@ -259,19 +256,19 @@ def cancel_job_by_id( Parameters ---------- - job_id : ``int`` + job_id: The id of the job to cancel - abort : ``bool`` + abort: If True, a job in ``doing`` state will be marked as ``aborting``, but the task itself has to respect the abortion request. If False, only jobs in ``todo`` state will be set to ``cancelled`` and won't be processed by a worker anymore. - delete_job : ``bool`` + delete_job: If True, the job will be deleted from the database after being cancelled. Does not affect the jobs that should be aborted. Returns ------- - ``bool`` + : If True, the job was cancelled (or its abortion was requested). If False, nothing was done: either there is no job with this id or it's not in a state where it may be cancelled (i.e. `todo` or `doing`) @@ -297,19 +294,19 @@ async def cancel_job_by_id_async( Parameters ---------- - job_id : ``int`` + job_id: The id of the job to cancel - abort : ``bool`` + abort: If True, a job in ``doing`` state will be marked as ``aborting``, but the task itself has to respect the abortion request. If False, only jobs in ``todo`` state will be set to ``cancelled`` and won't be processed by a worker anymore. - delete_job : ``bool`` + delete_job: If True, the job will be deleted from the database after being cancelled. Does not affect the jobs that should be aborted. Returns ------- - ``bool`` + : If True, the job was cancelled (or its abortion was requested). If False, nothing was done: either there is no job with this id or it's not in a state where it may be cancelled (i.e. `todo` or `doing`) @@ -333,12 +330,12 @@ def get_job_status(self, job_id: int) -> jobs.Status: Parameters ---------- - job_id : ``int`` + job_id: The id of the job to get the status of Returns ------- - `jobs.Status` + : """ result = self.connector.get_sync_connector().execute_query_one( query=sql.queries["get_job_status"], job_id=job_id @@ -351,12 +348,12 @@ async def get_job_status_async(self, job_id: int) -> jobs.Status: Parameters ---------- - job_id : ``int`` + job_id: The id of the job to get the status of Returns ------- - `jobs.Status` + : """ result = await self.connector.execute_query_one_async( query=sql.queries["get_job_status"], job_id=job_id @@ -376,18 +373,18 @@ async def retry_job( Parameters ---------- - job : `jobs.Job` - retry_at : ``Optional[datetime.datetime]`` + job: + retry_at: If set at present time or in the past, the job may be retried immediately. Otherwise, the job will be retried no sooner than this date & time. Should be timezone-aware (even if UTC). Defaults to present time. - priority : ``Optional[int]`` + priority: If set, the job will be retried with this priority. If not set, the priority remains unchanged. - queue : ``Optional[int]`` + queue: If set, the job will be retried on this queue. If not set, the queue remains unchanged. - lock : ``Optional[int]`` + lock: If set, the job will be retried with this lock. If not set, the lock remains unchanged. """ @@ -413,18 +410,18 @@ async def retry_job_by_id_async( Parameters ---------- - job_id : ``int`` - retry_at : ``datetime.datetime`` + job_id: + retry_at: If set at present time or in the past, the job may be retried immediately. Otherwise, the job will be retried no sooner than this date & time. Should be timezone-aware (even if UTC). - priority : ``Optional[int]`` + priority: If set, the job will be retried with this priority. If not set, the priority remains unchanged. - queue : ``Optional[int]`` + queue: If set, the job will be retried on this queue. If not set, the queue remains unchanged. - lock : ``Optional[int]`` + lock: If set, the job will be retried with this lock. If not set, the lock remains unchanged. """ @@ -469,9 +466,9 @@ async def listen_for_jobs( Parameters ---------- - event : ``asyncio.Event`` + event: This event will be set each time a defer operation occurs - queues : ``Optional[Iterable[str]]`` + queues: If ``None``, all defer operations will be considered. If an iterable of queue names is passed, only defer operations on those queues will be considered. Defaults to ``None`` @@ -487,7 +484,7 @@ async def check_connection_async(self) -> bool: Returns ------- - ``bool`` + : ``True`` if the table exists, ``False`` otherwise. """ result = await self.connector.execute_query_one_async( @@ -518,22 +515,22 @@ async def list_jobs_async( Parameters ---------- - id : ``int`` + id: Filter by job ID - queue : ``str`` + queue: Filter by job queue name - task : ``str`` + task: Filter by job task name - status : ``str`` + status: Filter by job status (``todo``/``doing``/``succeeded``/``failed``) - lock : ``str`` + lock: Filter by job lock - queueing_lock : ``str`` + queueing_lock: Filter by job queueing_lock Returns ------- - ``Iterable[jobs.Job]`` + : """ rows = await self.connector.execute_query_all_async( query=sql.queries["list_jobs"], @@ -581,18 +578,18 @@ async def list_queues_async( Parameters ---------- - queue : ``str`` + queue: Filter by job queue name - task : ``str`` + task: Filter by job task name - status : ``str`` + status: Filter by job status (``todo``/``doing``/``succeeded``/``failed``) - lock : ``str`` + lock: Filter by job lock Returns ------- - ``List[Dict[str, Any]]`` + : A list of dictionaries representing queues stats (``name``, ``jobs_count``, ``todo``, ``doing``, ``succeeded``, ``failed``, ``cancelled``, ``aborting``, ``aborted``). @@ -661,18 +658,18 @@ async def list_tasks_async( Parameters ---------- - queue : ``str`` + queue: Filter by job queue name - task : ``str`` + task: Filter by job task name - status : ``str`` + status: Filter by job status (``todo``/``doing``/``succeeded``/``failed``) - lock : ``str`` + lock: Filter by job lock Returns ------- - ``List[Dict[str, Any]]`` + : A list of dictionaries representing tasks stats (``name``, ``jobs_count``, ``todo``, ``doing``, ``succeeded``, ``failed``, ``cancelled``, ``aborting``, ``aborted``). @@ -741,18 +738,18 @@ async def list_locks_async( Parameters ---------- - queue : ``str`` + queue: Filter by job queue name - task : ``str`` + task: Filter by job task name - status : ``str`` + status: Filter by job status (``todo``/``doing``/``succeeded``/``failed``) - lock : ``str`` + lock: Filter by job lock Returns ------- - ``List[Dict[str, Any]]`` + : A list of dictionaries representing locks stats (``name``, ``jobs_count``, ``todo``, ``doing``, ``succeeded``, ``failed``, ``cancelled``, ``aborting``, ``aborted``). diff --git a/procrastinate/retry.py b/procrastinate/retry.py index 827a614df..a07341666 100644 --- a/procrastinate/retry.py +++ b/procrastinate/retry.py @@ -53,20 +53,20 @@ def __init__( Parameters ---------- - retry_at : ``Optional[datetime.datetime]`` + retry_at: If set at present time or in the past, the job may be retried immediately. Otherwise, the job will be retried no sooner than this date & time. Should be timezone-aware (even if UTC). Defaults to present time. - retry_in : ``Optional[types.TimeDeltaParams]`` + retry_in: If set, the job will be retried after this duration. If not set, the job will be retried immediately. - priority : ``Optional[int]`` + priority: If set, the job will be retried with this priority. If not set, the priority remains unchanged. - queue : ``Optional[int]`` + queue: If set, the job will be retried on this queue. If not set, the queue remains unchanged. - lock : ``Optional[int]`` + lock: If set, the job will be retried with this lock. If not set, the lock remains unchanged. """ @@ -127,7 +127,7 @@ def get_schedule_in(self, *, exception: BaseException, attempts: int) -> int | N Returns ------- - ``Optional[int]`` + : If a job should not be retried, this function should return None. Otherwise, it should return the duration after which to schedule the new job run, *in seconds*. diff --git a/procrastinate/tasks.py b/procrastinate/tasks.py index 55658a20b..7ae3afb68 100644 --- a/procrastinate/tasks.py +++ b/procrastinate/tasks.py @@ -66,31 +66,6 @@ class Task(Generic[P, Args]): """ A task is a function that should be executed later. It is linked to a default queue, and expects keyword arguments. - - Attributes - ---------- - name : ``str`` - Name of the task, usually the dotted path of the decorated function. - aliases : ``List[str]`` - Additional names for the task. - retry_strategy : `RetryStrategy` - Value indicating the retry conditions in case of - :py:class:`procrastinate.jobs.Job` error. - pass_context : ``bool`` - If ``True``, passes the task execution context as first positional argument on - :py:class:`procrastinate.jobs.Job` execution. - queue : ``str`` - Default queue to send deferred jobs to. The queue can be overridden when a - job is deferred. - priority : - Default priority (an integer) of jobs that are deferred from this task. - Jobs with higher priority are run first. Priority can be positive or negative. - If no default priority is set then the default priority is 0. - lock : ``Optional[str]`` - Default lock. The lock can be overridden when a job is deferred. - queueing_lock : ``Optional[str]`` - Default queueing lock. The queuing lock can be overridden when a job is - deferred. """ def __init__( @@ -110,16 +85,33 @@ def __init__( lock: str | None = None, queueing_lock: str | None = None, ): - self.queue = queue - self.priority = priority - self.blueprint = blueprint + #: Default queue to send deferred jobs to. The queue can be overridden + #: when a job is deferred. + self.queue: str = queue + #: Default priority (an integer) of jobs that are deferred from this + #: task. Jobs with higher priority are run first. Priority can be + #: positive or negative. If no default priority is set then the default + #: priority is 0. + self.priority: int = priority + self.blueprint: blueprints.Blueprint = blueprint self.func: Callable[P] = func - self.aliases = aliases if aliases else [] - self.retry_strategy = retry_module.get_retry_strategy(retry) + #: Additional names for the task. + self.aliases: list[str] = aliases if aliases else [] + #: Value indicating the retry conditions in case of + #: :py:class:`procrastinate.jobs.Job` error. + self.retry_strategy: retry_module.BaseRetryStrategy | None = ( + retry_module.get_retry_strategy(retry) + ) + #: Name of the task, usually the dotted path of the decorated function. self.name: str = name if name else self.full_path - self.pass_context = pass_context - self.lock = lock - self.queueing_lock = queueing_lock + #: If ``True``, passes the task execution context as first positional + #: argument on :py:class:`procrastinate.jobs.Job` execution. + self.pass_context: bool = pass_context + #: Default lock. The lock can be overridden when a job is deferred. + self.lock: str | None = lock + #: Default queueing lock. The queuing lock can be overridden when a job + #: is deferred. + self.queueing_lock: str | None = queueing_lock def add_namespace(self, namespace: str) -> None: """ @@ -190,7 +182,7 @@ def configure(self, **options: Unpack[ConfigureTaskOptions]) -> jobs.JobDeferrer Returns ------- - ``jobs.JobDeferrer`` + : An object with a ``defer`` method, identical to `Task.defer` Raises diff --git a/procrastinate/testing.py b/procrastinate/testing.py index 4af80bd2c..2bf5e864f 100644 --- a/procrastinate/testing.py +++ b/procrastinate/testing.py @@ -22,15 +22,11 @@ class InMemoryConnector(connector.BaseAsyncConnector): """ def __init__(self): - """ - Attributes - ---------- - jobs : ``Dict[int, Dict]`` - Mapping of ``{: }`` - """ self.reset() self.reverse_queries = {value: key for key, value in sql.queries.items()} self.reverse_queries[schema.SchemaManager.get_schema()] = "apply_schema" + #: Mapping of ``{: }`` + self.jobs: dict[int, JobRow] = {} def reset(self) -> None: """ diff --git a/pyproject.toml b/pyproject.toml index d193c6295..3ac25d99b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,9 +31,8 @@ attrs = "*" contextlib2 = { version = "*", python = "<3.10" } croniter = "*" django = { version = ">=2.2", optional = true } -importlib-metadata = { version = "*", python = "<3.8" } importlib-resources = { version = ">=1.4", python = "<3.9" } -psycopg = { extras = ["pool"], version = "^3.1.13" } +psycopg = { extras = ["pool"], version = "*" } psycopg2-binary = { version = "*", optional = true } python-dateutil = "*" sqlalchemy = { version = "^2.0", optional = true } @@ -64,15 +63,15 @@ aiopg = "*" sqlalchemy = { extras = ["mypy"], version = "*" } psycopg2-binary = "*" psycopg = [ - { version = "^3.1.13", extras = [ + { version = "*", extras = [ "binary", "pool", ], markers = "sys_platform != 'darwin' or platform_machine != 'arm64'" }, - { version = "^3.1.13", extras = [ + { version = "*", extras = [ "binary", "pool", ], markers = "sys_platform == 'darwin' and platform_machine == 'arm64'", python = ">=3.10" }, - { version = "^3.1.13", extras = [ + { version = "*", extras = [ "pool", ], markers = "sys_platform == 'darwin' and platform_machine == 'arm64'", python = "<3.10" }, ] @@ -90,11 +89,14 @@ migra = "*" # (pkg_resources). setuptools = { version = "*" } +[tool.poetry.group.docs] +optional = true + [tool.poetry.group.docs.dependencies] + django = ">=2.2" furo = "*" Sphinx = "*" -sphinx-autodoc-typehints = "*" sphinx-copybutton = "*" sphinx-github-changelog = "*" sphinxcontrib-programoutput = "*"