Skip to content

Commit

Permalink
services: add handling of async results of upload
Browse files Browse the repository at this point in the history
* adds the task results to the record
* addresses cernanalysispreservation#1970

Signed-off-by: Ilias Koutsakis <[email protected]>
  • Loading branch information
Lilykos committed Dec 3, 2020
1 parent a4fd8be commit edf3af4
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 20 deletions.
8 changes: 4 additions & 4 deletions cap/modules/deposit/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
UpdateDepositPermission)

from .review import Reviewable
from .tasks import upload_to_zenodo
from .tasks import create_zenodo_upload_tasks
from .utils import create_zenodo_deposit

_datastore = LocalProxy(lambda: current_app.extensions['security'].datastore)
Expand Down Expand Up @@ -287,9 +287,9 @@ def upload(self, pid, *args, **kwargs):
self.commit()

# upload files to zenodo deposit
upload_to_zenodo.delay(files, recid, token,
deposit['id'],
deposit['links']['bucket'])
create_zenodo_upload_tasks(files, recid, token,
deposit['id'],
deposit['links']['bucket'])
else:
raise FileUploadError(
'You cannot create an empty Zenodo deposit. '
Expand Down
84 changes: 71 additions & 13 deletions cap/modules/deposit/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,96 @@
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.
"""Tasks."""
"""Zenodo Upload Tasks."""

from __future__ import absolute_import, print_function

import requests
from flask import current_app
from celery import shared_task
from celery import chord, group, current_task, shared_task

from invenio_files_rest.models import FileInstance
from invenio_db import db

from cap.modules.experiments.errors import ExternalAPIException
from .utils import get_zenodo_deposit_from_record


def create_zenodo_upload_tasks(files, recid, token,
zenodo_depid, zenodo_bucket_url):
"""Create the upload tasks and get the results."""
current_app.logger.info(
f'Uploading files to Zenodo {zenodo_depid}: {files}.')

# the only way to have a task that waits for
# other tasks to finish is the `chord` structure
upload_callback = save_results_to_record.s(depid=zenodo_depid, recid=recid)
upload_tasks = group(
upload.s(filename, recid, token, zenodo_bucket_url)
for filename in files
)

chord(upload_tasks, upload_callback).delay()


@shared_task(autoretry_for=(Exception, ),
retry_kwargs={
'max_retries': 5,
'countdown': 10
})
def upload_to_zenodo(files, recid, token, zenodo_depid, zenodo_bucket_url):
"""Upload to Zenodo the files the user selected."""
def upload(filename, recid, token, zenodo_bucket_url):
"""Upload file to Zenodo."""
from cap.modules.deposit.api import CAPDeposit
rec = CAPDeposit.get_record(recid)
record = CAPDeposit.get_record(recid)

for filename in files:
file_obj = rec.files[filename]
file_ins = FileInstance.get(file_obj.file_id)
file_obj = record.files[filename]
file_ins = FileInstance.get(file_obj.file_id)
task_id = current_task.request.id

with open(file_ins.uri, 'rb') as fp:
with open(file_ins.uri, 'rb') as fp:
try:
resp = requests.put(
url=f'{zenodo_bucket_url}/{filename}',
data=fp,
params=dict(access_token=token),
)

if not resp.ok:
current_app.logger.error(
f'Uploading file {filename} to deposit {zenodo_depid} '
f'failed with {resp.status_code}.')
current_app.logger.error(
f'{task_id}: Zenodo upload of file `{filename}`: {resp.status_code}.') # noqa

status = resp.status_code
msg = resp.json()
except (ValueError, ExternalAPIException) as err:
status = 'FAILED'
msg = str(err)

current_app.logger.error(
f'{task_id}: Something went wrong with the task:\n{msg}')
finally:
return {
'task_id': task_id,
'result': {'file': filename, 'status': status, 'message': msg}
}


@shared_task(autoretry_for=(Exception, ),
retry_kwargs={
'max_retries': 5,
'countdown': 10
})
def save_results_to_record(tasks, depid, recid):
"""Save the results of uploading to the record."""
from cap.modules.deposit.api import CAPDeposit
record = CAPDeposit.get_record(recid)

# update the tasks of the specified zenodo deposit (filename: status)
# this way we can attach multiple deposits
zenodo = get_zenodo_deposit_from_record(record, depid)
for task in tasks:
zenodo['tasks'][task['task_id']] = task['result']

record.commit()
db.session.commit()

current_app.logger.info(
f'COMPLETED: Zenodo {depid} uploads:\n{tasks}')
14 changes: 14 additions & 0 deletions cap/modules/deposit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,20 @@ def add_api_to_links(links):
return response


def get_zenodo_deposit_from_record(record, pid):
"""Get the related Zenodo information from a record."""
try:
index = [idx for idx, deposit in enumerate(record['_zenodo'])
if deposit['id'] == pid][0]

# set an empty dict as tasks if there is none
record['_zenodo'][index].setdefault('tasks', {})
return record['_zenodo'][index]
except IndexError:
raise FileUploadError(
'The Zenodo pid you provided is not associated with this record.')


def create_zenodo_deposit(token, data=None):
"""Create a Zenodo deposit using the logged in user's credentials."""
zenodo_url = current_app.config.get("ZENODO_SERVER_URL")
Expand Down
18 changes: 18 additions & 0 deletions cap/modules/services/views/zenodo.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@

import requests
from flask import current_app, jsonify
from invenio_pidstore.resolver import Resolver

from . import blueprint
from cap.modules.access.utils import login_required
from cap.modules.deposit.api import CAPDeposit


def _get_zenodo_record(zenodo_id):
Expand All @@ -47,3 +50,18 @@ def get_zenodo_record(zenodo_id):
"""Get record from zenodo (route)."""
resp, status = _get_zenodo_record(zenodo_id)
return jsonify(resp), status


@blueprint.route('/zenodo/tasks/<depid>')
@login_required
def get_zenodo_tasks(depid):
"""Get record from zenodo (route)."""
resolver = Resolver(pid_type='depid',
object_type='rec',
getter=lambda x: x)

_, uuid = resolver.resolve(depid)
record = CAPDeposit.get_record(uuid)
tasks = record.get('_zenodo', {}).get('tasks', [])

return jsonify(tasks), 200
5 changes: 2 additions & 3 deletions tests/integration/test_zenodo_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def test_create_and_upload_to_zenodo(mock_token, app, users, deposit_with_file,

assert len(record['_zenodo']) == 1
assert record['_zenodo'][0]['id'] == 111
assert record['_zenodo'][0]['title'] == None
assert record['_zenodo'][0]['title'] is None
assert record['_zenodo'][0]['created'] == '2020-11-20T11:49:39.147767+00:00'


Expand Down Expand Up @@ -382,8 +382,7 @@ def test_zenodo_upload_file_not_uploaded_error(mock_token, app, users, deposit_w
assert resp.status_code == 201

captured = capsys.readouterr()
assert 'Uploading file test-file.txt to deposit 111 failed with 500' \
in captured.err
assert 'Zenodo upload of file `test-file.txt`: 500.' in captured.err


@patch('cap.modules.deposit.api._fetch_token', return_value='test-token')
Expand Down

0 comments on commit edf3af4

Please sign in to comment.