Apply a Function to a Data Frame Split by Factors via Futures

future_by(
  data,
  INDICES,
  FUN,
  ...,
  simplify = TRUE,
  future.envir = parent.frame()
)

Arguments

data

An R object, normally a data frame, possibly a matrix.

INDICES

A factor or a list of factors, each of length nrow(data).

FUN

a function to be applied to (usually data-frame) subsets of data.

simplify

logical: see base::tapply.

future.envir

An environment passed as argument envir to future::future() as-is.

...

Additional arguments pass to future_lapply() and then to FUN().

Value

An object of class "by", giving the results for each subset. This is always a list if simplify is false, otherwise a list or array (see base::tapply). See also base::by() for details.

Details

Internally, data is grouped by INDICES into a list of data subset elements which is then processed by future_lapply(). When the groups differ significantly in size, the processing time may differ significantly between the groups. To correct for processing-time imbalances, adjust the amount of chunking via arguments future.scheduling and future.chunk.size.

Note on 'stringsAsFactors'

The future_by() is modeled as closely as possible to the behavior of base::by(). Both functions have "default" S3 methods that calls data <- as.data.frame(data) internally. This call may in turn call an S3 method for as.data.frame() that coerces strings to factors or not depending on whether it has a stringsAsFactors argument and what its default is. For example, the S3 method of as.data.frame() for lists changed its (effective) default from stringsAsFactors = TRUE to stringsAsFactors = TRUE in R 4.0.0.

Examples

## --------------------------------------------------------- ## by() ## --------------------------------------------------------- library(datasets) ## warpbreaks library(stats) ## lm() y0 <- by(warpbreaks, warpbreaks[,"tension"], function(x) lm(breaks ~ wool, data = x)) plan(multisession) y1 <- future_by(warpbreaks, warpbreaks[,"tension"], function(x) lm(breaks ~ wool, data = x))
#> [13:58:15.725] future_lapply() ...
#> [13:58:15.733] Number of chunks: 3
#> [13:58:15.733] getGlobalsAndPackagesXApply() ...
#> [13:58:15.734] - future.globals: TRUE
#> [13:58:15.737] - globals found/used: [n=1] ‘FUN’
#> [13:58:15.737] - needed namespaces: [n=1] ‘stats’
#> [13:58:15.737] Finding globals ... DONE
#> [13:58:15.738] - use_args: TRUE
#> [13:58:15.738] - Getting '...' globals ...
#> [13:58:15.738] - '...' content: [n=0]
#> [13:58:15.738] List of 1 #> [13:58:15.738] $ ...: list() #> [13:58:15.738] ..- attr(*, "class")= chr [1:2] "DotDotDotList" "list" #> [13:58:15.738] - attr(*, "class")= chr [1:3] "FutureGlobals" "Globals" "list" #> [13:58:15.738] - attr(*, "where")=List of 1 #> [13:58:15.738] ..$ ...:<environment: 0x5572e8d02528> #> [13:58:15.738] - attr(*, "resolved")= logi TRUE #> [13:58:15.738] - attr(*, "total_size")= num NA
#> [13:58:15.742] - Getting '...' globals ... DONE
#> [13:58:15.742] Globals to be used in all futures (chunks): [n=2] ‘...future.FUN’, ‘...’
#> [13:58:15.743] List of 2 #> [13:58:15.743] $ ...future.FUN:function (x) #> [13:58:15.743] ..- attr(*, "srcref")= 'srcref' int [1:8] 12 17 12 55 17 55 12 12 #> [13:58:15.743] .. ..- attr(*, "srcfile")=Classes 'srcfilecopy', 'srcfile' <environment: 0x5572e706f788> #> [13:58:15.743] $ ... : list() #> [13:58:15.743] ..- attr(*, "class")= chr [1:2] "DotDotDotList" "list" #> [13:58:15.743] - attr(*, "class")= chr [1:3] "FutureGlobals" "Globals" "list" #> [13:58:15.743] - attr(*, "where")=List of 2 #> [13:58:15.743] ..$ ...future.FUN:<environment: R_EmptyEnv> #> [13:58:15.743] ..$ ... :<environment: 0x5572e8d02528> #> [13:58:15.743] - attr(*, "resolved")= logi FALSE #> [13:58:15.743] - attr(*, "total_size")= num 1776
#> [13:58:15.747] Packages to be attached in all futures: [n=1] ‘stats’
#> [13:58:15.748] getGlobalsAndPackagesXApply() ... DONE
#> [13:58:15.748] Number of futures (= number of chunks): 3
#> [13:58:15.748] Launching 3 futures (chunks) ...
#> [13:58:15.748] Chunk #1 of 3 ...
#> [13:58:15.748] - Finding globals in 'X' for chunk #1 ...
#> [13:58:15.749] + additional globals found: [n=0]
#> [13:58:15.749] + additional namespaces needed: [n=0]
#> [13:58:15.749] - Finding globals in 'X' for chunk #1 ... DONE
#> [13:58:15.749] - seeds: <none>
#> [13:58:15.809] Created future:
#> [13:58:15.809] MultisessionFuture: #> [13:58:15.809] Label: ‘future_by-1’ #> [13:58:15.809] Expression: #> [13:58:15.809] { #> [13:58:15.809] do.call(function(...) { #> [13:58:15.809] ...future.globals.maxSize.org <- getOption("future.globals.maxSize") #> [13:58:15.809] if (!identical(...future.globals.maxSize.org, ...future.globals.maxSize)) { #> [13:58:15.809] oopts <- options(future.globals.maxSize = ...future.globals.maxSize) #> [13:58:15.809] on.exit(options(oopts), add = TRUE) #> [13:58:15.809] } #> [13:58:15.809] { #> [13:58:15.809] lapply(seq_along(...future.elements_ii), FUN = function(jj) { #> [13:58:15.809] ...future.X_jj <- ...future.elements_ii[[jj]] #> [13:58:15.809] ...future.FUN(...future.X_jj, ...) #> [13:58:15.809] }) #> [13:58:15.809] } #> [13:58:15.809] }, args = future.call.arguments) #> [13:58:15.809] } #> [13:58:15.809] Lazy evaluation: FALSE #> [13:58:15.809] Asynchronous evaluation: TRUE #> [13:58:15.809] Local evaluation: TRUE #> [13:58:15.809] Environment: 0x5572ea4ef220 #> [13:58:15.809] Capture standard output: TRUE #> [13:58:15.809] Capture condition classes: ‘condition’ (excluding ‘nothing’) #> [13:58:15.809] Globals: 5 objects totaling 3.34 KiB (function ‘...future.FUN’ of 1.73 KiB, DotDotDotList ‘future.call.arguments’ of 0 bytes, list ‘...future.elements_ii’ of 1.60 KiB, NULL ‘...future.seeds_ii’ of 0 bytes, NULL ‘...future.globals.maxSize’ of 0 bytes) #> [13:58:15.809] Packages: 1 packages (‘stats’) #> [13:58:15.809] L'Ecuyer-CMRG RNG seed: <none> (seed = FALSE) #> [13:58:15.809] Resolved: FALSE #> [13:58:15.809] Value: <not collected> #> [13:58:15.809] Conditions captured: <none> #> [13:58:15.809] Early signaling: FALSE #> [13:58:15.809] Owner process: 9d04a638-80d7-2cb1-c9c4-7dbd6de2ef73 #> [13:58:15.809] Class: ‘MultisessionFuture’, ‘ClusterFuture’, ‘MultiprocessFuture’, ‘Future’, ‘environment’
#> [13:58:15.825] Chunk #1 of 3 ... DONE
#> [13:58:15.825] Chunk #2 of 3 ...
#> [13:58:15.826] - Finding globals in 'X' for chunk #2 ...
#> [13:58:15.827] + additional globals found: [n=0]
#> [13:58:15.828] + additional namespaces needed: [n=0]
#> [13:58:15.828] - Finding globals in 'X' for chunk #2 ... DONE
#> [13:58:15.828] - seeds: <none>
#> [13:58:15.903] Created future:
#> [13:58:15.903] MultisessionFuture: #> [13:58:15.903] Label: ‘future_by-2’ #> [13:58:15.903] Expression: #> [13:58:15.903] { #> [13:58:15.903] do.call(function(...) { #> [13:58:15.903] ...future.globals.maxSize.org <- getOption("future.globals.maxSize") #> [13:58:15.903] if (!identical(...future.globals.maxSize.org, ...future.globals.maxSize)) { #> [13:58:15.903] oopts <- options(future.globals.maxSize = ...future.globals.maxSize) #> [13:58:15.903] on.exit(options(oopts), add = TRUE) #> [13:58:15.903] } #> [13:58:15.903] { #> [13:58:15.903] lapply(seq_along(...future.elements_ii), FUN = function(jj) { #> [13:58:15.903] ...future.X_jj <- ...future.elements_ii[[jj]] #> [13:58:15.903] ...future.FUN(...future.X_jj, ...) #> [13:58:15.903] }) #> [13:58:15.903] } #> [13:58:15.903] }, args = future.call.arguments) #> [13:58:15.903] } #> [13:58:15.903] Lazy evaluation: FALSE #> [13:58:15.903] Asynchronous evaluation: TRUE #> [13:58:15.903] Local evaluation: TRUE #> [13:58:15.903] Environment: 0x5572ea4ef220 #> [13:58:15.903] Capture standard output: TRUE #> [13:58:15.903] Capture condition classes: ‘condition’ (excluding ‘nothing’) #> [13:58:15.903] Globals: 5 objects totaling 3.34 KiB (function ‘...future.FUN’ of 1.73 KiB, DotDotDotList ‘future.call.arguments’ of 0 bytes, list ‘...future.elements_ii’ of 1.60 KiB, NULL ‘...future.seeds_ii’ of 0 bytes, NULL ‘...future.globals.maxSize’ of 0 bytes) #> [13:58:15.903] Packages: 1 packages (‘stats’) #> [13:58:15.903] L'Ecuyer-CMRG RNG seed: <none> (seed = FALSE) #> [13:58:15.903] Resolved: FALSE #> [13:58:15.903] Value: <not collected> #> [13:58:15.903] Conditions captured: <none> #> [13:58:15.903] Early signaling: FALSE #> [13:58:15.903] Owner process: 9d04a638-80d7-2cb1-c9c4-7dbd6de2ef73 #> [13:58:15.903] Class: ‘MultisessionFuture’, ‘ClusterFuture’, ‘MultiprocessFuture’, ‘Future’, ‘environment’
#> [13:58:15.917] Chunk #2 of 3 ... DONE
#> [13:58:15.918] Chunk #3 of 3 ...
#> [13:58:15.919] - Finding globals in 'X' for chunk #3 ...
#> [13:58:15.921] + additional globals found: [n=0]
#> [13:58:15.922] + additional namespaces needed: [n=0]
#> [13:58:15.922] - Finding globals in 'X' for chunk #3 ... DONE
#> [13:58:15.923] - seeds: <none>
#> [13:58:16.018] Created future:
#> [13:58:16.019] MultisessionFuture: #> [13:58:16.019] Label: ‘future_by-3’ #> [13:58:16.019] Expression: #> [13:58:16.019] { #> [13:58:16.019] do.call(function(...) { #> [13:58:16.019] ...future.globals.maxSize.org <- getOption("future.globals.maxSize") #> [13:58:16.019] if (!identical(...future.globals.maxSize.org, ...future.globals.maxSize)) { #> [13:58:16.019] oopts <- options(future.globals.maxSize = ...future.globals.maxSize) #> [13:58:16.019] on.exit(options(oopts), add = TRUE) #> [13:58:16.019] } #> [13:58:16.019] { #> [13:58:16.019] lapply(seq_along(...future.elements_ii), FUN = function(jj) { #> [13:58:16.019] ...future.X_jj <- ...future.elements_ii[[jj]] #> [13:58:16.019] ...future.FUN(...future.X_jj, ...) #> [13:58:16.019] }) #> [13:58:16.019] } #> [13:58:16.019] }, args = future.call.arguments) #> [13:58:16.019] } #> [13:58:16.019] Lazy evaluation: FALSE #> [13:58:16.019] Asynchronous evaluation: TRUE #> [13:58:16.019] Local evaluation: TRUE #> [13:58:16.019] Environment: 0x5572ea4ef220 #> [13:58:16.019] Capture standard output: TRUE #> [13:58:16.019] Capture condition classes: ‘condition’ (excluding ‘nothing’) #> [13:58:16.019] Globals: 5 objects totaling 3.34 KiB (function ‘...future.FUN’ of 1.73 KiB, DotDotDotList ‘future.call.arguments’ of 0 bytes, list ‘...future.elements_ii’ of 1.60 KiB, NULL ‘...future.seeds_ii’ of 0 bytes, NULL ‘...future.globals.maxSize’ of 0 bytes) #> [13:58:16.019] Packages: 1 packages (‘stats’) #> [13:58:16.019] L'Ecuyer-CMRG RNG seed: <none> (seed = FALSE) #> [13:58:16.019] Resolved: FALSE #> [13:58:16.019] Value: <not collected> #> [13:58:16.019] Conditions captured: <none> #> [13:58:16.019] Early signaling: FALSE #> [13:58:16.019] Owner process: 9d04a638-80d7-2cb1-c9c4-7dbd6de2ef73 #> [13:58:16.019] Class: ‘MultisessionFuture’, ‘ClusterFuture’, ‘MultiprocessFuture’, ‘Future’, ‘environment’
#> [13:58:16.032] Chunk #3 of 3 ... DONE
#> [13:58:16.032] Launching 3 futures (chunks) ... DONE
#> [13:58:16.032] Resolving 3 futures (chunks) ...
#> [13:58:16.194] - Number of value chunks collected: 3
#> [13:58:16.195] Resolving 3 futures (chunks) ... DONE
#> [13:58:16.195] Reducing values from 3 chunks ...
#> [13:58:16.196] - Number of values collected after concatenation: 3
#> [13:58:16.196] - Number of values expected: 3
#> [13:58:16.197] Reducing values from 3 chunks ... DONE
#> [13:58:16.198] future_lapply() ... DONE
plan(sequential) y2 <- future_by(warpbreaks, warpbreaks[,"tension"], function(x) lm(breaks ~ wool, data = x))
#> [13:58:16.212] future_lapply() ...
#> [13:58:16.215] Number of chunks: 1
#> [13:58:16.215] getGlobalsAndPackagesXApply() ...
#> [13:58:16.215] - future.globals: TRUE
#> [13:58:16.220] - globals found/used: [n=1] ‘FUN’
#> [13:58:16.220] - needed namespaces: [n=1] ‘stats’
#> [13:58:16.220] Finding globals ... DONE
#> [13:58:16.220] - use_args: TRUE
#> [13:58:16.221] - Getting '...' globals ...
#> [13:58:16.221] - '...' content: [n=0]
#> [13:58:16.221] List of 1 #> [13:58:16.221] $ ...: list() #> [13:58:16.221] ..- attr(*, "class")= chr [1:2] "DotDotDotList" "list" #> [13:58:16.221] - attr(*, "class")= chr [1:3] "FutureGlobals" "Globals" "list" #> [13:58:16.221] - attr(*, "where")=List of 1 #> [13:58:16.221] ..$ ...:<environment: 0x5572e923f990> #> [13:58:16.221] - attr(*, "resolved")= logi TRUE #> [13:58:16.221] - attr(*, "total_size")= num NA
#> [13:58:16.231] - Getting '...' globals ... DONE
#> [13:58:16.231] Globals to be used in all futures (chunks): [n=2] ‘...future.FUN’, ‘...’
#> [13:58:16.232] List of 2 #> [13:58:16.232] $ ...future.FUN:function (x) #> [13:58:16.232] ..- attr(*, "srcref")= 'srcref' int [1:8] 16 17 16 55 17 55 16 16 #> [13:58:16.232] .. ..- attr(*, "srcfile")=Classes 'srcfilecopy', 'srcfile' <environment: 0x5572e706f788> #> [13:58:16.232] $ ... : list() #> [13:58:16.232] ..- attr(*, "class")= chr [1:2] "DotDotDotList" "list" #> [13:58:16.232] - attr(*, "class")= chr [1:3] "FutureGlobals" "Globals" "list" #> [13:58:16.232] - attr(*, "where")=List of 2 #> [13:58:16.232] ..$ ...future.FUN:<environment: R_EmptyEnv> #> [13:58:16.232] ..$ ... :<environment: 0x5572e923f990> #> [13:58:16.232] - attr(*, "resolved")= logi FALSE #> [13:58:16.232] - attr(*, "total_size")= num 1776
#> [13:58:16.236] Packages to be attached in all futures: [n=1] ‘stats’
#> [13:58:16.236] getGlobalsAndPackagesXApply() ... DONE
#> [13:58:16.237] Number of futures (= number of chunks): 1
#> [13:58:16.237] Launching 1 futures (chunks) ...
#> [13:58:16.237] Chunk #1 of 1 ...
#> [13:58:16.237] - Finding globals in 'X' for chunk #1 ...
#> [13:58:16.238] + additional globals found: [n=0]
#> [13:58:16.238] + additional namespaces needed: [n=0]
#> [13:58:16.238] - Finding globals in 'X' for chunk #1 ... DONE
#> [13:58:16.238] - Adjusted option 'future.globals.maxSize': 5.24288e+08 -> 3 * 5.24288e+08 = 1.57286e+09 (bytes)
#> [13:58:16.238] - seeds: <none>
#> [13:58:16.244] Created future:
#> [13:58:16.245] SequentialFuture: #> [13:58:16.245] Label: ‘future_by-1’ #> [13:58:16.245] Expression: #> [13:58:16.245] { #> [13:58:16.245] do.call(function(...) { #> [13:58:16.245] ...future.globals.maxSize.org <- getOption("future.globals.maxSize") #> [13:58:16.245] if (!identical(...future.globals.maxSize.org, ...future.globals.maxSize)) { #> [13:58:16.245] oopts <- options(future.globals.maxSize = ...future.globals.maxSize) #> [13:58:16.245] on.exit(options(oopts), add = TRUE) #> [13:58:16.245] } #> [13:58:16.245] { #> [13:58:16.245] lapply(seq_along(...future.elements_ii), FUN = function(jj) { #> [13:58:16.245] ...future.X_jj <- ...future.elements_ii[[jj]] #> [13:58:16.245] ...future.FUN(...future.X_jj, ...) #> [13:58:16.245] }) #> [13:58:16.245] } #> [13:58:16.245] }, args = future.call.arguments) #> [13:58:16.245] } #> [13:58:16.245] Lazy evaluation: FALSE #> [13:58:16.245] Asynchronous evaluation: FALSE #> [13:58:16.245] Local evaluation: TRUE #> [13:58:16.245] Environment: 0x5572ea4ef220 #> [13:58:16.245] Capture standard output: TRUE #> [13:58:16.245] Capture condition classes: ‘condition’ (excluding ‘nothing’) #> [13:58:16.245] Globals: 5 objects totaling 6.54 KiB (function ‘...future.FUN’ of 1.73 KiB, DotDotDotList ‘future.call.arguments’ of 0 bytes, list ‘...future.elements_ii’ of 4.80 KiB, NULL ‘...future.seeds_ii’ of 0 bytes, NULL ‘...future.globals.maxSize’ of 0 bytes) #> [13:58:16.245] Packages: 1 packages (‘stats’) #> [13:58:16.245] L'Ecuyer-CMRG RNG seed: <none> (seed = FALSE) #> [13:58:16.245] Resolved: TRUE #> [13:58:16.245] Value: 25.57 KiB of class ‘list’ #> [13:58:16.245] Early signaling: FALSE #> [13:58:16.245] Owner process: 9d04a638-80d7-2cb1-c9c4-7dbd6de2ef73 #> [13:58:16.245] Class: ‘SequentialFuture’, ‘UniprocessFuture’, ‘Future’, ‘environment’
#> [13:58:16.247] Chunk #1 of 1 ... DONE
#> [13:58:16.247] Launching 1 futures (chunks) ... DONE
#> [13:58:16.247] Resolving 1 futures (chunks) ...
#> [13:58:16.248] - Number of value chunks collected: 1
#> [13:58:16.248] Resolving 1 futures (chunks) ... DONE
#> [13:58:16.248] Reducing values from 1 chunks ...
#> [13:58:16.248] - Number of values collected after concatenation: 3
#> [13:58:16.248] - Number of values expected: 3
#> [13:58:16.249] Reducing values from 1 chunks ... DONE
#> [13:58:16.249] future_lapply() ... DONE