forked from SymbioticLab/Tiresias
-
Notifications
You must be signed in to change notification settings - Fork 2
/
log_manager.py
executable file
·155 lines (141 loc) · 5.74 KB
/
log_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import csv
import os
import time
class LogInfo(object):
def __init__(self,
num_idle_nodes,
num_busy_nodes,
num_busy_gpus,
num_idle_gpus,
avg_gpu_utilization,
avg_gpu_memory_allocated,
avg_pending_time,
median_pending_time,
max_pending_time,
num_running_jobs,
num_queuing_jobs,
num_finish_jobs) -> None:
self.idle_ns = num_idle_nodes
self.busy_ns = num_busy_nodes
self.busy_gs = num_busy_gpus
self.idle_gs = num_idle_gpus
self.avg_g_utils = avg_gpu_utilization
self.avg_g_mem = avg_gpu_memory_allocated
self.avg_pending = avg_pending_time
self.median_pending = median_pending_time
self.max_pending = max_pending_time
self.num_running_jobs = num_running_jobs
self.num_queuing_jobs = num_queuing_jobs
self.num_finish_jobs = num_finish_jobs
class LogManager(object):
def __init__(self, log_path, flags):
self.log_path = log_path
self.flags = flags
self.is_count = self.flags.scheme == 'count'
self.cluster_stats_header = [
'delta', 'num_idle_nodes', 'num_busy_nodes',
'num_busy_gpus', 'num_idle_gpus', 'avg_gpu_utilization',
'avg_gpu_memory_allocated', 'avg_pending_time',
'median_pending_time',
'max_pending_time',
'num_running_jobs', 'num_queuing_jobs',
'num_finish_jobs']
self.job_stats_header = [
'job_id',
'num_gpu',
'submit_time',
'start_time',
'end_time',
'original_duration',
'actual_duration',
'jct',
'preempt']
def init(self, infrastructure):
self.log_cluster = os.path.join(self.log_path, 'cluster.csv')
self.log_job = os.path.join(self.log_path, 'job.csv')
if not self.is_count:
self.log_cpu = os.path.join(self.log_path, 'cpu.csv')
self.log_gpu = os.path.join(self.log_path, 'gpu.csv')
self.log_network = os.path.join(self.log_path, 'network.csv')
self.log_mem = os.path.join(self.log_path, 'memory.csv')
self._init_all_csv(infrastructure)
def _init_all_csv(self, infrastructure):
# init cluster log
cluster_log = open(self.log_cluster, 'w+')
writer = csv.DictWriter(cluster_log, self.cluster_stats_header)
writer.writeheader()
cluster_log.close()
# init cpu, gpu, mem, network logs
if not self.is_count:
# 1. cpu
cpu_log = open(self.log_cpu, 'w+')
writer = csv.writer(cpu_log)
writer.writerow(['time']+['cpu' + str(i) for i in range(len(infrastructure.nodes))])
cpu_log.close()
del cpu_log
# 2. gpu
gpu_log = open(self.log_gpu, 'w+')
writer = csv.writer(gpu_log)
writer.writerow(['time']+['gpu' + str(i) for i in range(infrastructure.get_total_gpus())])
gpu_log.close()
del gpu_log
# 3. mem
mem_log = open(self.log_mem, 'w+')
writer = csv.writer(mem_log)
writer.writerow(['time', 'max', '99th', '95th', 'med'])
mem_log.close()
del mem_log
# 4. network
network_log = open(self.log_network, 'w+')
writer = csv.writer(network_log)
titles = []
titles.append('time')
for i in range(len(infrastructure.nodes)):
titles.append('in'+str(i))
titles.append('out'+str(i))
writer.writerow(titles)
network_log.close()
del network_log
del titles
# init jobs log
job_log = open(self.log_job, 'w+')
writer = csv.writer(job_log)
writer.writerow(self.job_stats_header)
job_log.close()
assert os.path.exists(self.log_cluster)
def step_cluster(self, loginfo, delta):
with open(self.log_cluster, 'a+') as f:
writer = csv.DictWriter(f, fieldnames=self.cluster_stats_header)
writer.writerow({
'delta': delta,
'num_idle_nodes': loginfo.idle_ns,
'num_busy_nodes': loginfo.busy_ns,
'num_busy_gpus': loginfo.busy_gs,
'num_idle_gpus': loginfo.idle_gs,
'avg_gpu_utilization': loginfo.avg_g_utils,
'avg_gpu_memory_allocated': loginfo.avg_g_mem,
'avg_pending_time': loginfo.avg_pending,
'median_pending_time': loginfo.median_pending,
'max_pending_time': loginfo.max_pending,
'num_running_jobs': loginfo.num_running_jobs,
'num_queuing_jobs': loginfo.num_queuing_jobs,
'num_finish_jobs': loginfo.num_finish_jobs
})
def jcts(self, finished_jobs):
assert len(finished_jobs) > 0, ValueError("No finished jobs")
with open(self.log_job, 'a+') as f:
writer = csv.DictWriter(f, fieldnames=self.job_stats_header)
for k, j in finished_jobs.items():
writer.writerow({
'job_id': j.job_id,
'num_gpu': j.gpus,
'submit_time': j.submit_time,
'start_time': j.start_time,
'end_time': j.end_time,
'original_duration': j.duration,
'actual_duration': j.get_duration(),
'jct': j.time_processed(),
'preempt': j.migration_count,
})
f.flush()
time.sleep(1)