Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Priorities are ignored when stealing tasks #8992

Open
Dellson opened this issue Jan 21, 2025 · 5 comments
Open

Priorities are ignored when stealing tasks #8992

Dellson opened this issue Jan 21, 2025 · 5 comments

Comments

@Dellson
Copy link

Dellson commented Jan 21, 2025

Describe the issue:

There are a few issues actually (two of them related to stealing), so let's go through the whole process.

I use dask and distributed packages to run automation in my project.
To speed up test execution (the tests cannot be run on a single environment in parallel), I spin up multiple VMs (dask workers) and then create a queue of tests (tasks) and assign them to workers until the tasks are depleted. Each task is unique and run once unless it failed (then it’s rerun).
This works perfectly but I noticed that using priorities may speed up execution significantly.

I tried to use dask priorities but ultimately failed. Here are my findings (the first issue is not related to the topic but is a problem nevertheless):

  1. All of the tasks are consumed immediately by the first VM created; I would like to have no more than one task assigned to a VM at once - this would ensure that priorities are balanced better.
  2. Dask dashboard shows that priorities have been assigned correctly, but only tasks on the first VM are run according to the priority. The priorities seems to be ignored on the second VM (the one that stole tasks from the first).
  3. The VM that stole the tasks grab random tasks, not the highest priority ones.

I would prefer assigning only one task per VM instead of stealing, but if I were really to use stealing, than it should grab the highest priority tasks that are not being processed (now it grabs random tasks)
To resolve the first problem, I set distributed.scheduler.worker-saturation to 1.0 but it didn’t help, all of the tasks are consumed at once.
I tried using fifo-timeout to deal with the second one, but it wasn’t helpful (I thought that fifo-timeout is taken into consideration when stealing).

The whole process looks as follows:

  1. The agent (host) VM creates tasks (not dask → asyncio.create_task()) to create VMs from snapshots (every VM is identical). Please note that spinning up a VM and then setting it up takes several minutes, so the tasks are created before the workers exist.
    1.1. Register set up VMs as workers with cmd (nthreads=1).
  2. In the meantime, the agent schedules tests asynchronously:
    2.1. Create a list of futures
    2.2. Append (client.submit(test)) each test to the list of futures
    2.3. Run as_completed to gather futures; if there are failed tests, readd them to the futures list
  3. If there are any existing workers, they start to pick up the tasks

With the current behaviour I haven’t encountered a single situation where a lowest priority task was run last. My task lengths vary from between 20 seconds to 3 minutes; the whole runtime can increase significantly if the tasks are ordered improperly.

Example:
I tested it on a small scale (8 tasks, 2 VMs).
The priorities are equal to an estimated task runtime in seconds (actual runtime is about priority + 40 seconds). That ensures that the slowest tasks have the highest priority.

The first worker consumed all of the tasks immediately.
Image

As soon as the second VM was created, it stole several tasks from the first worker.
Image

Then the workers kept stealing work from each other.
Tasks on the second VM were run in random order.
After the first task was executed on the first VM, the other tasks (which were re-stolen from the second VM) were run in order they were stolen.
Image

Environment:

  • Dask version: 2024.12.1
  • Distributed version: 2024.12.1
  • Python version: 3.13.1
  • Operating System: Windows Server 2022 on both scheduler and workers
  • Install method: pip

Conclusion

What I only want to achieve here is to make prioritization work.
I am not sure if all of the work consumed by the first VM I create is due to a flaw in my setup process; if this can be fixed, then I could skip stealing altogether.
Still, priorities seem to be ignored when stealing tasks which is a real issue.

@jacobtomlinson
Copy link
Member

Are you able to put together some kind of reproducible code example so that we can explore this behaviour?

@Dellson
Copy link
Author

Dellson commented Jan 22, 2025

The code I use is quite convoluted. Give me some time to distil simplified code that reproduces the problem.
Just be wary that the code will set up the tasks and the scheduler only, not the workers. The workers exist are separate VMs and I set them up by running dask-worker {scheduler} --name {vm_name} --nthreads 1 --resources {os_version}=1,{location}=1,{db_name}=1 --local-directory {self.DASK_WORKER_OUTPUT} in shell.

@jacobtomlinson
Copy link
Member

jacobtomlinson commented Jan 22, 2025

