Schema Independent DataFrame Transformations with Spark

DataFrame transformations can be defined with arguments so they don’t make assumptions about the schema of the underlying DataFrame.

Schema independent transformations are easier to reuse than schema dependent transformations and improve the quality of your code.

This blog post will demonstrate how to create schema independent transformations and explain when they should be used.

Schema Dependent DataFrame Transformation Refresher

Let’s take a look at a schema dependent transformation and then convert it to a schema independent transformation. Read this blog post first if you’re new to DataFrame transformations.

Suppose you have the following .

+-----+---+
| name|age|
+-----+---+
|tiger| 1|
| lily| 2|
| jack| 4|
+-----+---+

Let’s write a DataFrame transformation that appends an column.

def withAgePlusOne()(df: DataFrame): DataFrame = {
df.withColumn("age_plus_one", $"age" + 1)
}

Notice that the transformation only works for DataFrames that have an column — i.e. is a schema dependent transformation.

Let’s run the transformation and view the output.

kittensDF.transform(withAgePlusOne()).show()+-----+---+------------+
| name|age|age_plus_one|
+-----+---+------------+
|tiger| 1| 2|
| lily| 2| 3|
| jack| 4| 5|
+-----+---+------------+

The transformation requires the input DataFrame has an column and does not allow for customization of the column that is appended. Let’s refactor this transformation to make it more flexible.

Creating a Schema Independent DataFrame Transformation

Suppose you have the following .

+--------+-------+
|pup_name|pup_age|
+--------+-------+
| max| 5|
| charlie| 6|
| daisy| 7|
+--------+-------+

Let’s write a schema independent DataFrame transformation that appends an column.

def withAgePlusOne(
ageColName: String,
resultColName: String
)(df: DataFrame): DataFrame = {
df.withColumn(resultColName, col(ageColName) + 1)
}

The function has two parameter lists. The first parameter list takes and and the second parameter list takes the DataFrame.

The function doesn’t hardcode any column names in the function body, so it’s schema independent 🎊

Let’s run the code and view the output.

puppyDF.transform(
withAgePlusOne("pup_age", "pup_age_plus_one")
).show()
+--------+-------+----------------+
|pup_name|pup_age|pup_age_plus_one|
+--------+-------+----------------+
| max| 5| 6|
| charlie| 6| 7|
| daisy| 7| 8|
+--------+-------+----------------+

The refactored transformation does not make schema assumptions so it’s easier to reuse.

Validating DataFrame Dependencies

We can use the spark-daria library to validate the DataFrame requirements of schema independent transformations.

def withAgePlusOne(
ageColName: String,
resultColName: String
)(df: DataFrame): DataFrame = {
validatePresenceOfColumns(df, Seq(ageColName))
df.withColumn(resultColName, col(ageColName) + 1)
}

The check ensures that the code will throw a descriptive error message if the underlying DataFrame doesn’t contain the column.

Read this blog post for a detailed discussion on validating DataFrame schemas.

When to use schema dependent transformations vs schema independent transformations

The public interface of your code should expose schema independent transformations for function that will be used on a variety of schemas.

Schema dependent transformations are useful for functions that are only expected to run on a single schema.

Suppose you have the following sourceDF.

+-----+------+----+-----+
|fname| lname| ssn| zip|
+-----+------+----+-----+
| bob|barker|1234|11111|
| rich| piano|4444|22222|
+-----+------+----+-----+

Let’s write a withPersonId transformation to append a person_id column to the DataFrame.

def withPersonId()(df: DataFrame): DataFrame = {
sourceDF.withColumn(
"person_id",
md5(concat_ws(",", $"fname", $"lname", $"ssn", $"zip"))
)
}

Let’s run the code and see the results.

sourceDF.transform(withPersonId()).show()+-----+------+----+-----+--------------------+
|fname| lname| ssn| zip| person_id|
+-----+------+----+-----+--------------------+
| bob|barker|1234|11111|fa57b17f53ebfe07e...|
| rich| piano|4444|22222|b73a570f4240c5ba5...|
+-----+------+----+-----+--------------------+

This code is a little bulky when it’s expressed as a schema independent DataFrame transformation.

def withPersonId(
fnameColName: String,
lnameColName: String,
ssnColName: String,
zipColName: String
)(df: DataFrame): DataFrame = {
sourceDF.withColumn(
"person_id",
md5(
concat_ws(
",",
col(fnameColName),
col(lnameColName),
col(ssnColName),
col(zipColName)
)
)
)
}

Invoking the schema independent DataFrame transformation is also a bit more bulky.

sourceDF.transform(
withPersonId("fname", "lname", "ssn", "zip")
).show()

We can set smart defaults to clean up the function invocation of the schema independent DataFrame transformation.

def withPersonId(
fnameColName: String = "fname",
lnameColName: String = "lname",
ssnColName: String = "ssn",
zipColName: String = "zip"
)(df: DataFrame): DataFrame = {
sourceDF.withColumn(
"person_id",
md5(
concat_ws(
",",
col(fnameColName),
col(lnameColName),
col(ssnColName),
col(zipColName)
)
)
)
}

So…. should be a schema dependent or schema independent DataFrame transformation? It depends 😉

If will be called on a variety of DataFrame schemas, we should make it a flexible, schema independent transformation.

If will only be called on DataFrames with a single schema, we can use a schema dependent transformation

What about user defined functions?

User defined functions will obviously be used extensively in your codebase, but should we expose UDFs as part of our public interface?

It’s OK to expose UDFs that are high-level and don’t contain business logic. Spark’s org.apache.spark.sql.functions package provides a lot of good examples of generic functions that should be exposed.

spark-daria also exposes high-level functions.

It’s better to wrap used defined functions that contain business logic in DataFrame transformations and expose the DataFrame transformation as part of your public interface.

Conclusion

Adding schema independent DataFrame transformations to your repertoire will make you a more complete Spark programmer.

Practice the art of optimally selecting a schema dependent transformation, schema independent transformation, or user defined function given your circumstances.

Spark is a demanding engine and it’s vitally important for programmers to manage complexity by exposing the right public interfaces. Make sure to use schema independent transformation as appropriate.

Spark coder, live in Colombia / Brazil / US, love Scala / Python / Ruby, working on empowering Latinos and Latinas in tech