-
Notifications
You must be signed in to change notification settings - Fork 0
/
spotify_queries.py
283 lines (227 loc) · 11.8 KB
/
spotify_queries.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# OAuth2 Authentication Setup
import json
import os
import requests
import logging
from urllib.parse import urlencode
# Initialize logging
logging.basicConfig(level=logging.INFO)
class SpotifyQueries:
def __init__(self, access_token):
self.access_token = access_token
def cached_data_available(self, file_name):
"""
Check if the user's Spotify data is already cached in a JSON file.
Parameters:
file_name (str): The name of the file to check for in the cache.
Returns:
bool: True if the data is cached (i.e., file exists), False otherwise.
"""
# If the file_name already contains the cache directory path, use it as is.
# Otherwise, append the cache directory path to the file_name.
print(f"Checking cached_data_available for file: {file_name}")
file_path = f'./cache/{file_name}.json'
# Check if the file exists in the cache directory
if os.path.exists(file_path):
# print(f"File exists: True")
return True
else:
# print(f"File exists: False")
return False
def load_cached_data(self, file_name):
"""
Load the user's Spotify data from a cached JSON file.
Parameters:
file_name (str): The name of the JSON file to read from the cache.
Returns:
dict: The data read from the JSON file.
Raises:
FileNotFoundError: If the specified JSON file does not exist in the cache.
"""
# Define the path to the JSON file where the data will be stored
file_path = f'./cache/{file_name}.json'
# Check if the JSON file already exists
if self.cached_data_available(file_name):
print("Spotify data in cache - Reading from file")
# Read the cached data from the JSON file
with open(file_path, 'r') as f:
data = json.load(f)
return data
else:
raise FileNotFoundError(f"No cached data found at {file_path}")
# TODO: Think about if you want to invalidate the cache after a certain amount of time
def cache_data(self, data, file_name):
"""
Cache the user's Spotify data in a JSON file.
Parameters:
data (dict or list): Data to be cached, which can be a dictionary or a list of dictionaries.
file_name (str): The name of the JSON file where the data will be stored.
"""
# Create the 'cache' directory if it doesn't exist
if not os.path.exists('./cache'):
print("Creating 'cache' directory...")
os.makedirs('./cache')
# Define the path to the JSON file where the data will be stored
file_path = f'./cache/{file_name}.json'
# Write the cached data to the JSON file
with open(file_path, 'w') as f:
json.dump(data, f)
print("Spotify data cached successfully.")
def load_or_fetch_playlists(self):
"""
Retrieve the user's Spotify playlists. If cached data is available, it reads from the cache;
otherwise, it fetches the data via an API call and then caches it.
Returns:
list: A list of dictionaries, each containing information about a playlist.
"""
# Define the path to the JSON file where the data will be stored
#file_path = './cache/users_playlists.json'
file_name = "users_playlists"
if self.cached_data_available(file_name):
print("Spotify playlist data in cache - Reading from file")
# Read the cached data from the JSON file
playlists = self.load_cached_data(file_name)
return playlists
else:
print("Fetching Spotify playlist data via API...")
playlists = self.collect_all_playlists_for_user()
# Cache the playlists in a JSON file
self.cache_data(playlists, file_name)
return playlists
def load_or_fetch_top_tracks(self):
"""
Retrieve the user's Spotify playlists. If cached data is available, it reads from the cache;
otherwise, it fetches the data via an API call and then caches it.
Returns:
list: A list of dictionaries, each containing information about a playlist.
"""
# Define the path to the JSON file where the data will be stored
#file_path = './cache/users_playlists.json'
file_name = "users_top_tracks"
if self.cached_data_available(file_name):
print("Spotify top tracks data in cache - Reading from file")
# Read the cached data from the JSON file
top_tracks = self.load_cached_data(file_name)
return top_tracks
else:
print("Fetching Spotify top tracks data via API...")
top_tracks = self.api_get_current_users_top_tracks()
# Cache the playlists in a JSON file
self.cache_data(top_tracks, file_name)
return top_tracks
def collect_all_playlists_for_user(self):
"""
Collect all of the Spotify playlists for the current user.
This function iteratively calls the Spotify API to fetch all playlists
for the current user, handling pagination via 'limit' and 'offset' parameters.
It returns a list containing all these playlists.
Returns:
all_playlists (list): List of dictionaries, each containing information about a playlist.
"""
all_playlists = []
limit = 50 # Maximum allowed by Spotify
offset = 0 # Start at the beginning
while True:
logging.info(f"Fetching playlists, Offset: {offset}")
# Fetch a batch of playlists
playlists_batch = self.fetch_playlists_batch_from_api(limit=limit, offset=offset)
# Check for failed fetch or end of playlists
if playlists_batch is None or 'items' not in playlists_batch:
logging.error("Failed to fetch playlists or reached the end.")
break
# Extend the list of all playlists with the new batch
all_playlists.extend(playlists_batch['items'])
logging.info(f"Fetched {len(playlists_batch['items'])} playlists in this batch.")
# Check if we've reached the end of the playlists
if len(playlists_batch['items']) < limit:
logging.info("Reached the end of playlists.")
break
# Update the offset for the next batch
offset += limit
logging.info(f"Updating offset to {offset}")
logging.info(f"Total playlists fetched: {len(all_playlists)}")
return all_playlists
def fetch_playlists_batch_from_api(self, limit, offset):
"""
Fetch a batch of Spotify playlists for the current user from the Spotify API.
Makes an API request to fetch a batch of playlists for the current user.
The number of playlists fetched in each request is determined by the 'limit'
parameter, starting at the 'offset'.
Parameters:
limit (int): The maximum number of playlists to return in each API call.
offset (int): The index of the first playlist to return.
Returns:
dict or None: A dictionary containing the batch of playlists, or None if the request fails.
"""
logging.info("Fetching current user's playlists...")
endpoint = f"https://api.spotify.com/v1/me/playlists?limit={limit}&offset={offset}"
headers = {"Authorization": f"Bearer {self.access_token}"}
response = requests.get(endpoint, headers=headers)
if response.status_code == 200:
logging.info("Successfully fetched playlists.")
return json.loads(response.text)
elif response.status_code == 429:
logging.error(f"Failed to fetch playlists due to rate limiting. More info: https://developer.spotify.com/documentation/web-api/concepts/rate-limits")
else:
logging.error(f"Failed to fetch playlists. Error {response.status_code}: {response.text}")
return None
def fetch_playlist_tracks_batch(self, query_playlists):
fetched_playlists = []
logging.info(f"Fetching tracks for {len(query_playlists)} playlists...")
# Iterate through each playlist dictionary in the list
for playlist_info in query_playlists:
# Extract the ID from each dictionary
playlist_id = playlist_info['id']
# Fetch tracks for this playlist ID and append to fetched_playlists
fetched_playlists.append(self.fetch_playlist_tracks(playlist_id))
# Cache the fetched data (assuming self.cache_data is a method that does this)
self.cache_data(fetched_playlists, "fetched_playlists_tracks")
logging.info(f"Successfully fetched playlist tracks.")
return fetched_playlists
def fetch_playlist_tracks(self, playlist_id):
"""
Fetch the tracks for a given playlist from the Spotify API.
Parameters:
playlist_id (str): The ID of the playlist to fetch.
fields (str): The fields to return for each track.
Returns:
dict or None: A dictionary containing the playlist's tracks if successful, or None if the request fails.
"""
# uncomment for debugging
#logging.info(f"Fetching tracks for playlist ID {playlist_id}...")
# filter of dates we want to use - https://developer.spotify.com/documentation/web-api/reference/get-playlist
fields = "id,name,tracks.items(track(artists(name,id,genres),name,id,popularity,album(name,id,release_date)))"
params = {'fields': fields}
encoded_params = urlencode(params)
endpoint = f"https://api.spotify.com/v1/playlists/{playlist_id}?{encoded_params}"
headers = {"Authorization": f"Bearer {self.access_token}"}
response = requests.get(endpoint, headers=headers)
if response.status_code == 200:
# uncomment for debugging
#logging.info("Successfully fetched tracks.")
return json.loads(response.text)
elif response.status_code == 429:
logging.error(f"Failed to fetch tracks due to rate limiting. More info: https://developer.spotify.com/documentation/web-api/concepts/rate-limits")
elif response.status_code == 404:
logging.error(f"Playlist not found. Check if the playlist ID {playlist_id} is correct.")
else:
logging.error(f"Failed to fetch tracks. Error {response.status_code}: {response.text}")
return None
def api_get_current_users_top_tracks(self):
"""
Fetches the current user's top 20 tracks from Spotify. Handles rate limiting and logs the status of the API call.
Returns:
dict or None: A dictionary containing the user's top tracks if successful, or None if the request fails.
"""
logging.info("Fetching current user's top tracks...")
endpoint = "https://api.spotify.com/v1/me/top/tracks"
headers = {"Authorization": f"Bearer {self.access_token}"}
response = requests.get(endpoint, headers=headers)
if response.status_code == 200:
logging.info("Successfully fetched top tracks.")
return json.loads(response.text)
elif response.status_code == 429:
logging.error(f"Failed to fetch top tracks due to rate limiting. More info: https://developer.spotify.com/documentation/web-api/concepts/rate-limits")
else:
logging.error(f"Failed to fetch top tracks. Error {response.status_code}: {response.text}")
return None