Creating a PySpark project with pytest, pyenv, and egg files
This tutorial will show you how to create a PySpark project with a DataFrame transformation, a test, and a module that manages the SparkSession from scratch.
New PySpark projects should use Poetry to build wheel files as described in this blog post. The pip / egg workflow outlined in this post still works, but the Poetry / wheel approach is better.
We will create an application called gill and the example source code is uploaded to GitHub.
Create a project skeleton
Let’s start by creating a .python-version
file to specify the Python version with pyenv. Our directory structure will look like this.
gill/
.python-version
Add 3.6.1
to the .python-version
file to ensure everyone uses the right Python version.
Let’s follow Python packaging conventions and add a gill/
subdirectory for the application code. While we’re at it, we’ll create an __init__.py
file.
gill/
gill/
__init__.py
.python-version
Add the SparkSession
Add a spark.py
file so we can access the SparkSession in our application code and test suite.
gill/
gill/
__init__.py
spark.py
.python-version
Add this code to the spark.py
file:
from pyspark.sql import SparkSession
from functools import lru_cache
@lru_cache(maxsize=None)
def get_spark():
return (SparkSession.builder
.master("local")
.appName("gill")
.getOrCreate())
You can import spark.py
into other files to access the SparkSession. Notice that the getOrCreate()
method will reuse any SparkSessions that has already been created. Creating a SparkSession is expensive and getOrCreate()
will make our test suite run faster because all tests will use the same SparkSession.
The get_spark()
function is memoized with the @lru_cache
decorator, since the function will be called frequently.
Your production environment will define a specialized SparkSession. For example, the Databricks production environment provides a SparkSession with configuration to run on ec2 clusters. The getOrCreate()
method will fetch the SparkSession that is created by Databricks when this code is run in production.
This SparkSession management solution works perfectly in the test, development, and production environments. Win, win, win!
Add a DataFrame Transformation
Let’s add a mission.py
file with a DataFrame transformation that appends a life_goal
column to a DataFrame.
gill/
gill/
__init__.py
mission.py
spark.py
.python-version
Add the following DataFrame transformation to the mission.py
file.
import pyspark.sql.functions as F
def with_life_goal(df):
return df.withColumn("life_goal", F.lit("escape!"))
Important notes on the code:
- Consistent with PySpark best practices, we’re importing the PySpark SQL functions as
F
. - The DataFrame.withColumn method is used to append a column to a DataFrame.
Adding tests
Let’s add a test_mission.py
file to ensure the with_life_goal()
transformation is working as expected. Here’s what the directory structure will look like after the file is added.
gill/
gill/
__init__.py
mission.py
spark.py
tests/
test_mission.py
.python-version
Here are the contents of the test_mission.py
file:
import pytest
from gill.spark import get_spark
from gill.mission import with_life_goal
class TestMission(object):
def test_with_life_goal(self):
source_data = [
("jose", 1),
("li", 2)
]
source_df = get_spark().createDataFrame(
source_data,
["name", "age"]
)
actual_df = with_life_goal(source_df)
expected_data = [
("jose", 1, "escape!"),
("li", 2, "escape!")
]
expected_df = get_spark().createDataFrame(
expected_data,
["name", "age", "life_goal"]
)
assert(expected_df.collect() == actual_df.collect())
Run pytest
on the command line to execute the test.
Important notes on the code:
- We import the
spark.py
code that provides aget_spark()
function to access the SparkSession. We use thecreateDataFrame()
method with the SparkSession to create thesource_df
andexpected_df
. - The
collect()
method is used to compare DataFrame equality. This DataFrame comparison methodology is fast and yields readable error messages. Try to break the test intentionally to see the error message.
Troubleshooting
Your environment probably isn’t setup correctly if you’re unable to run the tests. Try following this guide to setup your machine.
Setting up Python and PySpark environments is hard. I’ll try to create a better guide on how to setup PySpark soon!
Packaging egg files
Add a setup.py
file to describe the project.
from setuptools import setup
setup(name='gill',
version='0.0.1',
description='A sample PySpark application',
url='http://github.com/mrpowers/gill',
author='MrPowers',
author_email='matthewkevinpowers@gmail.com',
packages=['gill'],
zip_safe=False)
Runpython setup.py bdist_egg
on the command line to package your code in a dist/gill-0.0.1-py3.6.egg
file that can be attached to Spark clusters in production or included in a PySpark console.
Wheels are the new standard for packaging Python projects and replace egg files. There’s even a website that’s tracking how many Python projects have transitioned to the Wheel standard.
It’s easy to use Poetry to build wheel files.
Include the egg file in the PySpark console
Change into the the Spark directory on your computer.
cd ~/spark-2.2.0-bin-hadoop2.7/bin
Start the PySpark console and attach the egg file.
./pyspark --py-files ~/Documents/code/my_apps/gill/dist/gill-0.0.1-py3.6.egg
From the PySpark REPL, you can import the gill code and execute the application code.
>>> from gill.spark import *
>>> from gill.mission import with_life_goal
>>> source_data = [
... ("jose", 1),
... ("li", 2)
... ]
>>> source_df = spark.createDataFrame(
... source_data,
... ["name", "age"]
... )
>>> actual_df = with_life_goal(source_df)
>>> actual_df.show()
+----+---+---------+
|name|age|life_goal|
+----+---+---------+
|jose| 1| escape!|
| li| 2| escape!|
+----+---+---------+
N.B. It’s almost alway better to write tests than use the PySpark console.
The gill library can be attached to spark-submit
commands for launching applications in a similar manner.
Add a requirements.txt file
Add a requirements.txt
file to specify your project requirements.
pytest==3.2.2
pyspark==2.2.0
setuptools==28.8.0
All Python project should have requirements.txt
files so it’s easier for other developers to install the project requirements on their machines.
Next steps
You can now create a project, package a DataFrame transformation as an egg file, upload the egg file to a service provider like Databricks, and run production analyses. Congratulations!
Look at the Optimus data munging library and the Quinn helper library for functions that will make you a more efficient PySpark programmer.
Then start playing around with TensorFlowOnSpark and the pyspark.ml package.
Understanding the basics that we’ve covered in this blog post will help you explore more complicated Python libraries with ease.