Nested Attributes & Functions Operating on Nested Types in PySpark
In this notebook we will be working with spotify songs Dataset from Kaggle. Specifically we will work with nested data types where the columns are of type ARRAYS or MAPS.
Kaggle: Spotify Dataset 1921–2020, 160k+ Tracks
Problem Statement:
Recently, I needed to work with Spark dataframes having Map datatypes for one of our projects. I realized that Map
and Array
are the two most commonly used datatypes. So, I explored in detail how can we create
, query
, explode
and implode
columns of array
and map
datatypes. I created this notebook to be a handy reference for myself. Please feel free to checkout this notebook on if you also need something quick and handy while working with these nested datatypes. The full notebook is on Github.
Note: You can reap benefits from Spark if you use it for large datasets. This dataset is small and used for illustrative purposes. I hope you enjoy reviewing it as much as I had writing it. Please let me know if you have any suggestions to improve this.
The original dataset can be downloaded from the Kaggle dataset page. The original dataset has been modified a bit for this notebook.
import os
import pandas as pd
import numpy as np
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, colimport warnings
warnings.filterwarnings("ignore")pd.set_option('display.max_columns', 20)
pd.set_option('display.max_colwidth', 400)# setting random seed for notebook reproducability
rnd_seed=23
np.random.seed=rnd_seed
np.random.set_state=rnd_seed
1. Create the Spark Session
spark = SparkSession.builder.master("local[*]").appName("working-with-nested-data-types").getOrCreate()
2. Load Spotify Songs Dataset
spotify_df = spark.read.csv(path='data/spotify-songs.csv', inferSchema=True, header=True).cache()spotify_df.limit(10).toPandas()
3. Data Wrangling
3.1 Create Nested Types
- Combine the columns [‘key’, ‘mode’, ‘target’] into an array using the
array
function of PySpark. - Transform the acoustic qualities {‘acousticness’, ‘tempo’, ‘liveness’, ‘instrumentalness’, ‘energy’, ‘danceability’, ‘speechiness’, ‘loudness’} of a song from individual columns into a map (key being acoustic quality). Although
create_map
function is meant to create map between a pair of columns but here we use the F.lit(...) function to generate the string key name for the acoustic quality. http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.create_map - Alos, you cannot have mixed data types for the Key or the Value. All Keys must be of same type. All Values must be of same type.
spotify_map_df = (spotify_df
.select('id', 'song_title', 'artist', 'duration_ms',
F.array('key', 'mode', 'target').alias('audience'),
F.create_map(
F.lit('acousticness'), 'acousticness',
F.lit('danceability'), 'acousticness',
F.lit('energy'), 'energy',
F.lit('instrumentalness'), 'instrumentalness',
F.lit('liveness'), 'liveness',
F.lit('loudness'), 'loudness',
F.lit('speechiness'), 'speechiness',
F.lit('tempo'), 'tempo'
).alias('qualities'),
'time_signature',
'valence')
.cache())spotify_map_df.limit(10).toPandas()
# Let's check the schema of the new DataFrame
spotify_map_df.printSchema()root
|-- id: integer (nullable = true)
|-- song_title: string (nullable = true)
|-- artist: string (nullable = true)
|-- duration_ms: integer (nullable = true)
|-- audience: array (nullable = false)
| |-- element: integer (containsNull = true)
|-- qualities: map (nullable = false)
| |-- key: string
| |-- value: double (valueContainsNull = true)
|-- time_signature: integer (nullable = true)
|-- valence: double (nullable = true)
Write the DataFrame to a json file:
spotify_map_df.coalesce(1).write.json(path='data/spotify-songs-json', mode="overwrite")
3.2 Reload the above restructured DataFrame now using a more complex schema with Nested Data Types
nested_schema = StructType([
StructField('id', IntegerType(), nullable=False),
StructField('song_title', StringType(), nullable=False),
StructField('artist', StringType(), nullable=False),
StructField('duration_ms', IntegerType(), nullable=False),
StructField('audience', ArrayType(elementType=IntegerType()), nullable=False),
StructField('qualities', MapType(keyType=StringType(), valueType=DoubleType(), valueContainsNull=False), nullable=True),
StructField('time_signature', IntegerType(), nullable=False),
StructField('valence', DoubleType(), nullable=False),
])spotify_reloaded_df = spark.read.json(path='data/spotify-songs-json', schema=nested_schema).cache()spotify_reloaded_df.limit(10).toPandas()
spotify_reloaded_df.printSchema()root
|-- id: integer (nullable = true)
|-- song_title: string (nullable = true)
|-- artist: string (nullable = true)
|-- duration_ms: integer (nullable = true)
|-- audience: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- qualities: map (nullable = true)
| |-- key: string
| |-- value: double (valueContainsNull = true)
|-- time_signature: integer (nullable = true)
|-- valence: double (nullable = true)
3.3 Extract Individual Nested/Complex Atributes as a Column
We can extract out each nested attribute within an array or map into a column of its own.
Extract out ARRAY elements:
The audience column is a combination of three attributes ‘key’, ‘mode’ and ‘target’. Extract out each array element into a column of its own.
(spotify_map_df
.select('song_title',
col('audience').getItem(0).alias('key'),
col('audience').getItem(1).alias('mode'),
col('audience').getItem(2).alias('target'))
.limit(10)
.toPandas())
Extract out MAP attributes:
The acoustic column is a map created from attributes ‘acousticness’, ‘tempo’, ‘liveness’, ‘instrumentalness’, etc. of a song. Extract out those qualities into individual columns.
(spotify_map_df
.select('song_title',
col('qualities').getItem('acousticness').alias('acousticness'),
col('qualities').getItem('speechiness').alias('speechiness')
)
.limit(10)
.toPandas())
Refactor: We can refactor the above code to be more concise and to generate a more efficient parsed logical plan.
cols = [F.col("song_title")] + list(map(
lambda f: F.col("qualities").getItem(f).alias(str(f)), ["acousticness", "speechiness", "liveness", "tempo"]))
spotify_map_df.select(cols).limit(10).toPandas()
Extract out MAP attributes programmatically:
Manually appending the columns is fine if we know all the distinct keys in the map. If we don’t know all the distinct keys, we’ll need a programatic solution, but be warned — this approach is slow! I learnt this approach from [1] and I have modified it a bit.
# Step 1: Create a DataFrame with all the unique keys
keys_df = spotify_map_df.select(F.collect_set(F.map_keys(F.col("qualities"))))keys_df.show()+------------------------------------------------------------------------------------------------+
|collect_set(map_keys(qualities)) |
+------------------------------------------------------------------------------------------------+
|[[acousticness, danceability, energy, instrumentalness, liveness, loudness, speechiness, tempo]]|
+------------------------------------------------------------------------------------------------+# Step 2: Convert the DataFrame to a list with all the unique keys
keys = keys_df.collect()[0][0][0]keys['acousticness',
'danceability',
'energy',
'instrumentalness',
'liveness',
'loudness',
'speechiness',
'tempo']
The collect()
method gathers all the data on the driver node, which can be slow. We call collect_set()
to limit the data that’s being collected on the driver node. Collecting data on a single node and leaving the worker nodes idle should be avoided whenever possible. We’re only using collect()
here cause it’s the only option.
# Step 3: Create an array of column objects for the map items
key_cols = list(map(lambda f: F.col("qualities").getItem(f).alias(str(f)), keys))# Step 4: Add any additional columns before calculating the final result
final_cols = [F.col("song_title")] + key_cols# Step 5: Run a select() to get the final result
spotify_map_df.select(final_cols).limit(10).toPandas()
Examining logical plans
Use the explain()
function to print the logical plans and see if the parsed logical plan needs a lot of optimizations:
spotify_map_df.select(final_cols).explain(True)== Parsed Logical Plan ==
'Project [unresolvedalias('song_title, None), 'qualities[acousticness] AS acousticness#2018, 'qualities[danceability] AS danceability#2019, 'qualities[energy] AS energy#2020, 'qualities[instrumentalness] AS instrumentalness#2021, 'qualities[liveness] AS liveness#2022, 'qualities[loudness] AS loudness#2023, 'qualities[speechiness] AS speechiness#2024, 'qualities[tempo] AS tempo#2025]
+- Project [id#16, song_title#17, artist#18, duration_ms#21, array(key#24, mode#27, target#32) AS audience#526, map(acousticness, acousticness#19, danceability, acousticness#19, energy, energy#22, instrumentalness, instrumentalness#23, liveness, liveness#25, loudness, loudness#26, speechiness, speechiness#28, tempo, tempo#29) AS qualities#527, time_signature#30, valence#31]
+- Relation[id#16,song_title#17,artist#18,acousticness#19,danceability#20,duration_ms#21,energy#22,instrumentalness#23,key#24,liveness#25,loudness#26,mode#27,speechiness#28,tempo#29,time_signature#30,valence#31,target#32] csv
== Analyzed Logical Plan ==
song_title: string, acousticness: double, danceability: double, energy: double, instrumentalness: double, liveness: double, loudness: double, speechiness: double, tempo: double
Project [song_title#17, qualities#527[acousticness] AS acousticness#2018, qualities#527[danceability] AS danceability#2019, qualities#527[energy] AS energy#2020, qualities#527[instrumentalness] AS instrumentalness#2021, qualities#527[liveness] AS liveness#2022, qualities#527[loudness] AS loudness#2023, qualities#527[speechiness] AS speechiness#2024, qualities#527[tempo] AS tempo#2025]
+- Project [id#16, song_title#17, artist#18, duration_ms#21, array(key#24, mode#27, target#32) AS audience#526, map(acousticness, acousticness#19, danceability, acousticness#19, energy, energy#22, instrumentalness, instrumentalness#23, liveness, liveness#25, loudness, loudness#26, speechiness, speechiness#28, tempo, tempo#29) AS qualities#527, time_signature#30, valence#31]
+- Relation[id#16,song_title#17,artist#18,acousticness#19,danceability#20,duration_ms#21,energy#22,instrumentalness#23,key#24,liveness#25,loudness#26,mode#27,speechiness#28,tempo#29,time_signature#30,valence#31,target#32] csv
== Optimized Logical Plan ==
Project [song_title#17, qualities#527[acousticness] AS acousticness#2018, qualities#527[danceability] AS danceability#2019, qualities#527[energy] AS energy#2020, qualities#527[instrumentalness] AS instrumentalness#2021, qualities#527[liveness] AS liveness#2022, qualities#527[loudness] AS loudness#2023, qualities#527[speechiness] AS speechiness#2024, qualities#527[tempo] AS tempo#2025]
+- InMemoryRelation [id#16, song_title#17, artist#18, duration_ms#21, audience#526, qualities#527, time_signature#30, valence#31], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [id#16, song_title#17, artist#18, duration_ms#21, array(key#24, mode#27, target#32) AS audience#526, map(acousticness, acousticness#19, danceability, acousticness#19, energy, energy#22, instrumentalness, instrumentalness#23, liveness, liveness#25, loudness, loudness#26, speechiness, speechiness#28, tempo, tempo#29) AS qualities#527, time_signature#30, valence#31]
+- InMemoryTableScan [acousticness#19, artist#18, duration_ms#21, energy#22, id#16, instrumentalness#23, key#24, liveness#25, loudness#26, mode#27, song_title#17, speechiness#28, target#32, tempo#29, time_signature#30, valence#31]
+- InMemoryRelation [id#16, song_title#17, artist#18, acousticness#19, danceability#20, duration_ms#21, energy#22, instrumentalness#23, key#24, liveness#25, loudness#26, mode#27, speechiness#28, tempo#29, time_signature#30, valence#31, target#32], StorageLevel(disk, memory, deserialized, 1 replicas)
+- FileScan csv [id#16,song_title#17,artist#18,acousticness#19,danceability#20,duration_ms#21,energy#22,instrumentalness#23,key#24,liveness#25,loudness#26,mode#27,speechiness#28,tempo#29,time_signature#30,valence#31,target#32] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/mnt/user-home/Data-Science-with-Spark/kaggle-spotify-dataset-working-with..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,song_title:string,artist:string,acousticness:double,danceability:double,duration_ms...
== Physical Plan ==
*(1) Project [song_title#17, qualities#527[acousticness] AS acousticness#2018, qualities#527[danceability] AS danceability#2019, qualities#527[energy] AS energy#2020, qualities#527[instrumentalness] AS instrumentalness#2021, qualities#527[liveness] AS liveness#2022, qualities#527[loudness] AS loudness#2023, qualities#527[speechiness] AS speechiness#2024, qualities#527[tempo] AS tempo#2025]
+- InMemoryTableScan [qualities#527, song_title#17]
+- InMemoryRelation [id#16, song_title#17, artist#18, duration_ms#21, audience#526, qualities#527, time_signature#30, valence#31], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [id#16, song_title#17, artist#18, duration_ms#21, array(key#24, mode#27, target#32) AS audience#526, map(acousticness, acousticness#19, danceability, acousticness#19, energy, energy#22, instrumentalness, instrumentalness#23, liveness, liveness#25, loudness, loudness#26, speechiness, speechiness#28, tempo, tempo#29) AS qualities#527, time_signature#30, valence#31]
+- InMemoryTableScan [acousticness#19, artist#18, duration_ms#21, energy#22, id#16, instrumentalness#23, key#24, liveness#25, loudness#26, mode#27, song_title#17, speechiness#28, target#32, tempo#29, time_signature#30, valence#31]
+- InMemoryRelation [id#16, song_title#17, artist#18, acousticness#19, danceability#20, duration_ms#21, energy#22, instrumentalness#23, key#24, liveness#25, loudness#26, mode#27, speechiness#28, tempo#29, time_signature#30, valence#31, target#32], StorageLevel(disk, memory, deserialized, 1 replicas)
+- FileScan csv [id#16,song_title#17,artist#18,acousticness#19,danceability#20,duration_ms#21,energy#22,instrumentalness#23,key#24,liveness#25,loudness#26,mode#27,speechiness#28,tempo#29,time_signature#30,valence#31,target#32] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/mnt/user-home/Data-Science-with-Spark/kaggle-spotify-dataset-working-with..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,song_title:string,artist:string,acousticness:double,danceability:double,duration_ms...
Reconstruct the original Table:
We can use all our learnings from above to reconstruct the original table.
(spotify_reloaded_df
.select('id', 'song_title', 'artist',
col('qualities').getItem('acousticness').alias('acousticness'),
col('qualities').getItem('danceability').alias('danceability'),
'duration_ms',
col('qualities').getItem('energy').alias('energy'),
col('qualities').getItem('instrumentalness').alias('instrumentalness'),
col('audience').getItem(0).alias('key'),
col('qualities').getItem('liveness').alias('liveness'),
col('qualities').getItem('loudness').alias('loudness'),
col('audience').getItem(1).alias('mode'),
col('qualities').getItem('speechiness').alias('speechiness'),
col('qualities').getItem('tempo').alias('tempo'),
'time_signature',
'valence',
col('audience').getItem(2).alias('target')
)
.limit(10)
.toPandas())
3.4 Explode Individual Nested/Complex into a row of its own
Using posexplode
function we can extract array element into a new row for each element with position in the given array.
(spotify_reloaded_df
.select('song_title', F.posexplode('audience'))
.limit(10)
.toPandas())
Using explode
function we can extract a new row for each element in the given array or map.
(spotify_reloaded_df
.select('song_title', F.explode('qualities').alias("qualities", "value"))
.limit(10)
.toPandas())
spark.stop()
References
These resources helped me a lot to understand about Map Datatype and their usage. Please visit these notebooks, they are great resources on their own merit.