-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathd_process_dispatcher.py
68 lines (47 loc) · 1.87 KB
/
d_process_dispatcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# Distributed Processing Module
__author__ = 'Sandesh'
from CustomClasses.ATree import *
from celery import Celery
import glob
from itertools import product
import subprocess
def manage_process_task():
async_result = []
print("Acquiring Chromosome Data")
for name in glob.glob('GenomeDataset/Chromosomes/*.fa'):
print("Throwing" + name + "at Cluster")
async_result.append(app.send_task("d_process_task.process", args=(name,)))
# async_result.append(app.send_task("d_process_task.process", args=(name,)))
total = 0
for key in async_result:
if key.ready:
total += key.get()
print(total)
subprocess.call(["ssh", "[email protected]", "./d_sync.sh"])
def manage_unique_pattern_generation_task(depth):
unique_patterns = []
range_val = 2 * depth
for length in range(1, range_val):
upg_async_result = [app.send_task("d_process_task.unique_pattern_generation", args=(length,))]
for key in upg_async_result:
if key.ready:
unique_patterns.append(key.get())
print(unique_patterns)
def analysis():
filelist = []
for name in glob.glob('GenomeDataset/Processing/*pTree'):
filelist.append(name)
pattern_list = ["".join(x) for i in range(1, 9) for x in product(*['ATGC'] * i)]
pattern_list.sort()
count_matrix = {}
result = {pattern: [] for pattern in pattern_list}
for infile in filelist:
with open(infile, 'rb') as in_fh:
new_tree = pickle.load(in_fh)
for pattern in pattern_list:
result[pattern].append(new_tree.count(pattern))
print("-->" + pattern + ": " + str(new_tree.count(pattern)))
if __name__ == '__main__':
app = Celery('d_process_task', broker='redis://192.168.6.4:6379/0', backend='redis://192.168.6.4:6379/0')
# manage_unique_pattern_generation_task(4)
manage_process_task()