-
Notifications
You must be signed in to change notification settings - Fork 4
/
passthru.py
96 lines (82 loc) · 2.78 KB
/
passthru.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# This module should be imported from REPL, not run from command line.
import socket
import uos
import network
import time
import esp,machine
class myserver:
def __init__(self,port=8999,name='myserver'):
self.__name__=name
self.port=port
self.listen_s = None
self.client_s = None
self.setup_conn(port)
print("Started server {} in normal mode".format(self.__name__))
def setup_conn(self,port):
self.listen_s = socket.socket()
self.listen_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ai = socket.getaddrinfo("0.0.0.0", self.port)
addr = ai[0][4]
self.listen_s.bind(addr)
self.listen_s.listen(1)
self.listen_s.setsockopt(socket.SOL_SOCKET, 20, self.accept_conn)
for i in (network.AP_IF, network.STA_IF):
iface = network.WLAN(i)
if iface.active():
print("Server {} started on {}:{}".format(self.__name__,iface.ifconfig()[0], port))
def read_from_sock(self,sock):
print('myhandler',sock.read())
def write_to_socket_loop(self,sock):
pass
def check_ok(self):
return True
def accept_conn(self,listen_sock):
cl, remote_addr = listen_sock.accept()
if not self.check_ok():
cl.close()
return
print("\n{} connection from:".format(self.__name__, remote_addr))
self.client_s = cl
cl.setblocking(False)
cl.setsockopt(socket.SOL_SOCKET, 20, self.read_from_sock)
self.write_to_socket_loop(cl)
def close(self):
if self.client_s:
self.client_s.close()
if self.listen_s:
self.listen_s.close()
self.client_s=None
self.listen_s=None
class passthru(myserver):
def __init__(self,port=8999):
super().__init__(port=port,name='pymata')
esp.uart_nostdio(1)
self.gpio2=machine.Pin(2)
self.gpio2.init(machine.Pin.OUT)
self.sport=machine.UART(0,57600,timeout=0)
self.reset()
def reset (self):
# reset min pro gpio2 ---> (pro)RESET
self.gpio2.value(1)
self.gpio2.value(0)
self.gpio2.value(1)
def read_from_sock(self,sock):
samples=sock.read()
self.sport.write(samples)
def write_to_socket_loop(self,sock):
while True:
buf=self.sport.read()
if buf is not None:
sock.write(buf)
time.sleep_ms(1)
class repl(myserver):
def __init__(self,port=23):
super().__init__(port=port)
def read_from_sock(self,sock):
uos.dupterm_notify(sock)
def write_to_socket_loop(self,sock):
print('socket now connected to repl')
uos.dupterm(sock)
def close(self):
uos.dupterm(None)
super().close()