Skip to content

Commit

Permalink
Resubscribe to topics and queues on failures
Browse files Browse the repository at this point in the history
  • Loading branch information
kalaspuff committed Nov 16, 2017
1 parent 167c9e2 commit 81437b9
Showing 1 changed file with 7 additions and 8 deletions.
15 changes: 7 additions & 8 deletions tomodachi/transport/aws_sns_sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,14 +456,6 @@ async def subscribe_topics(cls: Any, topic_arn_list: List, queue_arn: str, queue

return subscription_arn_list

async def recreate_client(cls: Any, name: str, context: Dict) -> None:
if not cls.clients or not cls.clients.get(name):
cls.create_client(cls, name, context)
return

cls.clients[name] = None
cls.create_client(cls, name, context)

async def consume_queue(cls: Any, obj: Any, context: Dict, handler: Callable, queue_url: str) -> None:
if not cls.clients or not cls.clients.get('sqs'):
cls.create_client(cls, 'sqs', context)
Expand Down Expand Up @@ -500,6 +492,13 @@ async def _callback() -> None:
if is_disconnected:
is_disconnected = False
logging.getLogger('transport.aws_sns_sqs').warning('Reconnected - receiving messages')
try:
context['_aws_sns_sqs_subscribed'] = False
cls.topics = {}
func = await cls.subscribe(cls, obj, context)
await func()
except Exception:
pass
await asyncio.sleep(20)
continue
if isinstance(e, (asyncio.TimeoutError, aiohttp.client_exceptions.ClientConnectorError)):
Expand Down

0 comments on commit 81437b9

Please sign in to comment.