Skip to content

Commit

Permalink
Parallelisation support for analyse (#435)
Browse files Browse the repository at this point in the history
  • Loading branch information
gowerc authored Oct 11, 2024
1 parent 0655323 commit 43639fc
Show file tree
Hide file tree
Showing 16 changed files with 760 additions and 220 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export(has_class)
export(impute)
export(locf)
export(longDataConstructor)
export(make_rbmi_cluster)
export(method_approxbayes)
export(method_bayes)
export(method_bmlmi)
Expand Down
144 changes: 132 additions & 12 deletions R/analyse.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@




#' Analyse Multiple Imputed Datasets
#'
#' @description
Expand Down Expand Up @@ -91,6 +88,70 @@
#' @param delta A `data.frame` containing the delta transformation to be applied to the imputed
#' datasets prior to running `fun`. See details.
#' @param ... Additional arguments passed onto `fun`.
#' @param ncores The number of parallel processes to use when running this function. Can also be a
#' cluster object created by [`make_rbmi_cluster()`]. See the parallisation section below.
#' @param .validate Should `inputations` be checked to ensure it conforms to the required format
#' (default = `TRUE`) ? Can gain a small performance increase if this is set to `FALSE` when
#' analysing a large number of samples.
#'
#' @section Parallelisation:
#' To speed up the evaluation of `analyse()` you can use the `ncores` argument to enable parallelisation.
#' Simply providing an integer will get rbmi to automatically spawn that many background processes
#' to parallelise across. If you are using a custom analysis function then you need to ensure
#' that any libraries or global objects required by your function are available in the
#' sub-processes. To do this you need to use the [`make_rbmi_cluster()`] function for example:
#' ```
#' my_custom_fun <- function(...) <some analysis code>
#' cl <- make_rbmi_cluster(
#' 4,
#' objects = list("my_custom_fun" = my_custom_fun),
#' packages = c("dplyr", "nlme")
#' )
#' analyse(
#' imputations = imputeObj,
#' fun = my_custom_fun,
#' ncores = cl
#' )
#' parallel::stopCluster(cl)
#' ```
#'
#' Note that there is significant overhead both with setting up the sub-processes and with
#' transferring data back-and-forth between the main process and the sub-processes. As such
#' parallelisation of the `analyse()` function tends to only be worth it when you have
#' `> 2000` samples generated by [`draws()`]. Conversely using parallelisation if your samples
#' are smaller than this may lead to longer run times than just running it sequentially.
#'
#' It is important to note that the implementation of parallel processing within [`analyse()`] has
#' been optimised around the assumption that the parallel processes will be spawned on the same
#' machine and not a remote cluster. One such optimisation is that the required data is saved to
#' a temporary file on the local disk from which it is then read into each sub-process. This is
#' done to avoid the overhead of transferring the data over the network. Our assumption is that
#' if you are at the stage where you need to be parallelising your analysis over a remote cluster
#' then you would likely be better off parallelising across multiple rbmi runs rather than within
#' a single rbmi run.
#'
#' Finally, if you are doing a tipping point analysis you can get a reasonable performance
#' improvement by re-using the cluster between each call to `analyse()` e.g.
#' ```
#' cl <- make_rbmi_cluster(4)
#' ana_1 <- analyse(
#' imputations = imputeObj,
#' delta = delta_plan_1,
#' ncores = cl
#' )
#' ana_2 <- analyse(
#' imputations = imputeObj,
#' delta = delta_plan_2,
#' ncores = cl
#' )
#' ana_3 <- analyse(
#' imputations = imputeObj,
#' delta = delta_plan_3,
#' ncores = cl
#' )
#' parallel::clusterStop(cl)
#' ```
#'
#' @examples
#' \dontrun{
#' vars <- set_vars(
Expand Down Expand Up @@ -119,9 +180,16 @@
#' )
#' }
#' @export
analyse <- function(imputations, fun = ancova, delta = NULL, ...) {
analyse <- function(
imputations,
fun = ancova,
delta = NULL,
...,
ncores = 1,
.validate = TRUE
) {

validate(imputations)
if (.validate) validate(imputations)

assert_that(
is.function(fun),
Expand All @@ -135,7 +203,7 @@ analyse <- function(imputations, fun = ancova, delta = NULL, ...) {

vars <- imputations$data$vars

devnull <- lapply(imputations$imputations, function(x) validate(x))
if (.validate) devnull <- lapply(imputations$imputations, function(x) validate(x))

if (!is.null(delta)) {
expected_vars <- c(
Expand All @@ -152,14 +220,66 @@ analyse <- function(imputations, fun = ancova, delta = NULL, ...) {
)
}

results <- lapply(
imputations$imputations,
function(x, ...) {
dat2 <- extract_imputed_df(x, imputations$data, delta)
fun(dat2, ...)
# Mangle name to avoid any conflicts with user defined objects if running in a cluster
..rbmi..analysis..imputations <- imputations
..rbmi..analysis..delta <- delta
..rbmi..analysis..fun <- fun
objects <- list(
..rbmi..analysis..imputations = ..rbmi..analysis..imputations,
..rbmi..analysis..delta = ..rbmi..analysis..delta,
..rbmi..analysis..fun = ..rbmi..analysis..fun
)

cl <- make_rbmi_cluster(ncores)

if (is(cl, "cluster")) {
..rbmi..analysis..data..path <- tempfile()
saveRDS(objects, file = ..rbmi..analysis..data..path, compress = FALSE)
devnull <- parallel::clusterExport(cl, "..rbmi..analysis..data..path", environment())
devnull <- parallel::clusterEvalQ(
cl,
{
..rbmi..analysis..objects <- readRDS(..rbmi..analysis..data..path)
list2env(..rbmi..analysis..objects, envir = environment())
}
)
}

# If the user provided the clusters object directly then do not close it on completion
if (!is(ncores, "cluster")) {
on.exit(
{ if (!is.null(cl)) parallel::stopCluster(cl) },
add = TRUE,
after = FALSE
)
}

# Chunk up requests for significant speed improvement when running in parallel
number_of_cores <- ifelse(is.null(cl), 1, length(cl))
indexes <- seq_along(imputations$imputations)
indexes_split <- split(indexes, (indexes %% number_of_cores) + 1)

results <- par_lapply(
cl,
function(indicies, ...) {
inner_fun <- function(idx, ...) {
dat2 <- extract_imputed_df(
..rbmi..analysis..imputations$imputations[[idx]],
..rbmi..analysis..imputations$data,
..rbmi..analysis..delta
)
..rbmi..analysis..fun(dat2, ...)
}
lapply(indicies, inner_fun, ...)
},
indexes_split,
...
)
) |>
unlist(recursive = FALSE, use.names = FALSE)

# Re-order to ensure results are returned in same order as imputations
results <- results[order(unlist(indexes_split, use.names = FALSE))]
names(results) <- NULL

fun_name <- deparse(substitute(fun))
if (length(fun_name) > 1) {
Expand Down
30 changes: 23 additions & 7 deletions R/draws.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
#' [method_approxbayes()], [method_condmean()] or [method_bmlmi()].
#' It specifies the multiple imputation methodology to be used. See details.
#' @param ncores A single numeric specifying the number of cores to use in creating the draws object.
#' Note that this parameter is ignored for [method_bayes()] (Default = 1).
#' Note that this parameter is ignored for [method_bayes()] (Default = 1). Can also be a cluster object
#' generated by [`make_rbmi_cluster()`]
#' @param quiet Logical, if `TRUE` will suppress printing of progress information that is printed to
#' the console.
#'
Expand Down Expand Up @@ -345,22 +346,39 @@ get_draws_mle <- function(
}


cl <- get_cluster(ncores)
mmrm_sample <- encap_get_mmrm_sample(cl, longdata, method)
cl <- make_rbmi_cluster(ncores, objects = list("longdata" = longdata, "method" = method))

# If the user provided the clusters object directly then do not close it on completion
if (!is(ncores, "cluster")){
on.exit(
{ if (!is.null(cl)) parallel::stopCluster(cl) },
add = TRUE,
after = FALSE
)
}

# Encapsulate arguments into a single function on `ids` and handle parallelisation
par_get_mmrm_sample <- function(ids) {
par_lapply(
cl,
get_mmrm_sample,
ids,
longdata = longdata,
method = method
)
}

samples <- list()
n_failed_samples <- 0
logger <- progressLogger$new(n_target_samples, quiet = quiet)

while (length(samples) < n_target_samples) {
ids <- sample_stack$pop(min(ncores, n_target_samples - length(samples)))
new_samples <- mmrm_sample(ids)
new_samples <- par_get_mmrm_sample(ids)
isfailure <- vapply(new_samples, function(x) x$failed, logical(1))
new_samples_keep <- new_samples[!isfailure]
n_failed_samples <- n_failed_samples + sum(isfailure)
if (n_failed_samples > failure_limit) {
if (!is.null(cl)) parallel::stopCluster(cl)
if (!is.null(method$type)) {
if (method$type == "jackknife") {
ids_fail <- ids[isfailure][[1]]
Expand All @@ -376,8 +394,6 @@ get_draws_mle <- function(
samples <- append(samples, new_samples_keep)
}

if (!is.null(cl)) parallel::stopCluster(cl)

assert_that(
length(samples) == n_target_samples,
msg = "Incorrect number of samples were produced"
Expand Down
Loading

0 comments on commit 43639fc

Please sign in to comment.