-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhomopolymer_finder.py
122 lines (92 loc) · 4.05 KB
/
homopolymer_finder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# coding=utf-8
# Import Libraries
import argparse
import sys
from collections import Counter
class Homopolymer:
# Constructor
def __init__(self, reference_fasta_file, min_homopolymer_length, max_outliers):
self.reference_fasta_file = reference_fasta_file
self.min_homopolymer_length = min_homopolymer_length
self.max_outliers = max_outliers
self.chromosome = ''
self.chr_array = []
self.homopolymer_positions = dict()
# Parse the fasta file
def parse_fasta(self):
with open(self.reference_fasta_file) as f1:
for line in f1:
line = line.rstrip('\n')
# Read Fasta header line
if line.startswith('>'):
# Print homopolymers for previous chromosome
self.find_homopolymers()
for key, value in sorted(self.homopolymer_positions.items(), key=lambda x: x[0]):
print("{}\t{}".format(self.chromosome, key))
# Reset containers
self.chr_array = []
self.homopolymer_positions = dict()
# Read new chromosome name
split_header_line = line.split(" ")
self.chromosome = split_header_line[0][1:]
else:
a = list(line)
self.chr_array += a
# Print homopolymers of the last chromosome
self.find_homopolymers()
for key, value in sorted(self.homopolymer_positions.items(), key=lambda x: x[0]):
print("{}\t{}".format(self.chromosome, key))
# Find homopolymer sequences in a chromosome
def find_homopolymers(self):
position = 1
current_seq = ''
# Parse sequence of chromosome
for i in self.chr_array:
# Store last n nucleotides of reference sequence
if len(current_seq) < self.min_homopolymer_length:
current_seq += i
else:
homopolymer = self.count_max_nucleotide(current_seq)
# If homopolymer, label all positions overlapping homopolymer in dictionary
if homopolymer == 1:
for pos in range(position - self.min_homopolymer_length, position):
self.homopolymer_positions[pos] = homopolymer
# Shift sequence by one nucleotide
current_seq = current_seq[1:]
current_seq += i
position += 1
# Identify the most common nucleotide in sequence and label homopolymers
def count_max_nucleotide(self, sequence):
# Ignore sequences with Ns
if 'N' in sequence:
return 0
# Count most common sequence
wc = Counter(sequence)
most_common_base = wc.most_common(1)[0][1]
# Check if most common nucleotide forms a homopolymer
if most_common_base >= (self.min_homopolymer_length - self.max_outliers):
return 1
else:
return 0
# Run homopolymer finder
def main():
# Read parameters
parser = argparse.ArgumentParser(description='Find all homopolymers of length n in reference sequence (fasta).')
parser.add_argument('-r', '--ref', type=str, help='Reference fasta file.', required=True)
parser.add_argument('-l', '--length', type=int, required=False, default=6,
help='Minimum length of homopolymers to report. Default = 6')
parser.add_argument('-o', '--outlier', type=int, required=False, default=0,
help='Maximum number of outliers (other nucleotides) in homopolymers. Default = 0')
try:
args = parser.parse_args()
except IOError as io:
print(io)
sys.exit('Error reading parameters.')
reference_fasta_file = args.ref
min_homopolymer_length = args.length
max_outliers = args.outlier
# Create Homopolymer object and run parse_fasta method
parser = Homopolymer(reference_fasta_file, min_homopolymer_length, max_outliers)
parser.parse_fasta()
# Run homopolymer finder
main()