Dependency Injection with Spark

Matthew Powers
3 min readJun 25, 2019

Dependency injection is a design pattern that let’s you write Spark code that’s more flexible and easier to test.

This blog post introduces code that has a dependency, shows how to inject the path as a dependency, and then shows how to inject an entire DataFrame.

Code with a dependency

Let’s create a withStateFullName method that appends a state_name column to a DataFrame.

def withStateFullName()(df: DataFrame): DataFrame = {
val stateMappingsDF = spark
.read
.option("header", true)
.csv(Config.get("stateMappingsPath"))
df
.join(
broadcast(stateMappingsDF),
df("state") <=> stateMappingsDF("state_abbreviation"),
"left_outer"
)
.drop("state_abbreviation")
}

The withStateFullName appends the state_name column with a broadcast join.

withStateFullName depends on the Config object. withStateFullName “has a dependency”. This is the dependency that’ll be “injected”.

The Config object is defined as follows:

object Config {

val test: Map[String, String] = {
Map(
"stateMappingsPath" -> new java.io.File(s"./src/test/resources/state_mappings.csv").getCanonicalPath
)
}

val production: Map[String, String] = {
Map(
"stateMappingsPath" -> "s3a://some-fake-bucket/state_mappings.csv"
)
}

var environment = sys.env.getOrElse("PROJECT_ENV", "production")

def get(key: String): String = {
if (environment == "test") {
test(key)
} else {
production(key)
}
}

}

This blog post describes environment specific configuration in Scala projects if you’re interested in more details about this design pattern.

Let’s define a src/test/resources/state_mappings.csv file, so we can run the withStateFullName method on some sample data.

state_name,state_abbreviation
Tennessee,TN
New York,NY
Mississippi,MS

Run the withStateFullName method.

val df = Seq(
("john", 23, "TN"),
("sally", 48, "NY")
).toDF("first_name", "age", "state")

df
.transform(withStateFullName())
.show()
+----------+---+-----+----------+
|first_name|age|state|state_name|
+----------+---+-----+----------+
| john| 23| TN| Tennessee|
| sally| 48| NY| New York|
+----------+---+-----+----------+

Let’s refactor the withStateFullName so it does not depend on the Config object. In other words, let’s remove the Config dependency from withStateFullName with the dependency injection design pattern.

Injecting a path

Let’s create a withStateFullNameInjectPath method that takes the path to the state mappings data as an argument.

def withStateFullNameInjectPath(
stateMappingsPath: String = Config.get("stateMappingsPath")
)(df: DataFrame): DataFrame = {
val stateMappingsDF = spark
.read
.option("header", true)
.csv(stateMappingsPath)
df
.join(
broadcast(stateMappingsDF),
df("state") <=> stateMappingsDF("state_abbreviation"),
"left_outer"
)
.drop("state_abbreviation")
}

The stateMappingsPath leverages a smart default, so users can easily use the function without explicitly referring to the path. This code is more flexible because it allows users to override the smart default and use any stateMappingsPath when running the function.

Let’s rely on the smart default and run this code.

val df = Seq(
("john", 23, "TN"),
("sally", 48, "NY")
).toDF("first_name", "age", "state")

df
.transform(withStateFullNameInjectPath())
.show()
+----------+---+-----+----------+
|first_name|age|state|state_name|
+----------+---+-----+----------+
| john| 23| TN| Tennessee|
| sally| 48| NY| New York|
+----------+---+-----+----------+

The withStateFullNameInjectPath method does not depend on the Config object.

Injecting an entire DataFrame

Let’s refactor the code again to inject the entire DataFrame as an argument, again with a smart default.

def withStateFullNameInjectDF(
stateMappingsDF: DataFrame = spark
.read
.option("header", true)
.csv(Config.get("stateMappingsPath"))
)(df: DataFrame): DataFrame = {
df
.join(
broadcast(stateMappingsDF),
df("state") <=> stateMappingsDF("state_abbreviation"),
"left_outer"
)
.drop("state_abbreviation")
}

This code provides the same functionality and is even more flexible. We can now run the function with any DataFrame. We can read a Parquet file and run this code or create a DataFrame with toDF in our test suite.

Let’s override the smart default and run this code in our test suite:

val stateMappingsDF = Seq(
("Tennessee", "TN"),
("New York", "NY")
).toDF("state_full_name", "state_abbreviation")

val df = Seq(
("john", 23, "TN"),
("sally", 48, "NY")
).toDF("first_name", "age", "state")

df
.transform(withStateFullNameInjectDF(stateMappingsDF))
.show()
+----------+---+-----+---------------+
|first_name|age|state|state_full_name|
+----------+---+-----+---------------+
| john| 23| TN| Tennessee|
| sally| 48| NY| New York|
+----------+---+-----+---------------+

Injecting the entire DataFrame as a dependency allows us to test our code without reading from a file. Avoiding file I/O in your test suite is a great way to make your tests run faster.

This design pattern also makes your tests more readable. Your coworkers won’t need to open up random CSV files to understand the tests.

Conclusion

Dependency injection can be used to make code that’s more flexible and easier to test.

We went from having code that relied on a CSV file stored in a certain path to code that’s flexible enough to be run with any DataFrame.

Before productionalizing this code, it’d be a good idea to run some DataFrame validations (on both the underlying DataFrame and the injected DataFrame) and make the code even more flexible by making it schema independent.

Make sure to leverage this design pattern so you don’t need to read from CSV / Parquet files in your test suite anymore!

--

--

Matthew Powers

Spark coder, live in Colombia / Brazil / US, love Scala / Python / Ruby, working on empowering Latinos and Latinas in tech