-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathstatus.py
211 lines (168 loc) · 7.65 KB
/
status.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
"""
Used to check that the example_fastapi program works as expected
in a deployed supervisor.
"""
import logging
from typing import Any
from aiohttp import ClientResponseError, ClientSession
from aiohttp.web_exceptions import HTTPBadGateway, HTTPInternalServerError, HTTPOk
from aleph_message.models import ItemHash
from aleph.vm.conf import settings
logger = logging.getLogger(__name__)
def assemble_vm_url(vm_id: ItemHash) -> str:
"""Assemble the URL for a VM based on the host and port that the orchestrator is running on and the VM ID."""
return f"http://{settings.SUPERVISOR_HOST}:{settings.SUPERVISOR_PORT}/vm/{vm_id}"
async def get_json_from_vm(session: ClientSession, vm_id: ItemHash, suffix: str) -> Any:
"""Get JSON from a VM running locally."""
vm_url = assemble_vm_url(vm_id)
url = f"{vm_url}{suffix}"
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.json()
async def post_to_vm(session: ClientSession, vm_id: ItemHash, suffix: str, data: Any = None) -> Any:
"""Post data to a VM running locally."""
vm_url = assemble_vm_url(vm_id)
url = f"{vm_url}{suffix}"
async with session.post(url, json=data) as resp:
resp.raise_for_status()
return await resp.json()
async def check_index(session: ClientSession, vm_id: ItemHash) -> bool:
"""Check that the index page of the VM is working."""
try:
result: dict = await get_json_from_vm(session, vm_id, "/")
assert result["Example"] == "example_fastapi"
return True
except ClientResponseError:
return False
async def check_lifespan(session: ClientSession, vm_id: ItemHash) -> bool:
"""Check that the lifespan endpoint of the VM is working."""
try:
result: dict = await get_json_from_vm(session, vm_id, "/lifespan")
return result["Lifespan"] is True
except ClientResponseError:
return False
async def check_environ(session: ClientSession, vm_id: ItemHash) -> bool:
"""Check that the environ endpoint of the VM returns the expected environment variables."""
try:
result: dict = await get_json_from_vm(session, vm_id, "/environ")
assert "ALEPH_API_HOST" in result
assert "ALEPH_API_UNIX_SOCKET" in result
assert "ALEPH_REMOTE_CRYPTO_HOST" in result
assert "ALEPH_REMOTE_CRYPTO_UNIX_SOCKET" in result
assert "ALEPH_ADDRESS_TO_USE" in result
return True
except ClientResponseError:
return False
async def check_messages(session: ClientSession, vm_id: ItemHash) -> bool:
"""Check that the messages endpoint of the VM returns a list of messages."""
try:
result: dict = await get_json_from_vm(session, vm_id, "/messages")
assert "Messages" in result
assert "messages" in result["Messages"]
assert "item_hash" in result["Messages"]["messages"][0]
return True
except ClientResponseError:
return False
async def check_dns(session: ClientSession, vm_id: ItemHash) -> bool:
"""Check that the DNS endpoint of the VM returns both IPv4 and IPv6 results."""
try:
result: dict = await get_json_from_vm(session, vm_id, "/dns")
assert result["ipv4"]
assert result["ipv6"]
return True
except ClientResponseError:
return False
async def check_ipv4(session: ClientSession, vm_id: ItemHash) -> bool:
"""Check that the VM has IPv4 connectivity."""
try:
result: dict = await get_json_from_vm(session, vm_id, "/ip/4")
assert result["result"] is True
return True
except ClientResponseError:
return False
async def check_ipv6(session: ClientSession, vm_id: ItemHash) -> bool:
"""Check that the VM has IPv6 connectivity."""
try:
result: dict = await get_json_from_vm(session, vm_id, "/ip/6")
assert result["result"] is True
assert "headers" in result
return True
except ClientResponseError:
return False
async def check_internet(session: ClientSession, vm_id: ItemHash) -> bool:
"""Check that the VM has internet connectivity. This requires DNS, IP, HTTP and TLS to work."""
try:
response: dict = await get_json_from_vm(session, vm_id, "/internet")
# The diagnostic VM returns HTTP 200 with {"result": False} when cannot connect to the internet.
# else it forwards the return code if its own test endpoint.
return response.get("result") == HTTPOk.status_code
except ClientResponseError:
return False
async def check_cache(session: ClientSession, vm_id: ItemHash) -> bool:
"""Check that the VM can set and get a value in its cache."""
try:
result1: bool = await get_json_from_vm(session, vm_id, "/cache/set/a/42")
assert result1 is True
result2: int = await get_json_from_vm(session, vm_id, "/cache/get/a")
assert result2 == "42"
keys: list[str] = await get_json_from_vm(session, vm_id, "/cache/keys")
assert "a" in keys
return True
except ClientResponseError:
return False
async def check_persistent_storage(session: ClientSession, vm_id: ItemHash) -> bool:
"""Check that the VM can set and get a value in its persistent storage."""
try:
result: dict = await get_json_from_vm(session, vm_id, "/state/increment")
counter = result["counter"]
result_2: dict = await get_json_from_vm(session, vm_id, "/state/increment")
counter_2 = result_2["counter"]
# Use >= to handle potential concurrency
assert counter_2 >= counter + 1
return True
except ClientResponseError:
return False
async def check_error_raised(session: ClientSession, vm_id: ItemHash) -> bool:
"""Check that the VM can raise an error and return a traceback instead of crashing."""
vm_url = assemble_vm_url(vm_id)
try:
async with session.get(f"{vm_url}/raise") as resp:
text = await resp.text()
return resp.status == HTTPInternalServerError.status_code and "Traceback" in text
except ClientResponseError:
return False
async def check_crash_and_restart(session: ClientSession, vm_id: ItemHash) -> bool:
"""Check that a crash in the VM would cause it to restart and work as expected."""
# Crash the VM init.
vm_url = assemble_vm_url(vm_id)
async with session.get(f"{vm_url}/crash") as resp:
if resp.status != HTTPBadGateway.status_code:
return False
# Try loading the index page. A new execution should be created.
try:
result: dict = await get_json_from_vm(session, vm_id, "/")
assert result["Example"] == "example_fastapi"
return True
except ClientResponseError:
return False
async def check_get_a_message(session: ClientSession, vm_id: ItemHash) -> bool:
"""Check that the VM can get a message from the aleph.im network."""
try:
result: dict = await get_json_from_vm(session, vm_id, "/get_a_message")
return "item_hash" in result
except ClientResponseError:
return False
async def check_post_a_message(session: ClientSession, vm_id: ItemHash) -> bool:
"""Check that the VM can post a message to the aleph.im network using a remote key present on the host."""
try:
result: dict = await post_to_vm(session, vm_id, "/post_a_message")
return "item_hash" in result
except ClientResponseError:
return False
async def check_sign_a_message(session: ClientSession, vm_id: ItemHash) -> bool:
"""Check that the VM can sign a message using a key local to the VM."""
try:
result: dict = await post_to_vm(session, vm_id, "/sign_a_message")
return "item_hash" in result
except ClientResponseError:
return False