Performing operations on multiple columns in a PySpark DataFrame
You can use reduce
, for
loops, or list comprehensions to apply PySpark functions to multiple columns in a DataFrame.
Using iterators to apply the same operation on multiple columns is vital for maintaining a DRY codebase.
Let’s explore different ways to lowercase all of the columns in a DataFrame to illustrate this concept.
If you’re using the Scala API, see this blog post on performing operations on multiple columns in a Spark DataFrame with foldLeft.
Lowercase all columns with reduce
Let’s import the reduce
function from functools
and use it to lowercase all the columns in a DataFrame.
source_df = spark.createDataFrame(
[
("Jose", "BLUE"),
("lI", "BrOwN")
],
["name", "eye_color"]
)
actual_df = (reduce(
lambda memo_df, col_name: memo_df.withColumn(col_name, lower(col(col_name))),
source_df.columns,
source_df
))
print(actual_df.show())+----+---------+
|name|eye_color|
+----+---------+
|jose| blue|
| li| brown|
+----+---------+
The physical plan that’s generated by this code looks efficient.
print(actual_df.explain())== Physical Plan ==
*Project [lower(name#0) AS name#5, lower(eye_color#1) AS eye_color#9]
+- Scan ExistingRDD[name#0,eye_color#1]
It is no secret that reduce is not among the favored functions of the Pythonistas. — dawg
Let’s see how we can achieve the same result with a for
loop.
Lowercase all columns with a for loop
Let’s use the same source_df
as earlier and build up the actual_df
with a for
loop.
actual_df = source_df
for col_name in actual_df.columns:
actual_df = actual_df.withColumn(col_name, lower(col(col_name)))
This code is a bit ugly, but Spark is smart and generates the same physical plan.
print(actual_df.explain())== Physical Plan ==
*Project [lower(name#18) AS name#23, lower(eye_color#19) AS eye_color#27]
+- Scan ExistingRDD[name#18,eye_color#19]
Let’s see how we can also use a list comprehension to write this code.
Lowercase all columns with a list comprehension
Let’s use the same source_df
as earlier and lowercase all the columns with list comprehensions that are beloved by Pythonistas far and wide.
actual_df = source_df.select(
*[lower(col(col_name)).name(col_name) for col_name in source_df.columns]
)
Spark is still smart and generates the same physical plan.
print(actual_df.explain())== Physical Plan ==
*Project [lower(name#36) AS name#41, lower(eye_color#37) AS eye_color#42]
+- Scan ExistingRDD[name#36,eye_color#37]
Let’s mix it up and see how these solutions work when they’re run on some, but not all, of the columns in a DataFrame.
Performing operations on a subset of the DataFrame columns
Let’s define a remove_some_chars
function that removes all exclamation points and question marks from a column.
def remove_some_chars(col_name):
removed_chars = ("!", "?")
regexp = "|".join('\{0}'.format(i) for i in removed_chars)
return regexp_replace(col_name, regexp, "")
Let’s use reduce
to apply the remove_some_chars
function to two colums in a new DataFrame.
source_df = spark.createDataFrame(
[
("h!o!c!k!e!y", "rangers", "new york"),
("soccer", "??nacional!!", "medellin")
],
["sport", "team", "city"]
)
print(source_df.show())+-----------+------------+--------+
| sport| team| city|
+-----------+------------+--------+
|h!o!c!k!e!y| rangers|new york|
| soccer|??nacional!!|medellin|
+-----------+------------+--------+actual_df = (reduce(
lambda memo_df, col_name: memo_df.withColumn(col_name, remove_some_chars(col_name)),
["sport", "team"],
source_df
))
print(actual_df.show())+------+--------+--------+
| sport| team| city|
+------+--------+--------+
|hockey| rangers|new york|
|soccer|nacional|medellin|
+------+--------+--------+
Let’s try building up the actual_df
with a for
loop.
actual_df = source_df
for col_name in ["sport", "team"]:
actual_df = actual_df.withColumn(col_name, remove_some_chars(col_name))
The for
loop looks pretty clean. Now let’s try it with a list comprehension.
source_df.select(
*[remove_some_chars(col_name).name(col_name) if col_name in ["sport", "team"] else col_name for col_name in source_df.columns]
)
Wow, the list comprehension is really ugly for a subset of the columns 😿
reduce
, for
, and list comprehensions are all outputting the same physical plan as in the previous example, so each option is equally performant when executed.
== Physical Plan ==
*Project [regexp_replace(sport#109, \!|\?, ) AS sport#116, regexp_replace(team#110, \!|\?, ) AS team#117, city#111]
+- Scan ExistingRDD[sport#109,team#110,city#111]
What approach should you use?
for
loops seem to yield the most readable code. List comprehensions can be used for operations that are performed on all columns of a DataFrame, but should be avoided for operations performed on a subset of the columns. The reduce
code is pretty clean too, so that’s also a viable alternative.
It’s best to write functions that operate on a single column and wrap the iterator in a separate DataFrame transformation so the code can easily be applied to multiple columns.
Let’s define a multi_remove_some_chars
DataFrame transformation that takes an array of col_names
as an argument and applies remove_some_chars
to each col_name.
def multi_remove_some_chars(col_names):
def inner(df):
for col_name in col_names:
df = df.withColumn(
col_name,
remove_some_chars(col_name)
)
return df
return inner
We can invoke multi_remove_some_chars
as follows:
multi_remove_some_chars(["sport", "team"])(source_df)
This separation of concerns creates a codebase that’s easy to test and reuse.