The different type of Spark functions (custom transformations, column functions, UDFs)

Spark code can be organized in custom transformations, column functions, or user defined functions (UDFs).

Here’s how the different functions should be used in general:

  1. Use custom transformations when writing to adding / removing columns or rows from a DataFrame
  2. Use Column functions when you need a custom Spark SQL function that can be defined with the native Spark API
  3. Use native functions (aka Catalyst expressions) when you want high performance execution
  4. Use UDFs when the native Spark API isn’t sufficient and you can’t express the logic you need with Column functions

Custom Transformations

val df = List(
("joao"),
("gabriel")
).toDF("first_name")

val df2 = df.withColumn(
"greeting",
lit("HEY!")
)

val df3 = df2.withColumn(
"fav_activity",
lit("surfing")
)

df3.show()
+----------+--------+------------+
|first_name|greeting|fav_activity|
+----------+--------+------------+
| joao| HEY!| surfing|
| gabriel| HEY!| surfing|
+----------+--------+------------+

Let’s refactor this code with custom transformations and see how these can be executed to yield the same result.

def withGreeting()(df: DataFrame): DataFrame = {
df.withColumn(
"greeting",
lit("HEY!")
)
}

def withFavActivity()(df: DataFrame): DataFrame = {
df.withColumn(
"fav_activity",
lit("surfing")
)
}
val df = List(
("joao"),
("gabriel")
).toDF("first_name")

df
.transform(withGreeting())
.transform(withFavActivity())
.show()
+----------+--------+------------+
|first_name|greeting|fav_activity|
+----------+--------+------------+
| joao| HEY!| surfing|
| gabriel| HEY!| surfing|
+----------+--------+------------+

The custom transformations eliminate the order dependent variable assignments and create code that’s easily testable 😅

Here’s the generic method signature for custom transformations.

def customTransformName(arg1: String)(df: DataFrame): DataFrame = {
// code that returns a DataFrame
}

Custom transformations should be used when adding columns, removing columns, adding rows, or removing rows from a DataFrame.

This blog post discusses custom transformations in more detail.

Column functions

def removeAllWhitespace(col: Column): Column = {
regexp_replace(col, "\\s+", "")
}

Column functions can be used like the Spark SQL functions.

List(
("I LIKE food"),
(" this fun")
).toDF("words")
.withColumn(
"clean_words",
removeAllWhitespace("words")
)
.show()
+--------------+-----------+
| words|clean_words|
+--------------+-----------+
| I LIKE food| ILIKEfood|
| this fun| thisfun|
+--------------+-----------+

Column functions are preferable to user defined functions, as discussed in this blog post.

Catalyst functions

Spark native functions are also a great way to learn about how Spark works under the hood.

See this blog post for more information on how to write Spark native functions.

User defined functions

Here’s a UDF to lowercase a string.

def toLowerFun(str: String): Option[String] = {
val s = Option(str).getOrElse(return None)
Some(s.toLowerCase())
}

val toLower = udf[Option[String], String](toLowerFun)

Let’s look at the toLower UDF in action.

val df = List(
("HI ThErE"),
("ME so HAPPY"),
(null)
).toDF("phrase")

df
.withColumn(
"lower_phrase",
toLower(col("phrase"))
)
.show()
+-----------+------------+
| phrase|lower_phrase|
+-----------+------------+
| HI ThErE| hi there|
|ME so HAPPY| me so happy|
| null| null|
+-----------+------------+

This is a contrived example and it’s obviously better to simply use the built-in Spark lower function to downcase a string.

UDFs aren’t desirable because they require complicated null logic and are a black box, so they’re hard for the Spark compiler to optimize. See this blog post for more details.

Conclusion

I use the spark-daria functions combined with private Column functions in almost all of the production custom transformations I write.

Spark coder, live in Colombia / Brazil / US, love Scala / Python / Ruby, working on empowering Latinos and Latinas in tech