Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskVineExecutor: add new features #2809

Merged
merged 37 commits into from
Jul 21, 2023

Conversation

tphung3
Copy link
Contributor

@tphung3 tphung3 commented Jul 10, 2023

Description

This PR brings new updates to the TaskVineExecutor:

  • Refactor the constructor of TaskVineExecutor. This helps de-clutter the current executor constructor and separates (almost all) configurations of different components (Executor interface that talks with DataFlowKernel, Manager process that manages tasks, and Factory process that manages workers)
  • Add a new option to use TaskVine factory as an alternative to Parsl provider. The benefits of the TaskVine factory are carried over to Parsl, with abilities to handle variable number of workers, dynamically size the number of workers according to the number of outstanding/pending tasks, ask for workers with an exact amount of resources (e.g., 8 cores 16 GBs memory worker), etc. Other features can be ported/added to upon request, but the features in this PR should serve well as the core/main use. The default factory will spawn a local process like the local provider, but much faster (reason: factory is run as a different process, while the provider shares CPU with the executor upon startup). The default worker provider is still Parsl provider for now.
  • Add a new keyword exec_mode to apps and introduce a new execution mode to apps. Now the executor supports regular execution mode, which uses the executor's serialization methods (i.e., exec_parsl_function.py and its friends), and python execution mode, which uses TaskVine manager's serialization methods (in short, works in the same principle as exec_parsl_function.py but uses cloudpickle and PythonTask, which are more native to TaskVine). The serverless feature (function service) was intended to be included in here but there seems to be a bug within TaskVine serverless tasks so it's delayed for now. Apps that use TaskVineExecutor will have a signature of something a long this line:
@python_app
def f(x, call_specs={'cores':1,'memory':4000,'gpus':4,'exec_mode': 'serverless'}):
    ...

@python_app
def g(x, call_specs={'cores':4, 'exec_mode': 'python'}):
    ...

@bash_app
def h(x, call_specs={'exec_mode': 'regular'}):
    ...
  • Add support for automatic conda environment packaging. This helps users with the ease of transition from local application execution to remote execution given that they use conda package manager to manage software dependencies. This works by packaging a local conda environment (the one that users are using), replicating this environment to workers, and running apps within this environment. From the users' perspective, one line of configuration supplying the conda environment name or path is enough for everything to work out.
  • A big chunk of changes to many little issues that arise when developing this PR. If needed I'll list them here, but there's really a lot and I want to publish these changes first for discussions.

Fixes # (issue)
N/A

Type of change

  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Code maintentance/cleanup

@benclifford
Copy link
Collaborator

benclifford commented Jul 11, 2023

mypy doesn't pass at the moment (commit a922c7a) with mypy:

parsl/executors/taskvine/executor.py:50: error: Skipping analyzing "autopep8": module is installed, but missing library stubs or py.typed marker [import]

requirements.txt Outdated
@@ -12,3 +12,4 @@ requests
paramiko
psutil>=5.5.1
setproctitle
autopep8
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this for?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found where it was used - sorry.

Maybe consider adding taskvine-specific dependencies to setup.py, in extras_require with a taskvine target: so that users would install like this:

pip install 'parsl[taskvine]'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

autopep8 is no longer needed and is removed.

