Validating Spark DataFrame Schemas
This post demonstrates how to explicitly validate the schema of a DataFrame in custom transformations so your code is easier to read and provides better error messages.
Spark’s lazy evaluation and execution plan optimizations yield amazingly fast results, but can also create cryptic error messages.
This post will demonstrate how schema validations create code that’s easier to read, maintain, and debug.
Custom Transformations Refresher
A custom transformation is a function that takes a DataFrame as an argument and returns a DataFrame. This blog post provides great background information on custom transformations.
Let’s look at an example of a custom transformation that makes an assumption.
The following transformation appends an is_senior_citizen
column to a DataFrame.
def withIsSeniorCitizen()(df: DataFrame): DataFrame = {
df.withColumn("is_senior_citizen", df("age") >= 65)
}
Suppose we have the following peopleDF
:
+------+---+
| name|age|
+------+---+
|miguel| 80|
| liz| 10|
+------+---+
Let’s run the withIsSeniorCitizen
transformation.
val actualDF = peopleDF.transform(withIsSeniorCitizen())
actualDF
will have the following data.
+------+---+-----------------+
| name|age|is_senior_citizen|
+------+---+-----------------+
|miguel| 80| true|
| liz| 10| false|
+------+---+-----------------+
withIsSeniorCitizen
assumes that the DataFrame has an age
column with the IntegerType
. In this case, the withIsSeniorCitizen
transformation’s assumption was correct and the code worked perfectly ;)
A Custom Transformation Making a Bad Assumption
Let’s use the following withFullName
transformation to illustrate how making incorrect assumptions yields bad error messages.
def withFullName()(df: DataFrame): DataFrame = {
df.withColumn(
"full_name",
concat_ws(" ", col("first_name"), col("last_name"))
)
}
Suppose we have the following animalDF
.
+---+
|pet|
+---+
|cat|
|dog|
+---+
Let’s run the withFullName
transformation.
animalDF.transform(withFullName())
The code will error out with this message.
org.apache.spark.sql.AnalysisException: cannot resolve ‘`first_name`’ given input columns: [pet]
The withFullName
transformation assumes the DataFrame has first_name
and last_name
columns. The assumption isn’t met, so the code errors out.
The default error message isn’t terrible, but it’s not complete. We would like to have an error message that specifies both the first_name
and last_name
columns are required to run the withFullName
transformation.
Column Presence Validation
Let’s use the spark-daria DataFrameValidator to specify the column assumptions within the withFullName
transformation.
import com.github.mrpowers.spark.daria.sql.DataFrameValidatordef withFullName()(df: DataFrame): DataFrame = {
validatePresenceOfColumns(df, Seq("first_name", "last_name"))
df.withColumn(
"full_name",
concat_ws(" ", col("first_name"), col("last_name"))
)
}
Let’s run the code again.
animalDF.transform(withFullName())
This is the new error message.
com.github.mrpowers.spark.daria.sql.MissingDataFrameColumnsException: The [first_name, last_name] columns are not included in the DataFrame with the following columns [pet]
validatePresenceOfColumns
makes the withFullName
transformation better in two important ways.
withFullName
will be easier to maintain and use because the transformation requirements are explicitly documented in the code- When the
withFullName
assumptions aren’t met, the error message is more descriptive
Full Schema Validation
We can also use the spark-daria DataFrameValidator to validate the presence of StructFields in DataFrames (i.e. validate the presence of the name, data type, and nullable property for each column that’s required).
Let’s look at a withSum
transformation that adds the num1
and num2
columns in a DataFrame.
def withSum()(df: DataFrame): DataFrame = {
df.withColumn(
"sum",
col("num1") + col("num2")
)
}
When the num1
and num2
columns contain numerical data, the withSum
transformation works as expected.
val numsDF = Seq(
(1, 3),
(7, 8)
).toDF("num1", "num2")
numsDF.transform(withSum()).show()+----+----+---+
|num1|num2|sum|
+----+----+---+
| 1| 3| 4|
| 7| 8| 15|
+----+----+---+
withSum
doesn’t work well when the num1
and num2
columns contain strings.
val wordsDF = Seq(
("one", "three"),
("seven", "eight")
).toDF("num1", "num2")
wordsDF.transform(withSum()).show()+-----+-----+----+
| num1| num2| sum|
+-----+-----+----+
| one|three|null|
|seven|eight|null|
+-----+-----+----+
withSum
should error out if the num1
and num2
columns aren’t numeric. Let’s refactor the function to error out with a descriptive error message.
def withSum()(df: DataFrame): DataFrame = {
val requiredSchema = StructType(
List(
StructField("num1", IntegerType, true),
StructField("num2", IntegerType, true)
)
)
validateSchema(df, requiredSchema)
df.withColumn(
"sum",
col("num1") + col("num2")
)
}
Let’s run the code again.
wordsDF.transform(withSum()).show()
Now we get a more descriptive error message.
com.github.mrpowers.spark.daria.sql.InvalidDataFrameSchemaException: The [StructField(num1,IntegerType,true), StructField(num2,IntegerType,true)] StructFields are not included in the DataFrame with the following StructFields [StructType(StructField(num1,StringType,true), StructField(num2,StringType,true))]
Documenting DataFrame Assumptions is Especially Important for Chained DataFrame Transformations
Production applications will define several standalone transformations and chain them together for the final result.
val resultDF = df
.transform(myFirstTransform()) // one set of assumptions
.transform(mySecondTransform()) // more assumptions
.transform(myThirdTransform()) // even more assumptions
Debugging order dependent transformations, each with a different set of assumptions, is a nightmare! Don’t torture yourself!
Conclusion
DataFrame schema assumptions should be explicitly documented in the code with validations.
Code that doesn’t make assumptions is easier to read, better to maintain, and returns more descriptive error message.
spark-daria contains the DataFrame validation functions you’ll need in your projects. Follow these setup instructions and write DataFrame transformations like this:
import com.github.mrpowers.spark.daria.sql.DataFrameValidator
object MyTransformations extends DataFrameValidator {
def withStandardizedPersonInfo(df: DataFrame): DataFrame = {
val requiredColNames = Seq("name", "age")
validatePresenceOfColumns(df, requiredColNames)
// some transformation code
}
}
Applications with proper DataFrame schema validations are significantly easier to debug, especially when complex transformations are chained.
Alternate Path Forward?
Schema independent DataFrame transformations are another way to remove schema assumptions from your code. I’ll discuss these in detail in another blog post ;)