Nested Attributes & Functions Operating on Nested Types in PySpark

In this notebook we will be working with spotify songs Dataset from Kaggle. Specifically we will work with nested data types where the columns are of type ARRAYS or MAPS.

Kaggle: Spotify Dataset 1921–2020, 160k+ Tracks

Recently, I needed to work with Spark dataframes having Map datatypes for one of our projects. I realized that Map and Array are the two most commonly used datatypes. So, I explored in detail how can we create, query, explode and implode columns of array and map datatypes. I created this notebook to be a handy reference for myself. Please feel free to checkout this notebook on if you also need something quick and handy while working with these nested datatypes. The full notebook is on Github.

Note: You can reap benefits from Spark if you use it for large datasets. This dataset is small and used for illustrative purposes. I hope you enjoy reviewing it as much as I had writing it. Please let me know if you have any suggestions to improve this.

The original dataset can be downloaded from the Kaggle dataset page. The original dataset has been modified a bit for this notebook.

import os
import pandas as pd
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
from pyspark.sql.window import Window

import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
import warnings
warnings.filterwarnings("ignore")
pd.set_option('display.max_columns', 20)
pd.set_option('display.max_colwidth', 400)
# setting random seed for notebook reproducability
rnd_seed=23
np.random.seed=rnd_seed
np.random.set_state=rnd_seed
spark = SparkSession.builder.master("local[*]").appName("working-with-nested-data-types").getOrCreate()
spotify_df = spark.read.csv(path='data/spotify-songs.csv', inferSchema=True, header=True).cache()spotify_df.limit(10).toPandas()
png
  • Combine the columns [‘key’, ‘mode’, ‘target’] into an array using the array function of PySpark.
  • Transform the acoustic qualities {‘acousticness’, ‘tempo’, ‘liveness’, ‘instrumentalness’, ‘energy’, ‘danceability’, ‘speechiness’, ‘loudness’} of a song from individual columns into a map (key being acoustic quality). Although create_map function is meant to create map between a pair of columns but here we use the F.lit(...) function to generate the string key name for the acoustic quality. http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.create_map
  • Alos, you cannot have mixed data types for the Key or the Value. All Keys must be of same type. All Values must be of same type.
spotify_map_df = (spotify_df
.select('id', 'song_title', 'artist', 'duration_ms',
F.array('key', 'mode', 'target').alias('audience'),
F.create_map(
F.lit('acousticness'), 'acousticness',
F.lit('danceability'), 'acousticness',
F.lit('energy'), 'energy',
F.lit('instrumentalness'), 'instrumentalness',
F.lit('liveness'), 'liveness',
F.lit('loudness'), 'loudness',
F.lit('speechiness'), 'speechiness',
F.lit('tempo'), 'tempo'
).alias('qualities'),
'time_signature',
'valence')
.cache())
spotify_map_df.limit(10).toPandas()
png
# Let's check the schema of the new DataFrame
spotify_map_df.printSchema()
root
|-- id: integer (nullable = true)
|-- song_title: string (nullable = true)
|-- artist: string (nullable = true)
|-- duration_ms: integer (nullable = true)
|-- audience: array (nullable = false)
| |-- element: integer (containsNull = true)
|-- qualities: map (nullable = false)
| |-- key: string
| |-- value: double (valueContainsNull = true)
|-- time_signature: integer (nullable = true)
|-- valence: double (nullable = true)

Write the DataFrame to a json file:

spotify_map_df.coalesce(1).write.json(path='data/spotify-songs-json', mode="overwrite")
nested_schema = StructType([
StructField('id', IntegerType(), nullable=False),
StructField('song_title', StringType(), nullable=False),
StructField('artist', StringType(), nullable=False),
StructField('duration_ms', IntegerType(), nullable=False),
StructField('audience', ArrayType(elementType=IntegerType()), nullable=False),
StructField('qualities', MapType(keyType=StringType(), valueType=DoubleType(), valueContainsNull=False), nullable=True),
StructField('time_signature', IntegerType(), nullable=False),
StructField('valence', DoubleType(), nullable=False),
])
spotify_reloaded_df = spark.read.json(path='data/spotify-songs-json', schema=nested_schema).cache()spotify_reloaded_df.limit(10).toPandas()
png
spotify_reloaded_df.printSchema()root
|-- id: integer (nullable = true)
|-- song_title: string (nullable = true)
|-- artist: string (nullable = true)
|-- duration_ms: integer (nullable = true)
|-- audience: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- qualities: map (nullable = true)
| |-- key: string
| |-- value: double (valueContainsNull = true)
|-- time_signature: integer (nullable = true)
|-- valence: double (nullable = true)

We can extract out each nested attribute within an array or map into a column of its own.

Extract out ARRAY elements:
The audience column is a combination of three attributes ‘key’, ‘mode’ and ‘target’. Extract out each array element into a column of its own.

(spotify_map_df
.select('song_title',
col('audience').getItem(0).alias('key'),
col('audience').getItem(1).alias('mode'),
col('audience').getItem(2).alias('target'))
.limit(10)
.toPandas())
png

Extract out MAP attributes:
The acoustic column is a map created from attributes ‘acousticness’, ‘tempo’, ‘liveness’, ‘instrumentalness’, etc. of a song. Extract out those qualities into individual columns.

(spotify_map_df
.select('song_title',
col('qualities').getItem('acousticness').alias('acousticness'),
col('qualities').getItem('speechiness').alias('speechiness')
)
.limit(10)
.toPandas())
png

Refactor: We can refactor the above code to be more concise and to generate a more efficient parsed logical plan.

cols = [F.col("song_title")] + list(map(
lambda f: F.col("qualities").getItem(f).alias(str(f)), ["acousticness", "speechiness", "liveness", "tempo"]))

spotify_map_df.select(cols).limit(10).toPandas()
png

Extract out MAP attributes programmatically:

Manually appending the columns is fine if we know all the distinct keys in the map. If we don’t know all the distinct keys, we’ll need a programatic solution, but be warned — this approach is slow! I learnt this approach from [1] and I have modified it a bit.

# Step 1: Create a DataFrame with all the unique keys
keys_df = spotify_map_df.select(F.collect_set(F.map_keys(F.col("qualities"))))
keys_df.show()+------------------------------------------------------------------------------------------------+
|collect_set(map_keys(qualities)) |
+------------------------------------------------------------------------------------------------+
|[[acousticness, danceability, energy, instrumentalness, liveness, loudness, speechiness, tempo]]|
+------------------------------------------------------------------------------------------------+
# Step 2: Convert the DataFrame to a list with all the unique keys
keys = keys_df.collect()[0][0][0]
keys['acousticness',
'danceability',
'energy',
'instrumentalness',
'liveness',
'loudness',
'speechiness',
'tempo']

The collect() method gathers all the data on the driver node, which can be slow. We call collect_set() to limit the data that’s being collected on the driver node. Collecting data on a single node and leaving the worker nodes idle should be avoided whenever possible. We’re only using collect() here cause it’s the only option.

# Step 3: Create an array of column objects for the map items
key_cols = list(map(lambda f: F.col("qualities").getItem(f).alias(str(f)), keys))
# Step 4: Add any additional columns before calculating the final result
final_cols = [F.col("song_title")] + key_cols
# Step 5: Run a select() to get the final result
spotify_map_df.select(final_cols).limit(10).toPandas()
png

Use the explain() function to print the logical plans and see if the parsed logical plan needs a lot of optimizations:

spotify_map_df.select(final_cols).explain(True)== Parsed Logical Plan ==
'Project [unresolvedalias('song_title, None), 'qualities[acousticness] AS acousticness#2018, 'qualities[danceability] AS danceability#2019, 'qualities[energy] AS energy#2020, 'qualities[instrumentalness] AS instrumentalness#2021, 'qualities[liveness] AS liveness#2022, 'qualities[loudness] AS loudness#2023, 'qualities[speechiness] AS speechiness#2024, 'qualities[tempo] AS tempo#2025]
+- Project [id#16, song_title#17, artist#18, duration_ms#21, array(key#24, mode#27, target#32) AS audience#526, map(acousticness, acousticness#19, danceability, acousticness#19, energy, energy#22, instrumentalness, instrumentalness#23, liveness, liveness#25, loudness, loudness#26, speechiness, speechiness#28, tempo, tempo#29) AS qualities#527, time_signature#30, valence#31]
+- Relation[id#16,song_title#17,artist#18,acousticness#19,danceability#20,duration_ms#21,energy#22,instrumentalness#23,key#24,liveness#25,loudness#26,mode#27,speechiness#28,tempo#29,time_signature#30,valence#31,target#32] csv

== Analyzed Logical Plan ==
song_title: string, acousticness: double, danceability: double, energy: double, instrumentalness: double, liveness: double, loudness: double, speechiness: double, tempo: double
Project [song_title#17, qualities#527[acousticness] AS acousticness#2018, qualities#527[danceability] AS danceability#2019, qualities#527[energy] AS energy#2020, qualities#527[instrumentalness] AS instrumentalness#2021, qualities#527[liveness] AS liveness#2022, qualities#527[loudness] AS loudness#2023, qualities#527[speechiness] AS speechiness#2024, qualities#527[tempo] AS tempo#2025]
+- Project [id#16, song_title#17, artist#18, duration_ms#21, array(key#24, mode#27, target#32) AS audience#526, map(acousticness, acousticness#19, danceability, acousticness#19, energy, energy#22, instrumentalness, instrumentalness#23, liveness, liveness#25, loudness, loudness#26, speechiness, speechiness#28, tempo, tempo#29) AS qualities#527, time_signature#30, valence#31]
+- Relation[id#16,song_title#17,artist#18,acousticness#19,danceability#20,duration_ms#21,energy#22,instrumentalness#23,key#24,liveness#25,loudness#26,mode#27,speechiness#28,tempo#29,time_signature#30,valence#31,target#32] csv

== Optimized Logical Plan ==
Project [song_title#17, qualities#527[acousticness] AS acousticness#2018, qualities#527[danceability] AS danceability#2019, qualities#527[energy] AS energy#2020, qualities#527[instrumentalness] AS instrumentalness#2021, qualities#527[liveness] AS liveness#2022, qualities#527[loudness] AS loudness#2023, qualities#527[speechiness] AS speechiness#2024, qualities#527[tempo] AS tempo#2025]
+- InMemoryRelation [id#16, song_title#17, artist#18, duration_ms#21, audience#526, qualities#527, time_signature#30, valence#31], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [id#16, song_title#17, artist#18, duration_ms#21, array(key#24, mode#27, target#32) AS audience#526, map(acousticness, acousticness#19, danceability, acousticness#19, energy, energy#22, instrumentalness, instrumentalness#23, liveness, liveness#25, loudness, loudness#26, speechiness, speechiness#28, tempo, tempo#29) AS qualities#527, time_signature#30, valence#31]
+- InMemoryTableScan [acousticness#19, artist#18, duration_ms#21, energy#22, id#16, instrumentalness#23, key#24, liveness#25, loudness#26, mode#27, song_title#17, speechiness#28, target#32, tempo#29, time_signature#30, valence#31]
+- InMemoryRelation [id#16, song_title#17, artist#18, acousticness#19, danceability#20, duration_ms#21, energy#22, instrumentalness#23, key#24, liveness#25, loudness#26, mode#27, speechiness#28, tempo#29, time_signature#30, valence#31, target#32], StorageLevel(disk, memory, deserialized, 1 replicas)
+- FileScan csv [id#16,song_title#17,artist#18,acousticness#19,danceability#20,duration_ms#21,energy#22,instrumentalness#23,key#24,liveness#25,loudness#26,mode#27,speechiness#28,tempo#29,time_signature#30,valence#31,target#32] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/mnt/user-home/Data-Science-with-Spark/kaggle-spotify-dataset-working-with..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,song_title:string,artist:string,acousticness:double,danceability:double,duration_ms...

== Physical Plan ==
*(1) Project [song_title#17, qualities#527[acousticness] AS acousticness#2018, qualities#527[danceability] AS danceability#2019, qualities#527[energy] AS energy#2020, qualities#527[instrumentalness] AS instrumentalness#2021, qualities#527[liveness] AS liveness#2022, qualities#527[loudness] AS loudness#2023, qualities#527[speechiness] AS speechiness#2024, qualities#527[tempo] AS tempo#2025]
+- InMemoryTableScan [qualities#527, song_title#17]
+- InMemoryRelation [id#16, song_title#17, artist#18, duration_ms#21, audience#526, qualities#527, time_signature#30, valence#31], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [id#16, song_title#17, artist#18, duration_ms#21, array(key#24, mode#27, target#32) AS audience#526, map(acousticness, acousticness#19, danceability, acousticness#19, energy, energy#22, instrumentalness, instrumentalness#23, liveness, liveness#25, loudness, loudness#26, speechiness, speechiness#28, tempo, tempo#29) AS qualities#527, time_signature#30, valence#31]
+- InMemoryTableScan [acousticness#19, artist#18, duration_ms#21, energy#22, id#16, instrumentalness#23, key#24, liveness#25, loudness#26, mode#27, song_title#17, speechiness#28, target#32, tempo#29, time_signature#30, valence#31]
+- InMemoryRelation [id#16, song_title#17, artist#18, acousticness#19, danceability#20, duration_ms#21, energy#22, instrumentalness#23, key#24, liveness#25, loudness#26, mode#27, speechiness#28, tempo#29, time_signature#30, valence#31, target#32], StorageLevel(disk, memory, deserialized, 1 replicas)
+- FileScan csv [id#16,song_title#17,artist#18,acousticness#19,danceability#20,duration_ms#21,energy#22,instrumentalness#23,key#24,liveness#25,loudness#26,mode#27,speechiness#28,tempo#29,time_signature#30,valence#31,target#32] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/mnt/user-home/Data-Science-with-Spark/kaggle-spotify-dataset-working-with..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,song_title:string,artist:string,acousticness:double,danceability:double,duration_ms...

We can use all our learnings from above to reconstruct the original table.

(spotify_reloaded_df
.select('id', 'song_title', 'artist',
col('qualities').getItem('acousticness').alias('acousticness'),
col('qualities').getItem('danceability').alias('danceability'),
'duration_ms',
col('qualities').getItem('energy').alias('energy'),
col('qualities').getItem('instrumentalness').alias('instrumentalness'),
col('audience').getItem(0).alias('key'),
col('qualities').getItem('liveness').alias('liveness'),
col('qualities').getItem('loudness').alias('loudness'),
col('audience').getItem(1).alias('mode'),
col('qualities').getItem('speechiness').alias('speechiness'),
col('qualities').getItem('tempo').alias('tempo'),
'time_signature',
'valence',
col('audience').getItem(2).alias('target')
)
.limit(10)
.toPandas())
png

Using posexplode function we can extract array element into a new row for each element with position in the given array.

(spotify_reloaded_df
.select('song_title', F.posexplode('audience'))
.limit(10)
.toPandas())
png

Using explode function we can extract a new row for each element in the given array or map.

(spotify_reloaded_df
.select('song_title', F.explode('qualities').alias("qualities", "value"))
.limit(10)
.toPandas())
png
spark.stop()

References
These resources helped me a lot to understand about Map Datatype and their usage. Please visit these notebooks, they are great resources on their own merit.

Machine Learning Platform Engineer at Lyft Inc.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store