-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathslurm_p2g.R
94 lines (79 loc) · 1.88 KB
/
slurm_p2g.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
library(parallel)
doc <- "Convert pythia per-pixel output files to GGCMI NetCDF files, using SLURM
Usage:
slurm_p2g.R <config_file>
slurm_p2g.R (-h | --help)
Options:
-h --help This message.
<config_file> JSON configuration file to run.
Note:
All variables available in pythia2ggcmi.R as automatically run.
"
isValid <- function(config) {
return(TRUE)
}
generateQueue <- function(config) {
return(c())
}
dequeue <- function(queue) {
return(list("value" = queue[1], "queue" = queue[-1]))
}
.testCase <- function(t) {
Sys.sleep(t)
return(t*t)
}
.execute <- function(t) {
return(system2("sleep", args=paste0(t)))
}
runtime <- function(slot.length = 4) {
slots <- rep(NA, slot.length)
queue <- floor(runif(n=30, min=10, max=41))
print(queue)
print("----------")
while(length(queue) > 0) {
slots <- lapply(slots, function(x){
if(!is.list(x) && is.na(x)) {
if (length(queue) > 0) {
qv <- dequeue(queue)
queue <<- qv$queue
return(mcparallel(.execute(qv$value)))
} else {
return(NA)
}
} else {
return(x)
}
})
procs <- mccollect(wait = FALSE)
if(!is.null(procs)) {
print(queue)
procNames <- names(procs)
slots <- lapply(slots, function(x){
if (is.list(x)) {
if (paste0(x$pid) %in% procNames) {
return(NA)
} else {
return(x)
}
} else {
return(x)
}
})
}
}
print("Queue drained. Waiting for final runs.")
mccollect()
return(TRUE)
}
if (sys.nframe() == 0) {
library(docopt)
argv <- docopt(doc)
if (!file.exists(argv$config_file)) {
stop(sprintf("%s not found.", argv$config_file))
}
library(jsonlite)
config <- read_json(argv$config_file)
if (!isValid(config)) {
stop(sprintf("%s is not a valid JSON configuration", argv$config_file))
}
}