How to write Spark ETL Processes
Spark is a powerful tool for extracting data, running transformations, and loading the results in a data store.
Spark runs computations in parallel so execution is lightning fast and clusters can be scaled up for big data. Spark’s native API and spark-daria’s EtlDefinition
object allow for elegant definitions of ETL logic.
Extract
Suppose you have a data lake of Parquet files. Here’s some example code that will fetch the data lake, filter the data, and then repartition the data subset.
val dataLakeDF = spark.read.parquet("s3a://some-bucket/foo")
val extractDF = dataLakeDF
.where(col("mood") === "happy")
.repartition(10000)
Read this blog post for more information about repartitioning DataFrames. We’re now ready to transform the extractDF
.
Transform
We can define a custom transformation function that takes a DataFrame
as an argument and returns a DataFrame
to transform the extractDF
. Custom transformation functions are reusable and easily testable, so this creates a high quality codebase.
Let’s define a couple of DataFrame transformations.
def withGreeting()(df: DataFrame): DataFrame = {
df.withColumn("greeting", lit("hello world"))
}
def withFarewell()(df: DataFrame): DataFrame = {
df.withColumn("farewell", lit("goodbye"))
}
Let’s create a model()
function that chains the custom transformations.
def model()(df: DataFrame): DataFrame = {
df
.transform(withGreeting())
.transform(withFarewell())
}
We can run extractDF.transform(model())
to run the transformations on our extract. Easy peasy 😎
Check out this blog post for more details on chaining custom DataFrame transformations.
Load (or Report)
We can use the Spark DataFrame writers to define a generic function that writes a DataFrame to a given location in S3.
def exampleWriter()(df: DataFrame): Unit = {
val path = "s3a://some-bucket/extracts/bar"
df.write.mode(SaveMode.Overwrite).parquet(path)
}
The writer function should take a DataFrame
as an argument and return nothing (Unit
).
EtlDefinition
Let’s instantiate the EtlDefinition
case class defined in spark-daria and use the process()
method to execute the ETL code.
val etl = new EtlDefinition(
sourceDF = extractDF,
transform = model(),
write = exampleWriter()
)
Here’s how to execute the ETL code:
etl.process()
Wow, that was easy 😉 The EtlDefinition
object can even be repurposed for making Slack messages! I’ll cover that in another blog post.
Take a look at the method signatures of the EtlDefinition
arguments and make sure you understand how the functions we’ve defined fit into this mold.
case class EtlDefinition(
sourceDF: DataFrame,
transform: (DataFrame => DataFrame),
write: (DataFrame => Unit),
metadata: scala.collection.mutable.Map[String, Any] = scala.collection.mutable.Map[String, Any]()
) {
def process(): Unit = {
write(sourceDF.transform(transform))
}
}
Take note that EtlDefinition
objects can optionally be instantiated with an arbitrary metadata Map.
Multiple EtlDefinitions
You can organize a collection of EtlDefinition
objects in a mutable Map, so they’re easy to fetch and execute.
val etls = scala.collection.mutable.Map[String, EtlDefinition]()
etls += ("bar" -> etl)
etls += ("baz" -> etl2)
etls("bar").process()
Next Steps
Here are the key steps to writing good ETL code in Spark.
- Make sure to repartition the DataFrame after filtering
- Custom DataFrame transformations should be broken up, tested individually, and then chained in a
model()
method - Create
EtlDefinition
objects to organize your ETL logic and make sure all of your method signatures are correct
I use the Databricks API, AWS Lambda, and Slack Slash commands to execute ETL jobs directly from Slack. I highly recommend this workflow!