forked from snake-flu/type_variants
-
Notifications
You must be signed in to change notification settings - Fork 0
/
type_variants.py
241 lines (183 loc) · 9.33 KB
/
type_variants.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
from Bio import SeqIO
import argparse
import sys
if sys.version_info[0] < 3:
raise Exception("Python 3 or a more recent version is required.")
def get_nuc_position_from_aa_description(cds, aa_pos):
"""
given a CDS (eg. S) and the number of an amino acid in it, get the
1-based start position of that codon in Wuhan-Hu-1 ref coordinates
nuc_pos is an integer which is 1-based start pos of codon
"""
# these coordinates are 1-based
CDS_dict = {"orf1ab": ((266, 13468), (13468, 21555)),
"orf1a": (266, 13468),
"orf1b": (13468, 21555),
"s": (21563, 25384),
"orf3a": (25393, 26220),
"e": (26245, 26472),
"m": (26523, 27191),
"orf6": (27202, 27387),
"orf7a": (27394, 27759),
"orf8": (27894, 28259),
"n": (28274, 29533),
"orf10" : (29558, 29674)}
if cds.lower() not in CDS_dict.keys():
sys.stderr.write("I don't know about cds: %s \n" % cds)
sys.stderr.write("please use one of: %s" % ",".join(CDS_dict.keys()))
sys.exit(1)
if cds.lower() == "orf1ab":
if aa_pos <= 4401:
parsed_cds = "orf1a"
else:
parsed_cds = "orf1b"
aa_pos = aa_pos - 4401
else:
parsed_cds = cds
cds_tuple = CDS_dict[parsed_cds.lower()]
nuc_pos = cds_tuple[0] + ((aa_pos - 1) * 3)
if nuc_pos > cds_tuple[1]:
sys.stderr.write("invalid amino acid position for cds %s : %d" % (cds, aa_pos))
sys.exit(1)
return nuc_pos
def parse_variants_in(csvfilehandle, refseq):
"""
read in a variants file and parse its contents and
return something sensible.
format of mutations csv is:
snp:T6954C
del:11288:9
aa:orf1ab:T1001I
returns variant_list which is a list of dicts of snps, aas and dels,
one dict per variant. format of subdict varies by variant type
"""
variant_list = []
with open(csvfilehandle, "r") as f:
for line in f:
l = line.split("#")[0].strip() # remove comments from the line
if len(l) > 0: # skip blank lines (or comment only lines)
lsplit = l.split(":")
if lsplit[0] == "snp":
type = lsplit[0]
ref_allele = lsplit[1][0]
ref_start = int(lsplit[1][1:-1])
alt_allele = lsplit[1][-1]
ref_allele_check = refseq[ref_start - 1]
if ref_allele != '?' and ref_allele != ref_allele_check:
sys.stderr.write("variants file says reference nucleotide at position %d is %s, but reference sequence has %s\n" %(ref_start, ref_allele, ref_allele_check))
sys.exit(1)
newsnprecord = {"type": type, "ref_start": ref_start, "ref_allele": ref_allele, "alt_allele": alt_allele}
variant_list.append(newsnprecord)
elif lsplit[0] == "aa":
type = lsplit[0]
cds = lsplit[1]
ref_allele = lsplit[2][0]
AA_pos = int(lsplit[2][1:-1])
alt_allele = lsplit[2][-1]
ref_start = get_nuc_position_from_aa_description(cds, AA_pos)
ref_allele_check = refseq[ref_start - 1:ref_start + 2].translate()
if ref_allele != '?' and ref_allele != ref_allele_check:
sys.stderr.write("variants file says reference amino acid in CDS %s at position %d is %s, but reference sequence has %s\n" %(cds, AA_pos, ref_allele, ref_allele_check))
sys.exit(1)
newaarecord = {"type": type, "cds": cds, "ref_start": ref_start, "ref_allele": ref_allele, "alt_allele": alt_allele}
variant_list.append(newaarecord)
elif lsplit[0] == "del":
length = int(lsplit[2])
newdelrecord = {"type": lsplit[0], "ref_start": int(lsplit[1]), "length": length, "ref_allele": refseq[int(lsplit[1]) - 1:int(lsplit[1]) + length - 1]}
variant_list.append(newdelrecord)
else:
sys.stderr.write("couldn't parse the following line in the config file: %s\n" % line)
sys.exit(1)
return(variant_list)
def type_variants(fasta_in, reference, variants_in, variants_out_handle, write_all_variants = False):
reference_seq = list(SeqIO.parse(reference, "fasta"))[0].seq
variant_list = parse_variants_in(variants_in, reference_seq)
variants_out = open(variants_out_handle, "w")
variants_out.write("query,ref_count,alt_count,other_count,fraction_alt")
if write_all_variants:
with open(variants_in, "r") as f:
for line in f:
variants_out.write(",")
variants_out.write(line.strip())
variants_out.write("\n")
with open(fasta_in, "r") as f:
for record in SeqIO.parse(f, "fasta"):
if len(record.seq) != 29903:
sys.stderr.write("The fasta record for %s isn't 29903bp long, is this fasta file aligned to Wuhan-Hu-1?\n" % record.id)
sys.exit(1)
ref_count = 0
alt_count = 0
oth_count = 0
alleles_list = []
for var in variant_list:
if var["type"] == "snp":
query_allele = record.seq.upper()[var["ref_start"] - 1]
if query_allele == var["ref_allele"]:
ref_count += 1
elif query_allele == var["alt_allele"]:
alt_count += 1
else:
oth_count += 1
alleles_list.append(query_allele)
if var["type"] == "aa":
try:
query_allele = record.seq.upper()[var["ref_start"] - 1:var["ref_start"] + 2].translate()
except:
oth_count += 1
alleles_list.append("X")
continue
if query_allele == var["ref_allele"]:
ref_count += 1
elif query_allele == var["alt_allele"]:
alt_count += 1
else:
oth_count += 1
alleles_list.append(str(query_allele))
if var["type"] == "del":
query_allele = record.seq.upper()[var["ref_start"] - 1:var["ref_start"] + var["length"] - 1]
if query_allele == var["ref_allele"]:
ref_count += 1
alleles_list.append("ref")
elif query_allele == "-" * var["length"]:
alt_count += 1
alleles_list.append("del")
else:
oth_count += 1
alleles_list.append("X")
variants_out.write(record.id + ",")
variants_out.write(str(ref_count) + ",")
variants_out.write(str(alt_count) + ",")
variants_out.write(str(oth_count) + ",")
variants_out.write(str(round(alt_count / (alt_count + ref_count + oth_count), 4)))
if write_all_variants:
variants_out.write(",")
variants_out.write(",".join(alleles_list))
variants_out.write("\n")
variants_out.close()
pass
def parse_args():
parser = argparse.ArgumentParser(description="""Type an alignment in Wuhan-Hu-1 coordinates for variants defined in a config file
If you have a consensus fasta file containing sequences that haven't been aligned to Wuhan-Hu-1, you can make an alignment to feed to this python script using minimap2, the latest version of gofasta and the reference fasta file:
minimap2 -a -x asm5 MN908947.fa unaligned.fasta | gofasta sam toMultiAlign --reference MN908947.fa > aligned.fasta
""",
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('--fasta-in', dest = 'fasta_in', help='alignment to type, in fasta format')
parser.add_argument('--variants-config', dest = 'variants_in', help="""config file containing variants to type. Format is like:
snp:T6954C
del:11288:9
aa:orf1ab:T1001I
snp and del positions are 1-based nucleotide position relative to the alignment
aa position is 1-based codon position relative to the cds
No header line or comment lines are permitted""")
parser.add_argument('--reference', help='Wuhan-Hu-1 in fasta format (for typing the reference allele at deletions and sanity checking the config file)')
parser.add_argument('--variants-out', dest = 'variants_out', help='csv file to write')
parser.add_argument('--append-genotypes', dest = 'append_genotypes', action = 'store_true', help='if invoked, write the genotype for each variant in the config file to the output')
args = parser.parse_args()
return args
if __name__ == '__main__':
args = parse_args()
type_variants(fasta_in = args.fasta_in,
reference = args.reference,
variants_in = args.variants_in,
variants_out_handle = args.variants_out,
write_all_variants = args.append_genotypes)