Spark User Defined Functions (UDFs)
Spark let’s you define custom SQL functions called user defined functions (UDFs). UDFs are great when built-in SQL functions aren’t sufficient, but should be used sparingly because they’re not performant.
This blog post will demonstrate how to define UDFs and will show how to avoid UDFs, when possible, by leveraging native Spark functions.
Simple UDF example
Let’s define a UDF that removes all the whitespace and lowercases all the characters in a string.
def lowerRemoveAllWhitespace(s: String): String = {
s.toLowerCase().replaceAll("\\s", "")
}
val lowerRemoveAllWhitespaceUDF = udf[String, String](lowerRemoveAllWhitespace)val sourceDF = spark.createDF(
List(
(" HI THERE "),
(" GivE mE PresenTS ")
), List(
("aaa", StringType, true)
)
)
sourceDF.select(
lowerRemoveAllWhitespaceUDF(col("aaa")).as("clean_aaa")
).show()+--------------+
| clean_aaa|
+--------------+
| hithere|
|givemepresents|
+--------------+
This code will unfortunately error out if the DataFrame column contains a null
value.
val anotherDF = spark.createDF(
List(
(" BOO "),
(" HOO "),
(null)
), List(
("cry", StringType, true)
)
)
anotherDF.select(
lowerRemoveAllWhitespaceUDF(col("cry")).as("clean_cry")
).show()org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 failed 1 times, most recent failure: Lost task 2.0 in stage 3.0 (TID 7, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(anonfun$2: (string) => string)Caused by: java.lang.NullPointerExceptionCause: org.apache.spark.SparkException: Failed to execute user defined function(anonfun$2: (string) => string)Cause: java.lang.NullPointerException
Let’s write a lowerRemoveAllWhitespaceUDF
function that won’t error out when the DataFrame contains null
values.
def betterLowerRemoveAllWhitespace(s: String): Option[String] = {
val str = Option(s).getOrElse(return None)
Some(str.toLowerCase().replaceAll("\\s", ""))
}
val betterLowerRemoveAllWhitespaceUDF = udf[Option[String], String](betterLowerRemoveAllWhitespace)val anotherDF = spark.createDF(
List(
(" BOO "),
(" HOO "),
(null)
), List(
("cry", StringType, true)
)
)
anotherDF.select(
betterLowerRemoveAllWhitespaceUDF(col("cry")).as("clean_cry")
).show()+---------+
|clean_cry|
+---------+
| boo|
| hoo|
| null|
+---------+
We can use the explain()
method to demonstrate that UDFs are a black box for the Spark engine.
== Physical Plan ==
*Project [UDF(cry#15) AS clean_cry#24]
+- Scan ExistingRDD[cry#15]
Spark doesn’t know how to convert the UDF into native Spark instructions. Let’s use the native Spark library to refactor this code and help Spark generate a physical plan that can be optimized.
Using Column Functions
Let’s define a function that takes a Column
argument, returns a Column
, and leverages native Spark functions to lowercase and remove all whitespace from a string.
def bestLowerRemoveAllWhitespace()(col: Column): Column = {
lower(regexp_replace(col, "\\s+", ""))
}val anotherDF = spark.createDF(
List(
(" BOO "),
(" HOO "),
(null)
), List(
("cry", StringType, true)
)
)
anotherDF.select(
bestLowerRemoveAllWhitespace()(col("cry")).as("clean_cry")
).show()+---------+
|clean_cry|
+---------+
| boo|
| hoo|
| null|
+---------+
Notice that the bestLowerRemoveAllWhitespace
elegantly handles the null
case and does not require us to add any special null
logic.
anotherDF.select(
bestLowerRemoveAllWhitespace()(col("cry")).as("clean_cry")
).explain()== Physical Plan ==
*Project [lower(regexp_replace(cry#29, \s+, )) AS clean_cry#38]
+- Scan ExistingRDD[cry#29]
Spark can view the internals of the bestLowerRemoveAllWhitespace
function and optimize the physical plan accordingly. UDFs are a black box for the Spark engine whereas functions that take a Column
argument and return a Column
are not a black box for Spark.
Conclusion
Spark UDFs should be avoided whenever possible. If you need to write a UDF, make sure to handle the null
case as this is a common cause of errors.