Skip to content

Commit

Permalink
Update _close_connection to ensure connection is closed before joinin…
Browse files Browse the repository at this point in the history
…g the thread

Remove unused import per review
  • Loading branch information
NeonDaniel committed Jan 13, 2025
1 parent ab06e08 commit 2ad36b2
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion neon_mq_connector/consumers/select_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import threading
import time

from asyncio import Event, Lock, run
from asyncio import Event
from typing import Optional

import pika.exceptions
Expand Down Expand Up @@ -233,6 +233,13 @@ def _close_connection(self, mark_consumer_as_dead: bool = True):
raise TimeoutError(f"Timeout waiting for channel close. "
f"is_closed={self.channel.is_closed}")
LOG.info(f"Channel closed")

# Wait for the connection to close
waiter = threading.Event()
while not self.connection.is_closed:
waiter.wait(1)
LOG.info(f"Connection closed")

if self.connection:
self.connection.ioloop.stop()
# self.connection = None
Expand Down

0 comments on commit 2ad36b2

Please sign in to comment.