From 43639fc03878d9e8fa0a83a812b1387c94d08e5f Mon Sep 17 00:00:00 2001 From: Craig Gower-Page Date: Fri, 11 Oct 2024 17:00:21 +0100 Subject: [PATCH] Parallelisation support for analyse (#435) --- NAMESPACE | 1 + R/analyse.R | 144 +++++++++++++++++++++++++++--- R/draws.R | 30 +++++-- R/parallel.R | 137 ++++++++++++++++++----------- man/analyse.Rd | 76 +++++++++++++++- man/draws.Rd | 3 +- man/encap_get_mmrm_sample.Rd | 25 ------ man/get_cluster.Rd | 19 ---- man/make_rbmi_cluster.Rd | 44 ++++++++++ man/par_lapply.Rd | 21 +++++ tests/testthat/helper-misc.R | 16 ++++ tests/testthat/test-analyse.R | 156 +++++++++++++++++++++++++++++++++ tests/testthat/test-parallel.R | 85 ++++++++++++++++++ vignettes/advanced.Rmd | 15 +++- vignettes/advanced.html | 135 +++++++++++++++------------- vignettes/quickstart.html | 73 +++++++-------- 16 files changed, 760 insertions(+), 220 deletions(-) delete mode 100644 man/encap_get_mmrm_sample.Rd delete mode 100644 man/get_cluster.Rd create mode 100644 man/make_rbmi_cluster.Rd create mode 100644 man/par_lapply.Rd diff --git a/NAMESPACE b/NAMESPACE index 8836268cf..2d8c204f1 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -58,6 +58,7 @@ export(has_class) export(impute) export(locf) export(longDataConstructor) +export(make_rbmi_cluster) export(method_approxbayes) export(method_bayes) export(method_bmlmi) diff --git a/R/analyse.R b/R/analyse.R index b06cac7ff..fbf281a58 100644 --- a/R/analyse.R +++ b/R/analyse.R @@ -1,7 +1,4 @@ - - - #' Analyse Multiple Imputed Datasets #' #' @description @@ -91,6 +88,70 @@ #' @param delta A `data.frame` containing the delta transformation to be applied to the imputed #' datasets prior to running `fun`. See details. #' @param ... Additional arguments passed onto `fun`. +#' @param ncores The number of parallel processes to use when running this function. Can also be a +#' cluster object created by [`make_rbmi_cluster()`]. See the parallisation section below. +#' @param .validate Should `inputations` be checked to ensure it conforms to the required format +#' (default = `TRUE`) ? Can gain a small performance increase if this is set to `FALSE` when +#' analysing a large number of samples. +#' +#' @section Parallelisation: +#' To speed up the evaluation of `analyse()` you can use the `ncores` argument to enable parallelisation. +#' Simply providing an integer will get rbmi to automatically spawn that many background processes +#' to parallelise across. If you are using a custom analysis function then you need to ensure +#' that any libraries or global objects required by your function are available in the +#' sub-processes. To do this you need to use the [`make_rbmi_cluster()`] function for example: +#' ``` +#' my_custom_fun <- function(...) +#' cl <- make_rbmi_cluster( +#' 4, +#' objects = list("my_custom_fun" = my_custom_fun), +#' packages = c("dplyr", "nlme") +#' ) +#' analyse( +#' imputations = imputeObj, +#' fun = my_custom_fun, +#' ncores = cl +#' ) +#' parallel::stopCluster(cl) +#' ``` +#' +#' Note that there is significant overhead both with setting up the sub-processes and with +#' transferring data back-and-forth between the main process and the sub-processes. As such +#' parallelisation of the `analyse()` function tends to only be worth it when you have +#' `> 2000` samples generated by [`draws()`]. Conversely using parallelisation if your samples +#' are smaller than this may lead to longer run times than just running it sequentially. +#' +#' It is important to note that the implementation of parallel processing within [`analyse()`] has +#' been optimised around the assumption that the parallel processes will be spawned on the same +#' machine and not a remote cluster. One such optimisation is that the required data is saved to +#' a temporary file on the local disk from which it is then read into each sub-process. This is +#' done to avoid the overhead of transferring the data over the network. Our assumption is that +#' if you are at the stage where you need to be parallelising your analysis over a remote cluster +#' then you would likely be better off parallelising across multiple rbmi runs rather than within +#' a single rbmi run. +#' +#' Finally, if you are doing a tipping point analysis you can get a reasonable performance +#' improvement by re-using the cluster between each call to `analyse()` e.g. +#' ``` +#' cl <- make_rbmi_cluster(4) +#' ana_1 <- analyse( +#' imputations = imputeObj, +#' delta = delta_plan_1, +#' ncores = cl +#' ) +#' ana_2 <- analyse( +#' imputations = imputeObj, +#' delta = delta_plan_2, +#' ncores = cl +#' ) +#' ana_3 <- analyse( +#' imputations = imputeObj, +#' delta = delta_plan_3, +#' ncores = cl +#' ) +#' parallel::clusterStop(cl) +#' ``` +#' #' @examples #' \dontrun{ #' vars <- set_vars( @@ -119,9 +180,16 @@ #' ) #' } #' @export -analyse <- function(imputations, fun = ancova, delta = NULL, ...) { +analyse <- function( + imputations, + fun = ancova, + delta = NULL, + ..., + ncores = 1, + .validate = TRUE +) { - validate(imputations) + if (.validate) validate(imputations) assert_that( is.function(fun), @@ -135,7 +203,7 @@ analyse <- function(imputations, fun = ancova, delta = NULL, ...) { vars <- imputations$data$vars - devnull <- lapply(imputations$imputations, function(x) validate(x)) + if (.validate) devnull <- lapply(imputations$imputations, function(x) validate(x)) if (!is.null(delta)) { expected_vars <- c( @@ -152,14 +220,66 @@ analyse <- function(imputations, fun = ancova, delta = NULL, ...) { ) } - results <- lapply( - imputations$imputations, - function(x, ...) { - dat2 <- extract_imputed_df(x, imputations$data, delta) - fun(dat2, ...) + # Mangle name to avoid any conflicts with user defined objects if running in a cluster + ..rbmi..analysis..imputations <- imputations + ..rbmi..analysis..delta <- delta + ..rbmi..analysis..fun <- fun + objects <- list( + ..rbmi..analysis..imputations = ..rbmi..analysis..imputations, + ..rbmi..analysis..delta = ..rbmi..analysis..delta, + ..rbmi..analysis..fun = ..rbmi..analysis..fun + ) + + cl <- make_rbmi_cluster(ncores) + + if (is(cl, "cluster")) { + ..rbmi..analysis..data..path <- tempfile() + saveRDS(objects, file = ..rbmi..analysis..data..path, compress = FALSE) + devnull <- parallel::clusterExport(cl, "..rbmi..analysis..data..path", environment()) + devnull <- parallel::clusterEvalQ( + cl, + { + ..rbmi..analysis..objects <- readRDS(..rbmi..analysis..data..path) + list2env(..rbmi..analysis..objects, envir = environment()) + } + ) + } + + # If the user provided the clusters object directly then do not close it on completion + if (!is(ncores, "cluster")) { + on.exit( + { if (!is.null(cl)) parallel::stopCluster(cl) }, + add = TRUE, + after = FALSE + ) + } + + # Chunk up requests for significant speed improvement when running in parallel + number_of_cores <- ifelse(is.null(cl), 1, length(cl)) + indexes <- seq_along(imputations$imputations) + indexes_split <- split(indexes, (indexes %% number_of_cores) + 1) + + results <- par_lapply( + cl, + function(indicies, ...) { + inner_fun <- function(idx, ...) { + dat2 <- extract_imputed_df( + ..rbmi..analysis..imputations$imputations[[idx]], + ..rbmi..analysis..imputations$data, + ..rbmi..analysis..delta + ) + ..rbmi..analysis..fun(dat2, ...) + } + lapply(indicies, inner_fun, ...) }, + indexes_split, ... - ) + ) |> + unlist(recursive = FALSE, use.names = FALSE) + + # Re-order to ensure results are returned in same order as imputations + results <- results[order(unlist(indexes_split, use.names = FALSE))] + names(results) <- NULL fun_name <- deparse(substitute(fun)) if (length(fun_name) > 1) { diff --git a/R/draws.R b/R/draws.R index a8ff6812a..db09db959 100644 --- a/R/draws.R +++ b/R/draws.R @@ -26,7 +26,8 @@ #' [method_approxbayes()], [method_condmean()] or [method_bmlmi()]. #' It specifies the multiple imputation methodology to be used. See details. #' @param ncores A single numeric specifying the number of cores to use in creating the draws object. -#' Note that this parameter is ignored for [method_bayes()] (Default = 1). +#' Note that this parameter is ignored for [method_bayes()] (Default = 1). Can also be a cluster object +#' generated by [`make_rbmi_cluster()`] #' @param quiet Logical, if `TRUE` will suppress printing of progress information that is printed to #' the console. #' @@ -345,9 +346,27 @@ get_draws_mle <- function( } - cl <- get_cluster(ncores) - mmrm_sample <- encap_get_mmrm_sample(cl, longdata, method) + cl <- make_rbmi_cluster(ncores, objects = list("longdata" = longdata, "method" = method)) + # If the user provided the clusters object directly then do not close it on completion + if (!is(ncores, "cluster")){ + on.exit( + { if (!is.null(cl)) parallel::stopCluster(cl) }, + add = TRUE, + after = FALSE + ) + } + + # Encapsulate arguments into a single function on `ids` and handle parallelisation + par_get_mmrm_sample <- function(ids) { + par_lapply( + cl, + get_mmrm_sample, + ids, + longdata = longdata, + method = method + ) + } samples <- list() n_failed_samples <- 0 @@ -355,12 +374,11 @@ get_draws_mle <- function( while (length(samples) < n_target_samples) { ids <- sample_stack$pop(min(ncores, n_target_samples - length(samples))) - new_samples <- mmrm_sample(ids) + new_samples <- par_get_mmrm_sample(ids) isfailure <- vapply(new_samples, function(x) x$failed, logical(1)) new_samples_keep <- new_samples[!isfailure] n_failed_samples <- n_failed_samples + sum(isfailure) if (n_failed_samples > failure_limit) { - if (!is.null(cl)) parallel::stopCluster(cl) if (!is.null(method$type)) { if (method$type == "jackknife") { ids_fail <- ids[isfailure][[1]] @@ -376,8 +394,6 @@ get_draws_mle <- function( samples <- append(samples, new_samples_keep) } - if (!is.null(cl)) parallel::stopCluster(cl) - assert_that( length(samples) == n_target_samples, msg = "Incorrect number of samples were produced" diff --git a/R/parallel.R b/R/parallel.R index cd8e650a6..356cffd34 100644 --- a/R/parallel.R +++ b/R/parallel.R @@ -1,34 +1,84 @@ -#' Create cluster +#' Create a rbmi ready cluster #' -#' @param ncores Number of parallel processes to use +#' @param ncores Number of parallel processes to use or an existing cluster to make use of +#' @param objects a named list of objects to export into the sub-processes +#' @param packages a character vector of libraries to load in the sub-processes #' -#' If `ncores` is `1` this function will return NULL -#' This function spawns a PSOCK cluster. -#' Ensures that `rbmi` and `assert_that` have been loaded -#' on the sub-processes +#' This function is a wrapper around `parallel::makePSOCKcluster()` but takes +#' care of configuring rbmi to be used in the sub-processes as well as loading +#' user defined objects and libraries and setting the seed for reproducibility. #' -get_cluster <- function(ncores = 1) { - if (ncores == 1) { +#' If `ncores` is `1` this function will return `NULL`. +#' +#' If `ncores` is a cluster created via `parallel::makeCluster()` then this function +#' just takes care of inserting the relevant rbmi objects into the existing cluster. +#' +#' @examples +#' \dontrun{ +#' # Basic usage +#' make_rbmi_cluster(5) +#' +#' # User objects + libraries +#' VALUE <- 5 +#' myfun <- function(x) { +#' x + day(VALUE) # From lubridate::day() +#' } +#' make_rbmi_cluster(5, list(VALUE = VALUE, myfun = myfun), c("lubridate")) +#' +#' # Using a already created cluster +#' cl <- parallel::makeCluster(5) +#' make_rbmi_cluster(cl) +#' } +#' @export +make_rbmi_cluster <- function(ncores = 1, objects = NULL, packages = NULL) { + + if (is.numeric(ncores) && ncores == 1) { return(NULL) + } else if (is.numeric(ncores)) { + cl <- parallel::makePSOCKcluster(ncores) + } else if (is(ncores, "cluster")) { + cl <- ncores + } else { + stop(sprintf( + "`ncores` has unsupported class of: %s", + paste(class(ncores), collapse = ", ") + )) + } + + # Load user defined objects into the globalname space + if (!is.null(objects) && length(objects)) { + export_env <- list2env(objects) + parallel::clusterExport(cl, names(objects), export_env) } - cl <- parallel::makePSOCKcluster( - ncores + # Load user defined packages + packages <- c(packages, "assertthat") + # Remove attempts to load rbmi as this will be covered later + packages <- setdiff(packages, "rbmi") + devnull <- parallel::clusterCall( + cl, + function(pkgs) lapply(pkgs, function(x) library(x, character.only = TRUE)), + as.list(packages) ) - devnull <- parallel::clusterEvalQ(cl, { - library(assertthat) - }) + # Ensure reproducibility + parallel::clusterSetRNGStream(cl, sample.int(1)) + # If user has previously configured rbmi sub-processes then early exit + exported_rbmi <- unlist(parallel::clusterEvalQ(cl, exists("..exported..parallel..rbmi"))) + if (all(exported_rbmi)) { + return(cl) + } + + # Ensure that exported and unexported objects are all directly accessible + # from the globalenv in the sub-processes if (is_in_rbmi_development()) { devnull <- parallel::clusterEvalQ(cl, pkgload::load_all()) } else { devnull <- parallel::clusterEvalQ( cl, { - # Here we "export" both exported and non-exported functions - # from the package to the global environment of our subprocesses .namespace <- getNamespace("rbmi") for (.nsfun in ls(.namespace)) { assign(.nsfun, get(.nsfun, envir = .namespace)) @@ -36,6 +86,12 @@ get_cluster <- function(ncores = 1) { } ) } + + # Set variable to signify rbmi has been configured + devnull <- parallel::clusterEvalQ(cl, { + ..exported..parallel..rbmi <- TRUE + }) + return(cl) } @@ -65,44 +121,19 @@ is_in_rbmi_development <- function() { -#' Encapsulate get_mmrm_sample -#' -#' Function creates a new wrapper function around [get_mmrm_sample()] -#' so that the arguments of [get_mmrm_sample()] are enclosed within -#' the new function. This makes running parallel and single process -#' calls to the function smoother. In particular this function takes care -#' of exporting the arguments if required to parallel process in a cluster -#' -#' @seealso [get_cluster()] for more documentation on the function inputs +#' Parallelise Lapply #' -#' @param cl Either a cluster from [get_cluster()] or `NULL` -#' @param longdata A longdata object from `longDataConstructor$new()` -#' @param method A method object -encap_get_mmrm_sample <- function(cl, longdata, method) { - fun <- function(ids) { - get_mmrm_sample( - ids = ids, - longdata = longdata, - method = method - ) - } - lfun <- function(ids) { - lapply(ids, fun) - } - - if (is.null(cl)) { - return(lfun) - } - - parallel::clusterExport( - cl = cl, - varlist = c("longdata", "method"), - envir = environment() - ) - - lfun <- function(ids) { - parallel::clusterApplyLB(cl, ids, fun) +#' Simple wrapper around `lapply` and [`parallel::clusterApplyLB`] to abstract away +#' the logic of deciding which one to use +#' @param cl Cluster created by [`parallel::makeCluster()`] or `NULL` +#' @param fun Function to be run +#' @param x object to be looped over +#' @param ... extra arguements passed to `fun` +par_lapply <- function(cl, fun, x, ...) { + result <- if (is.null(cl)) { + lapply(x, fun, ...) + } else { + parallel::clusterApplyLB(cl, x, fun, ...) } - - return(lfun) + return(result) } diff --git a/man/analyse.Rd b/man/analyse.Rd index 41d9bf9b1..86152c3c7 100644 --- a/man/analyse.Rd +++ b/man/analyse.Rd @@ -4,7 +4,14 @@ \alias{analyse} \title{Analyse Multiple Imputed Datasets} \usage{ -analyse(imputations, fun = ancova, delta = NULL, ...) +analyse( + imputations, + fun = ancova, + delta = NULL, + ..., + ncores = 1, + .validate = TRUE +) } \arguments{ \item{imputations}{An \code{imputations} object as created by \code{\link[=impute]{impute()}}.} @@ -15,6 +22,13 @@ analyse(imputations, fun = ancova, delta = NULL, ...) datasets prior to running \code{fun}. See details.} \item{...}{Additional arguments passed onto \code{fun}.} + +\item{ncores}{The number of parallel processes to use when running this function. Can also be a +cluster object created by \code{\link[=make_rbmi_cluster]{make_rbmi_cluster()}}. See the parallisation section below.} + +\item{.validate}{Should \code{inputations} be checked to ensure it conforms to the required format +(default = \code{TRUE}) ? Can gain a small performance increase if this is set to \code{FALSE} when +analysing a large number of samples.} } \description{ This function takes multiple imputed datasets (as generated by @@ -93,6 +107,66 @@ use the helper function \code{\link[=delta_template]{delta_template()}} to defin this provides utility variables such as \code{is_missing} which can be used to identify exactly which visits have been imputed. } +\section{Parallelisation}{ + +To speed up the evaluation of \code{analyse()} you can use the \code{ncores} argument to enable parallelisation. +Simply providing an integer will get rbmi to automatically spawn that many background processes +to parallelise across. If you are using a custom analysis function then you need to ensure +that any libraries or global objects required by your function are available in the +sub-processes. To do this you need to use the \code{\link[=make_rbmi_cluster]{make_rbmi_cluster()}} function for example: + +\if{html}{\out{
}}\preformatted{my_custom_fun <- function(...) +cl <- make_rbmi_cluster( + 4, + objects = list("my_custom_fun" = my_custom_fun), + packages = c("dplyr", "nlme") +) +analyse( + imputations = imputeObj, + fun = my_custom_fun, + ncores = cl +) +parallel::stopCluster(cl) +}\if{html}{\out{
}} + +Note that there is significant overhead both with setting up the sub-processes and with +transferring data back-and-forth between the main process and the sub-processes. As such +parallelisation of the \code{analyse()} function tends to only be worth it when you have +\verb{> 2000} samples generated by \code{\link[=draws]{draws()}}. Conversely using parallelisation if your samples +are smaller than this may lead to longer run times than just running it sequentially. + +It is important to note that the implementation of parallel processing within \code{\link[=analyse]{analyse()}} has +been optimised around the assumption that the parallel processes will be spawned on the same +machine and not a remote cluster. One such optimisation is that the required data is saved to +a temporary file on the local disk from which it is then read into each sub-process. This is +done to avoid the overhead of transferring the data over the network. Our assumption is that +if you are at the stage where you need to be parallelising your analysis over a remote cluster +then you would likely be better off parallelising across multiple rbmi runs rather than within +a single rbmi run. + +Finally, if you are doing a tipping point analysis you can get a reasonable performance +improvement by re-using the cluster between each call to \code{analyse()} e.g. + +\if{html}{\out{
}}\preformatted{cl <- make_rbmi_cluster(4) +ana_1 <- analyse( + imputations = imputeObj, + delta = delta_plan_1, + ncores = cl +) +ana_2 <- analyse( + imputations = imputeObj, + delta = delta_plan_2, + ncores = cl +) +ana_3 <- analyse( + imputations = imputeObj, + delta = delta_plan_3, + ncores = cl +) +parallel::clusterStop(cl) +}\if{html}{\out{
}} +} + \examples{ \dontrun{ vars <- set_vars( diff --git a/man/draws.Rd b/man/draws.Rd index df97fd7a4..a59fb4cce 100644 --- a/man/draws.Rd +++ b/man/draws.Rd @@ -31,7 +31,8 @@ to the ICEs and the imputation strategies. See details.} It specifies the multiple imputation methodology to be used. See details.} \item{ncores}{A single numeric specifying the number of cores to use in creating the draws object. -Note that this parameter is ignored for \code{\link[=method_bayes]{method_bayes()}} (Default = 1).} +Note that this parameter is ignored for \code{\link[=method_bayes]{method_bayes()}} (Default = 1). Can also be a cluster object +generated by \code{\link[=make_rbmi_cluster]{make_rbmi_cluster()}}} \item{quiet}{Logical, if \code{TRUE} will suppress printing of progress information that is printed to the console.} diff --git a/man/encap_get_mmrm_sample.Rd b/man/encap_get_mmrm_sample.Rd deleted file mode 100644 index 881795574..000000000 --- a/man/encap_get_mmrm_sample.Rd +++ /dev/null @@ -1,25 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/parallel.R -\name{encap_get_mmrm_sample} -\alias{encap_get_mmrm_sample} -\title{Encapsulate get_mmrm_sample} -\usage{ -encap_get_mmrm_sample(cl, longdata, method) -} -\arguments{ -\item{cl}{Either a cluster from \code{\link[=get_cluster]{get_cluster()}} or \code{NULL}} - -\item{longdata}{A longdata object from \code{longDataConstructor$new()}} - -\item{method}{A method object} -} -\description{ -Function creates a new wrapper function around \code{\link[=get_mmrm_sample]{get_mmrm_sample()}} -so that the arguments of \code{\link[=get_mmrm_sample]{get_mmrm_sample()}} are enclosed within -the new function. This makes running parallel and single process -calls to the function smoother. In particular this function takes care -of exporting the arguments if required to parallel process in a cluster -} -\seealso{ -\code{\link[=get_cluster]{get_cluster()}} for more documentation on the function inputs -} diff --git a/man/get_cluster.Rd b/man/get_cluster.Rd deleted file mode 100644 index 43773e663..000000000 --- a/man/get_cluster.Rd +++ /dev/null @@ -1,19 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/parallel.R -\name{get_cluster} -\alias{get_cluster} -\title{Create cluster} -\usage{ -get_cluster(ncores = 1) -} -\arguments{ -\item{ncores}{Number of parallel processes to use - -If \code{ncores} is \code{1} this function will return NULL -This function spawns a PSOCK cluster. -Ensures that \code{rbmi} and \code{assert_that} have been loaded -on the sub-processes} -} -\description{ -Create cluster -} diff --git a/man/make_rbmi_cluster.Rd b/man/make_rbmi_cluster.Rd new file mode 100644 index 000000000..4b7c79bfa --- /dev/null +++ b/man/make_rbmi_cluster.Rd @@ -0,0 +1,44 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/parallel.R +\name{make_rbmi_cluster} +\alias{make_rbmi_cluster} +\title{Create a rbmi ready cluster} +\usage{ +make_rbmi_cluster(ncores = 1, objects = NULL, packages = NULL) +} +\arguments{ +\item{ncores}{Number of parallel processes to use or an existing cluster to make use of} + +\item{objects}{a named list of objects to export into the sub-processes} + +\item{packages}{a character vector of libraries to load in the sub-processes + +This function is a wrapper around \code{parallel::makePSOCKcluster()} but takes +care of configuring rbmi to be used in the sub-processes as well as loading +user defined objects and libraries and setting the seed for reproducibility. + +If \code{ncores} is \code{1} this function will return \code{NULL}. + +If \code{ncores} is a cluster created via \code{parallel::makeCluster()} then this function +just takes care of inserting the relevant rbmi objects into the existing cluster.} +} +\description{ +Create a rbmi ready cluster +} +\examples{ +\dontrun{ +# Basic usage +make_rbmi_cluster(5) + +# User objects + libraries +VALUE <- 5 +myfun <- function(x) { + x + day(VALUE) # From lubridate::day() +} +make_rbmi_cluster(5, list(VALUE = VALUE, myfun = myfun), c("lubridate")) + +# Using a already created cluster +cl <- parallel::makeCluster(5) +make_rbmi_cluster(cl) +} +} diff --git a/man/par_lapply.Rd b/man/par_lapply.Rd new file mode 100644 index 000000000..95b4255bc --- /dev/null +++ b/man/par_lapply.Rd @@ -0,0 +1,21 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/parallel.R +\name{par_lapply} +\alias{par_lapply} +\title{Parallelise Lapply} +\usage{ +par_lapply(cl, fun, x, ...) +} +\arguments{ +\item{cl}{Cluster created by \code{\link[parallel:makeCluster]{parallel::makeCluster()}} or \code{NULL}} + +\item{fun}{Function to be run} + +\item{x}{object to be looped over} + +\item{...}{extra arguements passed to \code{fun}} +} +\description{ +Simple wrapper around \code{lapply} and \code{\link[parallel:clusterApply]{parallel::clusterApplyLB}} to abstract away +the logic of deciding which one to use +} diff --git a/tests/testthat/helper-misc.R b/tests/testthat/helper-misc.R index a011d1c1b..261731ebf 100644 --- a/tests/testthat/helper-misc.R +++ b/tests/testthat/helper-misc.R @@ -22,6 +22,22 @@ strip_names <- function(x) { } +is_cluster_closed <- function(cl) { + if (is.null(cl)) { + return(TRUE) + } + if (!is(cl, "cluster")) { + stop("`cl` is not a cluster object") + } + result <- tryCatch({ + parallel::clusterCall(cl, function() Sys.info()) + FALSE + }, error = function(e) { + TRUE + }) +} + + trunctate <- function(x, n) { floor(x * 10 ^ n) / 10 ^ n } diff --git a/tests/testthat/test-analyse.R b/tests/testthat/test-analyse.R index 7220b6803..6d5920410 100644 --- a/tests/testthat/test-analyse.R +++ b/tests/testthat/test-analyse.R @@ -5,6 +5,7 @@ suppressPackageStartupMessages({ test_that("basic constructions of `analysis` work as expected", { + oldopt <- getOption("warnPartialMatchDollar") options(warnPartialMatchDollar = TRUE) @@ -243,3 +244,158 @@ test_that("incorrect constructions of as_analysis fail", { expect_true(validate(x)) }) + + + + + +test_that("Parallisation works with analyse and produces identical results", { + set.seed(4642) + sigma <- as_vcov( + c(2, 1, 0.7, 1.5), + c(0.5, 0.3, 0.2, 0.3, 0.5, 0.4) + ) + dat <- get_sim_data(200, sigma, trt = 8) %>% + mutate(outcome = if_else(rbinom(n(), 1, 0.3) == 1 & group == "A", NA_real_, outcome)) + + dat_ice <- dat %>% + group_by(id) %>% + arrange(id, visit) %>% + filter(is.na(outcome)) %>% + slice(1) %>% + ungroup() %>% + select(id, visit) %>% + mutate(strategy = "JR") + + vars <- set_vars( + outcome = "outcome", + group = "group", + strategy = "strategy", + subjid = "id", + visit = "visit", + covariates = c("age", "sex", "visit * group") + ) + + set.seed(984) + drawobj <- draws( + data = dat, + data_ice = dat_ice, + vars = vars, + method = method_condmean(n_samples = 6, type = "bootstrap"), + quiet = TRUE + ) + + imputeobj <- impute( + draws = drawobj, + references = c("A" = "B", "B" = "B") + ) + + + # + # Here we set up a bunch of different analysis objects using different + # of parallelisation methods and different dat_delta objects + # + + + ### Delta 1 + + dat_delta_1 <- delta_template(imputations = imputeobj) %>% + mutate(delta = is_missing * 5) + + vars2 <- vars + vars2$covariates <- c("age", "sex") + + anaobj_d1_t1 <- analyse( + imputeobj, + fun = rbmi::ancova, + vars = vars2, + delta = dat_delta_1 + ) + + anaobj_d1_t2 <- analyse( + imputeobj, + fun = rbmi::ancova, + vars = vars2, + delta = dat_delta_1, + ncores = 2 + ) + + var <- 20 + inner_fun <- function(...) { + x <- day(var) # lubridate::day + rbmi::ancova(...) + } + outer_fun <- function(...) { + inner_fun(...) + } + + cl <- make_rbmi_cluster(2, objects = list(var = var, inner_fun = inner_fun), "lubridate") + anaobj_d1_t3 <- analyse( + imputeobj, + fun = rbmi::ancova, + vars = vars2, + delta = dat_delta_1, + ncores = cl + ) + + + ### Delta 2 + + dat_delta_2 <- delta_template(imputations = imputeobj) %>% + mutate(delta = is_missing * 50) + + anaobj_d2_t1 <- analyse( + imputeobj, + fun = rbmi::ancova, + vars = vars2, + delta = dat_delta_2 + ) + anaobj_d2_t3 <- analyse( + imputeobj, + fun = rbmi::ancova, + vars = vars2, + delta = dat_delta_2, + ncores = cl + ) + + + ### Delta 3 (no delta) + + anaobj_d3_t1 <- analyse( + imputeobj, + fun = rbmi::ancova, + vars = vars2 + ) + anaobj_d3_t3 <- analyse( + imputeobj, + fun = rbmi::ancova, + vars = vars2, + ncores = cl + ) + + ## Check for internal consistency + expect_equal(anaobj_d1_t1, anaobj_d1_t2) + expect_equal(anaobj_d1_t1, anaobj_d1_t3) + expect_equal(anaobj_d1_t2, anaobj_d1_t3) + + expect_equal(anaobj_d2_t1, anaobj_d2_t3) + + expect_equal(anaobj_d3_t1, anaobj_d3_t3) + + + ## Check that they differ (as different deltas have been used) + ## Main thing is sanity checking that the embedded delta + ## in the parallel processes hasn't lingered and impacted + ## future results + + # First assert consistency + expect_true(identical(anaobj_d1_t1$results, anaobj_d1_t3$results)) + expect_true(identical(anaobj_d2_t1$results, anaobj_d2_t3$results)) + expect_true(identical(anaobj_d3_t1$results, anaobj_d3_t3$results)) + + # The ensure they are different + expect_false(identical(anaobj_d1_t1$results, anaobj_d2_t1$results)) + expect_false(identical(anaobj_d1_t1$results, anaobj_d3_t1$results)) + expect_false(identical(anaobj_d2_t1$results, anaobj_d3_t1$results)) + +}) diff --git a/tests/testthat/test-parallel.R b/tests/testthat/test-parallel.R index fb5e0a84b..1b428bea7 100644 --- a/tests/testthat/test-parallel.R +++ b/tests/testthat/test-parallel.R @@ -157,3 +157,88 @@ test_that("Basic parallisation works as expected", { expect_equal(x1, x2, tolerance = 0.0001) }) + + +########################### +# +# Manual time testing +# +# method <- method_approxbayes(n_samples = 20) +# method <- method_condmean(n_samples = 80) +# method <- method_condmean(type = "jackknife") +# time_it({ +# results_2 <- draws( +# data = dat, +# data_ice = dat_ice, +# vars = vars, +# method = method, +# ncores = 1 +# ) +# }) +# time_it({ +# results_2 <- draws( +# data = dat, +# data_ice = dat_ice, +# vars = vars, +# method = method, +# ncores = 2 +# ) +# }) + + + + +test_that("Creation and management of user defined clusters works as expected", { + # Setup a function to be run on the parallel cluster that requires + # global objects (namely the `inner_fun` and environment `e`) as well + # as a handful of packages to be loaded + e <- new.env() + e$x <- 20 + inner_fun <- function(x) { + local_env <- e + temp1 <- e$x + 0 + temp2 <- rnorm(2) + temp3 <- dplyr::as_tibble(dplyr::starwars) # Explicit namespace + temp4 <- day(x) # lubridate::day() + e$x + } + outer_fun <- function(x) { + temp1 <- inner_fun(2) + temp2 <- anova.lme %>% invisible() # nlme::anova.lme() + temp3 <- iris + temp1 + x + 10 + } + + # Check that function can be run (e.g. all elements are correctly exported) + set.seed(1223) + cl1 <- make_rbmi_cluster(2, list(inner_fun = inner_fun, e = e), c("lubridate", "nlme")) + res_1_a <- parallel::clusterCall(cl1, rnorm, 200) + res_1_b <- parallel::clusterApplyLB(cl1, c(4, 5), outer_fun) + expect_equal(res_1_b, list(34, 35)) + + # Test that re-using an existing cluster is quick to load + time <- time_it({ + set.seed(1223) + cl2 <- make_rbmi_cluster(cl1, list(inner_fun = inner_fun, e = e), c("lubridate", "nlme")) + }) + expect_true(as.numeric(time) <= 2) + + # Should produce identical results as before + res_2_a <- parallel::clusterCall(cl2, rnorm, 200) + res_2_b <- parallel::clusterApplyLB(cl2, c(4, 5), outer_fun) + expect_equal(res_2_a, res_1_a) + expect_equal(res_2_b, res_1_b) + + # Both clusters should be closed as they are points to the same thing + parallel::stopCluster(cl2) + expect_true(is_cluster_closed(cl1)) + expect_true(is_cluster_closed(cl2)) + + + # Check that seed ensures reproducibility + set.seed(1223) + cl <- make_rbmi_cluster(2, list(inner_fun = inner_fun, e = e), c("lubridate", "nlme")) + res_3_a <- parallel::clusterCall(cl1, rnorm, 200) + expect_equal(res_3_a, res_1_a) + parallel::stopCluster(cl) +}) diff --git a/vignettes/advanced.Rmd b/vignettes/advanced.Rmd index 43368b093..743b71488 100644 --- a/vignettes/advanced.Rmd +++ b/vignettes/advanced.Rmd @@ -491,9 +491,12 @@ The same approach can be used to implement a tipping point analysis. Here, we ap ```{r fig.width=5, fig.height=4} -perform_tipp_analysis <- function(delta_control, delta_intervention) { - # Derive delta offset based on control and intervention specific deltas + + +perform_tipp_analysis <- function(delta_control, delta_intervention, cl) { + + # Derive delta offset based on control and intervention specific deltas delta_df <- delta_df_init %>% mutate( delta_ctl = (group == "Control") * is_missing * delta_control, @@ -506,6 +509,7 @@ perform_tipp_analysis <- function(delta_control, delta_intervention) { fun = compare_change_lastvisit, vars = vars_an, delta = delta_df, + ncores = cl ) pool_delta <- as.data.frame(pool(ana_delta)) @@ -525,10 +529,12 @@ tipp_frame_grid <- expand.grid( ) %>% as_tibble() +# parallelise to speed up computation +cl <- make_rbmi_cluster(2) tipp_frame <- tipp_frame_grid %>% mutate( - results_list = map2(delta_control, delta_intervention, perform_tipp_analysis), + results_list = map2(delta_control, delta_intervention, perform_tipp_analysis, cl = cl), trt_effect_6 = map_dbl(results_list, "trt_effect_6"), pval_6 = map_dbl(results_list, "pval_6") ) %>% @@ -542,6 +548,9 @@ tipp_frame <- tipp_frame_grid %>% ) ) +# Close cluster when done with it +parallel::stopCluster(cl) + # Show delta values which lead to non-significant analysis results tipp_frame %>% filter(pval_6 >= 0.05) diff --git a/vignettes/advanced.html b/vignettes/advanced.html index b00c23ed2..49fa9171b 100644 --- a/vignettes/advanced.html +++ b/vignettes/advanced.html @@ -714,7 +714,7 @@

6 Custom imputation strategies#> pars <- list(mu = mu, sigma = sigma) #> return(pars) #> } -#> <bytecode: 0x7f9e8123d938> +#> <bytecode: 0x7f7f2a7523a0> #> <environment: namespace:rbmi>

To further illustrate this for a simple example, assume that a new strategy is to be implemented as follows: - The marginal mean of the imputation distribution is equal to the marginal mean trajectory for the subject according to their assigned group and covariates up to the ICE. @@ -941,70 +941,79 @@

8.1 Simple delta adjustments and #> lsm_alt_6 7.225 0.725 5.793 8.656 <0.001 #> --------------------------------------------------

The same approach can be used to implement a tipping point analysis. Here, we apply different delta-adjustments to imputed data from the control and the intervention group, respectively. Assume that delta-adjustments by less then -5 points or by more than +15 points are considered implausible from a clinical perspective. Therefore, we vary the delta-values in each group between -5 to +15 points to investigate which delta combinations lead to a “tipping” of the primary analysis result, defined here as an analysis p-value \(\geq 0.05\).

-
perform_tipp_analysis <- function(delta_control, delta_intervention) {
+

 
-    # Derive delta offset based on control and intervention specific deltas    
-    delta_df <-  delta_df_init %>%
-        mutate(
-            delta_ctl = (group == "Control") * is_missing * delta_control,
-            delta_int = (group == "Intervention") * is_missing * delta_intervention,
-            delta = delta_ctl + delta_int
-        )
-
-    ana_delta <- analyse(
-        impute_obj_CIR,
-        fun = compare_change_lastvisit,
-        vars = vars_an,
-        delta = delta_df,
-    )
-
-    pool_delta <- as.data.frame(pool(ana_delta))
-
-    list(
-        trt_effect_6 = pool_delta[["est"]],
-        pval_6 = pool_delta[["pval"]]
-    )
-}
-
-# Get initial delta template
-delta_df_init <- delta_template(impute_obj_CIR)
-
-tipp_frame_grid <- expand.grid(
-    delta_control = seq(-5, 15, by = 2),
-    delta_intervention = seq(-5, 15, by = 2)
-) %>%
-    as_tibble()
-
-
-tipp_frame <- tipp_frame_grid %>%
-    mutate(
-        results_list = map2(delta_control, delta_intervention, perform_tipp_analysis),
-        trt_effect_6 = map_dbl(results_list, "trt_effect_6"),
-        pval_6 = map_dbl(results_list, "pval_6")
-    ) %>%
-    select(-results_list) %>%
+
+perform_tipp_analysis <- function(delta_control, delta_intervention, cl) {
+
+    # Derive delta offset based on control and intervention specific deltas
+    delta_df <-  delta_df_init %>%
+        mutate(
+            delta_ctl = (group == "Control") * is_missing * delta_control,
+            delta_int = (group == "Intervention") * is_missing * delta_intervention,
+            delta = delta_ctl + delta_int
+        )
+
+    ana_delta <- analyse(
+        impute_obj_CIR,
+        fun = compare_change_lastvisit,
+        vars = vars_an,
+        delta = delta_df,
+        ncores = cl
+    )
+
+    pool_delta <- as.data.frame(pool(ana_delta))
+
+    list(
+        trt_effect_6 = pool_delta[["est"]],
+        pval_6 = pool_delta[["pval"]]
+    )
+}
+
+# Get initial delta template
+delta_df_init <- delta_template(impute_obj_CIR)
+
+tipp_frame_grid <- expand.grid(
+    delta_control = seq(-5, 15, by = 2),
+    delta_intervention = seq(-5, 15, by = 2)
+) %>%
+    as_tibble()
+
+# parallelise to speed up computation
+cl <- make_rbmi_cluster(2)
+
+tipp_frame <- tipp_frame_grid %>%
     mutate(
-        pval = cut(
-            pval_6,
-            c(0, 0.001, 0.01, 0.05, 0.2, 1),
-            right = FALSE,
-            labels = c("<0.001", "0.001 - <0.01", "0.01- <0.05", "0.05 - <0.20", ">= 0.20")
-        )
-    )
-
-# Show delta values which lead to non-significant analysis results
-tipp_frame %>%
-    filter(pval_6 >= 0.05)
-#> # A tibble: 3 × 5
-#>   delta_control delta_intervention trt_effect_6 pval_6 pval        
-#>           <dbl>              <dbl>        <dbl>  <dbl> <fct>       
-#> 1            -5                 15        -1.99 0.0935 0.05 - <0.20
-#> 2            -3                 15        -2.15 0.0704 0.05 - <0.20
-#> 3            -1                 15        -2.31 0.0527 0.05 - <0.20
-
-ggplot(tipp_frame, aes(delta_control, delta_intervention, fill = pval)) +
-    geom_raster() +
-    scale_fill_manual(values = c("darkgreen", "lightgreen", "lightyellow", "orange", "red"))
+ results_list = map2(delta_control, delta_intervention, perform_tipp_analysis, cl = cl), + trt_effect_6 = map_dbl(results_list, "trt_effect_6"), + pval_6 = map_dbl(results_list, "pval_6") + ) %>% + select(-results_list) %>% + mutate( + pval = cut( + pval_6, + c(0, 0.001, 0.01, 0.05, 0.2, 1), + right = FALSE, + labels = c("<0.001", "0.001 - <0.01", "0.01- <0.05", "0.05 - <0.20", ">= 0.20") + ) + ) + +# Close cluster when done with it +parallel::stopCluster(cl) + +# Show delta values which lead to non-significant analysis results +tipp_frame %>% + filter(pval_6 >= 0.05) +#> # A tibble: 3 × 5 +#> delta_control delta_intervention trt_effect_6 pval_6 pval +#> <dbl> <dbl> <dbl> <dbl> <fct> +#> 1 -5 15 -1.99 0.0935 0.05 - <0.20 +#> 2 -3 15 -2.15 0.0704 0.05 - <0.20 +#> 3 -1 15 -2.31 0.0527 0.05 - <0.20 + +ggplot(tipp_frame, aes(delta_control, delta_intervention, fill = pval)) + + geom_raster() + + scale_fill_manual(values = c("darkgreen", "lightgreen", "lightyellow", "orange", "red"))

According to this analysis, the significant test result from the primary analysis under CIR could only be tipped to a non-significant result for rather extreme delta-adjustments. Please note that for a real analysis it is recommended to use a smaller step size in the grid than what has been used here.

diff --git a/vignettes/quickstart.html b/vignettes/quickstart.html index d22421ae8..d4accf55c 100644 --- a/vignettes/quickstart.html +++ b/vignettes/quickstart.html @@ -453,23 +453,24 @@

3 Draws

method = method, quiet = TRUE ) -#> Warning in fit_mcmc(designmat = model_df_scaled[, -1, drop = FALSE], outcome = model_df_scaled[, : The largest R-hat is 1.06, indicating chains have not mixed. -#> Running the chains for more iterations may help. See -#> https://mc-stan.org/misc/warnings.html#r-hat -drawObj -#> -#> Draws Object -#> ------------ -#> Number of Samples: 150 -#> Number of Failed Samples: 0 -#> Model Formula: CHANGE ~ 1 + THERAPY + VISIT + BASVAL * VISIT + THERAPY * VISIT -#> Imputation Type: random -#> Method: -#> name: Bayes -#> burn_in: 200 -#> burn_between: 5 -#> same_cov: TRUE -#> n_samples: 150 +#> recompiling to avoid crashing R session +#> Warning in fit_mcmc(designmat = model_df_scaled[, -1, drop = FALSE], outcome = model_df_scaled[, : The largest R-hat is 1.05, indicating chains have not mixed. +#> Running the chains for more iterations may help. See +#> https://mc-stan.org/misc/warnings.html#r-hat +drawObj +#> +#> Draws Object +#> ------------ +#> Number of Samples: 150 +#> Number of Failed Samples: 0 +#> Model Formula: CHANGE ~ 1 + THERAPY + VISIT + BASVAL * VISIT + THERAPY * VISIT +#> Imputation Type: random +#> Method: +#> name: Bayes +#> burn_in: 200 +#> burn_between: 5 +#> same_cov: TRUE +#> n_samples: 150

