-
Notifications
You must be signed in to change notification settings - Fork 328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Proposal] Apply operand closure cleanup #3193
Comments
The function can't be executed correctly if the corresponding closure is missing. So, Why not put the entire UDF to the storage ? My suggestion about this proposal:
|
This comment is used to track relative developments.
|
After discussions, storage service on supervisor will remain empty. As a result, some refinements on #3205 will come up soon. |
This comment is used to track further developments.
|
This comment is used to track further directions
|
For function calls such as DataFrame.apply, DataFrameGroupby.apply, Series.apply, etc., the user will pass in a custom function, i.e. UDF. Mars will serialize the custom function multiple times during graph building, scheduling and execution. If the custom function captures a large amount of data, such as pandas DataFrame internally, it will cause bottlenecks in scheduling and execution:
Although the serialization of udf functions can be cached by custom Mars serialization (currently cached when calculating chunk key/logic key in our inner codebase), if there is a large amount of data in udf, the result of serialization of a single Subtask may have Hundreds of megabytes, repeatedly sending these data to the Worker main pool and Sub pool during Subtask scheduling, which cause Supervisor bottlenecks and scheduling delays.
On the other hand, if a single Subtask object is too large, it will also lead to a large amount of lilneage storage overhead, resulting in task failure since lilneage are evicited.
Based on this, we propose a function closure cleaner at the Mars operator level, clean up the function closure in the
tile
phase, serialize the closure result into Storage, and then the operator only holds the reference of the closure stored in the storage. Before the operator is executed, the closure object is obtained from Storage, the udf is restored, and then the calculation is performed.Proposal
Function closure cleanup
The function closure object is in the closure attribute of the function. We can get the closure first, then put it into the storage, and then get the specific closure object from the storage to restore it when executing. The pseudo code is as follows:
Note: If the function closure is relatively small, it can be directly inlined in the code.
Callable cleanup
The UDF passed in by the user may not be a function, but a subclass that implements the callable method. In this case, the captured object cannot be obtained through the closure attribute. In this case special clipping of callable object is required. If the object implements python's reduce or getstate method, further processing is required
The text was updated successfully, but these errors were encountered: