-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathnode_connector.py
31 lines (27 loc) · 954 Bytes
/
node_connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import json
import logging
import requests
class NodeConnector:
def __init__(self, url):
self.url = url
def test_connection(self):
logging.info("Testing connection to the node...")
method = "web3_clientVersion"
payload = {
"jsonrpc": "2.0",
"method": method,
"params": [],
"id": 1,
}
response = self.send(json.dumps(payload, separators=(',', ':')))
if response:
logging.info(f"Connection successful!\nClient Version: {response['result']}")
else:
logging.error("Connection failed! Please ensure your node is running and accessible.")
exit(1)
def send(self, data):
logging.info("Sending to node...")
headers = {'Content-type': 'application/json'}
response = requests.post(self.url, data=data, headers=headers)
logging.info("OK")
return response.json()