How to write Spark ETL Processes

Spark is a powerful tool for extracting data, running transformations, and loading the results in a data store.

Spark runs computations in parallel so execution is lightning fast and clusters can be scaled up for big data. Spark’s native API and spark-daria’s EtlDefinition object allow for elegant definitions of ETL logic.

Extract

val dataLakeDF = spark.read.parquet("s3a://some-bucket/foo")
val extractDF = dataLakeDF
.where(col("mood") === "happy")
.repartition(10000)

Read this blog post for more information about repartitioning DataFrames. We’re now ready to transform the extractDF.

Transform

Let’s define a couple of DataFrame transformations.

def withGreeting()(df: DataFrame): DataFrame = {
df.withColumn("greeting", lit("hello world"))
}

def withFarewell()(df: DataFrame): DataFrame = {
df.withColumn("farewell", lit("goodbye"))
}

Let’s create a model() function that chains the custom transformations.

def model()(df: DataFrame): DataFrame = {
df
.transform(withGreeting())
.transform(withFarewell())
}

We can run extractDF.transform(model()) to run the transformations on our extract. Easy peasy 😎

Check out this blog post for more details on chaining custom DataFrame transformations.

Load (or Report)

def exampleWriter()(df: DataFrame): Unit = {
val path = "s3a://some-bucket/extracts/bar"
df.write.mode(SaveMode.Overwrite).parquet(path)
}

The writer function should take a DataFrame as an argument and return nothing (Unit).

EtlDefinition

val etl = new EtlDefinition(
sourceDF = extractDF,
transform = model(),
write = exampleWriter()
)

Here’s how to execute the ETL code:

etl.process()

Wow, that was easy 😉 The EtlDefinition object can even be repurposed for making Slack messages! I’ll cover that in another blog post.

Take a look at the method signatures of the EtlDefinition arguments and make sure you understand how the functions we’ve defined fit into this mold.

case class EtlDefinition(
sourceDF: DataFrame,
transform: (DataFrame => DataFrame),
write: (DataFrame => Unit),
metadata: scala.collection.mutable.Map[String, Any] = scala.collection.mutable.Map[String, Any]()
) {

def process(): Unit = {
write(sourceDF.transform(transform))
}

}

Take note that EtlDefinition objects can optionally be instantiated with an arbitrary metadata Map.

Multiple EtlDefinitions

val etls = scala.collection.mutable.Map[String, EtlDefinition]()
etls += ("bar" -> etl)
etls += ("baz" -> etl2)
etls("bar").process()

Next Steps

  • Make sure to repartition the DataFrame after filtering
  • Custom DataFrame transformations should be broken up, tested individually, and then chained in a model() method
  • Create EtlDefinition objects to organize your ETL logic and make sure all of your method signatures are correct

I use the Databricks API, AWS Lambda, and Slack Slash commands to execute ETL jobs directly from Slack. I highly recommend this workflow!

Spark coder, live in Colombia / Brazil / US, love Scala / Python / Ruby, working on empowering Latinos and Latinas in tech

Spark coder, live in Colombia / Brazil / US, love Scala / Python / Ruby, working on empowering Latinos and Latinas in tech