This repository has been archived by the owner on Sep 22, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathHuffDec12.py
276 lines (241 loc) · 9.72 KB
/
HuffDec12.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import os, sys, struct, zlib
class Error(Exception): pass
def cwDec(w): # Convert 16-bit value to string codeword
return bin(0x10000 | w).rstrip('0')[3:-1]
def cwEnc(cw): # Convert string codeword to 16-bit value
return int((cw+'1').ljust(16, '0'), 2)
#***************************************************************************
#***************************************************************************
#***************************************************************************
def HuffTabReader_bin(ab):
fmtRec = struct.Struct("<HB")
o = 0
while o < len(ab):
w, cb = fmtRec.unpack_from(ab, o)
o += fmtRec.size
v = ab[o:o+cb]
assert len(v) == cb
o += cb
yield(cwDec(w), cb, v)
def HuffTabPack_bin(d):
r = []
for cw in sorted(d.keys())[::-1]:
v = d[cw]
if v is None: continue
r.append(struct.pack("<HB", cwEnc(cw), len(v)) + v)
return "".join(r)
#***************************************************************************
#***************************************************************************
#***************************************************************************
def HuffTabReader_text(ab):
for ln in ab.splitlines():
a = ln.strip().split()
if len(a) < 1: continue # Skip empty lines
cw = a[0] # String representation of Huffman codeword
v = a[1] if len(a) > 1 else None # Huffman sequence value
if v is None: # Not defined
cb = None
elif v.startswith("??"): # Sequence length is known
cb = len(v)/2
v = None
else: # Value is known
v = v.decode("hex")
cb = len(v)
yield(cw, cb, v)
def HuffTabPack_text(dLen, d, mode=None):
if mode is None: mode = HuffDecoder.DUMP_ALL
r = []
for cw in sorted(dLen.keys())[::-1]:
cb = dLen[cw]
if cb is None and mode in (HuffDecoder.DUMP_KNOWN, HuffDecoder.DUMP_LEN): continue # Ignore if sequence length is not known
v = d.get(cw, None)
if v is None:
if HuffDecoder.DUMP_KNOWN == mode: continue # Ignore if sequence is not known
v = "" if cb is None else "??"*cb
else: v = v.encode("hex").upper()
pad = '\t'*(2 - len(cw)/8)
r.append("%s%s%s" % (cw, pad, v))
return "\n".join(r)
#***************************************************************************
#***************************************************************************
#***************************************************************************
def HuffTab_extendLen(dLen, extShape=False):
shape = []
aCW = sorted(dLen.keys())[::-1]
minBits, maxBits = len(aCW[0]), len(aCW[-1])
aCW.append('0'*(maxBits+1)) # Longer than max
nBits = minBits # Current length
e = int(aCW[0], 2)|1 # End value for current length
for o in xrange(1, len(aCW)):
nextBits = len(aCW[o])
if nextBits == nBits: continue # Run until length change
assert nextBits > nBits # Length must increase
s = int(aCW[o-1], 2) # Start value for current length
for i in xrange(s, e+1):
cw = bin(i)[2:].zfill(nBits)
if cw not in dLen: dLen[cw] = None
e = int(aCW[o], 2)|1 # End value for next length
shape.append([(x << (16-nBits)) for x in xrange(s, e/2, -1)])
if extShape:
for i in xrange(s-1, e/2, -1):
cw = bin(i)[2:].zfill(nBits)
if cw not in dLen: dLen[cw] = None
nBits = nextBits
return shape
#***************************************************************************
#***************************************************************************
#***************************************************************************
class HuffNode(object):
def __init__(self, cw, hd):
self.cw = cw # String codeword value
# self.w = cwEnc(cw) # Encoded codeword value
if hd:
self.nBits = len(cw) # Length of codeword in bits
self.cb = hd.dLen.get(cw, None)
self.av = [d.get(cw, None) for d in hd.adTab]
else:
self.nBits = None # Actual length of codeword is unknown
#***************************************************************************
#***************************************************************************
#***************************************************************************
class HuffDecoder(object):
NAMES = ("Code", "Data")
DUMP_KNOWN = 0
DUMP_LEN = 1
DUMP_ALL = 2
dPrefix = {DUMP_KNOWN:"kno", DUMP_LEN:"len", DUMP_ALL:"all"}
fmtInt = struct.Struct("<L")
baseDir = os.path.split(__file__)[0]
BLOCK_SIZE = 0x1000 # 4K bytes
def __init__(self, ver=None):
names = self.NAMES if ver is None else tuple("%s%d" % (n, ver) for n in self.NAMES)
try:
self.loadTables(names) # Load from text version
# with open("huff11.bin", "wb") as fo: fo.write(zlib.compress(self.packTables(), 9)[2:-4])
except:
with open(os.path.join(self.baseDir, "huff11.bin"), "rb") as fi: self.unpackTables(zlib.decompress(fi.read(), -15)) # Load from compressed version
self.prepareMap()
def loadTable(self, items):
sv = set() # Set for values
d = {}
for cw, cb, v in items:
if cw in d: raise Error("Codeword %s already defined" % cw)
if cb is None: continue
cbKnown = self.dLen.get(cw, None)
if cbKnown is None: self.dLen[cw] = cb
elif cb != cbKnown: raise Error("Codeword %s sequence length %d != known %d" % (cw, cb, cbKnown))
if v is None: continue
assert len(v) == cb
d[cw] = v # Remember value
# if v in sv: raise Error("Value %s already present" % v.encode("hex"))
sv.add(v)
self.adTab.append(d)
def unpackTables(self, ab):
n, = self.fmtInt.unpack_from(ab)
o = self.fmtInt.size
self.dLen, self.adTab = {}, []
for i in xrange(n):
cb, = self.fmtInt.unpack_from(ab, o)
o += self.fmtInt.size
data = ab[o:o+cb]
assert len(data) == cb
o += cb
self.loadTable(HuffTabReader_bin(data))
def packTables(self):
r = [self.fmtInt.pack(len(self.adTab))]
for d in self.adTab:
ab = HuffTabPack_bin(d)
r.append(self.fmtInt.pack(len(ab)) + ab)
return "".join(r)
def loadTables(self, names=None):
if names is None: names = self.NAMES
self.dLen, self.adTab = {}, []
for name in names:
with open(os.path.join(self.baseDir, "%s.txt" % name)) as fi:
self.loadTable(HuffTabReader_text(fi.read()))
def saveTables(self, mode=None, names=None):
if mode is None: mode = self.DUMP_ALL
if names is None: names = self.NAMES
dLen = self.dLen.copy()
HuffTab_extendLen(dLen)
for name,d in zip(names, self.adTab):
with open(os.path.join(self.baseDir, "%s%s.txt" % (self.dPrefix[mode], name)), "w") as fo:
print >>fo, HuffTabPack_text(dLen, d, mode)
def propagateMap(self, node):
cw = node.cw
for idx in xrange(int(cw[::-1], 2), len(self.aMap), 1<<len(cw)):
assert self.aMap[idx] is None
self.aMap[idx] = node
def prepareMap(self):
aCW = sorted(self.dLen.keys())[::-1]
minBits, maxBits = len(aCW[0]), len(aCW[-1])
self.mask = (1 << maxBits) - 1
self.aMap = [None]*(1<<maxBits) # 2**maxBits map
aCW.append('0'*(maxBits+1)) # Longer than max
nBits = minBits # Current length
e = int(aCW[0], 2)|1 # End value for current length
for o in xrange(1, len(aCW)):
nextBits = len(aCW[o])
if nextBits == nBits: continue # Run until length change
assert nextBits > nBits # Length must increase
s = int(aCW[o-1], 2) # Start value for current length
for i in xrange(s, e+1):
cw = bin(i)[2:].zfill(nBits)
self.propagateMap(HuffNode(cw, self))
e = int(aCW[o], 2)|1 # End value for next length
for i in xrange(e/2 + 1, s): # Handle values with unknown codeword length
cw = bin(i)[2:].zfill(nBits)
self.propagateMap(HuffNode(cw, None))
nBits = nextBits
for v in self.aMap: assert v is not None
def enumCW(self, ab):
v = int(bin(int("01"+ab.encode("hex"), 16))[3:][::-1], 2) # Reversed bits
cb = 0
while cb < self.BLOCK_SIZE: # Block length
node = self.aMap[v & self.mask]
if node.nBits is None: raise Error("Unknown codeword %s* length" % node.cw)
yield node
v >>= node.nBits
if node.cb is not None: cb += node.cb
def decompressChunk(self, ab, iTab):
r = []
cb = 0
for node in self.enumCW(ab):
v = node.av[iTab]
if v is None: raise Error("Unknown sequence for codeword %s in table #%d" % (node.cw, iTab))
r.append(v)
cb += len(v)
if cb >= self.BLOCK_SIZE: break
return "".join(r)
def decompress(self, ab, length):
nChunks, left = divmod(length, self.BLOCK_SIZE)
assert 0 == left
aOfs = list(struct.unpack_from("<%dL" % nChunks, ab))
aOpt = [0]*nChunks
for i in xrange(nChunks):
aOpt[i], aOfs[i] = divmod(aOfs[i], 0x40000000)
base = nChunks*4
aOfs.append(len(ab) - base)
r = []
for i, opt in enumerate(aOpt):
iTab, bCompr = divmod(opt, 2)
assert 1 == bCompr
unpacked = self.decompressChunk(ab[base + aOfs[i]: base + aOfs[i+1]], iTab)
assert len(unpacked) == self.BLOCK_SIZE
r.append(unpacked)
return "".join(r)
#***************************************************************************
#***************************************************************************
#***************************************************************************
def main(argv):
hd = HuffDecoder()
# with open("huff11.bin", "wb") as fo: fo.write(zlib.compress(hd.packTables(), 9)[2:-4])
# with open("all.bin", "wb") as fo: fo.write(hd.packTables())
# for mode in (HuffDecoder.DUMP_KNOWN, HuffDecoder.DUMP_LEN, HuffDecoder.DUMP_ALL): hd.saveTables(mode)
hd.prepareMap()
for fn in argv[1:]:
with open(fn, "rb") as fi: ab = fi.read()
nChunks, = struct.unpack_from("<L", ab)
ab = hd.decompress(ab[4:], nChunks * hd.BLOCK_SIZE)
with open(os.path.splitext(fn)[0] + ".mod", "wb") as fo: fo.write(ab)
if __name__=="__main__": main(sys.argv)