Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelized map using mirai #1163

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open

Conversation

shikokuchuo
Copy link
Contributor

@shikokuchuo shikokuchuo commented Jan 22, 2025

@hadley this builds on the proof of concept you reviewed last week at shikokuchuo#1.

Adds a .parallel argument to every function that currently accepts a .progress argument, i.e.:

  • map() and variants
  • map2() and variants
  • pmap() and variants

There is a full test suite added at test-parallel.R, which runs parallel versions of all the tests from the map/map2/pmap test files, so I'm fairly confident that the behaviour is consistent.

This version relies on current dev mirai. I made certain upstream changes to align, in particular, the error messages so that they are consistent and identical to the normal map cases. I'm ready to make another mirai release once this PR is merged.

I've added a couple of simple examples at the bottom of map() so it's hopefully clear to users what they should be doing.

The full documentation is available as an Rd page at ?parallelization. I've added it this way, as opposed to a vignette, to mirror how .progress is documented. It also has the advantage of linking easily from function documentation. This contains:

  • An explanation of when to provide a list to .parallel
  • How to set daemons()
  • The with() method for daemons

Conceptually, this has been mostly an interface-mapping exercise, with purrr directly using the public exported functions mirai_map() and collect_mirai() from the mirai package. I've focused almost completely on consistency with the normal, non-parallel, versions thus far.

I've resisted the urge to do any specific optimization for efficiency, as it's likely premature to do so. Just noting here that this is likely to yield results in future.

Welcome your review!

cc. @jcheng5 FYI.

closes #1162.

Copy link
Member

@hadley hadley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking great. Thanks so much for working on this!

R/map.R Show resolved Hide resolved
R/map.R Outdated Show resolved Hide resolved
#' As an example, the below is evaluated in parallel using 7 daemons:
#'
#' \preformatted{
#' with(daemons(7), {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The slightly more standard tidyverse way would be to provide with_daemons() and local_daemons(). That might be worth doing here. I'm not sure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought having it work with with() is a nice touch :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was about to suggest just removing mention of with() in the purrr docs for now, but Lionel seems to like it!

It's not at all essential and it's probably more idiomatic just to set daemons at the top of your script for interactive work.

It was designed in the context of Shiny as people tend to not know where to put the daemons() call - at the top level or within server() etc., so I introduced it so it's easy to do:

app <- shinyApp(server, ui, session)
with(daemons(4), runApp(app))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible to implement with_daemons() and local_daemons(), but I prefer not to do it in the initial implementation so as not to overcomplicate things.

I think there's no harm leaving the with() method in the docs, but if you think it diverts attention etc. I have no issue with taking the section out.

#' @description
#' purrr's map functions have a `.parallel` argument to parallelize a map using
#' the \CRANpkg{mirai} package. This allows you to run computations in parallel
#' using more cores on your machine, or distributed over the network.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be worth including a little advice here about when parallelisation is likely to save you time — i.e. talk a little about roughly what the overhead associated with parallelisation is.

Copy link
Member

@lionel- lionel- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to see progress on this front!

I'm not familiar with mirai so I mainly have a high level comment about potentially using carrier for the treatment of functions.

Comment on lines 20 to 29
#' If `.f` is a user-defined function, then ideally everything required by the
#' function is passed in as arguments (a pure function). Any constant arguments
#' in this case can be supplied through `...` in the usual way.
#'
#' However, if the function references an object that is not supplied as an
#' argument or defined in the function itself, i.e. a free variable, these
#' should be supplied as a list to `.parallel`. This makes them available to
#' `.f` in the parallel processes. `purrr` requires these to be explicitly
#' supplied, rather than try to determine them from analysing `.f`, as this
#' provides for consistent and reliable behaviour that is hence easy to debug.
Copy link
Member

@lionel- lionel- Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API feels a bit unfriendly.

Any thoughts on using carrier::crate() to create serialisable functions instead?
https://github.com/r-lib/carrier

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great! I'm certainly happy to recommend it in the docs. Were you thinking of forcing people to use this and not have the list API at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In f349d71 I remove the option of supplying a list to .parallel to make the interface less unwieldy.

We now rely on carrier::crate() to create self-contained functions where necessary and this is documented in ?parallelization.

This provides for a more robust and maintainable solution to have the carrier package bear this responsibility.

#' As an example, the below is evaluated in parallel using 7 daemons:
#'
#' \preformatted{
#' with(daemons(7), {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought having it work with with() is a nice touch :)

@@ -0,0 +1,104 @@
#' Parallelization in purrr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need some documentation about what happens if you do nested map(.parallel = TRUE) calls, i.e. what happens in that case and what are the best practices.

In particular, if I want the outer nest to send elements to remote workers, but then I want the inner nest to run in parallel on those workers, how do I achieve that? That was a common furrr question.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me take a look at the furrr docs to refresh my understanding of why it's an issue there. From what I remember it's the way that chunking is done + the lack of queuing in the old parallel backend. mirai should be completely flexible in just working in this respect. But I'll add something to the docs on this point.

R/parallelization.R Outdated Show resolved Hide resolved
Comment on lines 36 to 39
#' # requires 'fun1' to be supplied in a list:
#' fun1 <- function(x) \{x + x \%\% 2 \}
#' fun2 <- function(x) \{ x + fun1(x) \}
#' map(1:3, fun2, .parallel = list(fun1 = fun1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does fun1 get serialized? If fun1's environment contained some massive data frame, would that also get shipped over to the process? Or is the environment of a user defined function here "stripped down" in the same way I imagine it is done for fun2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A function will get serialized as a closure, but the global environment is never serialized by R. This is just R serialization btw. not something bespoke.

Key here is that fun2 is defined in the global environment, but has the free variable fun1 (it's a function in this case, but it could be any object). As fun2's closure is the global environment, then we need to ship fun1 and put it in the global environment of the remote process.

So if fun2 is a closure that contains fun1 then we wouldn't need to worry about separately shipping fun1.

At the end of the day, I do think most of the time it's going to be a package function here and then we're safe.

carrier::crate() also seems to provide a safe way to construct these closures, so we could recommend that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I am worried about, which I get questions about in furrr a lot

"Why is this furrr thing so slow??" - because it isn't obvious that serializing helper also captured its enclosing environment, so data came along for the ride and is shipped to each worker, and often times data itself is huuuuuge and you never intended for it to get there.

library(purrr)
library(mirai)
library(tibble)

daemons(2)
#> [1] 2

do_the_thing <- function(data) {
  # Imagine this is a function that doesn't use `data` directly at all,
  # so it should not need access to it. I expect that `env$data`
  # should return `NULL`.
  helper <- function(x) {
    env <- parent.env(environment())
    env$data
  }
  
  # `helper` should be serialized in a special way where its
  # environment is forcibly set to the global environment,
  # preventing anything else from "accidentally" getting
  # serialized along with it
  map(data$x, helper, .parallel = TRUE)
}

data <- tibble(
  x = list(1, 2, 3),
  y = 1:3
)

do_the_thing(data)
#> [[1]]
#> # A tibble: 3 × 2
#>   x             y
#>   <list>    <int>
#> 1 <dbl [1]>     1
#> 2 <dbl [1]>     2
#> 3 <dbl [1]>     3
#> 
#> [[2]]
#> # A tibble: 3 × 2
#>   x             y
#>   <list>    <int>
#> 1 <dbl [1]>     1
#> 2 <dbl [1]>     2
#> 3 <dbl [1]>     3
#> 
#> [[3]]
#> # A tibble: 3 × 2
#>   x             y
#>   <list>    <int>
#> 1 <dbl [1]>     1
#> 2 <dbl [1]>     2
#> 3 <dbl [1]>     3

Created on 2025-01-23 with reprex v2.1.1

Copy link
Member

@DavisVaughan DavisVaughan Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And before you say "well defining a named helper function inside another function isn't that common!", note that the same behavior also applies to anonymous functions, and something like this is very common

do_the_thing <- function(data) {
  map(data$x, .parallel = TRUE, function(x) {
    env <- parent.env(environment())
    env$data
  })
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks I can see that data is being passed to the daemons along with .f.

As far as I know, stripping of function environments isn't done in any existing implementation? I'd have to consider the consequences. Also there would need to be exceptions for package namespaces and 'crate' objects at least, now that we rely on carrier::crate() to 'crate' our functions :)

Comment on lines +60 to +63
#' * `n`: the number of daemons to launch on your local machine, e.g.
#' `daemons(7)`. As a rule of thumb, for maximum efficiency this should be (at
#' most) one less than the number of cores on your machine, leaving one core
#' for the main R process.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like Henrik would kill me if we didn't at least consider mentioning parallelly::availableCores() as a way to figure out the number of cores on your machine. Anything we can do to help prevent people from maxing out their shared HPC workers is probably a good thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed the reason why the number of daemons is user-supplied. It's not clear that any automated function is going to get this right, precisely as we can't know what else is going on in the compute environment at the same time.

Comment on lines +120 to +123
x <- withCallingHandlers(
mirai::collect_mirai(m, options = options),
error = function(cnd) {
location <- cnd$location
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe extract out with_parallel_indexed_errors() to be put near with_indexed_errors() so it is easier for us to keep them in sync in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a go, but it seems cleaner to keep as is - this is as the interrupt handler requires a reference to the mirai object to be passed in, which would then be inconsistent with having a general expr in the fashion of with_indexed_errors().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Parallelized Map
4 participants