Spark User Defined Functions (UDFs)

Matthew Powers
3 min readDec 27, 2017

Spark let’s you define custom SQL functions called user defined functions (UDFs). UDFs are great when built-in SQL functions aren’t sufficient, but should be used sparingly because they’re not performant.

This blog post will demonstrate how to define UDFs and will show how to avoid UDFs, when possible, by leveraging native Spark functions.

Simple UDF example

Let’s define a UDF that removes all the whitespace and lowercases all the characters in a string.

def lowerRemoveAllWhitespace(s: String): String = {
s.toLowerCase().replaceAll("\\s", "")
}

val lowerRemoveAllWhitespaceUDF = udf[String, String](lowerRemoveAllWhitespace)
val sourceDF = spark.createDF(
List(
(" HI THERE "),
(" GivE mE PresenTS ")
), List(
("aaa", StringType, true)
)
)

sourceDF.select(
lowerRemoveAllWhitespaceUDF(col("aaa")).as("clean_aaa")
).show()
+--------------+
| clean_aaa|
+--------------+
| hithere|
|givemepresents|
+--------------+

This code will unfortunately error out if the DataFrame column contains a null value.

val anotherDF = spark.createDF(
List(
(" BOO "),
(" HOO "),
(null)
), List(
("cry", StringType, true)
)
)

anotherDF.select(
lowerRemoveAllWhitespaceUDF(col("cry")).as("clean_cry")
).show()
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 failed 1 times, most recent failure: Lost task 2.0 in stage 3.0 (TID 7, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(anonfun$2: (string) => string)Caused by: java.lang.NullPointerExceptionCause: org.apache.spark.SparkException: Failed to execute user defined function(anonfun$2: (string) => string)Cause: java.lang.NullPointerException

Let’s write a lowerRemoveAllWhitespaceUDF function that won’t error out when the DataFrame contains null values.

def betterLowerRemoveAllWhitespace(s: String): Option[String] = {
val str = Option(s).getOrElse(return None)
Some(str.toLowerCase().replaceAll("\\s", ""))
}

val betterLowerRemoveAllWhitespaceUDF = udf[Option[String], String](betterLowerRemoveAllWhitespace)
val anotherDF = spark.createDF(
List(
(" BOO "),
(" HOO "),
(null)
), List(
("cry", StringType, true)
)
)

anotherDF.select(
betterLowerRemoveAllWhitespaceUDF(col("cry")).as("clean_cry")
).show()
+---------+
|clean_cry|
+---------+
| boo|
| hoo|
| null|
+---------+

We can use the explain() method to demonstrate that UDFs are a black box for the Spark engine.

== Physical Plan ==
*Project [UDF(cry#15) AS clean_cry#24]
+- Scan ExistingRDD[cry#15]

Spark doesn’t know how to convert the UDF into native Spark instructions. Let’s use the native Spark library to refactor this code and help Spark generate a physical plan that can be optimized.

Using Column Functions

Let’s define a function that takes a Column argument, returns a Column, and leverages native Spark functions to lowercase and remove all whitespace from a string.

def bestLowerRemoveAllWhitespace()(col: Column): Column = {
lower(regexp_replace(col, "\\s+", ""))
}
val anotherDF = spark.createDF(
List(
(" BOO "),
(" HOO "),
(null)
), List(
("cry", StringType, true)
)
)

anotherDF.select(
bestLowerRemoveAllWhitespace()(col("cry")).as("clean_cry")
).show()
+---------+
|clean_cry|
+---------+
| boo|
| hoo|
| null|
+---------+

Notice that the bestLowerRemoveAllWhitespace elegantly handles the null case and does not require us to add any special null logic.

anotherDF.select(
bestLowerRemoveAllWhitespace()(col("cry")).as("clean_cry")
).explain()
== Physical Plan ==
*Project [lower(regexp_replace(cry#29, \s+, )) AS clean_cry#38]
+- Scan ExistingRDD[cry#29]

Spark can view the internals of the bestLowerRemoveAllWhitespace function and optimize the physical plan accordingly. UDFs are a black box for the Spark engine whereas functions that take a Column argument and return a Column are not a black box for Spark.

Conclusion

Spark UDFs should be avoided whenever possible. If you need to write a UDF, make sure to handle the null case as this is a common cause of errors.

--

--

Matthew Powers

Spark coder, live in Colombia / Brazil / US, love Scala / Python / Ruby, working on empowering Latinos and Latinas in tech