multidplyr is a backend for dplyr that spreads work across multiple processes. Like all dplyr backends, it allows you to use the dplyr verbs that you’re already familiar with, but alters the underlying computational model to transparently support multi-process parallelism.
This vignette will show you the basics of multidplyr using the nycflights13 dataset.
library(multidplyr)
library(dplyr, warn.conflicts = FALSE)
library(nycflights13)
To start using multidplyr you must create a cluster. Here I used two
cores because it’s the maximum permitted by CRAN, but I suggest that you
use more. For best performance, I recommend using 1 or 2 fewer than the
total number of cores on your computer, which you can detect with
parallel::detectCores()
(leaving at least 1 core free means
that you should still be able to use your computer for other tasks while
your computation is running).
<- new_cluster(2)
cluster
cluster#> 2 session cluster [..]
(In the examples, you’ll also see the use of
default_cluster()
; this is designed specifically for the
constraints of R CMD check, so I don’t recommend using it in your own
code.)
A cluster consists of multiple R processes created by callr. When multiple processes are running at the same time, your operating system will take care of spreading the work across multiple cores.
There are two ways to get data to the workers in cluster:
partition()
a data frame that already loaded in the
interactive process.partition()
partition()
is useful if you have a single in-memory
data frame. For example, take nycflights13::flights
. This
dataset contains information for about ~300,000 flights departing New
York City in 2013. We group it by destination, then partition it:
<- flights %>% group_by(dest) %>% partition(cluster)
flights1
flights1#> Source: party_df [336,776 x 19]
#> Groups: dest
#> Shards: 2 [166,251--170,525 rows]
#>
#> # A data frame: 336,776 × 19
#> year month day dep_time sched_dep…¹ dep_d…² arr_t…³ sched…⁴ arr_d…⁵ carrier
#> <int> <int> <int> <int> <int> <dbl> <int> <int> <dbl> <chr>
#> 1 2013 1 1 557 600 -3 709 723 -14 EV
#> 2 2013 1 1 557 600 -3 838 846 -8 B6
#> 3 2013 1 1 558 600 -2 849 851 -2 B6
#> 4 2013 1 1 558 600 -2 853 856 -3 B6
#> 5 2013 1 1 559 559 0 702 706 -4 B6
#> 6 2013 1 1 559 600 -1 854 902 -8 UA
#> # … with 336,770 more rows, 9 more variables: flight <int>, tailnum <chr>,
#> # origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
#> # minute <dbl>, time_hour <dttm>, and abbreviated variable names
#> # ¹sched_dep_time, ²dep_delay, ³arr_time, ⁴sched_arr_time, ⁵arr_delay
partition()
splits flights1
into roughly
equal subsets on each worker, ensuring that all rows in a group are
transfered to the same worker. The result is a party_df
, or
partitioned data frame.
partition()
is simple to call, but it’s relatively
expensive because it copies a lot of data between processes. An
alternative strategy is for each worker to load the data (from files) it
needs directly.
To show how that might work, I’ll first split flights up by month and save as csv files:
<- tempfile()
path dir.create(path)
%>%
flights group_by(month) %>%
group_walk(~ vroom::vroom_write(.x, sprintf("%s/month-%02i.csv", path, .y$month)))
Now we find all the files in the directory, and divide them up so that each worker gets (approximately) the same number of pieces:
<- dir(path, full.names = TRUE)
files cluster_assign_partition(cluster, files = files)
Then we read in the files on each worker and use
party_df()
to create a partitioned dataframe:
cluster_send(cluster, flights2 <- vroom::vroom(files))
<- party_df(cluster, "flights2")
flights2
flights2#> Source: party_df [336,776 x 18]
#> Shards: 2 [166,158--170,618 rows]
#>
#> # A data frame: 336,776 × 18
#> year day dep_time sched_de…¹ dep_d…² arr_t…³ sched…⁴ arr_d…⁵ carrier flight
#> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <chr> <dbl>
#> 1 2013 1 517 515 2 830 819 11 UA 1545
#> 2 2013 1 533 529 4 850 830 20 UA 1714
#> 3 2013 1 542 540 2 923 850 33 AA 1141
#> 4 2013 1 544 545 -1 1004 1022 -18 B6 725
#> 5 2013 1 554 600 -6 812 837 -25 DL 461
#> 6 2013 1 554 558 -4 740 728 12 UA 1696
#> # … with 336,770 more rows, 8 more variables: tailnum <chr>, origin <chr>,
#> # dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>,
#> # time_hour <dttm>, and abbreviated variable names ¹sched_dep_time,
#> # ²dep_delay, ³arr_time, ⁴sched_arr_time, ⁵arr_delay
Once you have a partitioned data frame, you can operate on it with
the usual dplyr verbs. To bring the data back to the interactive
process, use collect()
:
%>%
flights1 summarise(dep_delay = mean(dep_delay, na.rm = TRUE)) %>%
collect()
#> # A tibble: 105 × 2
#> dest dep_delay
#> <chr> <dbl>
#> 1 ABQ 13.7
#> 2 ALB 23.6
#> 3 AUS 13.0
#> 4 AVL 8.19
#> 5 BDL 17.7
#> 6 BGR 19.5
#> 7 BHM 29.7
#> 8 BNA 16.0
#> 9 BOS 8.73
#> 10 BZN 11.5
#> # … with 95 more rows
For this size of data and a simple transformation, using a local cluster actually makes performance much worse!
<- flights %>% group_by(dest)
by_dest
# Local computation
system.time(by_dest %>% summarise(mean(dep_delay, na.rm = TRUE)))
#> user system elapsed
#> 0.009 0.000 0.010
# Remote: partitioning
system.time(flights2 <- flights %>% partition(cluster))
#> user system elapsed
#> 0.193 0.041 0.266
# Remote: computation
system.time(flights3 <- flights2 %>% summarise(mean(dep_delay, na.rm = TRUE)))
#> user system elapsed
#> 0.004 0.001 0.038
# Remote: retrieve results
system.time(flights3 %>% collect())
#> user system elapsed
#> 0.004 0.002 0.051
That’s because of the overhead associated with sending the data to each worker and retrieving the results at the end. For basic dplyr verbs, multidplyr is unlikely to give you significant speed ups unless you have 10s or 100s of millions of data points (and in that scenario you should first try dtplyr, which uses data.table).
multipldyr might help, however, if you’re doing more complex things. Let’s see how that plays out when fitting a moderately complex model. We’ll start by selecting a subset of flights that have at least 50 occurrences, and we’ll compute the day of the year from the date:
<- flights %>%
daily_flights count(dest) %>%
filter(n >= 365)
<- flights %>%
common_dest semi_join(daily_flights, by = "dest") %>%
mutate(yday = lubridate::yday(ISOdate(year, month, day))) %>%
group_by(dest)
nrow(common_dest)
#> [1] 332942
That leaves us with ~332,000 observations. Let’s partition this smaller dataset:
<- common_dest %>% partition(cluster)
by_dest
by_dest#> Source: party_df [332,942 x 20]
#> Groups: dest
#> Shards: 2 [164,539--168,403 rows]
#>
#> # A data frame: 332,942 × 20
#> year month day dep_time sched_dep…¹ dep_d…² arr_t…³ sched…⁴ arr_d…⁵ carrier
#> <int> <int> <int> <int> <int> <dbl> <int> <int> <dbl> <chr>
#> 1 2013 1 1 517 515 2 830 819 11 UA
#> 2 2013 1 1 533 529 4 850 830 20 UA
#> 3 2013 1 1 542 540 2 923 850 33 AA
#> 4 2013 1 1 554 558 -4 740 728 12 UA
#> 5 2013 1 1 555 600 -5 913 854 19 B6
#> 6 2013 1 1 558 600 -2 753 745 8 AA
#> # … with 332,936 more rows, 10 more variables: flight <int>, tailnum <chr>,
#> # origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
#> # minute <dbl>, time_hour <dttm>, yday <dbl>, and abbreviated variable names
#> # ¹sched_dep_time, ²dep_delay, ³arr_time, ⁴sched_arr_time, ⁵arr_delay
Let’s fit a smoothed generalised additive model to each destination,
estimating how delays vary over the course of the year and within a day.
Note that we need to use cluster_library()
to load the mgcv
package on every node. That takes around 3s:
cluster_library(cluster, "mgcv")
system.time({
<- by_dest %>%
models do(mod = gam(dep_delay ~ s(yday) + s(dep_time), data = .))
})#> user system elapsed
#> 0.005 0.002 1.395
Compared with around 5s doing it locally:
system.time({
<- common_dest %>%
models group_by(dest) %>%
do(mod = gam(dep_delay ~ s(yday) + s(dep_time), data = .))
})#> user system elapsed
#> 2.003 0.101 2.111
The cost of transmitting messages to the nodes is roughly constant, so the longer the task you’re parallelising, the closer you’ll get to a linear speed up.