Performing operations on multiple columns in a PySpark DataFrame

Matthew Powers
3 min readDec 7, 2017

--

You can use reduce, for loops, or list comprehensions to apply PySpark functions to multiple columns in a DataFrame.

Using iterators to apply the same operation on multiple columns is vital for maintaining a DRY codebase.

Let’s explore different ways to lowercase all of the columns in a DataFrame to illustrate this concept.

If you’re using the Scala API, see this blog post on performing operations on multiple columns in a Spark DataFrame with foldLeft.

Lowercase all columns with reduce

Let’s import the reduce function from functools and use it to lowercase all the columns in a DataFrame.

source_df = spark.createDataFrame(
[
("Jose", "BLUE"),
("lI", "BrOwN")
],
["name", "eye_color"]
)

actual_df = (reduce(
lambda memo_df, col_name: memo_df.withColumn(col_name, lower(col(col_name))),
source_df.columns,
source_df
))

print(actual_df.show())
+----+---------+
|name|eye_color|
+----+---------+
|jose| blue|
| li| brown|
+----+---------+

The physical plan that’s generated by this code looks efficient.

print(actual_df.explain())== Physical Plan ==
*Project [lower(name#0) AS name#5, lower(eye_color#1) AS eye_color#9]
+- Scan ExistingRDD[name#0,eye_color#1]

It is no secret that reduce is not among the favored functions of the Pythonistas. — dawg

Let’s see how we can achieve the same result with a for loop.

Lowercase all columns with a for loop

Let’s use the same source_df as earlier and build up the actual_df with a for loop.

actual_df = source_df

for col_name in actual_df.columns:
actual_df = actual_df.withColumn(col_name, lower(col(col_name)))

This code is a bit ugly, but Spark is smart and generates the same physical plan.

print(actual_df.explain())== Physical Plan ==
*Project [lower(name#18) AS name#23, lower(eye_color#19) AS eye_color#27]
+- Scan ExistingRDD[name#18,eye_color#19]

Let’s see how we can also use a list comprehension to write this code.

Lowercase all columns with a list comprehension

Let’s use the same source_df as earlier and lowercase all the columns with list comprehensions that are beloved by Pythonistas far and wide.

actual_df = source_df.select(
*[lower(col(col_name)).name(col_name) for col_name in source_df.columns]
)

Spark is still smart and generates the same physical plan.

print(actual_df.explain())== Physical Plan ==
*Project [lower(name#36) AS name#41, lower(eye_color#37) AS eye_color#42]
+- Scan ExistingRDD[name#36,eye_color#37]

Let’s mix it up and see how these solutions work when they’re run on some, but not all, of the columns in a DataFrame.

Performing operations on a subset of the DataFrame columns

Let’s define a remove_some_chars function that removes all exclamation points and question marks from a column.

def remove_some_chars(col_name):
removed_chars = ("!", "?")
regexp = "|".join('\{0}'.format(i) for i in removed_chars)
return regexp_replace(col_name, regexp, "")

Let’s use reduce to apply the remove_some_chars function to two colums in a new DataFrame.

source_df = spark.createDataFrame(
[
("h!o!c!k!e!y", "rangers", "new york"),
("soccer", "??nacional!!", "medellin")
],
["sport", "team", "city"]
)

print(source_df.show())
+-----------+------------+--------+
| sport| team| city|
+-----------+------------+--------+
|h!o!c!k!e!y| rangers|new york|
| soccer|??nacional!!|medellin|
+-----------+------------+--------+
actual_df = (reduce(
lambda memo_df, col_name: memo_df.withColumn(col_name, remove_some_chars(col_name)),
["sport", "team"],
source_df
))

print(actual_df.show())
+------+--------+--------+
| sport| team| city|
+------+--------+--------+
|hockey| rangers|new york|
|soccer|nacional|medellin|
+------+--------+--------+

Let’s try building up the actual_df with a for loop.

actual_df = source_df

for col_name in ["sport", "team"]:
actual_df = actual_df.withColumn(col_name, remove_some_chars(col_name))

The for loop looks pretty clean. Now let’s try it with a list comprehension.

source_df.select(
*[remove_some_chars(col_name).name(col_name) if col_name in ["sport", "team"] else col_name for col_name in source_df.columns]
)

Wow, the list comprehension is really ugly for a subset of the columns 😿

reduce, for, and list comprehensions are all outputting the same physical plan as in the previous example, so each option is equally performant when executed.

== Physical Plan ==
*Project [regexp_replace(sport#109, \!|\?, ) AS sport#116, regexp_replace(team#110, \!|\?, ) AS team#117, city#111]
+- Scan ExistingRDD[sport#109,team#110,city#111]

What approach should you use?

for loops seem to yield the most readable code. List comprehensions can be used for operations that are performed on all columns of a DataFrame, but should be avoided for operations performed on a subset of the columns. The reduce code is pretty clean too, so that’s also a viable alternative.

It’s best to write functions that operate on a single column and wrap the iterator in a separate DataFrame transformation so the code can easily be applied to multiple columns.

Let’s define a multi_remove_some_chars DataFrame transformation that takes an array of col_names as an argument and applies remove_some_chars to each col_name.

def multi_remove_some_chars(col_names):
def inner(df):
for col_name in col_names:
df = df.withColumn(
col_name,
remove_some_chars(col_name)
)
return df
return inner

We can invoke multi_remove_some_chars as follows:

multi_remove_some_chars(["sport", "team"])(source_df)

This separation of concerns creates a codebase that’s easy to test and reuse.

--

--

Matthew Powers
Matthew Powers

Written by Matthew Powers

Spark coder, live in Colombia / Brazil / US, love Scala / Python / Ruby, working on empowering Latinos and Latinas in tech

Responses (7)