Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: set/get progress #130

Merged
merged 18 commits into from
Jun 13, 2024
Merged

feat: set/get progress #130

merged 18 commits into from
Jun 13, 2024

Conversation

Sobes76rus
Copy link
Contributor

properly created PR instead of #121

@Sobes76rus
Copy link
Contributor Author

@s3rius Ok, i did it =)

Copy link
Member

@s3rius s3rius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi and thanks for your time and crating this PR. I have a questions related to generics. How can you use the _ProgressType? Because on client side there's no information about progress type that this task can possibly have.

You can add a parameter to the task decorator, to parameterize AsyncTaskiqDecoratedTask with progress type. Because now this generic adds no information for users about progress type of a task.

taskiq/depends/progress_tracker.py Outdated Show resolved Hide resolved
taskiq/depends/progress_tracker.py Outdated Show resolved Hide resolved
taskiq/abc/result_backend.py Outdated Show resolved Hide resolved
taskiq/depends/progress_tracker.py Outdated Show resolved Hide resolved
@Sobes76rus
Copy link
Contributor Author

For example

await def task(progress: ProgressTracker[int] = Depends()): 
    await progress.set_progress('progress', 123)

@codecov-commenter
Copy link

codecov-commenter commented May 12, 2023

Codecov Report

Attention: 5 lines in your changes are missing coverage. Please review.

Comparison is base (d708468) 77.40% compared to head (8d3c27f) 77.67%.

Files Patch % Lines
taskiq/depends/progress_tracker.py 89.65% 3 Missing ⚠️
taskiq/brokers/inmemory_broker.py 88.88% 1 Missing ⚠️
taskiq/task.py 83.33% 1 Missing ⚠️

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@             Coverage Diff             @@
##           develop     #130      +/-   ##
===========================================
+ Coverage    77.40%   77.67%   +0.27%     
===========================================
  Files           60       61       +1     
  Lines         1748     1792      +44     
===========================================
+ Hits          1353     1392      +39     
- Misses         395      400       +5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Sobes76rus
Copy link
Contributor Author

Sobes76rus commented May 15, 2023

@s3rius Hello, I made the changes as you suggested. Could you please take a look at them

@Sobes76rus Sobes76rus requested a review from s3rius May 18, 2023 18:26
@Sobes76rus
Copy link
Contributor Author

@s3rius Hi! Sorry to be annoying, but could you give me some feedback please. =)

Copy link
Member

@s3rius s3rius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I really had no time lately.

The request looks good. But I have some questions, regarding types.

You introduced a TaskProgress which is generic over _ProgressType. So generic types are used by IDEs to help developers get information about types they're working with or to help programs cast some random structure to some specific type. The problem with _ProgressType is that it's not used anywhere. I guess you imagined usage like this:

from pydantic import BaseModel
from myproj.tkq import broker

class MyProgress(BaseModel):
	id: int
	name: str

async def my_task(ptrack: ProgressTracker[MyProgress] = Depends()):
	for i in range(100):
		ptrack.set_progress("heavy_calcs", MyProgress(id=i, name=f"name_{i}"))
		# Do some heavy work.

And it looks good and easy. But types work only inside of the task. Check out this code snippet:

	task = await my_task.kiq()
	progress = task.get_progress()
	progress.meta.<TAB> # Here you won't get any auto completion.

To solve this issue, you have to add new generic to AsyncTaskiqTask task and a way to set this generic. The easiest way is to add a parameter to task method of the AsyncBroker interface and add one overload that make use of this generic. A small example of how types should look like so people would get suggestions based on their types:

class AsyncTaskiqDecoratedTask(Generic[_FuncParams, _ReturnType, _ProgressType]):
    @overload
    async def kiq(
        self: "AsyncTaskiqDecoratedTask[_FuncParams, Coroutine[Any, Any, _T], _ProgressType]",
        *args: _FuncParams.args,
        **kwargs: _FuncParams.kwargs,
    ) -> AsyncTaskiqTask[_T, _ProgressType]:
        ...

