From 85f40b0d10e366427afac7413b3ca13d6a7beac9 Mon Sep 17 00:00:00 2001 From: gowerc Date: Thu, 3 Oct 2024 16:50:01 +0100 Subject: [PATCH 01/11] initial --- .lintr | 5 +- NAMESPACE | 1 + R/analyse.R | 107 ++++++++++++++++++++-- R/draws.R | 34 ++++--- R/parallel.R | 140 ++++++++++++++++++----------- man/analyse.Rd | 56 +++++++++++- man/draws.Rd | 3 +- man/encap_get_mmrm_sample.Rd | 25 ------ man/get_cluster.Rd | 19 ---- man/make_rbmi_cluster.Rd | 44 +++++++++ man/par_lapply.Rd | 21 +++++ tests/testthat/helper-misc.R | 16 ++++ tests/testthat/test-analyse.R | 157 ++++++++++++++++++++++++++++++++- tests/testthat/test-parallel.R | 64 ++++++++++++++ 14 files changed, 572 insertions(+), 120 deletions(-) delete mode 100644 man/encap_get_mmrm_sample.Rd delete mode 100644 man/get_cluster.Rd create mode 100644 man/make_rbmi_cluster.Rd create mode 100644 man/par_lapply.Rd diff --git a/.lintr b/.lintr index 4c353cbfe..42708d85c 100644 --- a/.lintr +++ b/.lintr @@ -1,4 +1,5 @@ -linters: with_defaults( +linters: linters_with_defaults( line_length_linter(120), - object_name_linter = NULL + object_name_linter = NULL, + indentation_linter(indent = 4L) ) diff --git a/NAMESPACE b/NAMESPACE index 3b477fab8..514f587ca 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -58,6 +58,7 @@ export(has_class) export(impute) export(locf) export(longDataConstructor) +export(make_rbmi_cluster) export(method_approxbayes) export(method_bayes) export(method_bmlmi) diff --git a/R/analyse.R b/R/analyse.R index b06cac7ff..2119ef64b 100644 --- a/R/analyse.R +++ b/R/analyse.R @@ -91,6 +91,58 @@ #' @param delta A `data.frame` containing the delta transformation to be applied to the imputed #' datasets prior to running `fun`. See details. #' @param ... Additional arguments passed onto `fun`. +#' @param ncores The number of parallel processes to use when running this function. Can also be a +#' cluster object created by [`make_rbmi_cluster()`]. See the parallisation section below. +#' +#' @section Parallelisation: +#' To speed up the evaluation of `analyse()` you can use the `ncores` argument to enable parallelisation. +#' Simply providing an integer will get rbmi to automatically spawn that many background processes +#' to parallelise across. If you are using a custom analysis function then you need to ensure +#' that any libraries or global objects required by your function are available in the +#' sub-processes. To do this you need to use the [`make_rbmi_cluster()`] function for example: +#' ``` +#' my_custom_fun <- function(...) +#' cl <- make_rbmi_cluster( +#' 4, +#' objects = list("my_custom_fun" = my_custom_fun), +#' packages = c("dplyr", "nlme") +#' ) +#' analyse( +#' imputations = imputeObj, +#' fun = my_custom_fun, +#' ncores = cl +#' ) +#' parallel::stopCluster(cl) +#' ``` +#' +#' Note that there is significant overhead both with setting up the sub-processes and with +#' transferring data back-and-forth between the main process and the sub-processes. As such +#' parallelisation of the `analyse()` function tends to only be worth it when you have +#' `> 2000` samples generated by [`draws()`]. Conversely using parallelisation if your samples +#' are smaller than this may lead to longer run times than just running it sequentially. +#' +#' Finally, if you are doing a tipping point analysis you can get a reasonable performance +#' improvement by re-using the cluster between each call to `analyse()` e.g. +#' ``` +#' cl <- make_rbmi_cluster(4) +#' ana_1 <- analyse( +#' imputations = imputeObj, +#' delta = delta_plan_1, +#' ncores = cl +#' ) +#' ana_2 <- analyse( +#' imputations = imputeObj, +#' delta = delta_plan_2, +#' ncores = cl +#' ) +#' ana_3 <- analyse( +#' imputations = imputeObj, +#' delta = delta_plan_3, +#' ncores = cl +#' ) +#' parallel::clusterStop(cl) +#' ``` +#' #' @examples #' \dontrun{ #' vars <- set_vars( @@ -119,7 +171,13 @@ #' ) #' } #' @export -analyse <- function(imputations, fun = ancova, delta = NULL, ...) { +analyse <- function( + imputations, + fun = ancova, + delta = NULL, + ..., + ncores = 1 +) { validate(imputations) @@ -152,14 +210,49 @@ analyse <- function(imputations, fun = ancova, delta = NULL, ...) { ) } - results <- lapply( - imputations$imputations, - function(x, ...) { - dat2 <- extract_imputed_df(x, imputations$data, delta) - fun(dat2, ...) + # Mangle name to avoid any conflicts with user defined objects if running + # in a cluster + ..rbmi..analysis..fun <- fun + cl <- make_rbmi_cluster( + ncores, + objects = list( + "imputations" = imputations, + "delta" = delta, + "..rbmi..analysis..fun" = fun + ) + ) + + # If the user provided the clusters object directly then do not close it on completion + if (!is(ncores, "cluster")) { + on.exit( + { if (!is.null(cl)) parallel::stopCluster(cl) }, + add = TRUE, + after = FALSE + ) + } + + # Chunk up requests for significant speed improvement when running in parallel + number_of_cores <- ifelse(is.null(cl), 1, length(cl)) + indexes <- seq_along(imputations$imputations) + indexes_split <- split(indexes, (indexes %% number_of_cores) + 1) + + results <- par_lapply( + cl, + function(indicies, ...) { + inner_fun <- function(idx, ...) { + dat2 <- extract_imputed_df(imputations$imputations[[idx]], imputations$data, delta) + ..rbmi..analysis..fun(dat2, ...) + } + lapply(indicies, inner_fun, ...) }, + indexes_split, ... - ) + ) |> + unlist(recursive = FALSE, use.names = FALSE) + + # Reorder + results <- results[order(unlist(indexes_split, use.names = FALSE))] + names(results) <- NULL fun_name <- deparse(substitute(fun)) if (length(fun_name) > 1) { diff --git a/R/draws.R b/R/draws.R index 9c8f8afb0..aafe2c7dc 100644 --- a/R/draws.R +++ b/R/draws.R @@ -26,7 +26,8 @@ #' [method_approxbayes()], [method_condmean()] or [method_bmlmi()]. #' It specifies the multiple imputation methodology to be used. See details. #' @param ncores A single numeric specifying the number of cores to use in creating the draws object. -#' Note that this parameter is ignored for [method_bayes()] (Default = 1). +#' Note that this parameter is ignored for [method_bayes()] (Default = 1). Can also be a cluster object +#' generated by [`make_rbmi_cluster()`] #' @param quiet Logical, if `TRUE` will suppress printing of progress information that is printed to #' the console. #' @@ -342,14 +343,28 @@ get_draws_mle <- function( } - cl <- get_cluster(ncores) - mmrm_sample <- encap_get_mmrm_sample(cl, longdata, method) + cl <- make_rbmi_cluster(ncores, objects = list("longdata" = longdata, "method" = method)) + # If the user provided the clusters object directly then do not close it on completion + if (!is(ncores, "cluster")){ + on.exit( + { if (!is.null(cl)) parallel::stopCluster(cl) }, + add = TRUE, + after = FALSE + ) + } - # browser() - # get_mmrm_sample - # mmrm_sample(ids) - # clusterEvalQ(cl, fit_mmrm) + + # Encapsulate arguments into a single function on `ids` and handle parallelisation + par_get_mmrm_sample <- function(ids) { + par_lapply( + cl, + get_mmrm_sample, + ids, + longdata = longdata, + method = method + ) + } samples <- list() n_failed_samples <- 0 @@ -357,12 +372,11 @@ get_draws_mle <- function( while (length(samples) < n_target_samples) { ids <- sample_stack$pop(min(ncores, n_target_samples - length(samples))) - new_samples <- mmrm_sample(ids) + new_samples <- par_get_mmrm_sample(ids) isfailure <- vapply(new_samples, function(x) x$failed, logical(1)) new_samples_keep <- new_samples[!isfailure] n_failed_samples <- n_failed_samples + sum(isfailure) if (n_failed_samples > failure_limit) { - if (!is.null(cl)) parallel::stopCluster(cl) if (!is.null(method$type)) { if(method$type == "jackknife"){ ids_fail <- ids[isfailure][[1]] @@ -378,8 +392,6 @@ get_draws_mle <- function( samples <- append(samples, new_samples_keep) } - if (!is.null(cl)) parallel::stopCluster(cl) - assert_that( length(samples) == n_target_samples, msg = "Incorrect number of samples were produced" diff --git a/R/parallel.R b/R/parallel.R index 5e64db973..d4769ae60 100644 --- a/R/parallel.R +++ b/R/parallel.R @@ -1,34 +1,89 @@ -#' Create cluster +#' Create a rbmi ready cluster #' -#' @param ncores Number of parallel processes to use +#' @param ncores Number of parallel processes to use or an existing cluster to make use of +#' @param objects a named list of objects to export into the sub-processes +#' @param packages a character vector of libraries to load in the sub-processes #' -#' If `ncores` is `1` this function will return NULL -#' This function spawns a PSOCK cluster. -#' Ensures that `rbmi` and `assert_that` have been loaded -#' on the sub-processes +#' This function is a wrapper around `parallel::makePSOCKcluster()` but takes +#' care of configuring rbmi to be used in the sub-processes as well as loading +#' user defined objects and libraries and setting the seed for reproducibility. #' -get_cluster <- function(ncores = 1) { - if (ncores == 1) { +#' If `ncores` is `1` this function will return `NULL`. +#' +#' If `ncores` is a cluster created via `parallel::makeCluster()` then this function +#' just takes care of inserting the relevant rbmi objects into the existing cluster. +#' +#' @examples +#' \dontrun{ +#' # Basic usage +#' make_rbmi_cluster(5) +#' +#' # User objects + libraries +#' VALUE <- 5 +#' myfun <- function(x) { +#' x + day(VALUE) # From lubridate::day() +#' } +#' make_rbmi_cluster(5, list(VALUE = VALUE, myfun = myfun), c("lubridate")) +#' +#' # Using a already created cluster +#' cl <- parallel::makeCluster(5) +#' make_rbmi_cluster(cl) +#' } +#' @export +make_rbmi_cluster <- function(ncores = 1, objects = NULL, packages = NULL) { + + if (is.numeric(ncores) && ncores == 1) { return(NULL) + } else if (is.numeric(ncores)) { + cl <- parallel::makePSOCKcluster(ncores) + } else if (is(ncores, "cluster")) { + cl <- ncores + } else { + stop(sprintf( + "`ncores` has unsupported class of: %s", + paste(class(ncores), collapse = ", ") + )) + } + + # Load user defined objects into the globalname space + if (!is.null(objects) && length(objects)) { + export_env <- list2env(objects) + parallel::clusterExport(cl, names(objects), export_env) } - cl <- parallel::makePSOCKcluster( - ncores + # Load user defined packages + packages <- if (is.null(packages)) { + # TODO - can't remember why this is needed; need to look into + "assertthat" + } else { + c(packages, "assertthat") + } + # Remove attempts to load rbmi as this will be covered later + packages <- grep("^rbmi$", packages, value = TRUE, invert = TRUE) + devnull <- parallel::clusterCall( + cl, + function(pkgs) lapply(pkgs, function(x) library(x, character.only = TRUE)), + as.list(packages) ) - devnull <- parallel::clusterEvalQ(cl, { - library(assertthat) - }) + # Ensure reproducibility + parallel::clusterSetRNGStream(cl, sample.int(1)) + # If user has previously configured rbmi sub-processes then early exit + exported_rbmi <- unlist(parallel::clusterEvalQ(cl, exists("..exported..parallel..rbmi"))) + if (all(exported_rbmi)) { + return(cl) + } + + # Ensure that exported and unexported objects are all directly accessible + # from the globalenv in the sub-processes if (is_in_rbmi_development()) { devnull <- parallel::clusterEvalQ(cl, pkgload::load_all()) } else { devnull <- parallel::clusterEvalQ( cl, { - # Here we "export" both exported and non-exported functions - # from the package to the global environment of our subprocesses .namespace <- getNamespace("rbmi") for (.nsfun in ls(.namespace)) { assign(.nsfun, get(.nsfun, envir = .namespace)) @@ -36,6 +91,12 @@ get_cluster <- function(ncores = 1) { } ) } + + # Set variable to signify rbmi has been configured + devnull <- parallel::clusterEvalQ(cl, { + ..exported..parallel..rbmi <- TRUE + }) + return(cl) } @@ -65,46 +126,19 @@ is_in_rbmi_development <- function() { -#' Encapsulate get_mmrm_sample -#' -#' Function creates a new wrapper function around [get_mmrm_sample()] -#' so that the arguments of [get_mmrm_sample()] are enclosed within -#' the new function. This makes running parallel and single process -#' calls to the function smoother. In particular this function takes care -#' of exporting the arguments if required to parallel process in a cluster -#' -#' @seealso [get_cluster()] for more documentation on the function inputs +#' Parallelise Lapply #' -#' @param cl Either a cluster from [get_cluster()] or `NULL` -#' @param longdata A longdata object from `longDataConstructor$new()` -#' @param method A method object -encap_get_mmrm_sample <- function(cl, longdata, method) { - fun <- function(ids) { - get_mmrm_sample( - ids = ids, - longdata = longdata, - method = method - ) - } - lfun <- function(ids) { - lapply(ids, fun) - } - +#' Simple wrapper around `lapply` and [`parallel::clusterApplyLB`] to abstract away +#' the logic of deciding which one to use +#' @param cl Cluster created by [`parallel::makeCluster()`] or `NULL` +#' @param fun Function to be run +#' @param x object to be looped over +#' @param ... extra arguements passed to `fun` +par_lapply <- function(cl, fun, x, ...) { if (is.null(cl)) { - return(lfun) - } - - parallel::clusterExport( - cl = cl, - varlist = c("longdata", "method"), - envir = environment() - ) - - lfun <- function(ids) { - parallel::clusterApplyLB(cl, ids, fun) + return(lapply(x, fun, ...)) + } else { + return(parallel::clusterApplyLB(cl, x, fun, ...)) } - - return(lfun) } - diff --git a/man/analyse.Rd b/man/analyse.Rd index 41d9bf9b1..c4aa41147 100644 --- a/man/analyse.Rd +++ b/man/analyse.Rd @@ -4,7 +4,7 @@ \alias{analyse} \title{Analyse Multiple Imputed Datasets} \usage{ -analyse(imputations, fun = ancova, delta = NULL, ...) +analyse(imputations, fun = ancova, delta = NULL, ..., ncores = 1) } \arguments{ \item{imputations}{An \code{imputations} object as created by \code{\link[=impute]{impute()}}.} @@ -15,6 +15,9 @@ analyse(imputations, fun = ancova, delta = NULL, ...) datasets prior to running \code{fun}. See details.} \item{...}{Additional arguments passed onto \code{fun}.} + +\item{ncores}{The number of parallel processes to use when running this function. Can also be a +cluster object created by \code{\link[=make_rbmi_cluster]{make_rbmi_cluster()}}. See the parallisation section below.} } \description{ This function takes multiple imputed datasets (as generated by @@ -93,6 +96,57 @@ use the helper function \code{\link[=delta_template]{delta_template()}} to defin this provides utility variables such as \code{is_missing} which can be used to identify exactly which visits have been imputed. } +\section{Parallelisation}{ + +To speed up the evaluation of \code{analyse()} you can use the \code{ncores} argument to enable parallelisation. +Simply providing an integer will get rbmi to automatically spawn that many background processes +to parallelise across. If you are using a custom analysis function then you need to ensure +that any libraries or global objects required by your function are available in the +sub-processes. To do this you need to use the \code{\link[=make_rbmi_cluster]{make_rbmi_cluster()}} function for example: + +\if{html}{\out{
}}\preformatted{my_custom_fun <- function(...) +cl <- make_rbmi_cluster( + 4, + objects = list("my_custom_fun" = my_custom_fun), + packages = c("dplyr", "nlme") +) +analyse( + imputations = imputeObj, + fun = my_custom_fun, + ncores = cl +) +parallel::stopCluster(cl) +}\if{html}{\out{
}} + +Note that there is significant overhead both with setting up the sub-processes and with +transferring data back-and-forth between the main process and the sub-processes. As such +parallelisation of the \code{analyse()} function tends to only be worth it when you have +\verb{> 2000} samples generated by \code{\link[=draws]{draws()}}. Conversely using parallelisation if your samples +are smaller than this may lead to longer run times than just running it sequentially. + +Finally, if you are doing a tipping point analysis you can get a reasonable performance +improvement by re-using the cluster between each call to \code{analyse()} e.g. + +\if{html}{\out{
}}\preformatted{cl <- make_rbmi_cluster(4) +ana_1 <- analyse( + imputations = imputeObj, + delta = delta_plan_1, + ncores = cl +) +ana_2 <- analyse( + imputations = imputeObj, + delta = delta_plan_2, + ncores = cl +) +ana_3 <- analyse( + imputations = imputeObj, + delta = delta_plan_3, + ncores = cl +) +parallel::clusterStop(cl) +}\if{html}{\out{
}} +} + \examples{ \dontrun{ vars <- set_vars( diff --git a/man/draws.Rd b/man/draws.Rd index bd9bf52f4..32db53e1d 100644 --- a/man/draws.Rd +++ b/man/draws.Rd @@ -31,7 +31,8 @@ to the ICEs and the imputation strategies. See details.} It specifies the multiple imputation methodology to be used. See details.} \item{ncores}{A single numeric specifying the number of cores to use in creating the draws object. -Note that this parameter is ignored for \code{\link[=method_bayes]{method_bayes()}} (Default = 1).} +Note that this parameter is ignored for \code{\link[=method_bayes]{method_bayes()}} (Default = 1). Can also be a cluster object +generated by \code{\link[=make_rbmi_cluster]{make_rbmi_cluster()}}} \item{quiet}{Logical, if \code{TRUE} will suppress printing of progress information that is printed to the console.} diff --git a/man/encap_get_mmrm_sample.Rd b/man/encap_get_mmrm_sample.Rd deleted file mode 100644 index 881795574..000000000 --- a/man/encap_get_mmrm_sample.Rd +++ /dev/null @@ -1,25 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/parallel.R -\name{encap_get_mmrm_sample} -\alias{encap_get_mmrm_sample} -\title{Encapsulate get_mmrm_sample} -\usage{ -encap_get_mmrm_sample(cl, longdata, method) -} -\arguments{ -\item{cl}{Either a cluster from \code{\link[=get_cluster]{get_cluster()}} or \code{NULL}} - -\item{longdata}{A longdata object from \code{longDataConstructor$new()}} - -\item{method}{A method object} -} -\description{ -Function creates a new wrapper function around \code{\link[=get_mmrm_sample]{get_mmrm_sample()}} -so that the arguments of \code{\link[=get_mmrm_sample]{get_mmrm_sample()}} are enclosed within -the new function. This makes running parallel and single process -calls to the function smoother. In particular this function takes care -of exporting the arguments if required to parallel process in a cluster -} -\seealso{ -\code{\link[=get_cluster]{get_cluster()}} for more documentation on the function inputs -} diff --git a/man/get_cluster.Rd b/man/get_cluster.Rd deleted file mode 100644 index 43773e663..000000000 --- a/man/get_cluster.Rd +++ /dev/null @@ -1,19 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/parallel.R -\name{get_cluster} -\alias{get_cluster} -\title{Create cluster} -\usage{ -get_cluster(ncores = 1) -} -\arguments{ -\item{ncores}{Number of parallel processes to use - -If \code{ncores} is \code{1} this function will return NULL -This function spawns a PSOCK cluster. -Ensures that \code{rbmi} and \code{assert_that} have been loaded -on the sub-processes} -} -\description{ -Create cluster -} diff --git a/man/make_rbmi_cluster.Rd b/man/make_rbmi_cluster.Rd new file mode 100644 index 000000000..4b7c79bfa --- /dev/null +++ b/man/make_rbmi_cluster.Rd @@ -0,0 +1,44 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/parallel.R +\name{make_rbmi_cluster} +\alias{make_rbmi_cluster} +\title{Create a rbmi ready cluster} +\usage{ +make_rbmi_cluster(ncores = 1, objects = NULL, packages = NULL) +} +\arguments{ +\item{ncores}{Number of parallel processes to use or an existing cluster to make use of} + +\item{objects}{a named list of objects to export into the sub-processes} + +\item{packages}{a character vector of libraries to load in the sub-processes + +This function is a wrapper around \code{parallel::makePSOCKcluster()} but takes +care of configuring rbmi to be used in the sub-processes as well as loading +user defined objects and libraries and setting the seed for reproducibility. + +If \code{ncores} is \code{1} this function will return \code{NULL}. + +If \code{ncores} is a cluster created via \code{parallel::makeCluster()} then this function +just takes care of inserting the relevant rbmi objects into the existing cluster.} +} +\description{ +Create a rbmi ready cluster +} +\examples{ +\dontrun{ +# Basic usage +make_rbmi_cluster(5) + +# User objects + libraries +VALUE <- 5 +myfun <- function(x) { + x + day(VALUE) # From lubridate::day() +} +make_rbmi_cluster(5, list(VALUE = VALUE, myfun = myfun), c("lubridate")) + +# Using a already created cluster +cl <- parallel::makeCluster(5) +make_rbmi_cluster(cl) +} +} diff --git a/man/par_lapply.Rd b/man/par_lapply.Rd new file mode 100644 index 000000000..95b4255bc --- /dev/null +++ b/man/par_lapply.Rd @@ -0,0 +1,21 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/parallel.R +\name{par_lapply} +\alias{par_lapply} +\title{Parallelise Lapply} +\usage{ +par_lapply(cl, fun, x, ...) +} +\arguments{ +\item{cl}{Cluster created by \code{\link[parallel:makeCluster]{parallel::makeCluster()}} or \code{NULL}} + +\item{fun}{Function to be run} + +\item{x}{object to be looped over} + +\item{...}{extra arguements passed to \code{fun}} +} +\description{ +Simple wrapper around \code{lapply} and \code{\link[parallel:clusterApply]{parallel::clusterApplyLB}} to abstract away +the logic of deciding which one to use +} diff --git a/tests/testthat/helper-misc.R b/tests/testthat/helper-misc.R index 5d0638a0e..6253d08e4 100644 --- a/tests/testthat/helper-misc.R +++ b/tests/testthat/helper-misc.R @@ -22,6 +22,22 @@ strip_names <- function(x) { } +is_cluster_closed <- function(cl) { + if (is.null(cl)) { + return(TRUE) + } + if (!is(cl, "cluster")) { + stop("`cl` is not a cluster object") + } + result <- tryCatch({ + clusterCall(cl1, function() Sys.info()) + FALSE + }, error = function(e) { + TRUE + }) +} + + trunctate <- function(x, n) { floor(x * 10 ^ n) / 10 ^ n } diff --git a/tests/testthat/test-analyse.R b/tests/testthat/test-analyse.R index 900659bc6..335d692cd 100644 --- a/tests/testthat/test-analyse.R +++ b/tests/testthat/test-analyse.R @@ -4,7 +4,7 @@ suppressPackageStartupMessages({ }) -test_that("basic constructions of `analysis` work as expected",{ +test_that("basic constructions of `analysis` work as expected", { oldopt <- getOption("warnPartialMatchDollar") options(warnPartialMatchDollar = TRUE) @@ -244,3 +244,158 @@ test_that("incorrect constructions of as_analysis fail", { expect_true(validate(x)) }) + + + + + +test_that("Parallisation works with analyse and produces identical results", { + set.seed(4642) + sigma <- as_vcov( + c(2, 1, 0.7, 1.5), + c(0.5, 0.3, 0.2, 0.3, 0.5, 0.4) + ) + dat <- get_sim_data(200, sigma, trt = 8) %>% + mutate(outcome = if_else(rbinom(n(), 1, 0.3) == 1 & group == "A", NA_real_, outcome)) + + dat_ice <- dat %>% + group_by(id) %>% + arrange(id, visit) %>% + filter(is.na(outcome)) %>% + slice(1) %>% + ungroup() %>% + select(id, visit) %>% + mutate(strategy = "JR") + + vars <- set_vars( + outcome = "outcome", + group = "group", + strategy = "strategy", + subjid = "id", + visit = "visit", + covariates = c("age", "sex", "visit * group") + ) + + set.seed(984) + drawobj <- draws( + data = dat, + data_ice = dat_ice, + vars = vars, + method = method_condmean(n_samples = 6, type = "bootstrap"), + quiet = TRUE + ) + + imputeobj <- impute( + draws = drawobj, + references = c("A" = "B", "B" = "B") + ) + + + # + # Here we set up a bunch of different analysis objects using different + # of parallelisation methods and different dat_delta objects + # + + + ### Delta 1 + + dat_delta_1 <- delta_template(imputations = imputeobj) %>% + mutate(delta = is_missing * 5) + + vars2 <- vars + vars2$covariates <- c("age", "sex") + + anaobj_d1_t1 <- analyse( + imputeobj, + fun = rbmi::ancova, + vars = vars2, + delta = dat_delta_1 + ) + + anaobj_d1_t2 <- analyse( + imputeobj, + fun = rbmi::ancova, + vars = vars2, + delta = dat_delta_1, + ncores = 3 + ) + + var <- 20 + inner_fun <- function(...) { + x <- day(var) # lubridate::day + rbmi::ancova(...) + } + outer_fun <- function(...) { + inner_fun(...) + } + + cl <- make_rbmi_cluster(3, objects = list(var = var, inner_fun = inner_fun), "lubridate") + anaobj_d1_t3 <- analyse( + imputeobj, + fun = rbmi::ancova, + vars = vars2, + delta = dat_delta_1, + ncores = cl + ) + + + ### Delta 2 + + dat_delta_2 <- delta_template(imputations = imputeobj) %>% + mutate(delta = is_missing * 50) + + anaobj_d2_t1 <- analyse( + imputeobj, + fun = rbmi::ancova, + vars = vars2, + delta = dat_delta_2 + ) + anaobj_d2_t3 <- analyse( + imputeobj, + fun = rbmi::ancova, + vars = vars2, + delta = dat_delta_2, + ncores = cl + ) + + + ### Delta 3 (no delta) + + anaobj_d3_t1 <- analyse( + imputeobj, + fun = rbmi::ancova, + vars = vars2 + ) + anaobj_d3_t3 <- analyse( + imputeobj, + fun = rbmi::ancova, + vars = vars2, + ncores = cl + ) + + ## Check for internal consistency + expect_equal(anaobj_d1_t1, anaobj_d1_t2) + expect_equal(anaobj_d1_t1, anaobj_d1_t3) + expect_equal(anaobj_d1_t2, anaobj_d1_t3) + + expect_equal(anaobj_d2_t1, anaobj_d2_t3) + + expect_equal(anaobj_d3_t1, anaobj_d3_t3) + + + ## Check that they differ (as different deltas have been used) + ## Main thing is sanity checking that the embedded delta + ## in the parallel processes hasn't lingered and impacted + ## future results + + # First assert consistency + expect_true(identical(anaobj_d1_t1$results, anaobj_d1_t3$results)) + expect_true(identical(anaobj_d2_t1$results, anaobj_d2_t3$results)) + expect_true(identical(anaobj_d3_t1$results, anaobj_d3_t3$results)) + + # The ensure they are different + expect_false(identical(anaobj_d1_t1$results, anaobj_d2_t1$results)) + expect_false(identical(anaobj_d1_t1$results, anaobj_d3_t1$results)) + expect_false(identical(anaobj_d2_t1$results, anaobj_d3_t1$results)) + +}) diff --git a/tests/testthat/test-parallel.R b/tests/testthat/test-parallel.R index 9eea49c84..7131160ef 100644 --- a/tests/testthat/test-parallel.R +++ b/tests/testthat/test-parallel.R @@ -184,3 +184,67 @@ test_that("Basic parallisation works as expected", { # ncores = 2 # ) # }) + + + + +test_that("Creation and management of user defined clusters works as expected", { + + # Setup a function to be run on the parallel cluster that requires + # global objects (namely the `inner_fun` and environment `e`) as well + # as a handful of packages to be loaded + e <- new.env() + e$x <- 20 + inner_fun <- function(x) { + local_env <- e + temp1 <- e$x + 0 + temp2 <- rnorm(2) + temp3 <- dplyr::as_tibble(dplyr::starwars) # Explicit namespace + temp4 <- day(x) # lubridate::day() + e$x + } + outer_fun <- function(x) { + temp1 <- inner_fun(2) + temp2 <- anova.lme %>% invisible() # nlme::anova.lme() + temp3 <- iris + temp1 + x + 10 + } + + # Check that function can be run (e.g. all elements are correctly exported) + set.seed(1223) + cl1 <- make_rbmi_cluster(2, list(inner_fun = inner_fun, e = e), c("lubridate", "nlme")) + res_1_a <- parallel::clusterCall(cl1, rnorm, 200) + res_1_b <- parallel::clusterApplyLB(cl1, c(4, 5), outer_fun) + expect_equal(res_1_b, list(34, 35)) + + # Test that re-using an existing cluster is quick to load + time <- time_it({ + set.seed(1223) + cl2 <- make_rbmi_cluster(cl1, list(inner_fun = inner_fun, e = e), c("lubridate", "nlme")) + }) + expect_true(as.numeric(time) <= 2) + + # Should produce identical results as before + res_2_a <- parallel::clusterCall(cl2, rnorm, 200) + res_2_b <- parallel::clusterApplyLB(cl2, c(4, 5), outer_fun) + expect_equal(res_2_a, res_1_a) + expect_equal(res_2_b, res_1_b) + + # Both clusters should be closed as they are points to the same thing + parallel::stopCluster(cl2) + expect_true(is_cluster_closed(cl1)) + expect_true(is_cluster_closed(cl2)) + + + # Check that seed ensures reproducibility + set.seed(1223) + cl <- make_rbmi_cluster(2, list(inner_fun = inner_fun, e = e), c("lubridate", "nlme")) + res_3_a <- parallel::clusterCall(cl1, rnorm, 200) + expect_equal(res_3_a, res_1_a) + parallel::stopCluster(cl) +}) + + + + + From 14a4a187678fc124d3b1638c886d2322e9f570c1 Mon Sep 17 00:00:00 2001 From: gowerc Date: Thu, 3 Oct 2024 17:05:58 +0100 Subject: [PATCH 02/11] name mangling --- R/analyse.R | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/R/analyse.R b/R/analyse.R index 2119ef64b..644be5a11 100644 --- a/R/analyse.R +++ b/R/analyse.R @@ -212,12 +212,14 @@ analyse <- function( # Mangle name to avoid any conflicts with user defined objects if running # in a cluster + ..rbmi..analysis..imputations <- imputations + ..rbmi..analysis..delta <- delta ..rbmi..analysis..fun <- fun cl <- make_rbmi_cluster( ncores, objects = list( - "imputations" = imputations, - "delta" = delta, + "..rbmi..analysis..imputations" = imputations, + "..rbmi..analysis..delta" = delta, "..rbmi..analysis..fun" = fun ) ) @@ -240,7 +242,11 @@ analyse <- function( cl, function(indicies, ...) { inner_fun <- function(idx, ...) { - dat2 <- extract_imputed_df(imputations$imputations[[idx]], imputations$data, delta) + dat2 <- extract_imputed_df( + ..rbmi..analysis..imputations$imputations[[idx]], + ..rbmi..analysis..imputations$data, + ..rbmi..analysis..delta + ) ..rbmi..analysis..fun(dat2, ...) } lapply(indicies, inner_fun, ...) From 31e23c314b42c186e4d03f69c05866ba4244678c Mon Sep 17 00:00:00 2001 From: gowerc Date: Thu, 3 Oct 2024 17:10:12 +0100 Subject: [PATCH 03/11] update vignette example --- vignettes/advanced.Rmd | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/vignettes/advanced.Rmd b/vignettes/advanced.Rmd index 43368b093..743b71488 100644 --- a/vignettes/advanced.Rmd +++ b/vignettes/advanced.Rmd @@ -491,9 +491,12 @@ The same approach can be used to implement a tipping point analysis. Here, we ap ```{r fig.width=5, fig.height=4} -perform_tipp_analysis <- function(delta_control, delta_intervention) { - # Derive delta offset based on control and intervention specific deltas + + +perform_tipp_analysis <- function(delta_control, delta_intervention, cl) { + + # Derive delta offset based on control and intervention specific deltas delta_df <- delta_df_init %>% mutate( delta_ctl = (group == "Control") * is_missing * delta_control, @@ -506,6 +509,7 @@ perform_tipp_analysis <- function(delta_control, delta_intervention) { fun = compare_change_lastvisit, vars = vars_an, delta = delta_df, + ncores = cl ) pool_delta <- as.data.frame(pool(ana_delta)) @@ -525,10 +529,12 @@ tipp_frame_grid <- expand.grid( ) %>% as_tibble() +# parallelise to speed up computation +cl <- make_rbmi_cluster(2) tipp_frame <- tipp_frame_grid %>% mutate( - results_list = map2(delta_control, delta_intervention, perform_tipp_analysis), + results_list = map2(delta_control, delta_intervention, perform_tipp_analysis, cl = cl), trt_effect_6 = map_dbl(results_list, "trt_effect_6"), pval_6 = map_dbl(results_list, "pval_6") ) %>% @@ -542,6 +548,9 @@ tipp_frame <- tipp_frame_grid %>% ) ) +# Close cluster when done with it +parallel::stopCluster(cl) + # Show delta values which lead to non-significant analysis results tipp_frame %>% filter(pval_6 >= 0.05) From 61c228226aea9b28a8b9c15b3eb9116ce041caad Mon Sep 17 00:00:00 2001 From: gowerc Date: Thu, 3 Oct 2024 17:27:09 +0100 Subject: [PATCH 04/11] updated vignette --- vignettes/advanced.html | 135 +++++++++++++++++++++------------------- 1 file changed, 72 insertions(+), 63 deletions(-) diff --git a/vignettes/advanced.html b/vignettes/advanced.html index d0a958b43..e55b843a4 100644 --- a/vignettes/advanced.html +++ b/vignettes/advanced.html @@ -714,7 +714,7 @@

6 Custom imputation strategies#> pars <- list(mu = mu, sigma = sigma) #> return(pars) #> } -#> <bytecode: 0x7ff37e6af218> +#> <bytecode: 0x7ff5f61e00e0> #> <environment: namespace:rbmi>

To further illustrate this for a simple example, assume that a new strategy is to be implemented as follows: - The marginal mean of the imputation distribution is equal to the marginal mean trajectory for the subject according to their assigned group and covariates up to the ICE. @@ -941,70 +941,79 @@

8.1 Simple delta adjustments and #> lsm_alt_6 7.225 0.725 5.793 8.656 <0.001 #> --------------------------------------------------

The same approach can be used to implement a tipping point analysis. Here, we apply different delta-adjustments to imputed data from the control and the intervention group, respectively. Assume that delta-adjustments by less then -5 points or by more than +15 points are considered implausible from a clinical perspective. Therefore, we vary the delta-values in each group between -5 to +15 points to investigate which delta combinations lead to a “tipping” of the primary analysis result, defined here as an analysis p-value \(\geq 0.05\).

-
perform_tipp_analysis <- function(delta_control, delta_intervention) {
+

 
-    # Derive delta offset based on control and intervention specific deltas    
-    delta_df <-  delta_df_init %>%
-        mutate(
-            delta_ctl = (group == "Control") * is_missing * delta_control,
-            delta_int = (group == "Intervention") * is_missing * delta_intervention,
-            delta = delta_ctl + delta_int
-        )
-
-    ana_delta <- analyse(
-        impute_obj_CIR,
-        fun = compare_change_lastvisit,
-        vars = vars_an,
-        delta = delta_df,
-    )
-
-    pool_delta <- as.data.frame(pool(ana_delta))
-
-    list(
-        trt_effect_6 = pool_delta[["est"]],
-        pval_6 = pool_delta[["pval"]]
-    )
-}
-
-# Get initial delta template
-delta_df_init <- delta_template(impute_obj_CIR)
-
-tipp_frame_grid <- expand.grid(
-    delta_control = seq(-5, 15, by = 2),
-    delta_intervention = seq(-5, 15, by = 2)
-) %>%
-    as_tibble()
-
-
-tipp_frame <- tipp_frame_grid %>%
-    mutate(
-        results_list = map2(delta_control, delta_intervention, perform_tipp_analysis),
-        trt_effect_6 = map_dbl(results_list, "trt_effect_6"),
-        pval_6 = map_dbl(results_list, "pval_6")
-    ) %>%
-    select(-results_list) %>%
+
+perform_tipp_analysis <- function(delta_control, delta_intervention, cl) {
+
+    # Derive delta offset based on control and intervention specific deltas
+    delta_df <-  delta_df_init %>%
+        mutate(
+            delta_ctl = (group == "Control") * is_missing * delta_control,
+            delta_int = (group == "Intervention") * is_missing * delta_intervention,
+            delta = delta_ctl + delta_int
+        )
+
+    ana_delta <- analyse(
+        impute_obj_CIR,
+        fun = compare_change_lastvisit,
+        vars = vars_an,
+        delta = delta_df,
+        ncores = cl
+    )
+
+    pool_delta <- as.data.frame(pool(ana_delta))
+
+    list(
+        trt_effect_6 = pool_delta[["est"]],
+        pval_6 = pool_delta[["pval"]]
+    )
+}
+
+# Get initial delta template
+delta_df_init <- delta_template(impute_obj_CIR)
+
+tipp_frame_grid <- expand.grid(
+    delta_control = seq(-5, 15, by = 2),
+    delta_intervention = seq(-5, 15, by = 2)
+) %>%
+    as_tibble()
+
+# parallelise to speed up computation
+cl <- make_rbmi_cluster(2)
+
+tipp_frame <- tipp_frame_grid %>%
     mutate(
-        pval = cut(
-            pval_6,
-            c(0, 0.001, 0.01, 0.05, 0.2, 1),
-            right = FALSE,
-            labels = c("<0.001", "0.001 - <0.01", "0.01- <0.05", "0.05 - <0.20", ">= 0.20")
-        )
-    )
-
-# Show delta values which lead to non-significant analysis results
-tipp_frame %>%
-    filter(pval_6 >= 0.05)
-#> # A tibble: 3 × 5
-#>   delta_control delta_intervention trt_effect_6 pval_6 pval        
-#>           <dbl>              <dbl>        <dbl>  <dbl> <fct>       
-#> 1            -5                 15        -1.99 0.0935 0.05 - <0.20
-#> 2            -3                 15        -2.15 0.0704 0.05 - <0.20
-#> 3            -1                 15        -2.31 0.0527 0.05 - <0.20
-
-ggplot(tipp_frame, aes(delta_control, delta_intervention, fill = pval)) +
-    geom_raster() +
-    scale_fill_manual(values = c("darkgreen", "lightgreen", "lightyellow", "orange", "red"))
+ results_list = map2(delta_control, delta_intervention, perform_tipp_analysis, cl = cl), + trt_effect_6 = map_dbl(results_list, "trt_effect_6"), + pval_6 = map_dbl(results_list, "pval_6") + ) %>% + select(-results_list) %>% + mutate( + pval = cut( + pval_6, + c(0, 0.001, 0.01, 0.05, 0.2, 1), + right = FALSE, + labels = c("<0.001", "0.001 - <0.01", "0.01- <0.05", "0.05 - <0.20", ">= 0.20") + ) + ) + +# Close cluster when done with it +parallel::stopCluster(cl) + +# Show delta values which lead to non-significant analysis results +tipp_frame %>% + filter(pval_6 >= 0.05) +#> # A tibble: 3 × 5 +#> delta_control delta_intervention trt_effect_6 pval_6 pval +#> <dbl> <dbl> <dbl> <dbl> <fct> +#> 1 -5 15 -1.99 0.0935 0.05 - <0.20 +#> 2 -3 15 -2.15 0.0704 0.05 - <0.20 +#> 3 -1 15 -2.31 0.0527 0.05 - <0.20 + +ggplot(tipp_frame, aes(delta_control, delta_intervention, fill = pval)) + + geom_raster() + + scale_fill_manual(values = c("darkgreen", "lightgreen", "lightyellow", "orange", "red"))

According to this analysis, the significant test result from the primary analysis under CIR could only be tipped to a non-significant result for rather extreme delta-adjustments. Please note that for a real analysis it is recommended to use a smaller step size in the grid than what has been used here.

From f91163dd3436c5b9ddb5c08f3df7bd84b8e78e98 Mon Sep 17 00:00:00 2001 From: gowerc Date: Fri, 4 Oct 2024 12:41:06 +0100 Subject: [PATCH 05/11] progress --- R/analyse.R | 87 ++++++++++++++++++++++++++++++------ R/parallel.R | 16 +++++-- tests/testthat/helper-misc.R | 2 +- 3 files changed, 87 insertions(+), 18 deletions(-) diff --git a/R/analyse.R b/R/analyse.R index 644be5a11..40d114f05 100644 --- a/R/analyse.R +++ b/R/analyse.R @@ -1,5 +1,13 @@ +time_it <- function(expr){ + start <- Sys.time() + expr + stop <- Sys.time() + as.numeric(difftime(stop, start, units = "secs")) +} + + #' Analyse Multiple Imputed Datasets @@ -179,6 +187,7 @@ analyse <- function( ncores = 1 ) { +xx_startup <- system.time({ validate(imputations) assert_that( @@ -209,20 +218,51 @@ analyse <- function( ) ) } +}) +print(sprintf("STARTUP = %f", xx_startup[[3]])) + +xx_data_prep <- system.time({ + + # ..rbmi..analysis..wrapup <- if (is(ncores, "cluster")) { + # function(x, idx, folder) { + # path <- file.path( + # folder, + # paste0("result_", sprintf("%012d", idx), ".Rds") + # ) + # #saveRDS(x, path) + # return(TRUE) + # } + # } else { + # function(x, ...) { + # return(x) + # } + # } # Mangle name to avoid any conflicts with user defined objects if running # in a cluster - ..rbmi..analysis..imputations <- imputations - ..rbmi..analysis..delta <- delta - ..rbmi..analysis..fun <- fun - cl <- make_rbmi_cluster( - ncores, - objects = list( - "..rbmi..analysis..imputations" = imputations, - "..rbmi..analysis..delta" = delta, - "..rbmi..analysis..fun" = fun - ) + objects <- list( + ..rbmi..analysis..temppath = tempfile(), + ..rbmi..analysis..imputations = imputations, + ..rbmi..analysis..delta = delta, + ..rbmi..analysis..fun = fun + #..rbmi..analysis..wrapup = ..rbmi..analysis..wrapup ) + list2env(objects, envir = environment()) + cl <- make_rbmi_cluster(ncores) + if (is(ncores, "cluster")) { + ..rbmi..analysis..data..path <- file.path(tempdir(), "rbmi_data.Rds") + qs2::qs_save(objects, file = ..rbmi..analysis..data..path, compress = FALSE) + devnull <- parallel::clusterExport(cl, "..rbmi..analysis..data..path", environment()) + devnull <- parallel::clusterEvalQ( + cl, + { + ..rbmi..analysis..objects <- qs2::qs_read(..rbmi..analysis..data..path) + list2env(..rbmi..analysis..objects, envir = environment()) + } + ) + } + + dir.create(..rbmi..analysis..temppath, showWarnings = FALSE) # If the user provided the clusters object directly then do not close it on completion if (!is(ncores, "cluster")) { @@ -232,15 +272,18 @@ analyse <- function( after = FALSE ) } +}) +print(sprintf("DATA PREP = %f", xx_data_prep[[3]])) +xx_processing <- system.time({ # Chunk up requests for significant speed improvement when running in parallel number_of_cores <- ifelse(is.null(cl), 1, length(cl)) indexes <- seq_along(imputations$imputations) indexes_split <- split(indexes, (indexes %% number_of_cores) + 1) - results <- par_lapply( + results <- par_map2( cl, - function(indicies, ...) { + function(indicies, pid, ...) { inner_fun <- function(idx, ...) { dat2 <- extract_imputed_df( ..rbmi..analysis..imputations$imputations[[idx]], @@ -249,14 +292,28 @@ analyse <- function( ) ..rbmi..analysis..fun(dat2, ...) } - lapply(indicies, inner_fun, ...) + res <- lapply(indicies, inner_fun, ...) + #..rbmi..analysis..wrapup(res, pid, ..rbmi..analysis..temppath) + res }, indexes_split, + seq_along(indexes_split), ... ) |> unlist(recursive = FALSE, use.names = FALSE) - # Reorder + +}) +print(sprintf("PROCESSING = %f", xx_processing[[3]])) + +xx_reaping <- system.time({ + # if (is(cl, "cluster")) { + # results <- lapply( + # list.files(..rbmi..analysis..temppath, full.names = TRUE), + # readRDS + # ) |> + # unlist(recursive = FALSE, use.names = FALSE) + # } results <- results[order(unlist(indexes_split, use.names = FALSE))] names(results) <- NULL @@ -275,6 +332,8 @@ analyse <- function( method = imputations$method ) validate(ret) +}) +print(sprintf("REAPING = %f", xx_reaping[[3]])) return(ret) } diff --git a/R/parallel.R b/R/parallel.R index d4769ae60..a1bacf7fc 100644 --- a/R/parallel.R +++ b/R/parallel.R @@ -135,10 +135,20 @@ is_in_rbmi_development <- function() { #' @param x object to be looped over #' @param ... extra arguements passed to `fun` par_lapply <- function(cl, fun, x, ...) { - if (is.null(cl)) { - return(lapply(x, fun, ...)) + result <- if (is.null(cl)) { + lapply(x, fun, ...) } else { - return(parallel::clusterApplyLB(cl, x, fun, ...)) + parallel::clusterApplyLB(cl, x, fun, ...) } + return(result) +} + +par_map2 <- function(cl, fun, x, y, ...) { + result <- if (is.null(cl)) { + mapply(fun, x, y, MoreArgs = list(...), SIMPLIFY = FALSE) + } else { + parallel::clusterMap(cl, fun, x, y, MoreArgs = list(...), SIMPLIFY = FALSE) + } + return(result) } diff --git a/tests/testthat/helper-misc.R b/tests/testthat/helper-misc.R index 6253d08e4..71bbce9e0 100644 --- a/tests/testthat/helper-misc.R +++ b/tests/testthat/helper-misc.R @@ -30,7 +30,7 @@ is_cluster_closed <- function(cl) { stop("`cl` is not a cluster object") } result <- tryCatch({ - clusterCall(cl1, function() Sys.info()) + parallel::clusterCall(cl, function() Sys.info()) FALSE }, error = function(e) { TRUE From 22bccaee4cddefb88651cbf41b95e797b02e4b4d Mon Sep 17 00:00:00 2001 From: gowerc Date: Fri, 4 Oct 2024 14:57:21 +0100 Subject: [PATCH 06/11] progress --- R/analyse.R | 86 +++++++++++------------------------------------------ 1 file changed, 18 insertions(+), 68 deletions(-) diff --git a/R/analyse.R b/R/analyse.R index 40d114f05..f47318f91 100644 --- a/R/analyse.R +++ b/R/analyse.R @@ -1,15 +1,4 @@ - -time_it <- function(expr){ - start <- Sys.time() - expr - stop <- Sys.time() - as.numeric(difftime(stop, start, units = "secs")) -} - - - - #' Analyse Multiple Imputed Datasets #' #' @description @@ -101,6 +90,9 @@ time_it <- function(expr){ #' @param ... Additional arguments passed onto `fun`. #' @param ncores The number of parallel processes to use when running this function. Can also be a #' cluster object created by [`make_rbmi_cluster()`]. See the parallisation section below. +#' @param .validate Should `inputations` be checked to ensure it conforms to the required format +#' (default = `TRUE`) ? Can gain a small performance increase if this is set to `FALSE` when +#' analysing a large number of samples. #' #' @section Parallelisation: #' To speed up the evaluation of `analyse()` you can use the `ncores` argument to enable parallelisation. @@ -184,11 +176,11 @@ analyse <- function( fun = ancova, delta = NULL, ..., - ncores = 1 + ncores = 1, + .validate = TRUE ) { -xx_startup <- system.time({ - validate(imputations) + if (.validate) validate(imputations) assert_that( is.function(fun), @@ -202,7 +194,7 @@ xx_startup <- system.time({ vars <- imputations$data$vars - devnull <- lapply(imputations$imputations, function(x) validate(x)) + if (.validate) devnull <- lapply(imputations$imputations, function(x) validate(x)) if (!is.null(delta)) { expected_vars <- c( @@ -218,52 +210,30 @@ xx_startup <- system.time({ ) ) } -}) -print(sprintf("STARTUP = %f", xx_startup[[3]])) - -xx_data_prep <- system.time({ - - # ..rbmi..analysis..wrapup <- if (is(ncores, "cluster")) { - # function(x, idx, folder) { - # path <- file.path( - # folder, - # paste0("result_", sprintf("%012d", idx), ".Rds") - # ) - # #saveRDS(x, path) - # return(TRUE) - # } - # } else { - # function(x, ...) { - # return(x) - # } - # } - - # Mangle name to avoid any conflicts with user defined objects if running - # in a cluster + + # Mangle name to avoid any conflicts with user defined objects if running in a cluster objects <- list( - ..rbmi..analysis..temppath = tempfile(), ..rbmi..analysis..imputations = imputations, ..rbmi..analysis..delta = delta, ..rbmi..analysis..fun = fun - #..rbmi..analysis..wrapup = ..rbmi..analysis..wrapup ) list2env(objects, envir = environment()) + cl <- make_rbmi_cluster(ncores) - if (is(ncores, "cluster")) { - ..rbmi..analysis..data..path <- file.path(tempdir(), "rbmi_data.Rds") - qs2::qs_save(objects, file = ..rbmi..analysis..data..path, compress = FALSE) + + if (is(cl, "cluster")) { + ..rbmi..analysis..data..path <- tempfile() + saveRDS(objects, file = ..rbmi..analysis..data..path, compress = FALSE) devnull <- parallel::clusterExport(cl, "..rbmi..analysis..data..path", environment()) devnull <- parallel::clusterEvalQ( cl, { - ..rbmi..analysis..objects <- qs2::qs_read(..rbmi..analysis..data..path) + ..rbmi..analysis..objects <- readRDS(..rbmi..analysis..data..path) list2env(..rbmi..analysis..objects, envir = environment()) } ) } - dir.create(..rbmi..analysis..temppath, showWarnings = FALSE) - # If the user provided the clusters object directly then do not close it on completion if (!is(ncores, "cluster")) { on.exit( @@ -272,18 +242,15 @@ xx_data_prep <- system.time({ after = FALSE ) } -}) -print(sprintf("DATA PREP = %f", xx_data_prep[[3]])) -xx_processing <- system.time({ # Chunk up requests for significant speed improvement when running in parallel number_of_cores <- ifelse(is.null(cl), 1, length(cl)) indexes <- seq_along(imputations$imputations) indexes_split <- split(indexes, (indexes %% number_of_cores) + 1) - results <- par_map2( + results <- par_lapply( cl, - function(indicies, pid, ...) { + function(indicies, ...) { inner_fun <- function(idx, ...) { dat2 <- extract_imputed_df( ..rbmi..analysis..imputations$imputations[[idx]], @@ -292,28 +259,13 @@ xx_processing <- system.time({ ) ..rbmi..analysis..fun(dat2, ...) } - res <- lapply(indicies, inner_fun, ...) - #..rbmi..analysis..wrapup(res, pid, ..rbmi..analysis..temppath) - res + lapply(indicies, inner_fun, ...) }, indexes_split, - seq_along(indexes_split), ... ) |> unlist(recursive = FALSE, use.names = FALSE) - -}) -print(sprintf("PROCESSING = %f", xx_processing[[3]])) - -xx_reaping <- system.time({ - # if (is(cl, "cluster")) { - # results <- lapply( - # list.files(..rbmi..analysis..temppath, full.names = TRUE), - # readRDS - # ) |> - # unlist(recursive = FALSE, use.names = FALSE) - # } results <- results[order(unlist(indexes_split, use.names = FALSE))] names(results) <- NULL @@ -332,8 +284,6 @@ xx_reaping <- system.time({ method = imputations$method ) validate(ret) -}) -print(sprintf("REAPING = %f", xx_reaping[[3]])) return(ret) } From c1a8922e5a8ebacaea980d736057fea8af9f5e45 Mon Sep 17 00:00:00 2001 From: gowerc Date: Fri, 4 Oct 2024 15:00:44 +0100 Subject: [PATCH 07/11] export objects via disk --- R/analyse.R | 1 + R/parallel.R | 10 ---------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/R/analyse.R b/R/analyse.R index f47318f91..5b136b315 100644 --- a/R/analyse.R +++ b/R/analyse.R @@ -266,6 +266,7 @@ analyse <- function( ) |> unlist(recursive = FALSE, use.names = FALSE) + # Re-order to ensure results are returned in same order as imputations results <- results[order(unlist(indexes_split, use.names = FALSE))] names(results) <- NULL diff --git a/R/parallel.R b/R/parallel.R index a1bacf7fc..5a5b31eeb 100644 --- a/R/parallel.R +++ b/R/parallel.R @@ -142,13 +142,3 @@ par_lapply <- function(cl, fun, x, ...) { } return(result) } - -par_map2 <- function(cl, fun, x, y, ...) { - result <- if (is.null(cl)) { - mapply(fun, x, y, MoreArgs = list(...), SIMPLIFY = FALSE) - } else { - parallel::clusterMap(cl, fun, x, y, MoreArgs = list(...), SIMPLIFY = FALSE) - } - return(result) -} - From 9eab9c5f01ac14ae463e925edaa9f8c14c254279 Mon Sep 17 00:00:00 2001 From: gowerc Date: Fri, 4 Oct 2024 15:02:41 +0100 Subject: [PATCH 08/11] added missing rd file --- man/analyse.Rd | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/man/analyse.Rd b/man/analyse.Rd index c4aa41147..0a8feffb3 100644 --- a/man/analyse.Rd +++ b/man/analyse.Rd @@ -4,7 +4,14 @@ \alias{analyse} \title{Analyse Multiple Imputed Datasets} \usage{ -analyse(imputations, fun = ancova, delta = NULL, ..., ncores = 1) +analyse( + imputations, + fun = ancova, + delta = NULL, + ..., + ncores = 1, + .validate = TRUE +) } \arguments{ \item{imputations}{An \code{imputations} object as created by \code{\link[=impute]{impute()}}.} @@ -18,6 +25,10 @@ datasets prior to running \code{fun}. See details.} \item{ncores}{The number of parallel processes to use when running this function. Can also be a cluster object created by \code{\link[=make_rbmi_cluster]{make_rbmi_cluster()}}. See the parallisation section below.} + +\item{.validate}{Should \code{inputations} be checked to ensure it conforms to the required format +(default = \code{TRUE}) ? Can gain a small performance increase if this is set to \code{FALSE} when +analysing a large number of samples.} } \description{ This function takes multiple imputed datasets (as generated by From 28f2cff9775b9bef8b305f7d0b8fbe68918fef96 Mon Sep 17 00:00:00 2001 From: Craig Gower-Page Date: Fri, 4 Oct 2024 15:05:35 +0100 Subject: [PATCH 09/11] Apply suggestions from code review Co-authored-by: Isaac Gravestock <83659704+gravesti@users.noreply.github.com> Signed-off-by: Craig Gower-Page --- R/parallel.R | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/R/parallel.R b/R/parallel.R index 5a5b31eeb..5b6b6bb19 100644 --- a/R/parallel.R +++ b/R/parallel.R @@ -53,14 +53,9 @@ make_rbmi_cluster <- function(ncores = 1, objects = NULL, packages = NULL) { } # Load user defined packages - packages <- if (is.null(packages)) { - # TODO - can't remember why this is needed; need to look into - "assertthat" - } else { - c(packages, "assertthat") - } + packages <- c(packages, "assertthat") # Remove attempts to load rbmi as this will be covered later - packages <- grep("^rbmi$", packages, value = TRUE, invert = TRUE) + packages <- setdiff(packages, "rbmi") devnull <- parallel::clusterCall( cl, function(pkgs) lapply(pkgs, function(x) library(x, character.only = TRUE)), From a226aea1ef2328ca7272ea00a8db69963c6e96c5 Mon Sep 17 00:00:00 2001 From: gowerc Date: Fri, 11 Oct 2024 16:29:16 +0100 Subject: [PATCH 10/11] update warning --- R/analyse.R | 9 +++++++++ man/analyse.Rd | 9 +++++++++ tests/testthat/test-analyse.R | 4 ++-- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/R/analyse.R b/R/analyse.R index 5b136b315..06c9d00c9 100644 --- a/R/analyse.R +++ b/R/analyse.R @@ -121,6 +121,15 @@ #' `> 2000` samples generated by [`draws()`]. Conversely using parallelisation if your samples #' are smaller than this may lead to longer run times than just running it sequentially. #' +#' It is important to note that the implementation of parallel processing within [`analyse()`] has +#' been optimised around the assumption that the parallel processes will be spawned on the same +#' machine and not a remote cluster. One such optimisation is that the required data is saved to +#' a temporary file on the local disk from which it is then read into each sub-process. This is +#' done to avoid the overhead of transferring the data over the network. Our assumption is that +#' if you are at the stage where you need to be parallelising your analysis over a remote cluster +#' then you would likely be better off parallelising across multiple rbmi runs rather than within +#' a single rbmi run. +#' #' Finally, if you are doing a tipping point analysis you can get a reasonable performance #' improvement by re-using the cluster between each call to `analyse()` e.g. #' ``` diff --git a/man/analyse.Rd b/man/analyse.Rd index 0a8feffb3..86152c3c7 100644 --- a/man/analyse.Rd +++ b/man/analyse.Rd @@ -135,6 +135,15 @@ parallelisation of the \code{analyse()} function tends to only be worth it when \verb{> 2000} samples generated by \code{\link[=draws]{draws()}}. Conversely using parallelisation if your samples are smaller than this may lead to longer run times than just running it sequentially. +It is important to note that the implementation of parallel processing within \code{\link[=analyse]{analyse()}} has +been optimised around the assumption that the parallel processes will be spawned on the same +machine and not a remote cluster. One such optimisation is that the required data is saved to +a temporary file on the local disk from which it is then read into each sub-process. This is +done to avoid the overhead of transferring the data over the network. Our assumption is that +if you are at the stage where you need to be parallelising your analysis over a remote cluster +then you would likely be better off parallelising across multiple rbmi runs rather than within +a single rbmi run. + Finally, if you are doing a tipping point analysis you can get a reasonable performance improvement by re-using the cluster between each call to \code{analyse()} e.g. diff --git a/tests/testthat/test-analyse.R b/tests/testthat/test-analyse.R index 335d692cd..6d5920410 100644 --- a/tests/testthat/test-analyse.R +++ b/tests/testthat/test-analyse.R @@ -317,7 +317,7 @@ test_that("Parallisation works with analyse and produces identical results", { fun = rbmi::ancova, vars = vars2, delta = dat_delta_1, - ncores = 3 + ncores = 2 ) var <- 20 @@ -329,7 +329,7 @@ test_that("Parallisation works with analyse and produces identical results", { inner_fun(...) } - cl <- make_rbmi_cluster(3, objects = list(var = var, inner_fun = inner_fun), "lubridate") + cl <- make_rbmi_cluster(2, objects = list(var = var, inner_fun = inner_fun), "lubridate") anaobj_d1_t3 <- analyse( imputeobj, fun = rbmi::ancova, From c0ff3bf14b7cca4dc82373365fb0a78410a51dc0 Mon Sep 17 00:00:00 2001 From: gowerc Date: Fri, 11 Oct 2024 16:53:14 +0100 Subject: [PATCH 11/11] fix cran note --- R/analyse.R | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/R/analyse.R b/R/analyse.R index 06c9d00c9..fbf281a58 100644 --- a/R/analyse.R +++ b/R/analyse.R @@ -221,12 +221,14 @@ analyse <- function( } # Mangle name to avoid any conflicts with user defined objects if running in a cluster + ..rbmi..analysis..imputations <- imputations + ..rbmi..analysis..delta <- delta + ..rbmi..analysis..fun <- fun objects <- list( - ..rbmi..analysis..imputations = imputations, - ..rbmi..analysis..delta = delta, - ..rbmi..analysis..fun = fun + ..rbmi..analysis..imputations = ..rbmi..analysis..imputations, + ..rbmi..analysis..delta = ..rbmi..analysis..delta, + ..rbmi..analysis..fun = ..rbmi..analysis..fun ) - list2env(objects, envir = environment()) cl <- make_rbmi_cluster(ncores)