-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdag_evaluator
executable file
·490 lines (435 loc) · 21.7 KB
/
dag_evaluator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
#! /usr/bin/python
import sys
from optparse import OptionParser
import datetime
import math
finish_event_list = []
subtree_max_parallel = {} #record the max_parallel_degree of each subtree of a DAG
origin_task_list = []
origin_task_children_list_dict = {} #each item in it is a <task_id, children_list>
origin_task_dependency_list_dict = {} #each item in it is a <task_id, dependency_list>
#each task should contains the task id, task length, task dependency
#keep tracking waiting_task_list, once the task is ready (all its dependency is ready), put it into ready_task_list;
'''
Time Complexity: O(n), n is the number of tasks.
'''
def initialize_struct_full(filename):
real_cost = 0
with open(filename) as task_file:
task_children_list_dict = {} #each item in it is a <task_id, children_list>
task_dependency_list_dict = {} #each item in it is a <task_id, dependency_list>
waiting_task_list = []
task_length_dict = {}
for line in task_file:
first_space = line.find(' ', 0)
task_id = line[:first_space]
second_space = line.find(' ', first_space + 1)
task_length = line[first_space + 1:second_space]
real_cost += int(task_length)
task_length_dict[task_id] = task_length
right_brace = line.find(']', second_space + 1)
task_dependency = line[second_space + 2:right_brace]
task_dependency = task_dependency.replace(' ', '')
task_dependency_list = task_dependency.split(',')
if task_dependency_list == ['']:
task_dependency_list = []
task_dependency_list_dict[task_id] = task_dependency_list
for dependency in task_dependency_list:
if dependency in task_children_list_dict:
task_children_list_dict[dependency].append( task_id)
else:
task_children_list_dict[dependency] = [task_id]
waiting_task_list.append(task_id)
return real_cost, task_children_list_dict, task_dependency_list_dict, waiting_task_list, task_length_dict
'''
Time Complexity: O(n), n is the number of tasks.
'''
def initialize_struct_half(filename):
task_list = []
with open(filename) as task_file:
task_children_list_dict = {} #each item in it is a <task_id, children_list>
task_dependency_list_dict = {} #each item in it is a <task_id, dependency_list>
waiting_task_list = []
for line in task_file:
first_space = line.find(' ', 0)
task_id = int(line[:first_space])
task_list.append(task_id)
right_brace = line.find(']', first_space + 1)
task_dependency = line[first_space + 2:right_brace]
task_dependency = task_dependency.replace(' ', '')
task_dependency_list = task_dependency.split(',')
if task_dependency_list == ['']:
task_dependency_list = []
else:
temp_list = []
for item in task_dependency_list:
temp_list.append(int(item))
task_dependency_list = temp_list[:]
task_dependency_list_dict[task_id] = task_dependency_list
for dependency in task_dependency_list:
if int(dependency) in task_children_list_dict:
task_children_list_dict[int(dependency)].append(task_id)
else:
task_children_list_dict[int(dependency)] = [task_id]
waiting_task_list.append(task_id)
#task_dependency_list_dict and task_children_list_dict uniquely define a workflow
return task_dependency_list_dict, task_children_list_dict, task_list, waiting_task_list
def insert_finish_event_list(task_id, finish_time):
global finish_event_list
if len(finish_event_list) == 0:
finish_event_list = [{str(finish_time):task_id}]
else:
pos = 0
while len(finish_event_list) > pos and finish_time >= int((finish_event_list[pos]).keys()[0]):
pos = pos + 1
finish_event_list.insert(pos, {str(finish_time):task_id})
'''
Check each task in waiting_task_list, move all the ready tasks into ready_task_list;
Then check each task in the ready_task_list, move all the ready tasks into running_task_list, add the finishing time of each job in the running_task_list into the finish_event_list;
Then jump the time to the first item in finish_event_list, remove all the finishing tasks from running_task_list and task_dependency_list_dict.
Time Complexity: O(n), n is the number of tasks
'''
def schedule_tasks(task_children_list_dict, task_dependency_list_dict, waiting_task_list, task_length_dict):
running_machines = 0
current_time = 0
max_used_machines = 0
time_interval_endpoint_list = [0]
time_interval_width_list = [] #the number of running tasks during the last time interval
ready_task_list = []
running_task_list = []
while waiting_task_list: #waiting_task_list is initialized to be the total task set
#Check each task in waiting_task_list, move all the ready tasks into ready_task_list;
iterator = 0
waiting_to_ready_tasks = []
while iterator < len(waiting_task_list):
task_id = waiting_task_list[iterator]
if not task_dependency_list_dict[task_id]:
ready_task_list.append(task_id)
iterator = iterator + 1
for item in ready_task_list:
if item in waiting_task_list:
waiting_task_list.remove(item)
#Then check each task in the ready_task_list, move all the ready tasks into running_task_list, add the finishing time of each job in the running_task_list into the finish_event_list;
#print "\nAt time %d:" % current_time
while len(ready_task_list) > 0:
first_ready_task = ready_task_list.pop(0)
#print "Start to run task %s, the task length is %s." % (first_ready_task, task_length_dict[first_ready_task])
first_ready_task_length = task_length_dict[first_ready_task]
insert_finish_event_list(first_ready_task, current_time + int(task_length_dict[first_ready_task]))
running_task_list.append(first_ready_task)
running_machines += 1
if running_machines > max_used_machines:
max_used_machines = running_machines
# print "task_children_list_dict", task_children_list_dict
# print "task_length_dict", task_length_dict
# print "task_dependency_list_dict", task_dependency_list_dict
# print "running_task_list", running_task_list
# print "ready_task_list", ready_task_list
# print "waiting_task_list", waiting_task_list
# print "finish_event_list", finish_event_list
#Then jump the time to the first item in finish_event_list, remove all the finishing tasks from running_task_list and task_dependency_list_dict.
last_len_running_task_list = len(running_task_list)
if len(finish_event_list) > 0:
current_time = int((finish_event_list[0]).keys()[0])
next_finish_event_time = current_time
while len(finish_event_list) > 0 and next_finish_event_time == current_time:
first_finish_event = finish_event_list[0]
finish_task_id = first_finish_event[str(current_time)]
#print "Finish task %s at time %d" % (finish_task_id, current_time)
running_task_list.remove(finish_task_id)
running_machines -= 1
finish_event_list.pop(0)
if finish_task_id in task_children_list_dict:
for item in task_children_list_dict[finish_task_id]:
task_dependency_list_dict[item].remove(finish_task_id)
if len(finish_event_list) > 0:
next_finish_event_time = int((finish_event_list[0]).keys()[0])
else:
break
time_interval_endpoint_list.append(current_time)
time_interval_width_list.append(last_len_running_task_list)
return (current_time, max_used_machines, time_interval_endpoint_list, time_interval_width_list)
'''
Time Complexity: O(n), n is the number of tasks
'''
def full_process(filename):
real_cost, task_children_list_dict, task_dependency_list_dict, waiting_task_list, task_length_dict = initialize_struct_full(filename)
(execution_time, max_used_machines, time_interval_endpoint_list, time_interval_width_list) = schedule_tasks(task_children_list_dict, task_dependency_list_dict, waiting_task_list, task_length_dict)
total_cost = max_used_machines * int(execution_time)
idle_cost = total_cost - real_cost
wastage = float(idle_cost) / total_cost
print "Real cost (sum of task lengths): %d" % (real_cost)
print "Max used machines: %d" % max_used_machines
print "Execution time: %s" % execution_time
print "Total cost (machine number * execution time): %d" % (total_cost)
print "Idle cost (total cost - real cost): %d" % idle_cost
print "Wastage (idle cost / total cost): %5.4f" % wastage
time_interval_contribute_list = []
for i in range(0, len(time_interval_width_list)):
contribute = time_interval_width_list[i] * (time_interval_endpoint_list[i+1] - time_interval_endpoint_list[i])
time_interval_contribute_list.append(contribute)
# print "time_interval_endpoint_list", time_interval_endpoint_list
# print "time_interval_width_list", time_interval_width_list
# print "time_interval_contribute_list", time_interval_contribute_list
optimized_machine_number = 0
for i in range(0, len(time_interval_width_list)):
optimized_machine_number += time_interval_width_list[i] * (time_interval_contribute_list[i] / float(real_cost))
optimized_machine_number = int(math.ceil(optimized_machine_number))
print "Optimized Machine Number: ", optimized_machine_number
'''
This function depends on the fact that the task id is given from level 1 to level 2 until the last level and the fact that the task id of each task within the same level increases from left to right.
Calculate the ancestors of each task from the task with the smallest id to the task with the largest id. The ancestors of a task is the union of the ancestors of each parent.
Time Complexity: O(n), n is the number of tasks.
'''
def initialize_ancestor_list(task_dependency_list_dict):
task_ancestors_list_dict = {}
for key in sorted(task_dependency_list_dict.keys()):
task_ancestors_list_dict[key] = []
if len(task_dependency_list_dict[key]) == 0:
task_ancestors_list_dict[key] = []
else:
for item in task_dependency_list_dict[key]:
if item not in task_ancestors_list_dict:
task_ancestors_list_dict[item] = []
#print "task_ancestors_list_dict[item]", task_ancestors_list_dict[item]
task_ancestors_list_dict[key] += task_ancestors_list_dict[item]
task_ancestors_list_dict[key].append(item)
task_ancestors_list_dict[key] = list(set(task_ancestors_list_dict[key]))
return task_ancestors_list_dict
'''
This function depends on the fact that the task id is given from level 1 to level 2 until the last level and the fact that the task id of each task within the same level increases from left to right.
Calculate the descendants of each task from the task with the largest id to the task with the smallest id. The descendants of a task is the union of the descendants of each child.
Time Complexity: O(n), n is the number of tasks.
'''
def initialize_descendants_list(task_children_list_dict):
task_descendants_list_dict = {}
for key in sorted(task_children_list_dict.keys(), reverse=True):
task_descendants_list_dict[key] = []
if len(task_children_list_dict[key]) == 0:
task_descendants_list_dict[key] = []
else:
for item in task_children_list_dict[key]:
if item not in task_children_list_dict:
task_descendants_list_dict[item] = []
if item not in task_descendants_list_dict:
task_descendants_list_dict[item] = []
task_descendants_list_dict[key] += task_descendants_list_dict[item]
task_descendants_list_dict[key].append(item)
task_descendants_list_dict[key] = list(set(task_descendants_list_dict[key]))
return task_descendants_list_dict
'''The direct_relatives of a task is the union of its ancestors and its descendants
Time Complexity: O(n), n is the number of tasks.
'''
def intialize_direct_relatives_list(task_ancestors_list_dict, task_descendants_list_dict):
task_direct_relatives_list_dict = {}
for key in task_ancestors_list_dict:
if key not in task_descendants_list_dict:
task_descendants_list_dict[key] = []
task_direct_relatives_list_dict[key] = list(set(task_ancestors_list_dict[key]) | set(task_descendants_list_dict[key]))
return task_direct_relatives_list_dict
''' Calculate the maximal parallel degree of each task in a workflow
'''
def max_degree(task_dependency_list_dict, task_children_list_dict, task_list, filename):
global subtree_max_parallel
task_list_str = ''
#represent a subtree by the sequence of its task ids
for key in sorted(task_list):
task_list_str += str(key)
#if the max_parallel_degree of a subtree is already calculated and preserved into subtree_max_parallel, directly use the preserved value in subtree_max_parallel as the returned results.
if task_list_str in subtree_max_parallel:
value = subtree_max_parallel[task_list_str]
#print "subtree_max_parallel_value: ", value, "for task_list: ", task_list, "***************"
return value
#calculate the ancestor list of each task, the descendant list of each task, the direct_relatives of each task
task_ancestors_list_dict = initialize_ancestor_list(task_dependency_list_dict)
task_descendants_list_dict = initialize_descendants_list(task_children_list_dict)
task_direct_relatives_list_dict = intialize_direct_relatives_list(task_ancestors_list_dict, task_descendants_list_dict)
# print "task_ancestors_list_dict: ", task_ancestors_list_dict
# print "task_descendants_list_dict: ", task_descendants_list_dict
# print "task_direct_relatives_list_dict", task_direct_relatives_list_dict
max_parallel_tasks_dict = {} #track the maximal parallel tasks set
max_parallel_value = 0 #track the maximal parall tasks number
#for each item in task_list, calculate its indirect_relatives_list.
for item in task_list:
max_parallel_tasks_dict[item] = 0
temp_list = task_list[:] #copy task_list into temp_list, and they are two different lists with the same content.
indirect_relatives_list = list(set(temp_list) - set(task_direct_relatives_list_dict[item]))
conflict_tasks_no = 0
for item1 in sorted(indirect_relatives_list):
temp_list1 = indirect_relatives_list[:]
intersect_list = list(set(temp_list1) & set(task_direct_relatives_list_dict[item1]))
if len(intersect_list) > 0:
conflict_tasks_no += 1
break
if conflict_tasks_no == 0:
max_parallel_tasks_dict[item] = len(indirect_relatives_list)
else:
#construct new task_dependency_list_dict, task_children_list_dict, task_list and recursively call max_degree
temp_indirect_relatives_list = indirect_relatives_list[:]
if item in temp_indirect_relatives_list:
temp_indirect_relatives_list.remove(item)
new_task_list = temp_indirect_relatives_list[:]
new_task_dependency_list_dict, new_task_children_list_dict, origin_task_list, waiting_task_list = initialize_struct_half(filename)
removed_task_list = list(set(origin_task_list) - set(new_task_list))
#for each task in removed_task_list, remove it from task_children_list_dict and task_dependency_list_dict.
for key in removed_task_list:
if key in new_task_children_list_dict:
del new_task_children_list_dict[key]
if key in new_task_dependency_list_dict:
del new_task_dependency_list_dict[key]
for key2 in new_task_children_list_dict:
for item2 in new_task_children_list_dict[key2]:
if item2 in removed_task_list:
new_task_children_list_dict[key2].remove(item2)
for key3 in new_task_dependency_list_dict:
for item3 in new_task_dependency_list_dict[key3]:
if item3 in removed_task_list:
new_task_dependency_list_dict[key3].remove(item3)
max_temp = 1 + max_degree(new_task_dependency_list_dict, new_task_children_list_dict, new_task_list, filename)
if max_temp > max_parallel_tasks_dict[item]:
max_parallel_tasks_dict[item] = max_temp
if max_parallel_tasks_dict[item] > max_parallel_value:
max_parallel_value = max_parallel_tasks_dict[item]
task_list_str = ''
for key in sorted(task_list):
task_list_str += str(key)
subtree_max_parallel[task_list_str] = max_parallel_value
print "max_parallel_value: ", max_parallel_value, "for task_list: ", task_list, "***************"
return max_parallel_value
'''
Time Complexity: O(n), n is the number of tasks
'''
def level_width(task_dependency_list_dict):
level_dict = {}
level = 1 #initialize the level number to be 1
task_num = len(task_dependency_list_dict)
access_flag_list = [0] * task_num
ready_task_list = []
#add all the ready tasks (whose dependency list is empty) int ready_task_list
for key in task_dependency_list_dict.keys():
if not task_dependency_list_dict[key]:
ready_task_list.append(key)
while task_num > 0:
#set the ready_task_list to be the task sets of level i, remove all the tasks of level i from task_dependency_list_dict, then increase the level number
level_dict[level] = []
level_dict[level] = ready_task_list
task_num -= len(ready_task_list)
for item in ready_task_list:
del task_dependency_list_dict[item]
ready_task_list = []
for key in task_dependency_list_dict.keys():
task_dependency_list_dict[key] = list(set(task_dependency_list_dict[key]) - set(level_dict[level]))
if not task_dependency_list_dict[key]:
ready_task_list.append(key)
level += 1
return level_dict
''' Using topological sort to schedule tasks
Time Complexity: O(n), n is the number of tasks
'''
def schedule_tasks_half(task_children_list_dict, task_dependency_list_dict, waiting_task_list):
max_used_machines = 0
ready_task_list = []
iterator = 0
#check each task in waiting_task_list, and move all the ready tasks into ready_task_list
while iterator < len(waiting_task_list):
task_id = waiting_task_list[iterator]
if not task_dependency_list_dict[task_id]:
ready_task_list.append(task_id)
iterator = iterator + 1
for item in ready_task_list:
if item in waiting_task_list:
waiting_task_list.remove(item)
#maintain the maximal parallel running tasks number
if len(ready_task_list) > max_used_machines:
max_used_machines = len(ready_task_list)
# print "ready_task_list", ready_task_list
# print "waiting_task_list", waiting_task_list
# print "max_used_machines", max_used_machines
while ready_task_list:
#remove the first task from ready_task_list and task_dependency_list_dict, and add all the new ready tasks into the ready_task_list
first_ready_task = ready_task_list.pop(0)
if not first_ready_task in task_children_list_dict:
continue
for item in task_children_list_dict[first_ready_task]:
task_dependency_list_dict[item].remove(first_ready_task)
if not task_dependency_list_dict[item]:
ready_task_list.append(item)
del task_dependency_list_dict[item]
if item in waiting_task_list:
waiting_task_list.remove(item)
if len(ready_task_list) > max_used_machines:
max_used_machines = len(ready_task_list)
# print "ready_task_list", ready_task_list
# print "waiting_task_list", waiting_task_list
# print "max_used_machines", max_used_machines
return max_used_machines
def half_process(filename):
origin_task_dependency_list_dict, task_children_list_dict, task_list, waiting_task_list = initialize_struct_half(filename)
#print origin_task_dependency_list_dict, task_children_list_dict, task_list
task_dependency_list_dict = dict(origin_task_dependency_list_dict)
level_dict = level_width(task_dependency_list_dict)
task_dependency_list_dict = dict(origin_task_dependency_list_dict)
max_parallel_task_number = schedule_tasks_half(task_children_list_dict, task_dependency_list_dict, waiting_task_list)
print "Optimized Machine Number (max parallel running tasks under topological sort): ", max_parallel_task_number
def zero_process(filename):
task_dependency_list_dict, task_children_list_dict, task_list, waiting_task_list = initialize_struct_half(filename)
level_dict = level_width(task_dependency_list_dict)
average_dag_width = len(task_list) / float(len(level_dict))
average_dag_width = int(math.ceil(average_dag_width))
dag_width = 0
#print level_dict
for key in level_dict:
print "level ", key, ' has ', len(level_dict[key]), 'tasks: ', level_dict[key]
for key in level_dict:
if len(level_dict[key]) > dag_width:
dag_width = len(level_dict[key])
print "total_task_number: ", len(task_list)
print "dag_height: ", len(level_dict)
print "average_dag_width: ", average_dag_width
print "dag_width: ", dag_width
if dag_width > average_dag_width:
print "Optimized Machine Number (max{dag_width, average_dag_width}): ", dag_width
else:
print "Optimized Machine Number(max{dag_width, average_dag_width}): ", average_dag_width
pass
def zero_brute_force_process(filename):
task_dependency_list_dict, task_children_list_dict, task_list, waiting_task_list = initialize_struct_half(filename)
global origin_task_dependency_list_dict
origin_task_dependency_list_dict = dict(task_dependency_list_dict)
global origin_task_children_list_dict
origin_task_children_list_dict = dict(task_children_list_dict)
global origin_task_list
origin_task_list = task_list[:]
max_parallel_tasks = max_degree(task_dependency_list_dict, task_children_list_dict, task_list, filename)
print "Max Parallel running task number: ", max_parallel_tasks
def main():
#Problem: extension support incremental download
parser = OptionParser(usage="usage: %prog --degree full|half|zero taskfile",
version="%prog 1.0")
parser.add_option("-d", "--degree",
action="store",
help="The user's knowledge about each task length, which can be full, half, zero, zero_brute_force.\nfull: The length of each task is known.\nhalf: The task length is unknown, but the tasks on the same levels takes similar time.\nzero: The task length is unknown, and the tasks on the same level may take different times\nzero_brute_force: The task length is unknown, and the tasks on the same level may take different times, dag_evaluator returns the exactly optimal machine number.")
(options, args) = parser.parse_args()
task_info = options.degree
filename = sys.argv[-1]
start_time = datetime.datetime.now()
print "Starting time: ", start_time
if task_info == 'full':
full_process(filename)
elif task_info == 'half':
half_process(filename)
pass
elif task_info == 'zero':
zero_process(filename)
pass
elif task_info == 'zero_brute_force':
zero_brute_force_process(filename)
pass
else:
sys.exit("Unknown degree option. The degree option can be full, half, or zero.")
end_time = datetime.datetime.now()
print "Finishing time: ", end_time
time_diff = end_time - start_time
print "dag_evaluator takes ", time_diff, "s to evaluate the dag."
if __name__ == "__main__":
main()