-
Notifications
You must be signed in to change notification settings - Fork 0
/
tegra_bootrom_util.py
executable file
·153 lines (130 loc) · 4.29 KB
/
tegra_bootrom_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env python3
import binascii
import copy
from ctypes import *
import enum
import os
import signal
import sys
import threading
import time
# os.environ['PYUSB_DEBUG'] = 'debug'
import usb
from cmd2 import Cmd
USB_XFER_MAX = 0x1000
DEFAULT_VID = 0x0955
DEFAULT_PID = 0x7330
class cmd_type(enum.IntEnum):
CMD_TTY = 0x1
CMD_NOTIFY_REBOOT = 0x2
CMD_ACK = 0x3
class StructurePrettyPrint(LittleEndianStructure):
def __str__(self):
return "{}: {{{}}}".format(self.__class__.__name__,
", ".join(["{}: {}".format(field[0],
getattr(self,
field[0]))
for field in self._fields_]))
class StructureVariableSized(StructurePrettyPrint):
_variable_sized_ = []
def __new__(self, variable_sized=(), **kwargs):
def name_builder(name, variable_sized):
for variable_sized_field_name, variable_size in variable_sized:
name += variable_sized_field_name.title() + '[{0}]'.format(variable_size)
return name
local_fields = copy.deepcopy(self._fields_)
for variable_sized_field_name, variable_size in variable_sized:
match_type = None
for matching_field_name, matching_type in self._variable_sized_:
if variable_sized_field_name == matching_field_name:
match_type = matching_type
break
if match_type is None:
raise Exception
local_fields.append((variable_sized_field_name, match_type*variable_size),)
name = name_builder(self.__name__, variable_sized)
class BaseCtypesStruct(StructurePrettyPrint):
_fields_ = local_fields
_variable_sized_ = self._variable_sized_
classdef = BaseCtypesStruct
classdef.__name__ = name
return BaseCtypesStruct(**kwargs)
class cmd_hdr(StructurePrettyPrint):
_pack_ = 1
_fields_ = [('cmd_type', c_uint32),
('cmd_size', c_uint32)]
class cmd_tty(StructureVariableSized):
_pack_ = 1
_fields_ = [('hdr', cmd_hdr),]
_variable_sized_ = [('tty_buf', c_char)]
class cmd_notify_reboot(StructurePrettyPrint):
_pack_ = 1
_fields_ = [('hdr', cmd_hdr),
('status', c_int32)]
class RCMDevice:
# Default to the T30 RCM VID and PID.
DEFAULT_VID = 0x0955
DEFAULT_PID = 0x7330
def __init__(self, wait_for_device=False, vid=DEFAULT_VID, pid=DEFAULT_PID):
self.dev = usb.core.find(idVendor=vid, idProduct=pid)
if self.dev is None:
# ... and we're allowed to wait for one, wait indefinitely for one to appear...
if wait_for_device:
print("Waiting for a TegraRCM device to come online...")
while self.dev is None:
self.dev = usb.core.find(idVendor=vid, idProduct=pid)
# ... or bail out.
else:
raise IOError("No TegraRCM device found?")
def read(self, length=USB_XFER_MAX, timeout=0):
assert length <= USB_XFER_MAX
""" Reads data from the RCM protocol endpoint. """
return bytes(self.dev.read(0x81, length, timeout))
def write(self, data, timeout=3000):
"""
Writes a single RCM buffer, which should be USB_XFER_MAX long.
The last packet may be shorter, and should trigger a ZLP (e.g. not divisible by 512).
If it's not, send a ZLP.
"""
assert len(data) <= USB_XFER_MAX
return self.dev.write(0x01, data, timeout)
def send_ack(rcmdev):
ack = cmd_hdr()
ack.cmd_size = sizeof(ack)
ack.cmd_type = cmd_type.CMD_ACK
print('ack: {}'.format(ack))
rcmdev.write(bytes(ack))
def main():
rcmdev = RCMDevice()
while True:
recv_buf = rcmdev.read(sizeof(cmd_hdr))
print(recv_buf)
print(binascii.hexlify(recv_buf))
recv_cmd = cmd_hdr.from_buffer_copy(recv_buf)
print(recv_cmd)
print(recv_cmd.cmd_type)
print(recv_cmd.cmd_size)
if recv_cmd.cmd_type == cmd_type.CMD_TTY:
tty_buf_size = recv_cmd.cmd_size - sizeof(recv_cmd)
recv_cmd_tty_t = cmd_tty(variable_sized=(('tty_buf', tty_buf_size),),)
recv_buf += rcmdev.read(tty_buf_size)
recv_cmd_tty = recv_cmd_tty_t.__class__.from_buffer_copy(recv_buf)
print(recv_cmd_tty)
tty_str = recv_cmd_tty.tty_buf.decode('utf-8')
print(tty_str)
send_ack(rcmdev)
if recv_cmd.cmd_type == cmd_type.CMD_NOTIFY_REBOOT:
recv_cmd_notify_reboot = cmd_notify_reboot.from_buffer_copy(recv_buf)
print("rebooted! status: {:d}".format(recv_cmd_notify_reboot.status))
return 0
return 0
class Runner(threading.Thread):
def run(self):
os._exit(main())
def sigint_handler(signum, frame):
os._exit(1)
if __name__ == '__main__':
signal.signal(signal.SIGINT, sigint_handler)
Runner().start()
while True:
time.sleep(0.1)