Adding StructType columns to Spark DataFrames
StructType objects define the schema of Spark DataFrames. StructType objects contain a list of StructField objects that define the name, type, and nullable flag for each column in a DataFrame.
Let’s start with an overview of StructType objects and then demonstrate how StructType columns can be added to DataFrame schemas (essentially creating a nested schema).
StructType columns are a great way to eliminate order dependencies from Spark code.
StructType overview
The StructType case class can be used to define a DataFrame schema as follows.
val data = Seq(
Row(1, "a"),
Row(5, "z")
)
val schema = StructType(
List(
StructField("num", IntegerType, true),
StructField("letter", StringType, true)
)
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data),
schema
)
df.show()+---+------+
|num|letter|
+---+------+
| 1| a|
| 5| z|
+---+------+
The DataFrame schema
method returns a StructType
object.
print(df.schema)StructType(
StructField(num, IntegerType, true),
StructField(letter, StringType, true)
)
Let’s look at another example to see how StructType columns can be appended to DataFrames.
Appending StructType columns
Let’s use the struct()
function to append a StructType column to a DataFrame.
val data = Seq(
Row(20.0, "dog"),
Row(3.5, "cat"),
Row(0.000006, "ant")
)
val schema = StructType(
List(
StructField("weight", DoubleType, true),
StructField("animal_type", StringType, true)
)
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data),
schema
)
val actualDF = df.withColumn(
"animal_interpretation",
struct(
(col("weight") > 5).as("is_large_animal"),
col("animal_type").isin("rat", "cat", "dog").as("is_mammal")
)
)
actualDF.show(truncate = false)+------+-----------+---------------------+
|weight|animal_type|animal_interpretation|
+------+-----------+---------------------+
|20.0 |dog |[true,true] |
|3.5 |cat |[false,true] |
|6.0E-6|ant |[false,false] |
+------+-----------+---------------------+
Let’s take a look at the schema.
print(actualDF.schema)StructType(
StructField(weight,DoubleType,true),
StructField(animal_type,StringType,true),
StructField(animal_interpretation, StructType(
StructField(is_large_animal,BooleanType,true),
StructField(is_mammal,BooleanType,true)
), false)
)
The animal_interpretation
column has a StructType
type — this DataFrame has a nested schema.
It’s easier to view the schema with the printSchema
method.
actualDF.printSchema()root
|-- weight: double (nullable = true)
|-- animal_type: string (nullable = true)
|-- animal_interpretation: struct (nullable = false)
| |-- is_large_animal: boolean (nullable = true)
| |-- is_mammal: boolean (nullable = true)
We can flatten the DataFrame as follows.
actualDF.select(
col("animal_type"),
col("animal_interpretation")("is_large_animal")
.as("is_large_animal"),
col("animal_interpretation")("is_mammal")
.as("is_mammal")
).show(truncate = false)+-----------+---------------+---------+
|animal_type|is_large_animal|is_mammal|
+-----------+---------------+---------+
|dog |true |true |
|cat |false |true |
|ant |false |false |
+-----------+---------------+---------+
Using StructTypes to eliminate order dependencies
Let’s demonstrate some order dependent code and then use a StructType column to eliminate the order dependencies.
Let’s consider three custom transformations that add is_teenager
, has_positive_mood
, and what_to_do
columns to a DataFrame.
def withIsTeenager()(df: DataFrame): DataFrame = {
df.withColumn("is_teenager", col("age").between(13, 19))
}
def withHasPositiveMood()(df: DataFrame): DataFrame = {
df.withColumn(
"has_positive_mood",
col("mood").isin("happy", "glad")
)
}
def withWhatToDo()(df: DataFrame) = {
df.withColumn(
"what_to_do",
when(
col("is_teenager") && col("has_positive_mood"),
"have a chat"
)
)
}
Notice that both the withIsTeenager
and withHasPositiveMood
transformations must be run before the withWhatToDo
transformation can be run. The functions have an order dependency because they must be run in a certain order for the code to work.
Let’s build a DataFrame and execute the functions in the right order so the code will run.
val data = Seq(
Row(30, "happy"),
Row(13, "sad"),
Row(18, "glad")
)
val schema = StructType(
List(
StructField("age", IntegerType, true),
StructField("mood", StringType, true)
)
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data),
schema
)
df
.transform(ExampleTransforms.withIsTeenager())
.transform(ExampleTransforms.withHasPositiveMood())
.transform(ExampleTransforms.withWhatToDo())
.show()+---+-----+-----------+-----------------+-----------+
|age| mood|is_teenager|has_positive_mood| what_to_do|
+---+-----+-----------+-----------------+-----------+
| 30|happy| false| true| null|
| 13| sad| true| false| null|
| 18| glad| true| true|have a chat|
+---+-----+-----------+-----------------+-----------+
Let’s use the struct
function to append a StructType column to the DataFrame and remove the order depenencies from this code.
val isTeenager = col("age").between(13, 19)
val hasPositiveMood = col("mood").isin("happy", "glad")
df.withColumn(
"best_action",
struct(
isTeenager.as("is_teenager"),
hasPositiveMood.as("has_positive_mood"),
when(
isTeenager && hasPositiveMood,
"have a chat"
).as("what_to_do")
)
).show(truncate = false)+---+-----+-----------------------+
|age|mood |best_action |
+---+-----+-----------------------+
|30 |happy|[false,true,null] |
|13 |sad |[true,false,null] |
|18 |glad |[true,true,have a chat]|
+---+-----+-----------------------+
Order dependencies can be a big problem in large Spark codebases
If you’re code is organized as DataFrame transformations, order dependencies can become a big problem.
You might need to figure out how to call 20 functions in exactly the right order to get the desired result.
StructType columns are one way to eliminate order dependencies from your code. I’ll discuss other strategies in more detail in a future blog post!