Note the use of set_vars() which specifies the names of the key variables within the dataset and the imputation model. Additionally, note that whilst vars$group and vars$visit are added as terms to the imputation model by default, their interaction is not, @@ -694,15 +695,15 @@

6 Pool

#> trt_4 -0.092 0.683 -1.439 1.256 0.893 #> lsm_ref_4 -1.616 0.486 -2.576 -0.656 0.001 #> lsm_alt_4 -1.708 0.475 -2.645 -0.77 <0.001 -#> trt_5 1.328 0.924 -0.497 3.153 0.153 -#> lsm_ref_5 -4.14 0.66 -5.443 -2.838 <0.001 -#> lsm_alt_5 -2.812 0.646 -4.088 -1.537 <0.001 -#> trt_6 1.939 1 -0.037 3.915 0.054 -#> lsm_ref_6 -6.085 0.718 -7.503 -4.667 <0.001 -#> lsm_alt_6 -4.146 0.699 -5.527 -2.766 <0.001 -#> trt_7 2.136 1.12 -0.078 4.35 0.058 -#> lsm_ref_7 -6.982 0.812 -8.587 -5.377 <0.001 -#> lsm_alt_7 -4.846 0.789 -6.405 -3.287 <0.001 +#> trt_5 1.334 0.926 -0.494 3.162 0.151 +#> lsm_ref_5 -4.151 0.661 -5.457 -2.846 <0.001 +#> lsm_alt_5 -2.817 0.648 -4.097 -1.537 <0.001 +#> trt_6 1.934 1 -0.042 3.91 0.055 +#> lsm_ref_6 -6.088 0.715 -7.501 -4.676 <0.001 +#> lsm_alt_6 -4.155 0.698 -5.533 -2.777 <0.001 +#> trt_7 2.177 1.138 -0.073 4.426 0.058 +#> lsm_ref_7 -7.002 0.827 -8.638 -5.367 <0.001 +#> lsm_alt_7 -4.826 0.783 -6.373 -3.278 <0.001 #> --------------------------------------------------

