forked from swiseman/nn_coref
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtext_feats_to_hdf5_replacezero.py
297 lines (254 loc) · 10.1 KB
/
text_feats_to_hdf5_replacezero.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
import numpy as np
import h5py
import multiprocessing as mp
import argparse
def firstpass_worker(task_q, results_q):
"""
gets mention offsets
"""
while True:
d,line = task_q.get() # blocks
if d == -1:
return
ment_starts = []
fields = line.split("|")
ments = int(fields[0])
curr_idx = 2
for i in xrange(1,ments):
for j in xrange(i):
feats = fields[curr_idx].split(" ")
# index where next mention pair's features start
ment_starts.append(len(feats))
curr_idx += 1
curr_idx += 1
results_q.put((d,ment_starts))
def firstpass(fi,num_procs):
"""
establishes document and mention offsets
"""
task_q = mp.Queue(num_procs) # ensure can only put num_procs things on the q
results_q = mp.Queue() # no max size
# start worker procs
for i in xrange(num_procs):
mp.Process(target=firstpass_worker, args=(task_q,results_q)).start()
# assign each line to a worker
num_docs = 0
with open(fi) as f:
for i,line in enumerate(f):
print "assigning doc", i
task_q.put((i,line)) # blocks if no room in the Q
num_docs += 1
# done with the file, so tell workers they can die
for i in xrange(num_procs):
task_q.put((-1,None))
# close the task q
task_q.close()
# collect the results
ment_starts = [[] for i in xrange(num_docs)]
print "merging results..."
for i in xrange(num_docs):
d,doc_ment_starts = results_q.get()
ment_starts[d] = doc_ment_starts
doc_starts = [0]
[doc_starts.append(len(ment_starts[i])) for i in xrange(num_docs)]
# now that ment starts are in order we can put back together
all_ment_starts = [0]
[all_ment_starts.extend(ment_starts[i]) for i in xrange(num_docs)]
return np.cumsum(doc_starts).astype(np.int64), np.cumsum(all_ment_starts).astype(np.int64)
def doc_feats_worker(task_q, results_q):
"""
concatenates all of a document's (binary) features
"""
while True:
d,line = task_q.get() # blocks
if d == -1:
return
fields = line.split("|")
ments = int(fields[0])
doc_feats = []
curr_idx = 2
for i in xrange(1,ments):
for j in xrange(i):
feat_strs = fields[curr_idx].split(" ")
# fill in the features starting at offset
[doc_feats.append(int(fs)) for fs in feat_strs]
curr_idx += 1
curr_idx += 1
results_q.put((d,np.array(doc_feats,dtype=np.int64)))
# this guy will actually make the features
def merge_feats_worker(results_q, doc_starts, ment_starts, feat_pfx, replacement):
"""
puts documents' features in correct order and saves matrix in hd5 format
"""
num_docs = doc_starts.shape[0]-1
feats = np.zeros(ment_starts[-1], dtype=np.int64)
for i in xrange(num_docs):
print "got doc", i, "merging..."
d,nz = results_q.get()
nz[nz == 0] = replacement
feats[ment_starts[doc_starts[d]]:ment_starts[doc_starts[d+1]]] = nz
del nz
print "saving features in hdf5 format..."
h5fi = h5py.File('%s-feats.h5' % (feat_pfx),'w')
h5fi.create_dataset("feats",data=feats,compression="gzip")
h5fi.close()
def secondpass(fi, num_procs, doc_starts, ment_starts, feat_pfx, replacement):
task_q = mp.Queue(num_procs) # ensure can only put num_procs things on the q
results_q = mp.Queue()
# start worker procs
for i in xrange(num_procs):
mp.Process(target=doc_feats_worker, args=(task_q,results_q)).start()
fbp = mp.Process(target=merge_feats_worker, args=(results_q,doc_starts,ment_starts, feat_pfx, replacement))
fbp.start()
# assign each line to a worker
num_docs = 0
with open(fi) as f:
for i,line in enumerate(f):
print "filling in doc", i
task_q.put((i,line)) # blocks if no room in the Q
num_docs += 1
# done with the file, so tell workers they can die
print "tryina kill procs..."
for i in xrange(num_procs):
task_q.put((-1,None))
print "joining last guy..."
fbp.join()
task_q.close()
results_q.close()
####################### for parsing anaphoricity feature files #################
def na_merge_feats_worker(results_q, doc_starts, ment_starts, feat_pfx):
"""
puts documents' features in correct order and saves matrix in hd5 format
"""
num_docs = doc_starts.shape[0]-1
feats = np.zeros(ment_starts[-1], dtype=np.int32)
for i in xrange(num_docs):
print "got doc", i, "merging..."
d,nz = results_q.get()
feats[ment_starts[doc_starts[d]]:ment_starts[doc_starts[d+1]]] = nz
del nz
print "saving features in hdf5 format..."
h5fi = h5py.File('%s-feats.h5' % (feat_pfx),'w')
h5fi.create_dataset("feats",data=feats,compression="gzip")
h5fi.close()
def na_firstpass_worker(task_q, results_q):
"""
gets mention offsets
"""
while True:
d,line = task_q.get() # blocks
if d == -1:
return
ment_starts = []
fields = line.split("|")
ments = int(fields[0])
curr_idx = 1
for i in xrange(ments):
feats = fields[curr_idx].split(" ")
# index where next mention pair's features start
ment_starts.append(len(feats))
curr_idx += 1
results_q.put((d,ment_starts))
def na_firstpass(fi,num_procs):
"""
establishes document and mention offsets
"""
task_q = mp.Queue(num_procs) # ensure can only put num_procs things on the q
results_q = mp.Queue() # no max size
# start worker procs
for i in xrange(num_procs):
mp.Process(target=na_firstpass_worker, args=(task_q,results_q)).start()
# assign each line to a worker
num_docs = 0
with open(fi) as f:
for i,line in enumerate(f):
print "assigning doc", i
task_q.put((i,line)) # blocks if no room in the Q
num_docs += 1
# done with the file, so tell workers they can die
for i in xrange(num_procs):
task_q.put((-1,None))
# close the task q
task_q.close()
# collect the results
ment_starts = [[] for i in xrange(num_docs)]
print "merging results..."
for i in xrange(num_docs):
d,doc_ment_starts = results_q.get()
ment_starts[d] = doc_ment_starts
doc_starts = [0]
[doc_starts.append(len(ment_starts[i])) for i in xrange(num_docs)]
# now that ment starts are in order we can put back together
all_ment_starts = [0]
[all_ment_starts.extend(ment_starts[i]) for i in xrange(num_docs)]
return np.cumsum(doc_starts).astype(np.int32), np.cumsum(all_ment_starts).astype(np.int32)
def na_doc_feats_worker(task_q, results_q, replacement):
"""
concatenates all of a document's (binary) features
"""
while True:
d,line = task_q.get() # blocks
if d == -1:
return
fields = line.split("|")
ments = int(fields[0])
doc_feats = []
curr_idx = 1
for i in xrange(ments):
feat_strs = fields[curr_idx].split(" ")
# fill in the features starting at offset
[doc_feats.append(int(fs)) if fs != "0" else doc_feats.append(replacement) for fs in feat_strs]
curr_idx += 1
results_q.put((d,np.array(doc_feats,dtype=np.int32)))
def na_secondpass(fi, num_procs, doc_starts, ment_starts, feat_pfx, replacement):
task_q = mp.Queue(num_procs) # ensure can only put num_procs things on the q
results_q = mp.Queue()
# start worker procs
for i in xrange(num_procs):
mp.Process(target=na_doc_feats_worker, args=(task_q,results_q,replacement)).start()
fbp = mp.Process(target=na_merge_feats_worker, args=(results_q,doc_starts,ment_starts, feat_pfx))
fbp.start()
# assign each line to a worker
num_docs = 0
with open(fi) as f:
for i,line in enumerate(f):
print "filling in doc", i
task_q.put((i,line)) # blocks if no room in the Q
num_docs += 1
# done with the file, so tell workers they can die
print "tryina kill procs..."
for i in xrange(num_procs):
task_q.put((-1,None))
print "joining last guy..."
fbp.join()
task_q.close()
results_q.close()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("feature_text_file", action="store", help="feature text file to convert to hdf5")
parser.add_argument("pfx", action="store", help="prefix generated hdf5 files with this")
parser.add_argument("mode", choices=("pw","ana"), action="store", help="pairwise or anaphoricity features in text file")
parser.add_argument("-n","--num_procs", type=int, action="store", default=3, dest="num_procs", help="number of additional processes to spawn")
parser.add_argument("-r","--replacement", type=int, action="store", default=-1, dest="replacement", help="replacement feature for zeros")
args = parser.parse_args()
if args.mode == "pw":
feat_pfx = args.pfx + "-pw"
else:
feat_pfx = args.pfx + "-na"
if args.mode == "pw":
doc_starts, ment_starts = firstpass(args.feature_text_file, args.num_procs)
else:
doc_starts, ment_starts = na_firstpass(args.feature_text_file, args.num_procs)
print "saving doc_starts and ment_starts in hdf5 format..."
h5fi = h5py.File('%s-offsets.h5' % (feat_pfx),'w')
h5fi.create_dataset("doc_starts",data=doc_starts)
h5fi.create_dataset("ment_starts",data=ment_starts)
h5fi.close()
print "will now fill in the", ment_starts[-1], "features..."
if args.mode == "pw":
# subtract 1 b/c we'll spawn an additional guy to consume worker output
secondpass(args.feature_text_file, args.num_procs-1, doc_starts, ment_starts, feat_pfx, args.replacement)
else:
na_secondpass(args.feature_text_file, args.num_procs-1, doc_starts, ment_starts, feat_pfx, args.replacement)
if __name__ == "__main__":
main()