Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelisation support for analyse #435

Merged
merged 12 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export(has_class)
export(impute)
export(locf)
export(longDataConstructor)
export(make_rbmi_cluster)
export(method_approxbayes)
export(method_bayes)
export(method_bmlmi)
Expand Down
144 changes: 132 additions & 12 deletions R/analyse.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@




#' Analyse Multiple Imputed Datasets
#'
#' @description
Expand Down Expand Up @@ -91,6 +88,70 @@
#' @param delta A `data.frame` containing the delta transformation to be applied to the imputed
#' datasets prior to running `fun`. See details.
#' @param ... Additional arguments passed onto `fun`.
#' @param ncores The number of parallel processes to use when running this function. Can also be a
#' cluster object created by [`make_rbmi_cluster()`]. See the parallisation section below.
#' @param .validate Should `inputations` be checked to ensure it conforms to the required format
#' (default = `TRUE`) ? Can gain a small performance increase if this is set to `FALSE` when
#' analysing a large number of samples.
#'
#' @section Parallelisation:
#' To speed up the evaluation of `analyse()` you can use the `ncores` argument to enable parallelisation.
#' Simply providing an integer will get rbmi to automatically spawn that many background processes
#' to parallelise across. If you are using a custom analysis function then you need to ensure
#' that any libraries or global objects required by your function are available in the
#' sub-processes. To do this you need to use the [`make_rbmi_cluster()`] function for example:
#' ```
#' my_custom_fun <- function(...) <some analysis code>
#' cl <- make_rbmi_cluster(
#' 4,
#' objects = list("my_custom_fun" = my_custom_fun),
#' packages = c("dplyr", "nlme")
#' )
#' analyse(
#' imputations = imputeObj,
#' fun = my_custom_fun,
#' ncores = cl
#' )
#' parallel::stopCluster(cl)
#' ```
#'
#' Note that there is significant overhead both with setting up the sub-processes and with
#' transferring data back-and-forth between the main process and the sub-processes. As such
#' parallelisation of the `analyse()` function tends to only be worth it when you have
#' `> 2000` samples generated by [`draws()`]. Conversely using parallelisation if your samples
#' are smaller than this may lead to longer run times than just running it sequentially.
#'
#' It is important to note that the implementation of parallel processing within [`analyse()`] has
#' been optimised around the assumption that the parallel processes will be spawned on the same
#' machine and not a remote cluster. One such optimisation is that the required data is saved to
#' a temporary file on the local disk from which it is then read into each sub-process. This is
#' done to avoid the overhead of transferring the data over the network. Our assumption is that
#' if you are at the stage where you need to be parallelising your analysis over a remote cluster
#' then you would likely be better off parallelising across multiple rbmi runs rather than within
#' a single rbmi run.
#'
#' Finally, if you are doing a tipping point analysis you can get a reasonable performance
#' improvement by re-using the cluster between each call to `analyse()` e.g.
#' ```
#' cl <- make_rbmi_cluster(4)
#' ana_1 <- analyse(
#' imputations = imputeObj,
#' delta = delta_plan_1,
#' ncores = cl
#' )
#' ana_2 <- analyse(
#' imputations = imputeObj,
#' delta = delta_plan_2,
#' ncores = cl
#' )
#' ana_3 <- analyse(
#' imputations = imputeObj,
#' delta = delta_plan_3,
#' ncores = cl
#' )
#' parallel::clusterStop(cl)
#' ```
#'
#' @examples
#' \dontrun{
#' vars <- set_vars(
Expand Down Expand Up @@ -119,9 +180,16 @@
#' )
#' }
#' @export
analyse <- function(imputations, fun = ancova, delta = NULL, ...) {
analyse <- function(
imputations,
fun = ancova,
delta = NULL,
...,
ncores = 1,
.validate = TRUE
) {

validate(imputations)
if (.validate) validate(imputations)

assert_that(
is.function(fun),
Expand All @@ -135,7 +203,7 @@ analyse <- function(imputations, fun = ancova, delta = NULL, ...) {

vars <- imputations$data$vars

devnull <- lapply(imputations$imputations, function(x) validate(x))
if (.validate) devnull <- lapply(imputations$imputations, function(x) validate(x))

if (!is.null(delta)) {
expected_vars <- c(
Expand All @@ -152,14 +220,66 @@ analyse <- function(imputations, fun = ancova, delta = NULL, ...) {
)
}

results <- lapply(
imputations$imputations,
function(x, ...) {
dat2 <- extract_imputed_df(x, imputations$data, delta)
fun(dat2, ...)
# Mangle name to avoid any conflicts with user defined objects if running in a cluster
..rbmi..analysis..imputations <- imputations
..rbmi..analysis..delta <- delta
..rbmi..analysis..fun <- fun
objects <- list(
..rbmi..analysis..imputations = ..rbmi..analysis..imputations,
..rbmi..analysis..delta = ..rbmi..analysis..delta,
..rbmi..analysis..fun = ..rbmi..analysis..fun
)

cl <- make_rbmi_cluster(ncores)

if (is(cl, "cluster")) {
..rbmi..analysis..data..path <- tempfile()
saveRDS(objects, file = ..rbmi..analysis..data..path, compress = FALSE)
devnull <- parallel::clusterExport(cl, "..rbmi..analysis..data..path", environment())
devnull <- parallel::clusterEvalQ(
cl,
{
..rbmi..analysis..objects <- readRDS(..rbmi..analysis..data..path)
list2env(..rbmi..analysis..objects, envir = environment())
}
)
}

# If the user provided the clusters object directly then do not close it on completion
if (!is(ncores, "cluster")) {
on.exit(
{ if (!is.null(cl)) parallel::stopCluster(cl) },
add = TRUE,
after = FALSE
)
}

# Chunk up requests for significant speed improvement when running in parallel
number_of_cores <- ifelse(is.null(cl), 1, length(cl))
indexes <- seq_along(imputations$imputations)
indexes_split <- split(indexes, (indexes %% number_of_cores) + 1)

results <- par_lapply(
cl,
function(indicies, ...) {
inner_fun <- function(idx, ...) {
dat2 <- extract_imputed_df(
..rbmi..analysis..imputations$imputations[[idx]],
..rbmi..analysis..imputations$data,
..rbmi..analysis..delta
)
..rbmi..analysis..fun(dat2, ...)
}
lapply(indicies, inner_fun, ...)
},
indexes_split,
...
)
) |>
unlist(recursive = FALSE, use.names = FALSE)

# Re-order to ensure results are returned in same order as imputations
results <- results[order(unlist(indexes_split, use.names = FALSE))]
names(results) <- NULL

fun_name <- deparse(substitute(fun))
if (length(fun_name) > 1) {
Expand Down
30 changes: 23 additions & 7 deletions R/draws.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
#' [method_approxbayes()], [method_condmean()] or [method_bmlmi()].
#' It specifies the multiple imputation methodology to be used. See details.
#' @param ncores A single numeric specifying the number of cores to use in creating the draws object.
#' Note that this parameter is ignored for [method_bayes()] (Default = 1).
#' Note that this parameter is ignored for [method_bayes()] (Default = 1). Can also be a cluster object
#' generated by [`make_rbmi_cluster()`]
#' @param quiet Logical, if `TRUE` will suppress printing of progress information that is printed to
#' the console.
#'
Expand Down Expand Up @@ -345,22 +346,39 @@ get_draws_mle <- function(
}


cl <- get_cluster(ncores)
mmrm_sample <- encap_get_mmrm_sample(cl, longdata, method)
cl <- make_rbmi_cluster(ncores, objects = list("longdata" = longdata, "method" = method))

# If the user provided the clusters object directly then do not close it on completion
if (!is(ncores, "cluster")){
on.exit(
{ if (!is.null(cl)) parallel::stopCluster(cl) },
add = TRUE,
after = FALSE
)
}

# Encapsulate arguments into a single function on `ids` and handle parallelisation
par_get_mmrm_sample <- function(ids) {
par_lapply(
cl,
get_mmrm_sample,
ids,
longdata = longdata,
method = method
)
}

samples <- list()
n_failed_samples <- 0
logger <- progressLogger$new(n_target_samples, quiet = quiet)

while (length(samples) < n_target_samples) {
ids <- sample_stack$pop(min(ncores, n_target_samples - length(samples)))
new_samples <- mmrm_sample(ids)
new_samples <- par_get_mmrm_sample(ids)
isfailure <- vapply(new_samples, function(x) x$failed, logical(1))
new_samples_keep <- new_samples[!isfailure]
n_failed_samples <- n_failed_samples + sum(isfailure)
if (n_failed_samples > failure_limit) {
if (!is.null(cl)) parallel::stopCluster(cl)
if (!is.null(method$type)) {
if (method$type == "jackknife") {
ids_fail <- ids[isfailure][[1]]
Expand All @@ -376,8 +394,6 @@ get_draws_mle <- function(
samples <- append(samples, new_samples_keep)
}

if (!is.null(cl)) parallel::stopCluster(cl)

assert_that(
length(samples) == n_target_samples,
msg = "Incorrect number of samples were produced"
Expand Down
Loading
Loading