Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix calling cleanup function in Worker.run() to avoid race condition #1455

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

detailyang
Copy link

Hello.

Found that minio encounters deadlocks when uploading large files in certain cases. Further investigation reveals that the thread pool can experience deadlocks under specific conditions.

The minimal reproduction code is as follows:

from threading import BoundedSemaphore, Thread
from queue import Queue
import time

class Worker(Thread):
    def __init__(self, tasks_queue, results_queue, exceptions_queue):
        Thread.__init__(self)
        self.tasks_queue = tasks_queue
        self.results_queue = results_queue
        self.exceptions_queue = exceptions_queue
        self.daemon = True
        self.start()

    def run(self):
        while True:
            task = self.tasks_queue.get()
            if not task:
                self.tasks_queue.task_done()
                break
            
            if self.exceptions_queue.empty():
                func, args, kargs, cleanup_func = task
                try:
                    result = func(*args, **kargs)
                    self.results_queue.put(result)
                except Exception as ex:
                    print(f"exception occurred: {ex}")
                    self.exceptions_queue.put(ex)
                finally:
                    cleanup_func()
            else:
                print("Skipping task due to previous exception")
            self.tasks_queue.task_done()

class ThreadPool:
    def __init__(self, num_threads):
        self.results_queue = Queue()
        self.exceptions_queue = Queue()
        self.tasks_queue = Queue()
        self.sem = BoundedSemaphore(num_threads)
        self.num_threads = num_threads

    def add_task(self, func, *args, **kargs):
        print(f"Trying to acquire semaphore at {time.strftime('%H:%M:%S')}")
        self.sem.acquire()
        print(f"Semaphore acquired at {time.strftime('%H:%M:%S')}")
        cleanup_func = self.sem.release
        self.tasks_queue.put((func, args, kargs, cleanup_func))

    def start_parallel(self):
        for _ in range(self.num_threads):
            Worker(self.tasks_queue, self.results_queue, self.exceptions_queue)

    def result(self):
        for _ in range(self.num_threads):
            self.tasks_queue.put(None)
        self.tasks_queue.join()
        if not self.exceptions_queue.empty():
            raise self.exceptions_queue.get()
        return self.results_queue

def task_with_exception():
    raise ValueError("Task failed!")

def normal_task():
    time.sleep(1)
    return "Task completed"

if __name__ == "__main__":
    pool = ThreadPool(num_threads=1) 
    pool.start_parallel()

    for i in range(3):
        if i == 0:
            print(f"\nAdding task {i} (will raise exception)...")
            pool.add_task(task_with_exception)
        else:
            print(f"\nAdding task {i} (normal task)...")
            pool.add_task(normal_task)
        time.sleep(0.1) 
    
    try:
        results = pool.result()
        while not results.empty():
            print(results.get())
    except Exception as e:
        print(f"Final exception: {e}")

the main reason is that when there are exceptions in the exception queue, the worker does not release the semaphore, which leads to add_task being permanently blocked.

The fix is quite simple: ensure that the semaphore is released after each task is retrieved.

minio/helpers.py Show resolved Hide resolved
minio/helpers.py Outdated Show resolved Hide resolved
Copy link
Member

@balamurugana balamurugana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move line #826 before line #832

@detailyang
Copy link
Author

Move line #826 before line #832

Hello. @balamurugana.

I made a subtle adjustment to ensure cleanup_func is extracted from task to avoid used-before-assignment cleanup_func

minio/helpers.py Outdated Show resolved Hide resolved
minio/helpers.py Outdated Show resolved Hide resolved
@balamurugana balamurugana changed the title bugfix:avoid thread pool deadlock fix calling cleanup function in Worker.run() to avoid race condition Nov 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants