Testing Spark Applications

Matthew Powers
7 min readJan 14, 2017

--

Testing Spark applications allows for a rapid development workflow and gives you confidence that your code will work in production.

Most Spark users spin up clusters with sample data sets to develop code — this is slow (clusters are slow to start) and costly (you need to pay for computing resources).

An automated test suite lets you develop code on your local machine free of charge. Test files should run in under a minute, so it’s easy to rapidly iterate.

The test suite documents how the code functions, reduces bugs, and makes it easier to add new features without breaking existing code.

If you’re using the Scala API, continue reading this blog post. If you’re using Python, switch to this blog post on Testing PySpark Code.

We’ll talk about more benefits of testing later. Let’s start with some simple examples!

Hello World Example

The spark-test-examples repository contains all the code snippets covered in this tutorial! The spark-fast-tests library is used to make DataFrame comparisons.

The following HelloWorld object contains a withGreeting method that appends a greeting column to a DataFrame.

package com.github.mrpowers.spark.test.example

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

object HelloWorld {

def withGreeting()(df: DataFrame): DataFrame = {
df.withColumn("greeting", lit("hello world"))
}

}

Suppose we start with a DataFrame that looks like this:

+------+
| name|
+------+
|miguel|
| luisa|
+------+

When we run the HelloWorld.withGreeting() method, we should get a new DataFrame that looks like this:

+------+-----------+
| name| greeting|
+------+-----------+
|miguel|hello world|
| luisa|hello world|
+------+-----------+

Add a SparkSessionTestWrapper trait in the test directory so we can create DataFrames in our test suite via the SparkSession.

package com.github.mrpowers.spark.test.example

import org.apache.spark.sql.SparkSession

trait SparkSessionTestWrapper {

lazy val spark: SparkSession = {
SparkSession
.builder()
.master("local")
.appName("spark test example")
.getOrCreate()
}

}

Let’s write a test that creates a DataFrame, runs the withGreeting() method, and confirms that the greeting column has been properly appended to the DataFrame.

package com.github.mrpowers.spark.test.example

import com.github.mrpowers.spark.fast.tests.DataFrameComparer
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.scalatest.FunSpec

class HelloWorldSpec
extends FunSpec
with DataFrameComparer
with SparkSessionTestWrapper {

import spark.implicits._

it("appends a greeting column to a Dataframe") {

val sourceDF = Seq(
("miguel"),
("luisa")
).toDF("name")

val actualDF = sourceDF.transform(HelloWorld.withGreeting())

val expectedSchema = List(
StructField("name", StringType, true),
StructField("greeting", StringType, false)
)

val expectedData = Seq(
Row("miguel", "hello world"),
Row("luisa", "hello world")
)

val expectedDF = spark.createDataFrame(
spark.sparkContext.parallelize(expectedData),
StructType(expectedSchema)
)

assertSmallDataFrameEquality(actualDF, expectedDF)

}

}

The test file is pretty verbose… welcome to Scala!

Some notable points in the test file:

  1. We need to run import spark.implicits._ to access the toDF helper method that creates sourceDF.
  2. The expectedDF cannot be created with the toDF helper method. toDF allows the greeting column to be null, see the third argument in the following method — StructField(“greeting”, StringType, true). We need the greeting column to be StructField(“greeting”, StringType, false).
  3. The assertSmallDataFrameEquality() function compares the equality of two DataFrames. We need to include the DataFrameComparer trait in the test class definition and setup the project with spark-fast-tests to access this method.

The HelloWorld and HelloWorldSpec files are checked into GitHub if you’d like to clone the repo and play with the examples yourself.

Deeper Dive into StructField

StructField takes three arguments:

  1. The column name
  2. The column type (notice that these are imported from org.apache.spark.sql.types)
  3. A boolean value that indicates if the column is nullable. If the this argument is set to true, then the column can contain null values.

Testing a User Defined Function

Let’s create a user defined function that returns true if a number is even and false otherwise.

The code is quite simple.

package com.github.mrpowers.spark.test.example

import org.apache.spark.sql.functions._

object NumberFun {

def isEven(n: Integer): Boolean = {
n % 2 == 0
}

val isEvenUDF = udf[Boolean, Integer](isEven)

}

The test isn’t too complicated, but prepare yourself for a wall of code.

package com.github.mrpowers.spark.test.example

import org.scalatest.FunSpec
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import com.github.mrpowers.spark.fast.tests.DataFrameComparer

class NumberFunSpec
extends FunSpec
with DataFrameComparer
with SparkSessionTestWrapper {

import spark.implicits._

it("appends an is_even column to a Dataframe") {

val sourceDF = Seq(
(1),
(8),
(12)
).toDF("number")

val actualDF = sourceDF
.withColumn("is_even", NumberFun.isEvenUDF(col("number")))

val expectedSchema = List(
StructField("number", IntegerType, false),
StructField("is_even", BooleanType, true)
)

val expectedData = Seq(
Row(1, false),
Row(8, true),
Row(12, true)
)

val expectedDF = spark.createDataFrame(
spark.sparkContext.parallelize(expectedData),
StructType(expectedSchema)
)

assertSmallDataFrameEquality(actualDF, expectedDF)

}
}

We create a DataFrame, run the NumberFun.isEvenUDF() function, create another expected DataFrame, and compare the actual result with our expectations using assertSmallDataFrameEquality() from spark-fast-tests.

We can improve by testing isEven() on a standalone basis and cover the edge cases. Here are some tests we might like to add.

describe(".isEven") {
it("returns true for even numbers") {
assert(NumberFun.isEven(4) === true)
}

it("returns false for odd numbers") {
assert(NumberFun.isEven(3) === false)
}

it("returns false for null values") {
assert(NumberFun.isEven(null) === false)
}
}

The first two tests pass with our existing code, but the third one causes the code to error out with a NullPointerException. If we’d like our user defined function to assume that this function will never be called on columns that are nullable, we might be able to get away with ignoring null values.

It’s probably safter to account for null values and refactor the code accordingly. This blog post explains how to deal with null gracefully in UDFs.

A Real Test

Let’s write a test for a function that converts all the column names of a DataFrame to snake_case. This will make it a lot easier to run SQL queries off of the DataFrame.

package com.github.mrpowers.spark.test.example

import org.apache.spark.sql.DataFrame

object Converter {

def snakecaseify(s: String): String = {
s.toLowerCase().replace(" ", "_")
}

def snakeCaseColumns(df: DataFrame): DataFrame = {
df.columns.foldLeft(df) { (acc, cn) =>
acc.withColumnRenamed(cn, snakecaseify(cn))
}
}

}

snakecaseify is a pure function and will be tested using the Scaliest assert() method. We’ll compare the equality of two DataFrames to test the snakeCaseColumns method.

package com.github.mrpowers.spark.test.example

import com.github.mrpowers.spark.fast.tests.DataFrameComparer
import org.scalatest.FunSpec

class ConverterSpec
extends FunSpec
with DataFrameComparer
with SparkSessionTestWrapper {

import spark.implicits._

describe(".snakecaseify") {

it("downcases uppercase letters") {
assert(Converter.snakecaseify("HeLlO") === "hello")
}

it("converts spaces to underscores") {
assert(Converter.snakecaseify("Hi There") === "hi_there")
}

}

describe(".snakeCaseColumns") {

it("snake_cases the column names of a DataFrame") {

val sourceDF = Seq(
("funny", "joke")
).toDF("A b C", "de F")

val actualDF = Converter.snakeCaseColumns(sourceDF)

val expectedDF = Seq(
("funny", "joke")
).toDF("a_b_c", "de_f")

assertSmallDataFrameEquality(actualDF, expectedDF)

}

}

}

This test file uses the describe method to group tests associated with the snakecaseify() and snakeCaseColumns() methods. This makes separates code in the test file and makes the console output more clear when the tests are run.

How Testing Improves Your Codebase

Sandi Metz lists some benefits of testing in her book Practical Object Oriented Design with Ruby. Let’s see how her list applies to Spark applications.

Finding Bugs

When writing user defined functions or DataFrame transformations that will process billions of rows of data, you will likely encounter bad data. There will be strange characters, null values, and other inconsistencies. Testing encourages you to proactively deal with edge cases. If your code breaks with a production anomaly, you can add another test to make sure the edge case doesn’t catch you again.

Supplying Documentation

It is often easier to understand code by reading the tests! When I need to grok some new code, I start with the tests and then progress to the source code.

API documentation can sometimes fall out of sync with the actual code. A developer may update the code and forget to update the API documentation.

The test suite won’t fall out of sync with the code. If the code changes and the tests start failing, the developer will remember to update the test suite.

Exposing Design Flaws

Poorly designed code is difficult to test. If it’s hard to write a test, you’ll be forced to refactor the code. Automated tests incentivize well written code.

Advice from the Trenches

The spark-test-example GitHub repository contains all the examples that were covered in this blog. Clone the repo, run sbt test, and play around with the code to get your hands dirty.

The FunSpec traits is included in the test suites to make the code more readable. Following the Ruby convention of grouping tests for individual functions in a describe block and giving each spec a descriptive title makes it easier to read the test output.

Use a continuous intergration tool to build your project every time it’s merged with master.

CircleCI Output in Slack

When the test suite fails, it should be broadcasted loudly and the bugs should be fixed immediately.

Running a Single Test File

Create a workflow that enables you to run a single test file, so the tests run quicker. The IntelliJ text editor makes it easy to run a single text file or you can use the following command in your Terminal.

sbt "test-only *HelloWorldSpec"

Spend the time to develop a fluid development workflow, so testing is a delight.

Debugging with show()

The show() method can be called in the test suite to output the DataFrame contents in the console. Use actualDF.show() and expectedDF.show() whenever you’re debugging a failing spec with a cryptic error message.

Mocking is Limited in Scala!

ScalaMock3 can only mock traits and no-args classes — Paul Butcher, author of ScalaMock

ScalaMock3 doesn’t support objects or classes with arguments, arguably the most common language constructs you’d like to mock.

ScalaMock4 will be significantly more useable, but it doesn’t look like the project is making forward progress. ScalaMock4 was initially blocked on the release of scala.meta, but scala.meta has since been released and it doesn’t look like development ScalaMock4 has progressed.

Let’s hope mocking in Scala gets better soon!

Should You Test Your Spark Projects?

I started using Spark in the browser-based Databricks notebooks and the development workflow was painful. Editing text in the browser is slow and buggy. I was manually testing my code and other developers were reusing my code by copying and pasting the functions.

Developing Spark code in tested SBT projects is much better!

Other programming languages make testing easier. For example, with Ruby, it is easier to stub / mock, the testing frameworks have more features, and the community encourages testing. Scala only has one book on testing and it doesn’t get great reviews.

Testing in Scala / Spark isn’t perfect, but it’s still a lot better than not writing tests at all. It’s taken me a few months to develop a workflow that I’m comfortable with and I hope this blog post will accelerate your path to developing an awesome Spark testing workflow!

Once you’ve got the basics down, make sure to read this blog post on designing easily testable Spark code and this blog post on cutting the run time of a Spark test suite by 40%+.

--

--

Matthew Powers

Spark coder, live in Colombia / Brazil / US, love Scala / Python / Ruby, working on empowering Latinos and Latinas in tech