Schema Independent DataFrame Transformations with Spark
DataFrame transformations can be defined with arguments so they don’t make assumptions about the schema of the underlying DataFrame.
Schema independent transformations are easier to reuse than schema dependent transformations and improve the quality of your code.
This blog post will demonstrate how to create schema independent transformations and explain when they should be used.
Schema Dependent DataFrame Transformation Refresher
Let’s take a look at a schema dependent transformation and then convert it to a schema independent transformation. Read this blog post first if you’re new to DataFrame transformations.
Suppose you have the following kittensDF
.
+-----+---+
| name|age|
+-----+---+
|tiger| 1|
| lily| 2|
| jack| 4|
+-----+---+
Let’s write a DataFrame transformation that appends an age_plus_one
column.
def withAgePlusOne()(df: DataFrame): DataFrame = {
df.withColumn("age_plus_one", $"age" + 1)
}
Notice that the withAgePlusOne
transformation only works for DataFrames that have an age
column — i.e. withAgePlusOne
is a schema dependent transformation.
Let’s run the withAgePlusOne
transformation and view the output.
kittensDF.transform(withAgePlusOne()).show()+-----+---+------------+
| name|age|age_plus_one|
+-----+---+------------+
|tiger| 1| 2|
| lily| 2| 3|
| jack| 4| 5|
+-----+---+------------+
The withAgePlusOne
transformation requires the input DataFrame has an age
column and does not allow for customization of the column that is appended. Let’s refactor this transformation to make it more flexible.
Creating a Schema Independent DataFrame Transformation
Suppose you have the following puppyDF
.
+--------+-------+
|pup_name|pup_age|
+--------+-------+
| max| 5|
| charlie| 6|
| daisy| 7|
+--------+-------+
Let’s write a schema independent DataFrame transformation that appends an age_plus_one
column.
def withAgePlusOne(
ageColName: String,
resultColName: String
)(df: DataFrame): DataFrame = {
df.withColumn(resultColName, col(ageColName) + 1)
}
The withAgePlusOne
function has two parameter lists. The first parameter list takes ageColName
and resultColName
and the second parameter list takes the DataFrame.
The withAgePlusOne
function doesn’t hardcode any column names in the function body, so it’s schema independent 🎊
Let’s run the code and view the output.
puppyDF.transform(
withAgePlusOne("pup_age", "pup_age_plus_one")
).show()+--------+-------+----------------+
|pup_name|pup_age|pup_age_plus_one|
+--------+-------+----------------+
| max| 5| 6|
| charlie| 6| 7|
| daisy| 7| 8|
+--------+-------+----------------+
The refactored withAgePlusOne
transformation does not make schema assumptions so it’s easier to reuse.
Validating DataFrame Dependencies
We can use the spark-daria library to validate the DataFrame requirements of schema independent transformations.
def withAgePlusOne(
ageColName: String,
resultColName: String
)(df: DataFrame): DataFrame = {
validatePresenceOfColumns(df, Seq(ageColName))
df.withColumn(resultColName, col(ageColName) + 1)
}
The validatePresenceOfColumns
check ensures that the code will throw a descriptive error message if the underlying DataFrame doesn’t contain the ageColName
column.
Read this blog post for a detailed discussion on validating DataFrame schemas.
When to use schema dependent transformations vs schema independent transformations
The public interface of your code should expose schema independent transformations for function that will be used on a variety of schemas.
Schema dependent transformations are useful for functions that are only expected to run on a single schema.
Suppose you have the following sourceDF.
+-----+------+----+-----+
|fname| lname| ssn| zip|
+-----+------+----+-----+
| bob|barker|1234|11111|
| rich| piano|4444|22222|
+-----+------+----+-----+
Let’s write a withPersonId transformation to append a person_id column to the DataFrame.
def withPersonId()(df: DataFrame): DataFrame = {
sourceDF.withColumn(
"person_id",
md5(concat_ws(",", $"fname", $"lname", $"ssn", $"zip"))
)
}
Let’s run the code and see the results.
sourceDF.transform(withPersonId()).show()+-----+------+----+-----+--------------------+
|fname| lname| ssn| zip| person_id|
+-----+------+----+-----+--------------------+
| bob|barker|1234|11111|fa57b17f53ebfe07e...|
| rich| piano|4444|22222|b73a570f4240c5ba5...|
+-----+------+----+-----+--------------------+
This code is a little bulky when it’s expressed as a schema independent DataFrame transformation.
def withPersonId(
fnameColName: String,
lnameColName: String,
ssnColName: String,
zipColName: String
)(df: DataFrame): DataFrame = {
sourceDF.withColumn(
"person_id",
md5(
concat_ws(
",",
col(fnameColName),
col(lnameColName),
col(ssnColName),
col(zipColName)
)
)
)
}
Invoking the schema independent DataFrame transformation is also a bit more bulky.
sourceDF.transform(
withPersonId("fname", "lname", "ssn", "zip")
).show()
We can set smart defaults to clean up the function invocation of the schema independent DataFrame transformation.
def withPersonId(
fnameColName: String = "fname",
lnameColName: String = "lname",
ssnColName: String = "ssn",
zipColName: String = "zip"
)(df: DataFrame): DataFrame = {
sourceDF.withColumn(
"person_id",
md5(
concat_ws(
",",
col(fnameColName),
col(lnameColName),
col(ssnColName),
col(zipColName)
)
)
)
}
So…. should withPersonId
be a schema dependent or schema independent DataFrame transformation? It depends 😉
If withPersonId
will be called on a variety of DataFrame schemas, we should make it a flexible, schema independent transformation.
If withPersonId
will only be called on DataFrames with a single schema, we can use a schema dependent transformation
What about user defined functions?
User defined functions will obviously be used extensively in your codebase, but should we expose UDFs as part of our public interface?
It’s OK to expose UDFs that are high-level and don’t contain business logic. Spark’s org.apache.spark.sql.functions package provides a lot of good examples of generic functions that should be exposed.
spark-daria also exposes high-level functions.
It’s better to wrap used defined functions that contain business logic in DataFrame transformations and expose the DataFrame transformation as part of your public interface.
Conclusion
Adding schema independent DataFrame transformations to your repertoire will make you a more complete Spark programmer.
Practice the art of optimally selecting a schema dependent transformation, schema independent transformation, or user defined function given your circumstances.
Spark is a demanding engine and it’s vitally important for programmers to manage complexity by exposing the right public interfaces. Make sure to use schema independent transformation as appropriate.