-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathCallConsensus.py
64 lines (53 loc) · 1.92 KB
/
CallConsensus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from Bio import SeqIO, AlignIO
from Bio.SeqIO import SeqRecord
from Bio.Seq import Seq
from sys import argv, stderr
from re import compile
from collections import defaultdict
from os import path, makedirs
from subprocess import Popen
from collections import Counter
aligner = compile("FastML-[^/]*")
tcoffee = (
"t_coffee -seq {dir}/{node}.fasta -outfile {dir}/{node}.out.aln -newtree {dir}/{node}.tree"
)
def get_consensus(aln_file):
aln = AlignIO.read(aln_file, "clustal")
out_bases = []
default_seq = [rec for rec in aln if rec.id.endswith("tcoffee")][0]
for base in range(len(aln[0])):
bases = Counter(aln[:, base])
if len(bases) == 1:
out_bases.append(bases.most_common(1)[0][0])
continue
best, second = bases.most_common(2)
if best[1] > second[1]:
out_bases.append(best[0])
else:
print("Ambiguous base: {} {} {}".format(aln_file, base, bases), file=stderr)
out_bases.append(default_seq[base])
return "".join(out_bases)
if __name__ == "__main__":
out_fname = argv[1]
outdir = path.dirname(out_fname)
in_fnames = argv[2:]
all_seqs = defaultdict(list)
for fname in in_fnames:
prog = aligner.findall(fname)[0]
for rec in SeqIO.parse(fname, "fasta"):
old_id = rec.id
rec.id = rec.id + "_" + prog
all_seqs[old_id].append(rec)
tempout = path.join(outdir, "tempseqs")
makedirs(tempout, exist_ok=True)
jobs = []
for node, seqs in all_seqs.items():
SeqIO.write(seqs, path.join(tempout, node + ".fasta"), "fasta")
jobs.append(Popen(tcoffee.format(node=node, dir=tempout).split()))
for job in jobs:
job.wait()
out_recs = []
for node in all_seqs:
seq = get_consensus(path.join(tempout, node + ".out.aln"))
out_recs.append(SeqRecord(seq=Seq(seq), id=node))
SeqIO.write(out_recs, out_fname, "fasta")