Managing Spark Partitions with Coalesce and Repartition
Spark splits data into partitions and executes computations on the partitions in parallel. You should understand how data is partitioned and when you need to manually adjust the partitioning to keep your Spark computations running efficiently.
Understanding memory partitioning is absolutely critical for Spark programmers. This post covers memory partitioning and you should check out Beautiful Spark for details on related topics like building partitioned lakes on disk and how to perform fast filtering operations.
Intro to partitions
Lest’s create a DataFrame of numbers to illustrate how data is partitioned:
val x = (1 to 10).toList
val numbersDf = x.toDF(“number”)
On my machine, the numbersDf is split into four partitions:
numbersDf.rdd.partitions.size // => 4
Each partition is a separate CSV file when you write a DataFrame to disc.
numbersDf.write.csv(“/Users/powers/Desktop/spark_output/numbers”)
Here is how the data is separated on the different partitions.
Partition A: 1, 2
Partition B: 3, 4, 5
Partition C: 6, 7
Partition D: 8, 9, 10
coalesce
The coalesce method reduces the number of partitions in a DataFrame. Here’s how to consolidate the data in two partitions:
val numbersDf2 = numbersDf.coalesce(2)
We can verify coalesce has created a new DataFrame with only two partitions:
numbersDf2.rdd.partitions.size // => 2
numbersDf2 will be written out to disc as two text files:
numbersDf2.write.csv(“/Users/powers/Desktop/spark_output/numbers2”)
The partitions in numbersDf2 have the following data:
Partition A: 1, 2, 3, 4, 5
Partition C: 6, 7, 8, 9, 10
The coalesce algorithm moved the data from Partition B to Partition A and moved the data from Partition D to Partition C. The data in Partition A and Partition C does not move with the coalesce algorithm. This algorithm is fast in certain situations because it minimizes data movement.
Increasing partitions
You can try to increase the number of partitions with coalesce, but it won’t work!
val numbersDf3 = numbersDf.coalesce(6)
numbersDf3.rdd.partitions.size // => 4
numbersDf3 keeps four partitions even though we attemped to create 6 partitions with coalesce(6).
The coalesce algorithm changes the number of nodes by moving data from some partitions to existing partitions. This algorithm obviously cannot increate the number of partitions.
repartition
The repartition method can be used to either increase or decrease the number of partitions in a DataFrame.
Let’s create a homerDf from the numbersDf with two partitions.
val homerDf = numbersDf.repartition(2)
homerDf.rdd.partitions.size // => 2
Let’s examine the data on each partition in homerDf:
Partition ABC: 1, 3, 5, 6, 8, 10
Partition XYZ: 2, 4, 7, 9
Partition ABC contains data from Partition A, Partition B, Partition C, and Partition D. Partition XYZ also contains data from each original partition. The repartition algorithm does a full data shuffle and equally distributes the data among the partitions. It does not attempt to minimize data movement like the coalesce algorithm.
Increasing partitions
The repartition method can be used to increase the number of partitions as well.
val bartDf = numbersDf.repartition(6)
bartDf.rdd.partitions.size // => 6
Here’s how the data is split up amongst the partitions in the bartDf.
Partition 00000: 5, 7
Partition 00001: 1
Partition 00002: 2
Partition 00003: 8
Partition 00004: 3, 9
Partition 00005: 4, 6, 10
The repartition method does a full shuffle of the data, so the number of partitions can be increased.
Differences between coalesce and repartition
The repartition algorithm does a full shuffle of the data and creates equal sized partitions of data. coalesce combines existing partitions to avoid a full shuffle.
repartition by column
Let’s use the following data to examine how a DataFrame can be repartitioned by a particular column.
+-----+-------+
| age | color |
+-----+-------+
| 10 | blue |
| 13 | red |
| 15 | blue |
| 99 | red |
| 67 | blue |
+-----+-------+
We’ll start by creating the DataFrame:
val people = List(
(10, "blue"),
(13, "red"),
(15, "blue"),
(99, "red"),
(67, "blue")
)
val peopleDf = people.toDF("age", "color")
Let’s repartition the DataFrame by the color column:
colorDf = peopleDf.repartition($"color")
When partitioning by a column, Spark will create a minimum of 200 partitions by default. This example will have two partitions with data and 198 empty partitions.
Partition 00091
13,red
99,redPartition 00168
10,blue
15,blue
67,blue
The colorDf contains different partitions for each color and is optimized for extracts by color. Partitioning by a column is similar to indexing a column in a relational database.
Real World Example
Suppose you have a data lake that contains 2 billion rows of data (1TB) split in 13,000 partitions.
You’d like to create a data puddle that’s a random sampling of one millionth of the data lake. The data puddle will be used in development and the data lake will be reserved for production grade code. You’d like to write the data puddle out to S3 for easy access.
Here’s how you’d structure the code:
val dataPuddle = dataLake.sample(true, 0.000001)
dataPuddle.write.parquet("s3a://my_bucket/puddle/")
Spark doesn’t adjust the number of partitions when a large DataFrame is filtered, so the dataPuddle will also have 13,000 partitions. The dataPuddle only contains 2,000 rows of data, so a lot of the partitions will be empty. It’s not efficient to read or write thousands of empty text files to S3 — we should improve this code by repartitioning.
val dataPuddle = dataLake.sample(true, 0.000001)
val goodPuddle = dataPuddle.repartition(4)
goodPuddle.write.parquet("s3a://my_bucket/puddle/")
Why did we choose 4 partitions for the data puddle?
The data is a million times smaller, so we reduce the number of partitions by a million and keep the same amount of data per partition. 13,000 partitions / 1,000,000 = 1 partition (rounded up). We used 4 partitions so the data puddle can leverage the parallelism of Spark.
In general, you can determine the number of partitions by multiplying the number of CPUs in the cluster by 2, 3, or 4 (see more here and here).
number_of_partitions = number_of_cpus * 4
If you’re writing the data out to a file system, you can choose a partition size that will create reasonable sized files (100MB). Spark will optimize the number of partitions based on the number of clusters when the data is read.
Why did we use the repartition method instead of coalesce?
A full data shuffle is an expensive operation for large data sets, but our data puddle is only 2,000 rows. The repartition method returns equal sized text files, which are more efficient for downstream consumers.
Actual performance improvement
It took 241 seconds to count the rows in the data puddle when the data wasn’t repartitioned (on a 5 node cluster). It only took 2 seconds to count the data puddle when the data was partitioned — that’s a 124x speed improvement!
Writing out single files
repartition(1) and coalesce(1) can be used to write out DataFrames to single files. You won’t typically want to write out data to a single file because it’s slow (and will error out if the dataset is big). You’ll only want to write out data to a single file when the DataFrame is tiny.
Building partitioned datasets on disk
partitionBy can be used to created data that’s partitioned on disk. repartition and coalesce partition data in memory. Partitioning on disk is separate — check out Beautiful Spark to learn about this important distinction.
This post explains how to partition data on disk with partitionBy. This post explains how querying partitioned lakes can be much faster.
You probably need to think about partitions
The partitioning of DataFrames seems like a low level implementation detail that should be managed by the framework, but it’s not. When filtering large DataFrames into smaller ones, you should almost always repartition the data.
You’ll probably be filtering large DataFrames into smaller ones frequently, so get used to repartitioning. Grok it!