-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathreactor.py
77 lines (61 loc) · 2.56 KB
/
reactor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import logging
from collections.abc import Coroutine
from aleph_message.models import AlephMessage
from aleph_message.models.execution.environment import Subscription
from aleph.vm.pool import VmPool
from aleph.vm.utils import create_task_log_exceptions
from .pubsub import PubSub
from .run import run_code_on_event
logger = logging.getLogger(__name__)
def is_equal_or_includes(value, compare_to) -> bool:
if isinstance(value, str):
return value == compare_to
elif isinstance(value, dict):
for subkey, subvalue in value.items():
if not hasattr(compare_to, subkey):
return False
if not is_equal_or_includes(subvalue, getattr(compare_to, subkey)):
return False
return True
else:
msg = "Unsupported value"
raise ValueError(msg)
def subscription_matches(subscription: Subscription, message: AlephMessage) -> bool:
if not subscription:
# Require at least one value to match
return False
for key, value in subscription.dict().items():
if not is_equal_or_includes(value, getattr(message, key)):
return False
return True
class Reactor:
pubsub: PubSub
pool: VmPool
listeners: list[AlephMessage]
def __init__(self, pubsub: PubSub, pool: VmPool):
self.pubsub = pubsub
self.pool = pool
self.listeners = []
async def trigger(self, message: AlephMessage):
coroutines: list[Coroutine] = []
for listener in self.listeners:
if not listener.content.on.message:
logger.warning(
r"Program with no subscription was registered in reactor listeners: {listener.item_hash}"
)
continue
for subscription in listener.content.on.message:
if subscription_matches(subscription, message):
vm_hash = listener.item_hash
event = message.json()
# Register the listener in the list of coroutines to run asynchronously:
coroutines.append(run_code_on_event(vm_hash, event, self.pubsub, pool=self.pool))
break
# Call all listeners asynchronously from the event loop:
for coroutine in coroutines:
create_task_log_exceptions(coroutine)
def register(self, message: AlephMessage):
if message.content.on.message:
self.listeners.append(message)
else:
logger.debug(f"Program with no subscription cannot be registered in reactor listeners: {message.item_hash}")