If this is a bug in the scheduling/stealing behaviour you should be able to reproduce it with LocalCluster.

@Dellson
Copy link
Author

Dellson commented Jan 23, 2025

Hey, it took me a while, but I've got the code - it uses LocalCluster and reproduces the issue.

from dask.distributed import Client, LocalCluster
from dask.distributed import as_completed
import time

def my_func(t):
    time.sleep(t)
    return t

def run_cluster():
    cluster = LocalCluster(n_workers=1, resources={'WIN10':1, 'CLOUD':1, 'DB':1})
    client: Client = cluster.get_client()
    try:
        futures = []
        for i in range(20):
            futures.append(
                client.submit(my_func, i, key=f"my_func{i}", priority=i, pure=False, resources={'WIN10':1, 'CLOUD':1, 'DB':1}
            ))

        time.sleep(1)
        client.cluster.scale(2)

        for future, result in as_completed(futures, with_results=True):
            print(f'Result {result}')
    except:
        pass
    cluster.close()

if __name__ == '__main__':
    run_cluster()

The output:

Result 8
Result 32
Result 64
Result 1
Result 2
Result 4
Result 16

@Dellson
Copy link
Author

Dellson commented Jan 23, 2025

I reproduced it using CLI/python (it's close to what I actually do in my project) too.
In this case, I noticed that resources cause improper prioritization (resources removal "fixes" the problem).

Failing sequnce:

  1. Run dask scheduler --port 8786 --scheduler-file info.txt --dashboard-address :8787 --show
  2. Run python code:
import asyncio
import time

from dask.distributed import Client
from dask.distributed import as_completed


def my_func(t):
    print(f'sleep {t}')
    time.sleep(t)
    return t

async def main():
    client = await Client(address='tcp://10.249.224.32:8786', name="client", asynchronous=True, timeout=180)

    futures = []
    for i in range(1,21):
        futures.append(
            client.submit(my_func, i, key=f"my_func{i}", priority=i, pure=False, resources={'WIN10':1, 'CLOUD':1, 'DB':1}
        ))

    print('Ready for workers to register')

    async for _, result in as_completed(futures, with_results=True):
        print(f'Result {result}')
    client.close()

result = asyncio.run(main())
  1. Start the first worker: dask-worker $scheduler_address --name "worker-1" --resources "WIN10=1,CLOUD=1,DB=1" --nthreads 1
  2. Start the second worker: dask-worker $scheduler_address --name "worker-2" --resources "WIN10=1,CLOUD=1,DB=1" --nthreads 1

First worker output:

sleep 20
sleep 12
sleep 18
sleep 17
sleep 16
sleep 13
sleep 3
sleep 1
sleep 9
sleep 7

Second worker output:

sleep 15
sleep 19
sleep 4
sleep 14
sleep 8
sleep 10
sleep 11
sleep 2
sleep 6
sleep 5

However, it worked correctly when I removed the resources:

  1. Run dask scheduler --port 8786 --scheduler-file info.txt --dashboard-address :8787 --show
  2. Run python code:
import asyncio
import time

from dask.distributed import Client
from dask.distributed import as_completed


def my_func(t):
    print(f'sleep {t}')
    time.sleep(t)
    return t

async def main():
    client = await Client(address='tcp://10.249.224.32:8786', name="client", asynchronous=True, timeout=180)

    futures = []
    for i in range(1,21):
        futures.append(
            client.submit(my_func, i, key=f"my_func{i}", priority=i, pure=False#, resources={'WIN10':1, 'CLOUD':1, 'DB':1}
        ))

    print('Ready for workers to register')

    async for _, result in as_completed(futures, with_results=True):
        print(f'Result {result}')
    client.close()

result = asyncio.run(main())
  1. Start the first worker: dask-worker $scheduler_address --name "worker-1" --nthreads 1
  2. Start the second worker: dask-worker $scheduler_address --name "worker-2" --nthreads 1

Output:
First worker:

sleep 20
sleep 18
sleep 17
sleep 14
sleep 13
sleep 10
sleep 9
sleep 6
sleep 5
sleep 2
sleep 1

Second worker:

sleep 19
sleep 16
sleep 15
sleep 12
sleep 11
sleep 8
sleep 7
sleep 4
sleep 3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants