forked from XiaoLabJHU/GRN_Inference
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGRN_inference_and_AUPR_calc.py
206 lines (163 loc) · 7.82 KB
/
GRN_inference_and_AUPR_calc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
"""
AUPR calc for various GRN inference algorithms on DREAM3/4 data using GeneNetWeaver generated data.
Calculate MI->Inference->AUPR
Written by: Lior Shachaf
2021-08-30
"""
import os
import numpy as np
import Building_MI_matrices
import Precision_Recall_module as PR_module
def get_topology_list(test=False):
"""
Get a list of topologies in the current directory.
Parameters:
test (bool): If True, only include topologies containing "50". Default is False.
Returns:
list: A list of topologies (directories) in the current directory.
"""
topology_list = []
for topology in os.listdir():
if test and "50" not in topology:
continue
if os.path.isdir(topology):
topology_list.append(topology)
return topology_list
def get_replicate_list(file, test=False):
"""
Get a list of replicates in the current directory that contain the specified file.
Parameters:
file (str): The file name to check for in each replicate directory.
test (bool): If True, only include the "rep_1" replicate. Default is False.
Returns:
list: A list of replicates (directories) in the current directory that contain the specified file.
"""
replicate_list = []
for replicate in os.listdir():
if test and replicate != "rep_1":
continue
if os.path.isdir(replicate) and os.path.exists(os.path.join(replicate, file)):
replicate_list.append(replicate)
return replicate_list
def get_network_details(topology):
"""
Get the details of a network from its topology.
Parameters:
topology (str): The topology of the network.
Returns:
tuple: A tuple containing the network size and network type.
"""
if "-" in topology:
network_size = topology.split("-")[0].replace("InSilicoSize", '')
network_type = topology.split("-")[1]
elif "_" in topology:
network_size = topology.split("_")[1].replace("size", '')
network_type = topology.split("_")[2]
else:
network_size = None
network_type = None
return network_size, network_type
def process_grn(mi_est, infer_algo, bins_or_neighbors, k_max, file):
"""
Process the Gene Regulatory Network (GRN) by calculating mutual information (MI) and total correlation (TC).
This function imports gene expression data, calculates MI2 (and optionally TC), and saves the results to files.
It handles different mutual information estimators and inference algorithms.
Parameters:
mi_est (str): Mutual information estimator ("Shannon", "Miller-Madow", "KSG", "KL").
infer_algo (str): Inference algorithm to use.
bins_or_neighbors (int): Number of bins or neighbors.
k_max (int): Maximum number of neighbors for k-nearest neighbors (k-NN) methods.
file (str): Path to the input gene expression data file.
Raises:
ValueError: If an invalid mutual information estimator is provided.
Returns:
None
"""
# kNN MI2 and TC are calculated for all k=1..k_max, so no need to calculate again for every k
mi_est_string = {
"Shannon": f"FB{bins_or_neighbors}_Shan",
"Miller-Madow": f"FB{bins_or_neighbors}_MM",
"KSG": f"KNN{k_max}_KSG",
"KL": f"KNN{k_max}_KL"
}.get(mi_est, None)
if mi_est_string is None:
raise ValueError(f"Invalid mutual information estimator: {mi_est}")
# Importing gene expression data file generated by GeneNetWeaver.
input1_data_array = Building_MI_matrices.import_and_clean_input_file(file)
# Calc MI2 (and optional TC) and save to file
MI2_filename = f"{file[:-4]}_MI2_{mi_est_string}.mat"
if MI2_filename not in ''.join(os.listdir()):
print(f"Building {MI2_filename} file")
Building_MI_matrices.mi2_matrix_build(file, input1_data_array, bins_or_neighbors, mi_est)
if grn_dimensions[infer_algo] == 3:
TC_filename = f"{file[:-4]}_TC_{mi_est_string}.mat"
if TC_filename not in ''.join(os.listdir()):
print(f"Building {TC_filename} file")
Building_MI_matrices.tc_matrix_build(file, input1_data_array, bins_or_neighbors, mi_est)
# For KNN we calculate MI quantities for k=1..k_max, but we write a DB file per k
if mi_est in ["KSG", "KL"]:
mi_est_string = f"KNN{bins_or_neighbors}_{mi_est}"
# Write DB for MI2 (and optional TC) and save to file
MI2_DB_filename = f"{file[:-4]}_MI2_{mi_est_string}.dat"
if MI2_DB_filename not in ''.join(os.listdir()):
print(f"Building {MI2_DB_filename} file")
Building_MI_matrices.generate_mi_db_output(MI2_filename, bins_or_neighbors, mi_est)
if grn_dimensions[infer_algo] == 3:
TC_DB_filename = f"{file[:-4]}_MI2andTC_{mi_est_string}.dat"
if TC_DB_filename not in ''.join(os.listdir()):
print(f"Building {TC_DB_filename} file") # Debug
Building_MI_matrices.generate_mi_db_output(
TC_filename, bins_or_neighbors, mi_est, include_tc=True)
# Constants
binning_rule_list = ["Sqrt", "range"] # or "Sturges"
binning_rule = binning_rule_list[1]
mi_est_list = ["Shannon", "Miller-Madow", "KSG", "KL"]
mi_est = mi_est_list[-1]
infer_algo_list = ["RL", "CLRvMinet", "CMIA_CLR_vKSG", "ARACNE", "SA_CLR_v2"]
infer_algo = infer_algo_list[1]
grn_dimensions = {"RL": 2, "CLRvMinet": 2, "CMIA_CLR_vKSG": 3, "ARACNE": 2, "SA_CLR_v2": 3}
expression_data_type = "SS"
dataset = "dream3" # "dream4"
# Move to data folder
path_to_data = os.path.abspath(f"../DATA/{dataset}/")
os.chdir(path_to_data)
# listing all in silico networks
topology_list = get_topology_list()
# Main part - iterating over all networks and all replicates
for top_counter, topology in enumerate(topology_list):
os.chdir(f"{topology}")
file = f"{topology}_{expression_data_type}_all.tsv"
replicate_list = get_replicate_list(file)
for rep_counter, replicate in enumerate(replicate_list):
os.chdir(f"./{replicate}")
print(topology, replicate, file) # Debug
# calculating number of bins to use based on some rule (for binning methods)
if mi_est in ["Shannon", "Miller-Madow"]:
min_range, max_range, interval = Building_MI_matrices.calc_bins_num(file, binning_rule,
min_range=12, max_range=14)
elif mi_est in ["KSG", "KL"]:
min_range, max_range, interval = 1, 3, 1
# open AUPR output file for summary statistics
output_filename = f"AUPR_{topology}_{expression_data_type}_{mi_est}_{infer_algo}.dat"
if output_filename in ''.join(os.listdir()) and os.path.getsize(output_filename):
print("AUPR output file already exists & file isn't empty")
else: # file is empty or doesn't exist
output_file = open(output_filename, "w")
AUPR_array = np.zeros(len(range(min_range, max_range + 1, interval)), dtype=float)
for idx, bins_or_neighbors in enumerate(range(max_range, min_range - 1, -interval)):
process_grn(mi_est=mi_est, infer_algo=infer_algo, bins_or_neighbors=bins_or_neighbors,
k_max=max_range, file=file)
# PR & AUPR
mi_est_full = [bins_or_neighbors, mi_est]
AUPR_array[idx] = PR_module.AUPR_calc(PR_module.PR_per_infer_algo(
topology, expression_data_type, mi_est_full, infer_algo))
# writing output file for summary statistics
aupr_value = AUPR_array[idx]
network_size, network_type = get_network_details(topology)
output_file.write(
f"{aupr_value:.3f},{network_size},{network_type},{expression_data_type},"
f"{mi_est},{bins_or_neighbors},{infer_algo},{replicate}\n")
output_file.close()
print("Done", replicate, file)
os.chdir('../')
os.chdir('../')