-
Notifications
You must be signed in to change notification settings - Fork 33
/
bdevice.py
209 lines (179 loc) · 8.09 KB
/
bdevice.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# bdevice.py Hardware-agnostic base classes.
# BlockDevice Base class for general block devices e.g. EEPROM, FRAM.
# FlashDevice Base class for generic Flash memory (subclass of BlockDevice).
# Documentation in BASE_CLASSES.md
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2019-2024 Peter Hinch
from micropython import const
class BlockDevice:
def __init__(self, nbits, nchips, chip_size):
self._c_bytes = chip_size # Size of chip in bytes
self._a_bytes = chip_size * nchips # Size of array
self._nbits = nbits # Block size in bits
self._block_size = 2 ** nbits
self._rwbuf = bytearray(1)
def __len__(self):
return self._a_bytes
def __setitem__(self, addr, value):
if isinstance(addr, slice):
return self._wslice(addr, value)
self._rwbuf[0] = value
self.readwrite(addr, self._rwbuf, False)
def __getitem__(self, addr):
if isinstance(addr, slice):
return self._rslice(addr)
return self.readwrite(addr, self._rwbuf, True)[0]
# Handle special cases of a slice. Always return a pair of positive indices.
def _do_slice(self, addr):
if not (addr.step is None or addr.step == 1):
raise NotImplementedError("only slices with step=1 (aka None) are supported")
start = addr.start if addr.start is not None else 0
stop = addr.stop if addr.stop is not None else self._a_bytes
start = start if start >= 0 else self._a_bytes + start
stop = stop if stop >= 0 else self._a_bytes + stop
return start, stop
def _wslice(self, addr, value):
start, stop = self._do_slice(addr)
try:
if len(value) == (stop - start):
res = self.readwrite(start, value, False)
else:
raise RuntimeError("Slice must have same length as data")
except TypeError:
raise RuntimeError("Can only assign bytes/bytearray to a slice")
return res
def _rslice(self, addr):
start, stop = self._do_slice(addr)
buf = bytearray(stop - start)
return self.readwrite(start, buf, True)
# IOCTL protocol.
def sync(self): # Nothing to do for unbuffered devices. Subclass overrides.
return
def readblocks(self, blocknum, buf, offset=0):
self.readwrite(offset + (blocknum << self._nbits), buf, True)
def writeblocks(self, blocknum, buf, offset=0):
self.readwrite(offset + (blocknum << self._nbits), buf, False)
# https://docs.micropython.org/en/latest/library/os.html#os.AbstractBlockDev.ioctl
def ioctl(self, op, arg): # ioctl calls: see extmod/vfs.h
if op == 3: # SYNCHRONISE
self.sync()
return
if op == 4: # BP_IOCTL_SEC_COUNT
return self._a_bytes >> self._nbits
if op == 5: # BP_IOCTL_SEC_SIZE
return self._block_size
if op == 6: # Ignore ERASE because handled by driver.
return 0
# Hardware agnostic base class for EEPROM arrays
class EepromDevice(BlockDevice):
def __init__(self, nbits, nchips, chip_size, page_size, verbose):
super().__init__(nbits, nchips, chip_size)
# Handle page size arg
if page_size not in (None, 16, 32, 64, 128, 256):
raise ValueError(f"Invalid page size: {page_size}")
self._set_pagesize(page_size) # Set page size
verbose and print("Page size:", self._page_size)
def _psize(self, ps): # Set page size and page mask
self._page_size = ps
self._page_mask = ~(ps - 1)
def get_page_size(self): # For test script
return self._page_size
# Measuring page size should not be done in production code. See docs.
def _set_pagesize(self, page_size):
if page_size is None: # Measure it.
self._psize(16) # Conservative
old = self[:129] # Save old contents (nonvolatile!)
self._psize(256) # Ambitious
r = (16, 32, 64, 128) # Legal page sizes + 256
for x in r:
self[x] = 255 # Write single bytes, don't invoke page write
self[0:129] = b"\0" * 129 # Zero 129 bytes attempting to use 256 byte pages
try:
ps = next(z for z in r if self[z])
except StopIteration:
ps = 256
self._psize(ps)
self[:129] = old
else: # Validated page_size was supplied
self._psize(page_size)
# Hardware agnostic base class for flash memory.
_RDBUFSIZE = const(32) # Size of read buffer for erasure test
class FlashDevice(BlockDevice):
def __init__(self, nbits, nchips, chip_size, sec_size):
super().__init__(nbits, nchips, chip_size)
self.sec_size = sec_size
self._cache_mask = sec_size - 1 # For 4K sector size: 0xfff
self._fmask = self._cache_mask ^ 0x3FFFFFFF # 4K -> 0x3ffff000
self._buf = bytearray(_RDBUFSIZE)
self._mvbuf = memoryview(self._buf)
self._cache = bytearray(sec_size) # Cache always contains one sector
self._mvd = memoryview(self._cache)
self._acache = 0 # Address in chip of byte 0 of current cached sector.
# A newly cached sector, or one which has been flushed, will be clean,
# so .sync() will do nothing. If cache is modified, dirty will be set.
self._dirty = False
def read(self, addr, mvb):
nbytes = len(mvb)
next_sec = self._acache + self.sec_size # Start of next sector
if addr >= next_sec or addr + nbytes <= self._acache:
self.rdchip(addr, mvb) # No data is cached: just read from device
else:
# Some of address range is cached
boff = 0 # Offset into buf
if addr < self._acache: # Read data prior to cache from chip
nr = self._acache - addr
self.rdchip(addr, mvb[:nr])
addr = self._acache # Start of cached data
nbytes -= nr
boff += nr
# addr now >= self._acache: read from cache.
sa = addr - self._acache # Offset into cache
nr = min(nbytes, self._acache + self.sec_size - addr) # No of bytes to read from cache
mvb[boff : boff + nr] = self._mvd[sa : sa + nr]
if nbytes - nr: # Get any remaining data from chip
self.rdchip(addr + nr, mvb[boff + nr :])
return mvb
def sync(self):
if self._dirty:
self.flush(self._mvd, self._acache) # Write out old data
self._dirty = False
return 0
# Performance enhancement: if cache intersects address range, update it first.
# Currently in this case it would be written twice. This may be rare.
def write(self, addr, mvb):
nbytes = len(mvb)
acache = self._acache
boff = 0 # Offset into buf.
while nbytes:
if (addr & self._fmask) != acache:
self.sync() # Erase sector and write out old data
self._fill_cache(addr) # Cache sector which includes addr
offs = addr & self._cache_mask # Offset into cache
npage = min(nbytes, self.sec_size - offs) # No. of bytes in current sector
self._mvd[offs : offs + npage] = mvb[boff : boff + npage]
self._dirty = True # Cache contents do not match those of chip
nbytes -= npage
boff += npage
addr += npage
return mvb
# Cache the sector which contains a given byte addresss. Save sector
# start address.
def _fill_cache(self, addr):
addr &= self._fmask
self.rdchip(addr, self._mvd)
self._acache = addr
self._dirty = False
def initialise(self):
self._fill_cache(0)
# Return True if a sector is erased.
def is_empty(self, addr, ev=0xFF):
mvb = self._mvbuf
erased = True
nbufs = self.sec_size // _RDBUFSIZE # Read buffers per sector
for _ in range(nbufs):
self.rdchip(addr, mvb)
if any(True for x in mvb if x != ev):
erased = False
break
addr += _RDBUFSIZE
return erased