Creating a PySpark project with pytest, pyenv, and egg files
This tutorial will show you how to create a PySpark project with a DataFrame transformation, a test, and a module that manages the SparkSession from scratch.
New PySpark projects should use Poetry to build wheel files as described in this blog post. The pip / egg workflow outlined in this post still works, but the Poetry / wheel approach is better.
Create a project skeleton
Let’s start by creating a
.python-version file to specify the Python version with pyenv. Our directory structure will look like this.
3.6.1 to the
.python-version file to ensure everyone uses the right Python version.
Let’s follow Python packaging conventions and add a
gill/ subdirectory for the application code. While we’re at it, we’ll create an
Add the SparkSession
spark.py file so we can access the SparkSession in our application code and test suite.
Add this code to the
from pyspark.sql import SparkSession
from functools import lru_cache
You can import
spark.py into other files to access the SparkSession. Notice that the
getOrCreate() method will reuse any SparkSessions that has already been created. Creating a SparkSession is expensive and
getOrCreate() will make our test suite run faster because all tests will use the same SparkSession.
get_spark() function is memoized with the
@lru_cache decorator, since the function will be called frequently.
Your production environment will define a specialized SparkSession. For example, the Databricks production environment provides a SparkSession with configuration to run on ec2 clusters. The
getOrCreate() method will fetch the SparkSession that is created by Databricks when this code is run in production.
This SparkSession management solution works perfectly in the test, development, and production environments. Win, win, win!
Add a DataFrame Transformation
Let’s add a
mission.py file with a DataFrame transformation that appends a
life_goal column to a DataFrame.
Add the following DataFrame transformation to the
import pyspark.sql.functions as F
return df.withColumn("life_goal", F.lit("escape!"))
Important notes on the code:
- Consistent with PySpark best practices, we’re importing the PySpark SQL functions as
- The DataFrame.withColumn method is used to append a column to a DataFrame.
Let’s add a
test_mission.py file to ensure the
with_life_goal() transformation is working as expected. Here’s what the directory structure will look like after the file is added.
Here are the contents of the
from gill.spark import get_spark
from gill.mission import with_life_goal
source_data = [
source_df = get_spark().createDataFrame(
actual_df = with_life_goal(source_df)
expected_data = [
("jose", 1, "escape!"),
("li", 2, "escape!")
expected_df = get_spark().createDataFrame(
["name", "age", "life_goal"]
assert(expected_df.collect() == actual_df.collect())
pytest on the command line to execute the test.
Important notes on the code:
- We import the
spark.pycode that provides a
get_spark()function to access the SparkSession. We use the
createDataFrame()method with the SparkSession to create the
collect()method is used to compare DataFrame equality. This DataFrame comparison methodology is fast and yields readable error messages. Try to break the test intentionally to see the error message.
Your environment probably isn’t setup correctly if you’re unable to run the tests. Try following this guide to setup your machine.
Setting up Python and PySpark environments is hard. I’ll try to create a better guide on how to setup PySpark soon!
Packaging egg files
setup.py file to describe the project.
from setuptools import setup
description='A sample PySpark application',
python setup.py bdist_egg on the command line to package your code in a
dist/gill-0.0.1-py3.6.egg file that can be attached to Spark clusters in production or included in a PySpark console.
Include the egg file in the PySpark console
Change into the the Spark directory on your computer.
Start the PySpark console and attach the egg file.
./pyspark --py-files ~/Documents/code/my_apps/gill/dist/gill-0.0.1-py3.6.egg
From the PySpark REPL, you can import the gill code and execute the application code.
>>> from gill.spark import *
>>> from gill.mission import with_life_goal
>>> source_data = [
... ("jose", 1),
... ("li", 2)
>>> source_df = spark.createDataFrame(
... ["name", "age"]
>>> actual_df = with_life_goal(source_df)
|jose| 1| escape!|
| li| 2| escape!|
N.B. It’s almost alway better to write tests than use the PySpark console.
The gill library can be attached to
spark-submit commands for launching applications in a similar manner.
Add a requirements.txt file
requirements.txt file to specify your project requirements.
All Python project should have
requirements.txt files so it’s easier for other developers to install the project requirements on their machines.
You can now create a project, package a DataFrame transformation as an egg file, upload the egg file to a service provider like Databricks, and run production analyses. Congratulations!
Understanding the basics that we’ve covered in this blog post will help you explore more complicated Python libraries with ease.