Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelisation support for analyse #435

Merged
merged 12 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .lintr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
linters: with_defaults(
linters: linters_with_defaults(
line_length_linter(120),
object_name_linter = NULL
object_name_linter = NULL,
indentation_linter(indent = 4L)
)
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export(has_class)
export(impute)
export(locf)
export(longDataConstructor)
export(make_rbmi_cluster)
export(method_approxbayes)
export(method_bayes)
export(method_bmlmi)
Expand Down
113 changes: 106 additions & 7 deletions R/analyse.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,58 @@
#' @param delta A `data.frame` containing the delta transformation to be applied to the imputed
#' datasets prior to running `fun`. See details.
#' @param ... Additional arguments passed onto `fun`.
#' @param ncores The number of parallel processes to use when running this function. Can also be a
#' cluster object created by [`make_rbmi_cluster()`]. See the parallisation section below.
#'
#' @section Parallelisation:
#' To speed up the evaluation of `analyse()` you can use the `ncores` argument to enable parallelisation.
#' Simply providing an integer will get rbmi to automatically spawn that many background processes
#' to parallelise across. If you are using a custom analysis function then you need to ensure
#' that any libraries or global objects required by your function are available in the
#' sub-processes. To do this you need to use the [`make_rbmi_cluster()`] function for example:
#' ```
#' my_custom_fun <- function(...) <some analysis code>
#' cl <- make_rbmi_cluster(
#' 4,
#' objects = list("my_custom_fun" = my_custom_fun),
#' packages = c("dplyr", "nlme")
#' )
#' analyse(
#' imputations = imputeObj,
#' fun = my_custom_fun,
#' ncores = cl
#' )
#' parallel::stopCluster(cl)
#' ```
#'
#' Note that there is significant overhead both with setting up the sub-processes and with
#' transferring data back-and-forth between the main process and the sub-processes. As such
#' parallelisation of the `analyse()` function tends to only be worth it when you have
#' `> 2000` samples generated by [`draws()`]. Conversely using parallelisation if your samples
#' are smaller than this may lead to longer run times than just running it sequentially.
#'
#' Finally, if you are doing a tipping point analysis you can get a reasonable performance
#' improvement by re-using the cluster between each call to `analyse()` e.g.
#' ```
#' cl <- make_rbmi_cluster(4)
#' ana_1 <- analyse(
#' imputations = imputeObj,
#' delta = delta_plan_1,
#' ncores = cl
#' )
#' ana_2 <- analyse(
#' imputations = imputeObj,
#' delta = delta_plan_2,
#' ncores = cl
#' )
#' ana_3 <- analyse(
#' imputations = imputeObj,
#' delta = delta_plan_3,
#' ncores = cl
#' )
#' parallel::clusterStop(cl)
#' ```
#'
#' @examples
#' \dontrun{
#' vars <- set_vars(
Expand Down Expand Up @@ -119,7 +171,13 @@
#' )
#' }
#' @export
analyse <- function(imputations, fun = ancova, delta = NULL, ...) {
analyse <- function(
imputations,
fun = ancova,
delta = NULL,
...,
ncores = 1
) {

validate(imputations)

Expand Down Expand Up @@ -152,14 +210,55 @@ analyse <- function(imputations, fun = ancova, delta = NULL, ...) {
)
}

results <- lapply(
imputations$imputations,
function(x, ...) {
dat2 <- extract_imputed_df(x, imputations$data, delta)
fun(dat2, ...)
# Mangle name to avoid any conflicts with user defined objects if running
# in a cluster
..rbmi..analysis..imputations <- imputations
..rbmi..analysis..delta <- delta
..rbmi..analysis..fun <- fun
cl <- make_rbmi_cluster(
ncores,
objects = list(
"..rbmi..analysis..imputations" = imputations,
"..rbmi..analysis..delta" = delta,
"..rbmi..analysis..fun" = fun
)
)

# If the user provided the clusters object directly then do not close it on completion
if (!is(ncores, "cluster")) {
on.exit(
{ if (!is.null(cl)) parallel::stopCluster(cl) },
add = TRUE,
after = FALSE
)
}

# Chunk up requests for significant speed improvement when running in parallel
number_of_cores <- ifelse(is.null(cl), 1, length(cl))
indexes <- seq_along(imputations$imputations)
indexes_split <- split(indexes, (indexes %% number_of_cores) + 1)

results <- par_lapply(
cl,
function(indicies, ...) {
inner_fun <- function(idx, ...) {
dat2 <- extract_imputed_df(
..rbmi..analysis..imputations$imputations[[idx]],
..rbmi..analysis..imputations$data,
..rbmi..analysis..delta
)
..rbmi..analysis..fun(dat2, ...)
}
lapply(indicies, inner_fun, ...)
},
indexes_split,
...
)
) |>
unlist(recursive = FALSE, use.names = FALSE)

# Reorder
results <- results[order(unlist(indexes_split, use.names = FALSE))]
names(results) <- NULL

fun_name <- deparse(substitute(fun))
if (length(fun_name) > 1) {
Expand Down
34 changes: 23 additions & 11 deletions R/draws.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
#' [method_approxbayes()], [method_condmean()] or [method_bmlmi()].
#' It specifies the multiple imputation methodology to be used. See details.
#' @param ncores A single numeric specifying the number of cores to use in creating the draws object.
#' Note that this parameter is ignored for [method_bayes()] (Default = 1).
#' Note that this parameter is ignored for [method_bayes()] (Default = 1). Can also be a cluster object
#' generated by [`make_rbmi_cluster()`]
#' @param quiet Logical, if `TRUE` will suppress printing of progress information that is printed to
#' the console.
#'
Expand Down Expand Up @@ -342,27 +343,40 @@ get_draws_mle <- function(
}


cl <- get_cluster(ncores)
mmrm_sample <- encap_get_mmrm_sample(cl, longdata, method)
cl <- make_rbmi_cluster(ncores, objects = list("longdata" = longdata, "method" = method))

# If the user provided the clusters object directly then do not close it on completion
if (!is(ncores, "cluster")){
on.exit(
{ if (!is.null(cl)) parallel::stopCluster(cl) },
add = TRUE,
after = FALSE
)
}

# browser()
# get_mmrm_sample
# mmrm_sample(ids)
# clusterEvalQ(cl, fit_mmrm)

# Encapsulate arguments into a single function on `ids` and handle parallelisation
par_get_mmrm_sample <- function(ids) {
par_lapply(
cl,
get_mmrm_sample,
ids,
longdata = longdata,
method = method
)
}

samples <- list()
n_failed_samples <- 0
logger <- progressLogger$new(n_target_samples, quiet = quiet)

while (length(samples) < n_target_samples) {
ids <- sample_stack$pop(min(ncores, n_target_samples - length(samples)))
new_samples <- mmrm_sample(ids)
new_samples <- par_get_mmrm_sample(ids)
isfailure <- vapply(new_samples, function(x) x$failed, logical(1))
new_samples_keep <- new_samples[!isfailure]
n_failed_samples <- n_failed_samples + sum(isfailure)
if (n_failed_samples > failure_limit) {
if (!is.null(cl)) parallel::stopCluster(cl)
if (!is.null(method$type)) {
if(method$type == "jackknife"){
ids_fail <- ids[isfailure][[1]]
Expand All @@ -378,8 +392,6 @@ get_draws_mle <- function(
samples <- append(samples, new_samples_keep)
}

if (!is.null(cl)) parallel::stopCluster(cl)

assert_that(
length(samples) == n_target_samples,
msg = "Incorrect number of samples were produced"
Expand Down
140 changes: 87 additions & 53 deletions R/parallel.R
Original file line number Diff line number Diff line change
@@ -1,41 +1,102 @@

#' Create cluster
#' Create a rbmi ready cluster
#'
#' @param ncores Number of parallel processes to use
#' @param ncores Number of parallel processes to use or an existing cluster to make use of
#' @param objects a named list of objects to export into the sub-processes
#' @param packages a character vector of libraries to load in the sub-processes
#'
#' If `ncores` is `1` this function will return NULL
#' This function spawns a PSOCK cluster.
#' Ensures that `rbmi` and `assert_that` have been loaded
#' on the sub-processes
#' This function is a wrapper around `parallel::makePSOCKcluster()` but takes
#' care of configuring rbmi to be used in the sub-processes as well as loading
#' user defined objects and libraries and setting the seed for reproducibility.
#'
get_cluster <- function(ncores = 1) {
if (ncores == 1) {
#' If `ncores` is `1` this function will return `NULL`.
#'
#' If `ncores` is a cluster created via `parallel::makeCluster()` then this function
#' just takes care of inserting the relevant rbmi objects into the existing cluster.
#'
#' @examples
#' \dontrun{
#' # Basic usage
#' make_rbmi_cluster(5)
#'
#' # User objects + libraries
#' VALUE <- 5
#' myfun <- function(x) {
#' x + day(VALUE) # From lubridate::day()
#' }
#' make_rbmi_cluster(5, list(VALUE = VALUE, myfun = myfun), c("lubridate"))
#'
#' # Using a already created cluster
#' cl <- parallel::makeCluster(5)
#' make_rbmi_cluster(cl)
#' }
#' @export
make_rbmi_cluster <- function(ncores = 1, objects = NULL, packages = NULL) {

if (is.numeric(ncores) && ncores == 1) {
return(NULL)
} else if (is.numeric(ncores)) {
cl <- parallel::makePSOCKcluster(ncores)
} else if (is(ncores, "cluster")) {
cl <- ncores
} else {
stop(sprintf(
"`ncores` has unsupported class of: %s",
paste(class(ncores), collapse = ", ")
))
}

# Load user defined objects into the globalname space
if (!is.null(objects) && length(objects)) {
export_env <- list2env(objects)
parallel::clusterExport(cl, names(objects), export_env)
}

cl <- parallel::makePSOCKcluster(
ncores
# Load user defined packages
packages <- if (is.null(packages)) {
# TODO - can't remember why this is needed; need to look into
"assertthat"
} else {
c(packages, "assertthat")
}
gowerc marked this conversation as resolved.
Show resolved Hide resolved
# Remove attempts to load rbmi as this will be covered later
packages <- grep("^rbmi$", packages, value = TRUE, invert = TRUE)
gowerc marked this conversation as resolved.
Show resolved Hide resolved
devnull <- parallel::clusterCall(
cl,
function(pkgs) lapply(pkgs, function(x) library(x, character.only = TRUE)),
as.list(packages)
)

devnull <- parallel::clusterEvalQ(cl, {
library(assertthat)
})
# Ensure reproducibility
parallel::clusterSetRNGStream(cl, sample.int(1))

# If user has previously configured rbmi sub-processes then early exit
exported_rbmi <- unlist(parallel::clusterEvalQ(cl, exists("..exported..parallel..rbmi")))
if (all(exported_rbmi)) {
return(cl)
}

# Ensure that exported and unexported objects are all directly accessible
# from the globalenv in the sub-processes
if (is_in_rbmi_development()) {
devnull <- parallel::clusterEvalQ(cl, pkgload::load_all())
} else {
devnull <- parallel::clusterEvalQ(
cl,
{
# Here we "export" both exported and non-exported functions
# from the package to the global environment of our subprocesses
.namespace <- getNamespace("rbmi")
for (.nsfun in ls(.namespace)) {
assign(.nsfun, get(.nsfun, envir = .namespace))
}
}
)
}

# Set variable to signify rbmi has been configured
devnull <- parallel::clusterEvalQ(cl, {
..exported..parallel..rbmi <- TRUE
})

return(cl)
}

Expand Down Expand Up @@ -65,46 +126,19 @@ is_in_rbmi_development <- function() {



#' Encapsulate get_mmrm_sample
#'
#' Function creates a new wrapper function around [get_mmrm_sample()]
#' so that the arguments of [get_mmrm_sample()] are enclosed within
#' the new function. This makes running parallel and single process
#' calls to the function smoother. In particular this function takes care
#' of exporting the arguments if required to parallel process in a cluster
#'
#' @seealso [get_cluster()] for more documentation on the function inputs
#' Parallelise Lapply
#'
#' @param cl Either a cluster from [get_cluster()] or `NULL`
#' @param longdata A longdata object from `longDataConstructor$new()`
#' @param method A method object
encap_get_mmrm_sample <- function(cl, longdata, method) {
fun <- function(ids) {
get_mmrm_sample(
ids = ids,
longdata = longdata,
method = method
)
}
lfun <- function(ids) {
lapply(ids, fun)
}

#' Simple wrapper around `lapply` and [`parallel::clusterApplyLB`] to abstract away
#' the logic of deciding which one to use
#' @param cl Cluster created by [`parallel::makeCluster()`] or `NULL`
#' @param fun Function to be run
#' @param x object to be looped over
#' @param ... extra arguements passed to `fun`
par_lapply <- function(cl, fun, x, ...) {
if (is.null(cl)) {
return(lfun)
}

parallel::clusterExport(
cl = cl,
varlist = c("longdata", "method"),
envir = environment()
)

lfun <- function(ids) {
parallel::clusterApplyLB(cl, ids, fun)
return(lapply(x, fun, ...))
} else {
return(parallel::clusterApplyLB(cl, x, fun, ...))
}

return(lfun)
}


Loading
Loading