Working with Spark ArrayType and MapType Columns

Matthew Powers
4 min readDec 17, 2017

--

Spark DataFrame columns support arrays and maps, which are great for data sets that have an arbitrary length.

This blog post has been completely updated and here are the latest versions:

This blog post will demonstrate Spark methods that return ArrayType columns, describe how to create your own ArrayType / MapType columns, and explain when these column types are suitable for your DataFrames.

Splitting a string into an ArrayType column

Let’s create a DataFrame with a name column and a hit_songs pipe delimited string. Then let’s use the split() method to convert hit_songs into an array of strings.

val singersDF = Seq(
("beatles", "help|hey jude"),
("romeo", "eres mia")
).toDF("name", "hit_songs")

val actualDF = singersDF.withColumn(
"hit_songs",
split(col("hit_songs"), "\\|")
)

actualDF.show()
+-------+----------------+
| name| hit_songs|
+-------+----------------+
|beatles|[help, hey jude]|
| romeo| [eres mia]|
+-------+----------------+
actualDF.printSchema()root
|-- name: string (nullable = true)
|-- hit_songs: array (nullable = true)
| |-- element: string (containsNull = true)

An ArrayType column is suitable in this example because a singer can have an arbitrary amount of hit songs. We don’t want to create a DataFrame with hit_song1, hit_song2, …, hit_songN columns.

Directly creating an ArrayType column

Let’s use the spark-daria createDF method to create a DataFrame with an ArrayType column directly. See this blog post for more information about the createDF method.

We’ll create another singersDF with some different artists.

val singersDF = spark.createDF(
List(
("bieber", Array("baby", "sorry")),
("ozuna", Array("criminal"))
), List(
("name", StringType, true),
("hit_songs", ArrayType(StringType, true), true)
)
)

singersDF.show()
+------+-------------+
| name| hit_songs|
+------+-------------+
|bieber|[baby, sorry]|
| ozuna| [criminal]|
+------+-------------+
singersDF.printSchema()root
|-- name: string (nullable = true)
|-- hit_songs: array (nullable = true)
| |-- element: string (containsNull = true)

The ArrayType case class is instantiated with an elementType and a containsNull flag.

Directly creating a MapType column

Let’s create a MapType column that lists a good song and a bad song of some singers.

val singersDF = spark.createDF(
List(
("sublime", Map(
"good_song" -> "santeria",
"bad_song" -> "doesn't exist")
),
("prince_royce", Map(
"good_song" -> "darte un beso",
"bad_song" -> "back it up")
)
), List(
("name", StringType, true),
("songs", MapType(StringType, StringType, true), true)
)
)

singersDF.show()
+------------+--------------------+
| name| songs|
+------------+--------------------+
| sublime|Map(good_song -> ...|
|prince_royce|Map(good_song -> ...|
+------------+--------------------+
singersDF.printSchema()root
|-- name: string (nullable = true)
|-- songs: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)

The MapType case class takes three arguments: the keyType, the valueType, and the valueContainsNull flag.

Here’s how we can display the singer name and their bad song:

singersDF
.select(
col("name"),
col("songs")("bad_song").as("bad song!")
).show()
+------------+-------------+
| name| bad song!|
+------------+-------------+
| sublime|doesn't exist|
|prince_royce| back it up|
+------------+-------------+

Creating a schema with a column that uses MapType and ArrayType

Let’s use MapType and ArrayType to create a column that lists the good songs and bad songs of select singers.

val singersDF = spark.createDF(
List(
("miley", Map(
"good_songs" -> Array("party in the usa", "wrecking ball"),
"bad_songs" -> Array("younger now"))
),
("kesha", Map(
"good_songs" -> Array("tik tok", "timber"),
"bad_songs" -> Array("rainbow"))
)
), List(
("name", StringType, true),
("songs", MapType(StringType, ArrayType(StringType, true), true), true)
)
)

singersDF.show()
+-----+--------------------+
| name| songs|
+-----+--------------------+
|miley|Map(good_songs ->...|
|kesha|Map(good_songs ->...|
+-----+--------------------+
singersDF.printSchema()root
|-- name: string (nullable = true)
|-- songs: map (nullable = true)
| |-- key: string
| |-- value: array (valueContainsNull = true)
| | |-- element: string (containsNull = true)

Here’s how we can display the good songs for each singer.

singersDF
.select(
col("name"),
col("songs")("good_songs").as("fun")
).show()
+-----+--------------------+
| name| fun|
+-----+--------------------+
|miley|[party in the usa...|
|kesha| [tik tok, timber]|
+-----+--------------------+

array_contains() and explode() methods for ArrayType columns

The Spark functions object provides helper methods for working with ArrayType columns. The array_contains method returns true if the column contains a specified element.

Let’s create an array with people and their favorite colors. Then let’s use array_contains to append a likes_red column that returns true if the person likes red.

val peopleDF = spark.createDF(
List(
("bob", Array("red", "blue")),
("maria", Array("green", "red")),
("sue", Array("black"))
), List(
("name", StringType, true),
("favorite_colors", ArrayType(StringType, true), true)
)
)

val actualDF = peopleDF.withColumn(
"likes_red",
array_contains(col("favorite_colors"), "red")
)

actualDF.show()
+-----+---------------+---------+
| name|favorite_colors|likes_red|
+-----+---------------+---------+
| bob| [red, blue]| true|
|maria| [green, red]| true|
| sue| [black]| false|
+-----+---------------+---------+

The explode() method creates a new row for every element in an array.

peopleDF.select(
col("name"),
explode(col("favorite_colors")).as("color")
).show()
+-----+-----+
| name|color|
+-----+-----+
| bob| red|
| bob| blue|
|maria|green|
|maria| red|
| sue|black|
+-----+-----+

The spark-daria library defines forall() and exists() methods for ArrayType columns that function similar to the Scala forall() and exists() methods.

collect_list()

Next steps

ArrayType and MapType columns are vital for attaching arbitrary length data structures to DataFrame rows. A lot of Spark programmers don’t know about the existence of ArrayType / MapType columns and have difficulty defining schemas for these columns. Make sure to study the simple examples in this blog well, so you’re prepared to use these column types in your production applications! 😉

--

--

Matthew Powers

Spark coder, live in Colombia / Brazil / US, love Scala / Python / Ruby, working on empowering Latinos and Latinas in tech