parsl/app/app.py Outdated
@@ -58,8 +58,8 @@ def __init__(self, func, data_flow_kernel=None, executors='all', cache=False, ig
self.kwargs['stderr'] = params['stderr'].default
if 'walltime' in params:
self.kwargs['walltime'] = params['walltime'].default
if 'parsl_resource_specification' in params:
self.kwargs['parsl_resource_specification'] = params['parsl_resource_specification'].default
if 'parsl_call_specs' in params:
Copy link
Collaborator

@benclifford benclifford Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't change core parsl! this is a breaking change for WQ users, not just the taskvine in-development work, and would need some more serious justificiation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change reverted. It's just that the variable name can be better so more information can flow from top down.

logger.error("Ignoring the call specification. "
"Parsl call specification is not supported in HighThroughput Executor. "
"Please check WorkQueueExecutor or TaskVineExecutor if call specification is needed.")
raise UnsupportedFeatureError('call specification', 'HighThroughput Executor', 'WorkQueue Executor')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this exception is a bit icky: it's meant to direct you to use another executor - but it doesn't redirect you to either WQ or TaskVine in the exception object.

That's not too concerning but it's a bit ugly... I think the right thing here is that the UnsupportedFeatureError shouldn't be pointing people to WQ at all: in a modular, open expanding world, the High Throughput Executor code shouldn't be expected to know about other executors at all...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. UnsupportedFeatureError is now agnostic to target executors if needs be.


# If TaskVine factory is used, disable the Parsl provider
if use_factory:
provider = LocalProvider(init_blocks=0, max_blocks=0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you tried using None here? ideally that would work here but maybe it doesn't...

Copy link
Contributor Author

@tphung3 tphung3 Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using None almost works, until the intepreter exits...

Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/home/jason/work/parsl_taskvine/parsl/parsl/process_loggers.py", line 27, in wrapped
    r = func(*args, **kwargs)
  File "/home/jason/work/parsl_taskvine/parsl/parsl/dataflow/dflow.py", line 1205, in cleanup
    job_ids = executor.provider.resources.keys()
AttributeError: 'NoneType' object has no attribute 'resources'

An easy fix is to check for existence of executor.provider and is incoporated in this PR. provider is set to None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mypy doesn't allow assignment to None so revert back to dummy LocalProvider.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok - I'll see if I can sort that out in a different PR...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made PR #2825 to work on allowing this


# need source of function to send tasks as python or serverless tasks
if exec_mode == 'python' or exec_mode == 'serverless':
func_src = autopep8.fix_code(inspect.getsource(func))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this formatting make the code more executable? or it is prettifying for human consumption? (i can't tell from the autopep8 docs if this is going to change how the code runs?)

Copy link
Contributor Author

@tphung3 tphung3 Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cases where apps are functions indented inside a class or another function, like this:

def f():
    @python_app
    def g():
        ...

then inspecting function g inspect.getsource(g) results in a string that's 1 level indented, like this:

@python_app
    def g():
        ...

This prevents attempts to exec the source code into a temporary namespace as the indentation level is one level to the right. autopep8.fix_code corrects this and aligns the function with its body properly.

Another way to address this case is to write a custom method that aligns function source code (detecting type, either tabs or whitespaces, and number of indentations should be easy) but this is a bit hacky. Let me know what you think.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly I think this PR should avoid anything to do with changing serialization away from parsl's defaults, and focus on solid improvements to the TaskVineExecutor.

Then, work on serialization in a different branch - it might even be that some techniques like are more generally useful across all parsl executors and the code should go into the parsl serialization system, rather than the taskvine executor.

if self.source:
function_info = {"source code": inspect.getsource(parsl_fn),
if app_type == 'python':
function_info = {"source code": autopep8.fix_code(inspect.getsource(parsl_fn)),
Copy link
Collaborator

@benclifford benclifford Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in general, you shouldn't expect source code of a function to exist: sometimes callables passed to executors come from places other than raw source (for example, wrappers/closures in monitoring and file staging, or in some cases, interesting user generated callables)

That's also what a bash_app is: just some wrapped python code...

So I think this is generally the wrong thing to be doing by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that function source code is not a given. This is a bit concerning as the serverless task model in TaskVine assumes otherwise.

The default behavior now is to transfer source code only if a function object has source code, otherwise we fall back to pack_apply_message.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does that mean bash_apps don't work with serverless mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current state of serverless task, yes. Serverless tasks use source code to compute a function hash, which is then used to put each function into its own staging directory.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, that feels like a more fundamental architectural problem which is going to to cause trouble long term with people actually using this for real stuff...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that many parts of serverless tasks assume the existence of source code, not just computing a unique directory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes tough luck... I'll see if there's any quick fix to this later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so for this PR I recommend getting rid of the changes in core (choosing which decorator) that were made to support this source code stuff, because from the above it looks to me like they're to support a use-case (serverless) which isn't going to work properly...

then let's have a proper discussion about serverless, maybe involving other people - perhaps by email.

(alternatively, if you prefer, rather than removing stuff from this PR, you can open a few new PRs, each with individual changes from this PR that are less controversial to review - that would also provide a path to get much of this work merged, leaving the awkward stuff to think about later)

@benclifford
Copy link
Collaborator

on my laptop, pytest parsl/tests/ --config parsl/tests/configs/taskvine_ex.py against this taskvine version (from git)

../cctools/I/bin/vine_worker version 8.0.0 DEVELOPMENT (released 2023-06-19 12:12:49 +0000)
	Built by benc@parsl-dev-3-11-1364 on 2023-06-19 12:12:49 +0000
	System: Linux parsl-dev-3-11-1364 5.19.0-43-generic #44~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Mon May 22 13:39:36 UTC 2 x86_64 GNU/Linux
	Configuration: --prefix=/home/benc/parsl/src/cctools/I

fails with this error:

_______________________________________ test_check_importlib_file_function ________________________________________

    def test_check_importlib_file_function():
        helper_path = pathlib.Path(__file__).parent / "callables_helper.py"
        spec = importlib.util.spec_from_file_location("dynamically_loaded_module", helper_path)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        some_aux_func = module.some_aux_func
>       app(some_aux_func).result()

...

  File "/home/benc/parsl/src/parsl/parsl/dataflow/dflow.py", line 719, in launch_task
    exec_fu = executor.submit(executable, task_record['call_specs'], *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/benc/parsl/src/parsl/parsl/executors/taskvine/executor.py", line 392, in submit
    self._serialize_function(call_specs['app_type'], function_file, func, args, kwargs)
  File "/home/benc/parsl/src/parsl/parsl/executors/taskvine/executor.py", line 486, in _serialize_function
    pickle.dump(function_info, f_out)
_pickle.PicklingError: Can't pickle <function some_aux_func at 0x7f7d53d26160>: import of module 'dynamically_loaded_module' failed

I think that's coming from the code in _serialize_function that tries to avoid using parsl's regular serialization mechanisms and do something different... that then breaks.

Maybe we can dig in a bit what isn't working with the parsl default serialization in this implementation.

If I comment out all the source based stuff in taskvine/executor.py and use pack_apply_message's parsl serialization, many pytest tests pass ok: these one's do not (some of them even hang with no error report):

test_callables.py: test_check_importlib_file_function and test_check_importlib_file_function_partial

tests/test_staging/test_staging_ftp.py: test_staging_ftp

@benclifford
Copy link
Collaborator

benclifford commented Jul 11, 2023

@tphung3 I reviewed enough to identify two major concerns:

  • breaking the publicly facing parsl_resource_specification API, and
  • stuff that concerns me about function serialization, which I have written about above and as per-line comments.

I haven't reviewed the other pieces yet and probably won't until we deal with the above two points.

@benclifford
Copy link
Collaborator

to fix that autopep8 mypy error, I think it will gets fixed if you add autopep8 into mypy.ini near the end, like these packages:

[mypy-oauth_ssh.*]
ignore_missing_imports = True

[mypy-flux.*]
ignore_missing_imports = True

[mypy-setproctitle.*]
ignore_missing_imports = True

@benclifford
Copy link
Collaborator

ok for that race condition:

get all the process starts before the thread start, like this:

@@ -266,13 +266,15 @@ class TaskVineExecutor(BlockProviderExecutor, putils.RepresentationMixin):
 
         # Begin work
         self.submit_process.start()
-        self.collector_thread.start()
 
         # Run worker scaler either with Parsl provider or TaskVine factory
         if self.use_factory:
             self.factory_process.start()
         else:
             self.initialize_scaling()
+
+        self.collector_thread.start()

This doesn't fix the race condition, but reduces the likelihood of the hang - it doesn't happen at all on my laptop after an hour of looping, for example.

The broader story:

Python programs cannot really use 1) multiprocessing via the fork launch method and 2) threading together in one problem: multiprocess forks massively unsafe. See this python issue python/cpython#84559 and a plan to switch Python to use spawn by default.

In general, Parsl should move away from fork based multiprocessing. However there are some subtleties there which make it non-trivial, which is why other parts of parsl still use it - and why I don't think you could use spawn here either.

Here's an issue tracking this #2343

So I can't, in the presence of our other bad examples, insist on not using multiprocessing fork here... and that's why I suggest the swap of thread and multiprocessing above.

The race condition that's happens around here, when I've seen it before, is that the thread does a lot of stuff as it starts which takes locks... and then those locked locks are immedately forked into the new process, which never unlocks them.

Getting that process forking done before any thready-stuff reduces the chances of that.

@@ -150,6 +150,7 @@ def __call__(self, *args, **kwargs):
executors=self.executors,
cache=self.cache,
ignore_for_cache=self.ignore_for_cache,
app_kwargs=invocation_kwargs)
app_kwargs=invocation_kwargs,
app_type='bash')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This app_type annotation is not used any more, right? (and I hope to persuade you it will never be needed...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. I'll revert that change some time today.

msg = executor.create_monitoring_info(new_status)
logger.debug("Sending message {} to hub from DFK".format(msg))
self.monitoring.send(MessageType.BLOCK_INFO, msg)
if executor.provider:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm going to move this into a separate PR because it isn't directly task vine related. Parsl shutdown has been pretty fragile, so I'd like to keep this as an explicit commit in our history. You don't need to do anything to make that happen.

@tphung3
Copy link
Contributor Author

tphung3 commented Jul 20, 2023

If all is good I'd like to add some documentations for TaskVine. Is there a command that lints documentation? Also the PR fails at that part.

@benclifford
Copy link
Collaborator

@tphung3 usually I build the documentation on my dev machine. adding the docs extras requirement will get the dependencies (eg something like pip install -e .[docs]`) and then:

cd docs/
make html

will build the html docs and put them in docs/_build/html

and this:

make SPHINXOPTS=-W html

will run the more aggressive build that CI does, that fails harder so you can see what it got angry about.

"Parsl resource specification is not supported in HighThroughput Executor. "
"Please check WorkQueueExecutor if resource specification is needed.")
raise UnsupportedFeatureError('resource specification', 'HighThroughput Executor', 'WorkQueue Executor')
if 'app_type' in resource_specification and len(resource_specification) > 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can go back to the original test now, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change reverted.


# guess the host name if the project name is not given
if not self.manager_config.project_name:
self.manager_config.address = socket.gethostname()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a module parsl.addresses which provides various methods for working out hostnames, intended for use with the address parameter of the high throughput executor. This is because we haven't found one method that works everywhere. That experience makes me think that eventually that might be needed here too (although that doesn't need to block this PR being merged - it's a note for the future)

executor_task_id = vine_id_to_executor_task_id[str(t.id)]
logger.debug("Completed TaskVine task {}, executor task {}".format(t.id, executor_task_id))
result_file = result_file_of_task_id.pop(executor_task_id)
logger.debug('Found a task!')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use ! surprise in log messages - if it's a surprise it should be a WARNING

@@ -52,11 +52,10 @@ def submit(self, func, resource_specification, *args, **kwargs):
here `Python docs: <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor>`_

"""
if resource_specification:
if 'app_type' in resource_specification and len(resource_specification) > 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about htex - this can go back to the original if condition?


required_resource_types = set(['cores', 'memory', 'disk'])
acceptable_resource_types = set(['cores', 'memory', 'disk', 'gpus', 'priority', 'running_time_min'])
acceptable_fields = set(['cores', 'memory', 'disk', 'gpus', 'priority', 'running_time_min', 'app_type'])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spurious app_type

from typing import Optional


# Configuration of a TaskVine factory
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be the first sentence of the docstring so that it appears as the short description of the structure in docs? (and same for TaskVineManagerConfig)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, corrected.

@benclifford
Copy link
Collaborator

I realised how many different code paths there are through this code - it might be interesting (eg in a later PR) to put in some tests for various things that run just a single task rather than the full test suite, but for a wide selection of different configuration options.

@tphung3
Copy link
Contributor Author

tphung3 commented Jul 21, 2023

Looks like CI is stuck at generating taskvine stubs in /docs/stubs. Is this automate-able?

@benclifford
Copy link
Collaborator

Looking at that error briefly, I think it's because it can't import ndcctools - because that isn't on the python path by default (and why there's an explicit path set for the taskvine and work queue pytests).

You might try adding an explicit python path onto the doc build in .GitHub/workflows/ci.yaml to tell it where to find task vine code... I haven't tried if that solves the problem because I'm away from my development environment

@benclifford benclifford merged commit 41357c6 into Parsl:master Jul 21, 2023
4 checks passed
benclifford pushed a commit that referenced this pull request Aug 5, 2023
This PR is a follow-up to PR #2809 with the following changes:
- Add support for serverless execution mode for tasks. Users can now declare serverless tasks like this:
```
@python_app
def f(x, parsl_resource_specification={'exec_mode': 'serverless'}):
    ...
```
Under the hood, a library task is deployed to each worker and forks itself to execute an actual task/function call.
- Add support for full conda environment replication on worker nodes for both serverless and regular tasks. Given a conda environment, TaskVine will distribute and cache it efficiently, so a shared filesystem is not required. The benefit mainly lies in the assumption that the conda environment can be huge in size and thousands of tasks asking the shared filesystem for the same environment directory at the same time is not very performant. TaskVine on the other hand keeps track of where files are at and has an implicit throttle mechanism to data distribution so things don't go haywire.
- Add support for no conda environment replication, provided that environment is properly setup on worker nodes. Turning on `shared_fs` flag and add some setup code to `init_command` would turn off almost all TaskVine data-aware magic and the workflow run will mostly use the shared filesystem instead (e.g., `init_command="conda run -p path/to/conda"`). This serves as the baseline behavior for TaskVineExecutor.
- Refactor `executor.py` into {`executor.py`, `manager.py`, and `factory.py`} as they are different components.
- Change API: `use_factory` to `worker_launch_method`.
- Some variable renaming and additional logging.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants