-
Notifications
You must be signed in to change notification settings - Fork 0
/
tasks.py
219 lines (183 loc) · 6.86 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import numpy
from celery import shared_task
from django.contrib.contenttypes.models import ContentType
from django.db.models import Avg, Count, F, Window
from django.db.models.functions import DenseRank
from django.utils import timezone
from exports.models import Export
from movies import managers, models
from movies.services import ml
from ratings.models import Rating
from suggestions.models import Suggestion
from users.models import User
def update_movie_ratings(all=False, count: int | None = None):
"""Updates movies ratings data (average and count).
Args:
all (bool, optional): If True, all the movies in the database are updated,
only movies with outdated ratings information are updated otherwise.
Defaults to False.
count (int, optional): The number of movies to update. Defaults to None.
Returns:
int: The number of movies updated.
"""
content_type = ContentType.objects.get_for_model(models.Movie)
aggregated_ratings = (
Rating.objects.filter(content_type=content_type)
.values("object_id")
.annotate(average=Avg("value"), count=Count("object_id"))
)
queryset: managers.MovieManager = models.Movie.objects.all().order_by(
"rating_last_updated"
)
if not all:
queryset = queryset.filter_outdated_rating()
updated = 0
for agg in aggregated_ratings:
queryset.filter(pk=agg["object_id"]).update(
ratings_average=agg["average"],
ratings_count=agg["count"],
score=agg["average"] * agg["count"],
rating_last_updated=timezone.now(),
)
updated += 1
if count and updated >= count:
break
return updated
@shared_task(name="update_movie_ratings_outdated")
def update_movie_ratings_outdated():
"""Updates outdated movies ratings data."""
return update_movie_ratings()
@shared_task(name="export_movie_ratings_dataset")
def export_movie_ratings_dataset(filename: str | None = None) -> str | None:
"""Exports a dataset with the movies ratings average and count.
Args:
filename (str | None, optional): A name for the destination file. Defaults to `movie_ratings`.
Returns:
str: The export file path.
"""
return Export.from_dataset(
Rating.objects.to_dataset(
ContentType.objects.get_for_model(models.Movie),
item_column_name="movieId",
),
fieldnames=["userId", "movieId", "rating", "createdAt"],
filename=filename or "movie_ratings",
content_type=ContentType.objects.get_for_model(Rating),
).file.path
@shared_task(name="export_movies_dataset")
def export_movies_dataset():
"""Exports a dataset with the movies data.
Returns:
str: The export file path.
"""
return Export.from_dataset(
models.Movie.objects.to_dataset(),
fieldnames=[
"movieId",
"movieIndex",
"title",
"releasedAt",
"ratingsAverage",
"ratingsCount",
],
filename="movies",
content_type=ContentType.objects.get_for_model(models.Movie),
).file.path
@shared_task(name="train_and_export_surprise_model")
def train_and_export_surprise_model(epochs: int = 20):
"""Trains a model using the Surprise library and exports it to a file.
Args:
epochs (int, optional): The number of epochs to train the model. Defaults to 20.
Returns:
str: The path to the exported model file.
"""
model, accuracy, _ = ml.train_surprise_model(epochs)
model_name = ml.get_model_name_from_accuracy(accuracy)
return ml.export_model(model, model_name)
@shared_task(name="batch_user_prediction")
def batch_user_prediction(
users_ids: list[int] | None = None,
start: int = 0,
offset: int = 50,
max: int = 1000,
use_suggestions_up_to_days: int | None = 7,
) -> list[int]:
"""Generates suggestions for a batch of users.
Args:
users_ids (list[int] | None, optional): The users to generate suggestions to.
Defaults to the users that interacted with the application recently.
start (int, optional): The movie to start to suggest. Defaults to 0.
offset (int, optional): The amount of suggestions to create per batch. Defaults to 50.
max (int, optional): The max number of movies to suggest. Defaults to 1000.
use_suggestions_up_to_days (int | None, optional): Skip suggestions for movies
that were suggested to the users in the last days. Defaults to 7.
Raises:
ValueError: If the model is not found.
Returns:
list[int]: The IDs of the created suggestions.
"""
model = ml.load_model()
if not model:
raise ValueError("Model not found.")
if users_ids is None:
users_ids = User.objects.recent(ids_only=True)
end = start + offset
movies_ids: list[int] = (
models.Movie.objects.all().popular().values_list("id", flat=True)[start:end]
)
recent_suggestions = {}
if use_suggestions_up_to_days:
recent_suggestions = models.Movie.objects.recent_suggestions(
users_ids, movies_ids
)
ctype = ContentType.objects.get_for_model(models.Movie)
ids = numpy.array([], dtype=int)
count = 0
while count < max:
suggestions: list[Suggestion] = []
for movie_id in movies_ids:
users_covered = recent_suggestions.get(movie_id, [])
for user_id in users_ids:
if user_id in users_covered:
continue
suggestions.append(
Suggestion(
user_id=user_id,
content_type=ctype,
object_id=movie_id,
value=model.predict(uid=user_id, iid=movie_id).est,
)
)
count += 1
numpy.append(
ids,
[
suggestion.id
for suggestion in Suggestion.objects.bulk_create(suggestions)
],
)
start += offset
end = start + offset # Update end for the next iteration
movies_ids = (
models.Movie.objects.all()
.popular()
.values_list("id", flat=True)[start : start + offset]
)
return ids.tolist()
@shared_task(name="update_movie_position_embeddings")
def update_movie_position_embeddings():
"""Update the movies embeddings.
Returns:
int: The number of movies updated.
"""
updated = 0
for movie in (
models.Movie.objects.all()
.annotate(embedding_index=Window(DenseRank(), order_by=[F("id").asc()]))
.annotate(new_index=F("embedding_index") - 1)
):
if movie.index != getattr(movie, "new_index", None):
movie.index = getattr(movie, "new_index", None)
movie.save()
updated += 1
return updated