-
Notifications
You must be signed in to change notification settings - Fork 26
/
utils.py
352 lines (265 loc) · 10.4 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
import base64
import io
import json
import os
import subprocess
import tempfile
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
from fastapi import UploadFile, status
from fastapi.responses import JSONResponse
from PIL import Image
from pydantic import BaseModel, Field
class MediaURL(BaseModel):
"""A URL from which media can be accessed."""
url: str = Field(..., description="The URL where the media can be accessed.")
class Media(MediaURL):
"""A media object containing information about the generated media."""
seed: int = Field(..., description="The seed used to generate the media.")
# TODO: Make nsfw property optional once Go codegen tool supports
# OAPI 3.1 https://github.com/deepmap/oapi-codegen/issues/373
nsfw: bool = Field(..., description="Whether the media was flagged as NSFW.")
class ImageResponse(BaseModel):
"""Response model for image generation."""
images: List[Media] = Field(..., description="The generated images.")
class VideoResponse(BaseModel):
"""Response model for video generation."""
frames: List[List[Media]] = Field(..., description="The generated video frames.")
class AudioResponse(BaseModel):
"""Response model for audio generation."""
audio: MediaURL = Field(..., description="The generated audio.")
class MasksResponse(BaseModel):
"""Response model for object segmentation."""
masks: str = Field(..., description="The generated masks.")
scores: str = Field(
..., description="The model's confidence scores for each generated mask."
)
logits: str = Field(
..., description="The raw, unnormalized predictions (logits) for the masks."
)
class Chunk(BaseModel):
"""A chunk of text with a timestamp."""
timestamp: Tuple = Field(..., description="The timestamp of the chunk.")
text: str = Field(..., description="The text of the chunk.")
class TextResponse(BaseModel):
"""Response model for text generation."""
text: str = Field(..., description="The generated text.")
chunks: List[Chunk] = Field(..., description="The generated text chunks.")
class LLMResponse(BaseModel):
response: str
tokens_used: int
class ImageToTextResponse(BaseModel):
"""Response model for text generation."""
text: str = Field(..., description="The generated text.")
class LiveVideoToVideoResponse(BaseModel):
"""Response model for live video-to-video generation."""
subscribe_url: str = Field(
..., description="Source URL of the incoming stream to subscribe to"
)
publish_url: str = Field(
..., description="Destination URL of the outgoing stream to publish to"
)
class APIError(BaseModel):
"""API error response model."""
msg: str = Field(..., description="The error message.")
class HTTPError(BaseModel):
"""HTTP error response model."""
detail: APIError = Field(..., description="Detailed error information.")
def http_error(msg: str) -> HTTPError:
"""Create an HTTP error response with the specified message.
Args:
msg: The error message.
Returns:
The HTTP error response.
"""
return {"detail": {"msg": msg}}
def image_to_base64(img: Image, format: str = "png") -> str:
"""Convert an image to a base64 string.
Args:
img: The image to convert.
format: The image format to use. Defaults to "png".
Returns:
The base64-encoded image.
"""
buffered = io.BytesIO()
img.save(buffered, format=format)
return base64.b64encode(buffered.getvalue()).decode("utf-8")
def image_to_data_url(img: Image, format: str = "png") -> str:
"""Convert an image to a data URL.
Args:
img: The image to convert.
format: The image format to use. Defaults to "png".
Returns:
The data URL for the image.
"""
return "data:image/png;base64," + image_to_base64(img, format=format)
def audio_to_data_url(buffer: io.BytesIO, format: str = "wav") -> str:
"""Convert an audio buffer to a data URL.
Args:
buffer: The audio buffer to convert.
format: The audio format to use. Defaults to "wav".
Returns:
The data URL for the audio.
"""
base64_audio = base64.b64encode(buffer.read()).decode("utf-8")
return f"data:audio/{format};base64,{base64_audio}"
def file_exceeds_max_size(
input_file: UploadFile, max_size: int = 10 * 1024 * 1024
) -> bool:
"""Checks if the uploaded file exceeds the specified maximum size.
Args:
input_file: The uploaded file to check.
max_size: The maximum allowed file size in bytes. Defaults to 10 MB.
Returns:
True if the file exceeds the maximum size, False otherwise.
"""
try:
if input_file.file:
# Get size by moving the cursor to the end of the file and back.
input_file.file.seek(0, os.SEEK_END)
file_size = input_file.file.tell()
input_file.file.seek(0)
return file_size > max_size
except Exception as e:
print(f"Error checking file size: {e}")
return False
def json_str_to_np_array(
data: Optional[str], var_name: Optional[str] = None
) -> Optional[np.ndarray]:
"""Converts a JSON string to a NumPy array.
Args:
data: The JSON string to convert.
var_name: The name of the variable being converted. Used in error messages.
Returns:
The NumPy array if the conversion is successful, None otherwise.
Raises:
ValueError: If an error occurs during JSON parsing.
"""
if data:
try:
array = np.array(json.loads(data))
return array
except json.JSONDecodeError as e:
error_message = "Error parsing JSON"
if var_name:
error_message += f" for {var_name}"
error_message += f": {e}"
raise ValueError(error_message)
return None
# Global error handling configuration.
# NOTE: "" for default message, None for exception message.
ERROR_CONFIG: Dict[str, Tuple[Union[str, None], int]] = {
# Specific error types.
"LoraLoadingError": (None, status.HTTP_400_BAD_REQUEST),
"InferenceError": (None, status.HTTP_400_BAD_REQUEST),
"ValueError": ("", status.HTTP_400_BAD_REQUEST),
"OutOfMemoryError": ("GPU out of memory.", status.HTTP_500_INTERNAL_SERVER_ERROR),
# General error patterns.
"out of memory": ("Out of memory.", status.HTTP_500_INTERNAL_SERVER_ERROR),
"CUDA out of memory": ("GPU out of memory.", status.HTTP_500_INTERNAL_SERVER_ERROR),
}
def handle_pipeline_exception(
e: object,
default_error_message: Union[str, Dict[str, object]] = "Pipeline error.",
default_status_code: int = status.HTTP_500_INTERNAL_SERVER_ERROR,
custom_error_config: Dict[str, Tuple[str, int]] = None,
) -> JSONResponse:
"""Handles pipeline exceptions by returning a JSON response with the appropriate
error message and status code.
Args:
e(int): The exception to handle. Can be any type of object.
default_error_message: The default error message or content dictionary. Default
will be used if no specific error type ismatched.
default_status_code: The default status code to use if no specific error type is
matched. Defaults to HTTP_500_INTERNAL_SERVER_ERROR.
custom_error_config: Custom error configuration to override the application
error configuration.
Returns:
The JSON response with appropriate status code and error message.
"""
error_config = ERROR_CONFIG.copy()
if custom_error_config:
error_config.update(custom_error_config)
error_message = default_error_message
status_code = default_status_code
error_type = type(e).__name__
if error_type in error_config:
error_message, status_code = error_config[error_type]
else:
for error_pattern, (message, code) in error_config.items():
if error_pattern.lower() in str(e).lower():
status_code = code
error_message = message
break
if error_message is None:
error_message = f"{e}."
elif error_message == "":
error_message = default_error_message
content = (
http_error(error_message) if isinstance(error_message, str) else error_message
)
return JSONResponse(
status_code=status_code,
content=content,
)
def parse_key_from_metadata(
metadata: str, key: str, expected_type: type
) -> Union[Optional[Union[str, int, float, bool]]]:
"""Parse a specific key from the metadata JSON string.
Args:
metadata: The metadata JSON string.
key: The key to parse from the metadata.
expected_type: The expected type of the key's value.
Returns:
The value of the key if it exists and is of the expected type, otherwise None.
Raises:
ValueError: If the metadata is not valid JSON.
TypeError: If the value is not of the expected type.
"""
try:
metadata_dict = json.loads(metadata)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON: {e}")
value = metadata_dict.get(key)
if value is not None:
if isinstance(value, expected_type):
return value
try:
return expected_type(value)
except (ValueError, TypeError):
raise TypeError(
f"Invalid {key} value. Must be of type {expected_type.__name__}."
)
return None
def get_media_duration_ffmpeg(bytes: bytes) -> float:
"""Gets the duration of the media using ffprobe.
Args:
bytes: The media file as bytes.
Returns:
The duration of the media in seconds.
"""
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(bytes)
temp_file_path = temp_file.name
try:
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
temp_file_path,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
duration = float(result.stdout.strip())
except Exception as e:
raise Exception(f"Failed to get duration with ffmpeg: {e}")
finally:
os.remove(temp_file_path)
return duration