class AsyncTaskiqTask(Generic[_Task[_ReturnType], _ProgressType]):
    async def get_progress(self) -> "Optional[TaskProgress[_ProgressType]]":
        """
        Get task progress.
        :return: task's progress.
        """
        return await self.result_backend.get_progress(self.task_id)

class AsyncBroker(ABC):
    @overload
    def task(
        self,
        task_name: Callable[_FuncParams, _ReturnType],
		progress_type: Optional[Type[_ProgressType]] = None,
        **labels: Any,
    ) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType, _ProgressType]:  # pragma: no cover
        ...

taskiq/receiver/receiver.py Outdated Show resolved Hide resolved
taskiq/depends/progress_tracker.py Outdated Show resolved Hide resolved
taskiq/depends/progress_tracker.py Outdated Show resolved Hide resolved
@Sobes76rus
Copy link
Contributor Author

@s3rius Hello there! I have resolved conflicts with latest version and make some changes. Could u pls take another look =)

Anton added 2 commits February 1, 2024 23:38
Fix Optional field with validate_default only performing one field validation
by @sydney-runkle in pydantic/pydantic-core#1002
@kice
Copy link

kice commented Feb 22, 2024

I was looking for a progress report feature to show a progress bar, and found this. This feature name should change to "Task state reporting"

My very one-sided opinion on this feature is: pulling states from worker instead of pushing to result backend.

In case of way too many tasks, I think state reporting will easily over overwhelmed result backend. This might be solved by using reporting to a broker instead of a result backend, or have a limiter built-in.

For me, I might report what percentage the task is finished, which I should report as often as possible; and if there is not that much states, it is not necessary to report.

Also, I think state reporting needs to be one-way communication, or it will confuse people: should I use "state reporting" to handle error and retry, or use a middleware.

Maybe the end goal is to make a worker monitoring for taskiq.

@Sobes76rus
Copy link
Contributor Author

@s3rius hello there! Could you please take another look at this request

Copy link
Member

@s3rius s3rius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the idea from here. And the implementation is actually good. But here's some things which I try to accomplish and currently had no luck.

I tried to make some mechanism to add type suggestions to progresses. Because i generally dislike working with raw dicts over python types. I will try to do more research on that later (after the PR is merged).

As for the PR, I think it's great. The only one thing which is missing for me is setting the progress for tasks inside the receiver. When task was started, when task was executed etc.

Also, you can show usage of retry status by updating the SimpleRetryMiddleware.

taskiq/depends/progress_tracker.py Outdated Show resolved Hide resolved
taskiq/abc/result_backend.py Show resolved Hide resolved
.github/workflows/test.yml Show resolved Hide resolved
@kyboi
Copy link

kyboi commented Apr 30, 2024

Is the idea of this PR just to know the overall progress of a task as started/failure/success/retry or can an actual status message be passed from the worker task and fetched from the backend/broker (e.g. progress percentage or stage to then be displayed to user)?

@s3rius
Copy link
Member

s3rius commented Apr 30, 2024

@kyboi, yes. Mainly for progress.

@Sobes76rus
Copy link
Contributor Author

@s3rius
I think it should be up to the user to decide how to use the progress tracker.
The overall status should be implemented with some ProgressMiddleware, not in the receiver. Or there should be a way to disable it.
In other cases, the user can pass custom progress messages using the ProgressTracker dependency.

@Sobes76rus
Copy link
Contributor Author

@kyboi
my intension was to use custom status messages.
overall status should be optional middleware

@s3rius
Copy link
Member

s3rius commented Apr 30, 2024

@Sobes76rus, I guess the progress middleware can be implemented in subsequent request, so it's fine to not set statuses here.

Copy link
Member

@s3rius s3rius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Will merge a bit later.

@s3rius s3rius merged commit cdb431b into taskiq-python:develop Jun 13, 2024
23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants