-
Notifications
You must be signed in to change notification settings - Fork 18
Description
A common setup is to initiate a process on one server, pass it to the head node of a cluster, then parallelize on the cluster from the head node. future is a good tool for this scenario, but the code for this is a bit ... idiomatic:
plan(list(<some sort of remote setup>,<futurebatchtools>))
future_lapply(X=1L,function(x){
future_lapply(X=1L:10L, function(z){
#actually do the thing you want parallelized
}
})
The problem with the above setup is that if you start writing your code for a local multiprocess, then decide to switch to a cluster backend specified in the above plan (ie passing through a remote head), you need to go and wrap every call to future_lapply() with another future_lappy() call.
I've written some script which passes objects through an arbitrary number of remotes, if they exist, before actually doing a specified lapply.
this should (in theory) be a drop-in replacement for future_lapply:
future_lapply_backend <- function(X, FUN, ..., future.stdout = TRUE,
future.conditions = NULL, future.globals = FALSE,
future.packages = NULL, future.lazy = FALSE, future.seed = FALSE,
future.scheduling = 1.0, future.chunk.size = NULL,
future.label = "future_lapply-%d") {
current_plan <- setdiff(class(future::plan()), c("FutureStrategy", "tweaked",
"function"))[1]
call <- match.call()
arg_list <- as.list(call)[-1] #the first element is the function symbol
##we're just using match.call here to find the arguments which were actually specified.
#but we want to actually evaluate them rather than passing then unevaluated:
##evaulate all specified arguments
for(i in 1:length(arg_list)){
arg_list[[names(arg_list)[i] ]] <- get(names(arg_list)[[i]])
}
#"mypackage" contains future_lapply_backend, thus ensuring that it gets passed allowing recursion
if(!"mypackage" %in% arg_list[["future.packages"]]){
arg_list[["future.packages"]] <- c("mypackage",arg_list[["future.packages"]])
}
if(current_plan=="remote"){
#create a copy of the arglist. everything stays the same *except* X and FUN
#X becomes 1L just to signify that this only needs to be run once
#(as this level of the plan, being remote, is only pasing though)
#and FUN is just a recursive call to start over again
arg_list_pass <- arg_list
arg_list_pass[["X"]] <- 1L
arg_list_pass[["FUN"]] <- function(x) do.call(future_lapply_backend, args=arg_list)
out <- do.call(future.apply::future_lapply, args=arg_list_pass)
#return out[[1]] to drop the top-level of the list created by the future_lapply used to pass through remote
return(out[[1]])
}else{
out <- do.call(future.apply::future_lapply, args=arg_list)
return(out)
}
}
I've worked out most of the kinks but I wonder if there's a better approach to dealing with this?