-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathattachment_helpers.py
98 lines (81 loc) · 3.08 KB
/
attachment_helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import logging
import boto3
from botocore.exceptions import ClientError
from django.conf import settings
logger = logging.getLogger(__name__)
S3_CLIENT = boto3.client("s3")
def get_all_study_attachments(study_uuid):
"""
Get all video responses to a study by fetching all objects in the bucket with
key name videoStream_<study_uuid>
"""
s3 = boto3.resource("s3")
bucket = s3.Bucket(settings.BUCKET_NAME)
return bucket.objects.filter(Prefix=f"videoStream_{study_uuid}")
def get_url(video_key, recording_method_is_pipe, set_attachment_header):
"""
Generate a presigned url for the video that expires in 10 minutes.
"""
url = None
if recording_method_is_pipe:
# Pipe bucket
bucket = settings.BUCKET_NAME
else:
# RecordRTC bucket
bucket = settings.S3_BUCKET_NAME
params = {"Bucket": bucket, "Key": video_key}
if set_attachment_header:
params["ResponseContentDisposition"] = "attachment"
try:
url = S3_CLIENT.generate_presigned_url(
"get_object",
Params=params,
ExpiresIn=600,
)
except ClientError as e:
logger.warning(f"Video {video_key} not found in bucket. {e}")
return None
return url
def get_study_attachments(study, orderby="key", match=None):
"""
Fetches study attachments from s3
"""
sort = "created_at" if "created_at" in orderby else "key"
attachments = [
att
for att in get_all_study_attachments(str(study.uuid))
if "PREVIEW_DATA_DISREGARD" not in att.key
]
if match:
attachments = [att for att in attachments if match in att.key]
return sorted(
attachments,
key=lambda x: getattr(x, sort),
reverse=True if "-" in orderby else False,
)
def rename_stored_video(old_name, new_name, ext):
"""
Renames a stored video on S3. old_name and new_name are both without extension ext.
Returns 1 if success, 0 if old_name video did not exist. May throw error if
other problems encountered.
"url":"https://bucketname.s3.amazonaws.com/vs1457013120534_862.mp4",
"snapshotUrl":"https://bucketname.s3.amazonaws.com/vs1457013120534_862.jpg",
"""
s3 = boto3.resource("s3")
old_name_full = old_name + "." + ext
old_name_thum = old_name + ".jpg"
new_name_full = new_name + "." + ext
# No way to directly rename in boto3, so copy and delete original (this is dumb, but let's get it working)
try: # Create a copy with the correct new name, if the original exists. Could also
# wait until old_name_full exists using orig_video.wait_until_exists()
s3.Object(settings.BUCKET_NAME, new_name_full).copy_from(
CopySource=(settings.BUCKET_NAME + "/" + old_name_full)
)
except ClientError: # old_name_full not found!
return False
else: # Go on to remove the originals
orig_video = s3.Object(settings.BUCKET_NAME, old_name_full)
orig_video.delete()
# remove the .jpg thumbnail.
s3.Object(settings.BUCKET_NAME, old_name_thum).delete()
return True