Dependency Injection with Spark

Dependency injection is a design pattern that let’s you write Spark code that’s more flexible and easier to test.

This blog post introduces code that has a dependency, shows how to inject the path as a dependency, and then shows how to inject an entire DataFrame.

Code with a dependency

Let’s create a method that appends a column to a DataFrame.

def withStateFullName()(df: DataFrame): DataFrame = {
val stateMappingsDF = spark
.read
.option("header", true)
.csv(Config.get("stateMappingsPath"))
df
.join(
broadcast(stateMappingsDF),
df("state") <=> stateMappingsDF("state_abbreviation"),
"left_outer"
)
.drop("state_abbreviation")
}

The appends the column with a broadcast join.

depends on the object. “has a dependency”. This is the dependency that’ll be “injected”.

The object is defined as follows:

object Config {

val test: Map[String, String] = {
Map(
"stateMappingsPath" -> new java.io.File(s"./src/test/resources/state_mappings.csv").getCanonicalPath
)
}

val production: Map[String, String] = {
Map(
"stateMappingsPath" -> "s3a://some-fake-bucket/state_mappings.csv"
)
}

var environment = sys.env.getOrElse("PROJECT_ENV", "production")

def get(key: String): String = {
if (environment == "test") {
test(key)
} else {
production(key)
}
}

}

This blog post describes environment specific configuration in Scala projects if you’re interested in more details about this design pattern.

Let’s define a file, so we can run the method on some sample data.

state_name,state_abbreviation
Tennessee,TN
New York,NY
Mississippi,MS

Run the method.

val df = Seq(
("john", 23, "TN"),
("sally", 48, "NY")
).toDF("first_name", "age", "state")

df
.transform(withStateFullName())
.show()
+----------+---+-----+----------+
|first_name|age|state|state_name|
+----------+---+-----+----------+
| john| 23| TN| Tennessee|
| sally| 48| NY| New York|
+----------+---+-----+----------+

Let’s refactor the so it does not depend on the object. In other words, let’s remove the dependency from with the dependency injection design pattern.

Injecting a path

Let’s create a method that takes the path to the state mappings data as an argument.

def withStateFullNameInjectPath(
stateMappingsPath: String = Config.get("stateMappingsPath")
)(df: DataFrame): DataFrame = {
val stateMappingsDF = spark
.read
.option("header", true)
.csv(stateMappingsPath)
df
.join(
broadcast(stateMappingsDF),
df("state") <=> stateMappingsDF("state_abbreviation"),
"left_outer"
)
.drop("state_abbreviation")
}

The leverages a smart default, so users can easily use the function without explicitly referring to the path. This code is more flexible because it allows users to override the smart default and use any when running the function.

Let’s rely on the smart default and run this code.

val df = Seq(
("john", 23, "TN"),
("sally", 48, "NY")
).toDF("first_name", "age", "state")

df
.transform(withStateFullNameInjectPath())
.show()
+----------+---+-----+----------+
|first_name|age|state|state_name|
+----------+---+-----+----------+
| john| 23| TN| Tennessee|
| sally| 48| NY| New York|
+----------+---+-----+----------+

The method does not depend on the object.

Injecting an entire DataFrame

Let’s refactor the code again to inject the entire DataFrame as an argument, again with a smart default.

def withStateFullNameInjectDF(
stateMappingsDF: DataFrame = spark
.read
.option("header", true)
.csv(Config.get("stateMappingsPath"))
)(df: DataFrame): DataFrame = {
df
.join(
broadcast(stateMappingsDF),
df("state") <=> stateMappingsDF("state_abbreviation"),
"left_outer"
)
.drop("state_abbreviation")
}

This code provides the same functionality and is even more flexible. We can now run the function with any DataFrame. We can read a Parquet file and run this code or create a DataFrame with in our test suite.

Let’s override the smart default and run this code in our test suite:

val stateMappingsDF = Seq(
("Tennessee", "TN"),
("New York", "NY")
).toDF("state_full_name", "state_abbreviation")

val df = Seq(
("john", 23, "TN"),
("sally", 48, "NY")
).toDF("first_name", "age", "state")

df
.transform(withStateFullNameInjectDF(stateMappingsDF))
.show()
+----------+---+-----+---------------+
|first_name|age|state|state_full_name|
+----------+---+-----+---------------+
| john| 23| TN| Tennessee|
| sally| 48| NY| New York|
+----------+---+-----+---------------+

Injecting the entire DataFrame as a dependency allows us to test our code without reading from a file. Avoiding file I/O in your test suite is a great way to make your tests run faster.

This design pattern also makes your tests more readable. Your coworkers won’t need to open up random CSV files to understand the tests.

Conclusion

Dependency injection can be used to make code that’s more flexible and easier to test.

We went from having code that relied on a CSV file stored in a certain path to code that’s flexible enough to be run with any DataFrame.

Before productionalizing this code, it’d be a good idea to run some DataFrame validations (on both the underlying DataFrame and the injected DataFrame) and make the code even more flexible by making it schema independent.

Make sure to leverage this design pattern so you don’t need to read from CSV / Parquet files in your test suite anymore!

Spark coder, live in Colombia / Brazil / US, love Scala / Python / Ruby, working on empowering Latinos and Latinas in tech

Spark coder, live in Colombia / Brazil / US, love Scala / Python / Ruby, working on empowering Latinos and Latinas in tech