Joining External Data Files with Spark DataFrames

You’ll often need to incorporate external data files in your Spark applications. You’ll also want to store data for large arrays or maps in data files rather than store them in code.

You’ll greatly improve the quality and maintainability of your code by knowing when to make data file abstractions and by maintaining these files as single source of truth data stores.

This post will explain how to structure data files, testing code that uses data files, joining data files with DataFrames, and converting DataFiles to Arrays / Maps.

Joining a data file with a DataFrame

state_name,state_abbreviation
Tennessee,TN
New York,NY
Mississippi,MS

Let’s create a DataFrame and use a broadcast join to append the state_abbreviation column to the DataFrame.

val sourceDF = Seq(
("britney spears", "Mississippi"),
("romeo santos", "New York"),
("miley cyrus", "Tennessee"),
("random dude", null),
(null, "Dubai")
).toDF("name", "birth_state")

val stateMappingsDF = spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv("/some/path/state_mappings.csv")

val resultDF = sourceDF.join(
broadcast(stateMappingsDF),
sourceDF("birth_state") <=> stateMappingsDF("state_name"),
"left_outer"
).drop(stateMappingsDF("state_name"))

resultDF.show()
+--------------+-----------+------------------+
| name|birth_state|state_abbreviation|
+--------------+-----------+------------------+
|britney spears|Mississippi| MS|
| romeo santos| New York| NY|
| miley cyrus| Tennessee| TN|
| random dude| null| null|
| null| Dubai| null|
+--------------+-----------+------------------+

You’ll need to update /some/path/state_mappings.csv to get this code to work on your machine.

Here are some common design patterns you’ll follow when appending a column to a DataFrame based on a data file:

  • Using the null safe equality operator (<=>) to match columns when making joins
  • Doing a broadcast join (broadcast(stateMappingsDF))
  • Doing a left_outer join
  • Dropping columns in the mapping DataFrame after the join is executed (drop(stateMappingsDF(“state_name”)))

This design pattern is only performant when the data file is small enough to be broadcasted. You’ll need to take a different approach for big data files.

Let’s refactor this code, so we don’t need to hardcode the data file path.

Managing data file paths with a Config object

Let’s make a Config object with different paths for the data file depending on the PROJECT_ENV environment variable.

object Config {

val test: Map[String, String] = {
Map(
"stateMappingsPath" -> new java.io.File(s"./src/test/resources/state_mappings.csv").getCanonicalPath
)
}

val production: Map[String, String] = {
Map(
"stateMappingsPath" -> "s3a://some-fake-bucket/state_mappings.csv"
)
}

var environment = sys.env.getOrElse("PROJECT_ENV", "production")

def get(key: String): String = {
if (environment == "test") {
test(key)
} else {
production(key)
}
}

}

Now let’s update the build.sbt file to set the PROJECT_ENV variable to test in the test environment.

envVars in Test := Map("PROJECT_ENV" -> "test")

We’re ready to refactor our code that reads in the mapping CSV file.

val stateMappingsDF = spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv(Config.get("stateMappingsPath"))

The same code will read in different mapping files depending on the PROJECT_ENV — amazing! 😺

Accessing data file columns as arrays

spark-daria defines helper methods to convert DataFrames into Arrays or Maps. Let’s use the DataFrameHelpers.columnToArray method defined in spark-daria to convert the state_abbreviations column from the state_mappings.csv file to an Array of strings.

val stateMappingsDF = spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv(Config.get("stateMappingsPath"))

val abbreviations = DataFrameHelpers.columnToArray[String](
stateMappingsDF,
"state_abbreviation"
)
println(abbreviations.mkString(" ")) // prints "TN NY MS"

Let’s create another DataFrame and use the abbreviations Array to append an is_valid_state_abbr column.

val sourceDF = Seq(
("NY"),
("nowhere"),
(null)
).toDF("state_abbr")

val resultDF = sourceDF.withColumn(
"is_valid_state_abbr",
col("state_abbr").isin(abbreviations: _*)
)

resultDF.show()
+----------+-------------------+
|state_abbr|is_valid_state_abbr|
+----------+-------------------+
| NY| true|
| nowhere| false|
| null| null|
+----------+-------------------+

It’s always better to abstract large arrays into data files. You shouldn’t manage huge arrays in your codebase.

Structuring data files

Here’s an example of a well structured data file:

city,state
houston,texas
charleston,south carolina
louisville,kentucky

Each row in the data file contains related data and each row has two columns.

Storing small data files in GitHub

GitHub data files are easily searchable and benefit from version control.

You can use the Python Pandas library and Invoke to create tasks that sort, de-duplicate and upload your data files from the Git repository to S3. Message me in the comments if you’d like more instructions and code snippets on how to do this easily.

Next steps

You can manage the data files in GitHub or a relational database and create automated tasks to upload the data files to S3. I’ve found Spark connectors to relational databases like Redshift to be slow and finicky, so it’s best to make your data files available to Spark via S3.

It’s important to manage your data files as single source of truth data stores. The data in GitHub (or the relational database) should be treated as the single source of truth and the replication in S3 is only to make it easier to work with Spark.

Never modify a data file in S3 directly. Always update the single source of truth data store and then run the automated task to upload the new data file to S3.

Remember to use the spark-daria helper methods whenever converting a data file to an Array or a Map. You don’t want to litter your codebase with that complex logic.

External data files make code that’s easier to maintain, test, and understand by non-coders! You can even put non-coders in charge of the data files, so they can make changes that are immediately reflected in your systems without doing a code deploy.

Spark coder, live in Colombia / Brazil / US, love Scala / Python / Ruby, working on empowering Latinos and Latinas in tech