The table of values shown in the print message for poolObj can also be extracted using the as.data.frame() function:

as.data.frame(poolObj)
@@ -710,17 +711,17 @@ 

6 Pool

#> 1 trt_4 -0.09180645 0.6826279 -1.43949684 1.2558839 8.931772e-01 #> 2 lsm_ref_4 -1.61581996 0.4862316 -2.57577141 -0.6558685 1.093708e-03 #> 3 lsm_alt_4 -1.70762640 0.4749573 -2.64531931 -0.7699335 4.262148e-04 -#> 4 trt_5 1.32800107 0.9239991 -0.49687491 3.1528770 1.526144e-01 -#> 5 lsm_ref_5 -4.14031255 0.6595847 -5.44302381 -2.8376013 3.163421e-09 -#> 6 lsm_alt_5 -2.81231148 0.6459122 -4.08807336 -1.5365496 2.396574e-05 -#> 7 trt_6 1.93891419 1.0001460 -0.03694571 3.9147741 5.438468e-02 -#> 8 lsm_ref_6 -6.08530002 0.7176967 -7.50335519 -4.6672448 1.946811e-14 -#> 9 lsm_alt_6 -4.14638583 0.6985434 -5.52650481 -2.7662668 1.911219e-08 -#> 10 trt_7 2.13609482 1.1201125 -0.07781920 4.3500088 5.849971e-02 -#> 11 lsm_ref_7 -6.98181990 0.8117268 -8.58678186 -5.3768579 1.511791e-14 -#> 12 lsm_alt_7 -4.84572508 0.7885999 -6.40477980 -3.2866704 7.793320e-09
+#> 4 trt_5 1.33436741 0.9255820 -0.49370018 3.1624350 1.513737e-01 +#> 5 lsm_ref_5 -4.15140080 0.6608110 -5.45658456 -2.8462170 3.110832e-09 +#> 6 lsm_alt_5 -2.81703340 0.6478134 -4.09662455 -1.5374422 2.460646e-05 +#> 7 trt_6 1.93372720 1.0000881 -0.04213496 3.9095894 5.502527e-02 +#> 8 lsm_ref_6 -6.08849645 0.7148756 -7.50096014 -4.6760328 1.548722e-14 +#> 9 lsm_alt_6 -4.15476925 0.6975559 -5.53298494 -2.7765536 1.739819e-08 +#> 10 trt_7 2.17651184 1.1379189 -0.07335404 4.4263777 5.784145e-02 +#> 11 lsm_ref_7 -7.00205125 0.8268086 -8.63751537 -5.3665871 4.132364e-14 +#> 12 lsm_alt_7 -4.82553941 0.7830124 -6.37333317 -3.2777456 6.903910e-09

These outputs gives an estimated difference of -2.136 (95% CI -0.078 to 4.350) +2.177 (95% CI -0.073 to 4.426) between the two groups at the last visit with an associated p-value of 0.058.