This post shows a number of different package and approaches for leveraging parallel processing with R and Python.
Multicore Data Science in R and Python
Time is precious. Data science involves increasingly demanding processing requirements. From training ever larger models, to feature engineering, to hyper parameter tuning, processing power is often the bottleneck to experimentation and ideation.
Leveraging large machine instances on the cloud allows data scientists to use the statistical programming environments they already know and love, such as R and Python, on ever larger datasets, more complex models, and more demanding configurations. Massive hardware such as the AWS X1 instances, with 128 cores and 2 terabytes of RAM, have pushed the boundaries of what can be done without requiring complex, hard-to-manage, unfamiliar distributed systems.
Leveraging larger datasets and more processing power allows data scientists to do more experiments, be more confident about solutions, and build more accurate models for the business. Parallel processing used to require specialized skillset and understanding, leveraging basic building blocks of fine grained multi-threading and message passing. The modern data science stack provides high-level APIs, where data scientists can take advantage of large compute instances while working at the level of abstraction where they’re most productive.
In the Multicore Data Science on R and Python video we cover a number of R and Python tools that allow data scientists to leverage large-scale architectures to collect, write, munge, and manipulate data, as well as train and validate models on multicore architectures. You will see sample code, real-world benchmarks, and running of experiments on AWS X1 instances using Domino.
For R, we cover the parallel package, data.table, caret, and multidplyr. In Python, we cover paratext, joblib, and scikit-learn.
Finally, we show that there exists powerful language-agnostic tools for data scientists to take advantage of multicore architectures. In the video we use H2O.ai and xgboost—two cutting-edge machine learning tools that can leverage machines with many cores by setting a single parameter.
Below is a sampling of the material covered in the full video.
Package parallel was first included in R 2.14.0 and provides drop-in parallel replacements for most of the functionality of apply, with integrated handling of random-number generation.
Parallelism can be done in computation at many different levels: this package is principally concerned with ‘coarse-grained parallelization’. The crucial point is that these chunks of computation are unrelated and do not need to communicate in any way.
In this sample code, we leverage parallel to read a directory of 100 csv files from a folder in parallel. This can allow us to leverage multicore architectures to parse and ingest data on disk more quickly.
numCores <- detectCores() # get the number of cores available
results <- mclapply(1:100,
FUN=function(i) read.csv(paste0("./data/datafile-", i, ".csv")),
mc.cores = numCores)
The first multicore concepts introduced in this code are on line 3, where we call the detectCores() function to retrieve the number of cores available to this process. This queries the underlying operating system and returns an integer representing the number of processors. It’s important to note that often optimal parallelism does not mean using all cores available, as it may saturate other resources and cause thrashing, so remember to experiment and benchmark.
On line 5 we call the mclapply() (or multicore lapply) function imported from the parallel package. This is a nearly drop-in replacement for R’s venerable lapply function.
There are two differences practitioners should be aware of:
- The mc.cores parameter provides the user a way to set the number of cores to leverage (in our case simply all cores detected).
- The code in FUN is being executed inside of a separate R process, and therefore inherits forking semantics.
The main takeaway is that accessing global variables and dealing with global state will necessarily be different than when all code is executing in a single process.
Parallel provides a great tool to quickly scale up existing code with a few minor tweaks. It’s important to measure the true performance as the number of cores is scaled up and to remember that forking semantics will require some reorganization of code if global variables are used, however this can all be worth if it you want to speed up your code by over 100x!
Data.table is a venerable and powerful package written primarily by Matt Dowle. It is a high-performance implementation of R’s data frame construct, with an enhanced syntax. There have been innumerable benchmarks showcasing the power of the data.table package. It provides a highly optimized tabular data structure for most common analytical operations.
Matt Dowle cautions against using data.table in multicore environments, so why are discussing it in a multicore webinar and blog post? Announced in November of 2016, data.table got a fully parallelized version of fwrite! Allowing R to write out data with significant speedup!
In this sample code, we use the data.table fwrite package to write a large CSV leveraging multicore architectures.
numCores <- detectCores()
nrows_mult <- 1000
big_data <- do.call("rbind", replicate(nrows_mult, iris, simplify = FALSE))
big_data <- do.call("cbind", replicate(ncols_mult, big_data, simplify = FALSE))
fwrite(big_data, "/tmp/bigdata.csv", nThread=numCores)
In lines 6-10 we are replicating the iris dataset a number of times across to make it a very large in-memory data structure. This would take a significant amount of time to write to disk using standard tools.
To leverage multiple cores, on line 12 we call the fwrite() function with the parameter nThread with the number of cores we detected on line 4. There are limitations to the amount of parallelism that makes sense in this scenario, because the IO subsystem may not be able to keep up with massive numbers of threads, but as the benchmarks in the video show, it can make a significant difference; sometimes, writing out files 50% faster is the difference between success and failure.
The caret package (Classification And REgression Training) is a set of functions that streamline the process for creating predictive models. The package contains tools for data splitting, preprocessing, feature selection, model tuning using resampling, variable importance estimation, and other functionality.
There are many different modeling functions in R. Some have different syntax for model training and/or prediction. The package began as a way to provide a uniform interface for the functions themselves, as well as to standardize common tasks such parameter tuning and variable importance.
The caret package leverages multicore functionality seamlessly and easily. It takes advantage of the fact that many of the operations—such as training with different hyperparemeters and cross validation—in model training are parallel.
In the sample code, we take advantage of caret’s multicore ability to train a glmnet model while doing a hyperparameter sweep and cross-validation:
numCores <- detectCores()
registerDoMc(cores = numCores)
model_fit<- train(price ~ ., data=diamonds, method="glmnet", preProcess=c("center", "scale"), tuneLength=10)
This code should look familiar by now. The primary difference comes at line 2 where we include the doMC package. This package provides a multicore “back-end” to the caret package, and handles all of the distribution of jobs across cores.
On line 6 we register the number of cores available to the doMC cluster.
On line 8 we train the model, doing some pre-processing on the values.
There is no need to pass the number of cores to caret, as it automatically inherits this information from the already created cluster.
There are a number of parameters that can be passed to caret. In this example we pass a tuneLength of 10, which creates a large tuning grid of hyperparemeters. This will create dozens if not hundreds of models with different configurations, and give us the best model based on optimizing the metric that makes the most sense, in this case RMSE.
Multidplyr is a backend for dplyr that partitions a data frame across multiple cores. You tell multidplyr how to split the data up with partition(), and then the data stays on each node until you explicitly retrieve it with collect(). This minimizes time spent moving data around, and maximizes parallel performance.
Due to the overhead associated with communicating between the nodes, you won't see much performance improvement on basic dplyr verbs with fewer than ~10 million observations. However, you'll see improvements much faster if you're doing more complex operations with do().
In the sample code we use multidplyr to train a large number of GAM models on a dataset:
numCores <- detectCores()
cluster <- create_cluster(cores)
by_dest <- flights %>%
filter(n >= 365) %>%
semi_join(flights, .) %>%
mutate(yday = lubridate::yday(ISOdate(year, month, day))) %>%
partition(dest, cluster = cluster)
models <- by_dest %>%
do(mod = gam(dep_delay ~ s(yday) + s(dep_time), data = .))
This code is slightly more complex than our previous examples. The main differences are that on line 8 we are explicitly initializing a cluster with the number of cores detected in line 6.
The multidplyr package handles all of the underlying challenges of spinning up the cluster, and does so in a transparent fashion for us via that simplified interface.
Lines 10-15 we are taking the flights dataset, doing some feature engineering to it, then partitioning it to the cluster on line 15.
This means we are taking subsets of the dataset and sending it to each one of the cores for processing. This abstraction is at a lower level than other abstractions such as caret’s, but it does allow us a significant amount of power to decide exactly how code is distributed across multiple cores.
On line 17 we broadcast to the cluster that it will require the “mgcv” library in order to execute the following pipeline.
Finally, on lines 18 and 19, we train a large number of GAM models in parallel across the cluster, each partitioned by the destination.
Multidplyr is still an early package and is being used by Hadley and his team to work through and understand how to bring multicore power to the tidyverse. It is useful now for a number of use cases and is an exciting preview of things to come.
Reading CSVs can is a time-consuming task and often bottleneck to data processing. If you are leveraging large-scale hardware with dozens of cores, it can be humbling to see your server sitting mostly idle as one core is utilized 100% while reading a CSV file.
The paratext library provides mutlicore processing to CSV reading and parsing. ParaText is a C++ library to read text files in parallel on multi-core machines. The alpha release includes a CSV reader and Python bindings. The library itself has no dependencies other than the standard library.
According to our benchmarks, ParaText is the fastest CSV reading library in the world.
In the sample code, we use paratext to read a very large CSV while leveraging all of the available cores.
import pandas as pd
mydf = paratext.load_csv_to_pandas("data/big_data.csv")
In this incredibly simple code, we load the paratext library and use the load_csv_to_pandas function to create a pandas dataframe from big_data.csv. This function will automatically leverage an optimal number of cores and provide a significant acceleration.
The only challenge with paratext is getting it built on your particular environment can be nontrivial. However, after installation, it provides significant performance with minimal effort.
Joblib is a set of tools for lightweight pipelining in Python. In particular, joblib offers:
- Transparent disk-caching of the output values and lazy re-evaluation (memorize pattern)
- easy simple parallel computing
- logging and tracing of the execution
Joblib is optimized to be fast and robust in particular on large data and has specific optimizations for numpy arrays. Joblib is a fundamental building block of parallel processing in Python, not just for data science but for many other distributed and multicore processing tasks.
In the sample code, joblib is used to find which line-segments are fully contained by other line-segments in the population—an embarrassingly parallel task:
import numpy as np
from matplotlib.path import Path
from joblib import Parallel, delayed
## Create pairs of points for line segments
all_segments = zip(np.random.rand(10000,2), np.random.rand(10000,2))
test_segments = zip(np.random.rand(800,2),np.random.rand(800,2))
## Check if one line segment contains another.
for other_path in all_segments:
chck = Path(other_path)
res = Parallel(n_jobs=128) (delayed(check_paths) (Path(test_segment)) for test_segment in test_segments)
All of the code up until line 19 is setting up our environment. Generating line-segments and segments to validate, and creating a function called check_paths to iterate through the segments and check if one contains the other.
Line 19 is our call to joblib, in which we create a parallel object with 128 threads (this was run on an AWS X1 instance). It iterates through the values in test_segments, creates a path object for that test_segment, then calls the check_paths function for the created object.
Note that the call to check_paths is wrapped in delayed(), which allows joblib to schedule it instead of having it executed immediately.
Scikit-learn is a free software machine learning library for the Python programming language. It features various classification, regression, and clustering algorithms including support for vector machines, random forests, gradient boosting, k-means and DBSCAN. It is designed to interoperate with the Python numerical and scientific libraries NumPy and SciPy.
Scikit-learn makes it easy to leverage large multicore servers by the simple usage of the n_jobs parameter. This works for many models, Grid searching, cross validation, etc.
In the sample code, we train a RandomForestClassifier to predict the species in the iris dataset leveraging multiple cores in parallel:
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics, datasets
iris = datasets.load_iris()
X = iris.data[:, :2] # we only take the first two features.
Y = iris.target
md = RandomForestClassifier(n_estimators = 500, n_jobs = -1)
The power of scikit-learn’s multicore capabilities is shown online 8: In order to leverage all of the cores available on any system, we just pass the value -1 to the n_jobs parameter.
There is no need to set up a cluster, introspect the machine, or anything else... This simple parameter can often provide a 100x speedup in the training of machine learning models for scikit-learn.
Not all models provide the n_jobs parameter, but the scikit-learn documentation provides a way to find out if your specific classifier supports this simple parallelization.
Learn More, Including H2O and Xgboost
Watch the full video on multicore data science with R and Python to learn about multicore capabilities in h2o and xgboost, two of the most popular machine learning packages available today.
Working with the world’s most cutting-edge software, on supercomputer-class hardware is a real privilege. It’s exciting to see how much more productive these tools have made me in my data science practice, and will hopefully impact you in a similar way.
The Domino Enterprise MLOps platform provides access to large-scale compute environments with powerful environment management tools, making it easy to test this software on large hardware and manage the configuration of multiple versions of these packages.
If you are interested in learning more, you can request a demo of Domino.