Skip to content

Commit

Permalink
Merge pull request sciencemesh#1 from JarCz/share_api
Browse files Browse the repository at this point in the history
Share api
  • Loading branch information
JarCz authored Sep 7, 2020
2 parents e60b6ae + 3f95a4f commit 10afff6
Show file tree
Hide file tree
Showing 20 changed files with 422 additions and 339 deletions.
3 changes: 1 addition & 2 deletions cs3api_test_ext/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from notebook.utils import url_path_join

from ._version import __version__
from .cs3apischeckpoint import CS3APIsCheckpoints
from .cs3apismanager import CS3APIsManager
from cs3api_test_ext.api.cs3apischeckpoint import CS3APIsCheckpoints
from .handlers import handlers


Expand Down
Empty file added cs3api_test_ext/api/__init__.py
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -7,102 +7,47 @@
"""

import http
import sys
import time

import cs3.gateway.v1beta1.gateway_api_pb2 as cs3gw
import cs3.gateway.v1beta1.gateway_api_pb2_grpc as cs3gw_grpc
import cs3.rpc.code_pb2 as cs3code
import cs3.storage.provider.v1beta1.provider_api_pb2 as cs3sp
import cs3.storage.provider.v1beta1.resources_pb2 as cs3spr
import cs3.types.v1beta1.types_pb2 as types
import grpc
import requests

from cs3api_test_ext.auth.channel_connector import ChannelConnector
from cs3api_test_ext.auth.authenticator import Authenticator
from cs3api_test_ext.api.file_utils import FileUtils as file_utils

class Cs3FileApi:
tokens = {} # map userid [string] to {authentication token, token expiration time}

class Cs3FileApi:
log = None
chunksize = 4194304
authtokenvalidity = 3600
auth_token_validity = 3600
cs3_stub = None
home_dir = ""

client_id = None
client_secret = None

def __init__(self, config, log):
connector = None
auth = None

def __init__(self, config, log):

self.log = log
self.chunksize = int(config['chunksize'])
self.authtokenvalidity = int(config['authtokenvalidity'])

secure_channel = bool(config['secure_channel'])
reva_host = config['revahost']

self.auth_token_validity = int(config['auth_token_validity'])
self.client_id = config['client_id']
self.client_secret = config['client_secret']
self.home_dir = config['home_dir']
self.connector = ChannelConnector(config, log)

# prepare the gRPC connection

if secure_channel:

try:

cert = open(config['client_cert'], 'rb').read()
key = open(config['client_key'], 'rb').read()

ca_cert = open(config['ca_cert'], 'rb').read()
credentials = grpc.ssl_channel_credentials(ca_cert, key, cert)

ch = grpc.secure_channel(reva_host, credentials)

except:
ex = sys.exc_info()[0]
self.log.error('msg="Error create secure channel" reason="%s"' % ex)
raise IOError(ex)

else:
ch = grpc.insecure_channel(reva_host)

self.cs3_stub = cs3gw_grpc.GatewayAPIStub(ch)
channel = self.connector.get_channel()
self.auth = Authenticator(config, channel)
self.cs3_stub = cs3gw_grpc.GatewayAPIStub(channel)
return

def _authenticate(self, userid):

# ToDo: use real authentication data or get token from author provider
# authReq = cs3gw.AuthenticateRequest(type='bearer', client_secret=userid)

if userid not in self.tokens or self.tokens[userid]['exp'] < time.time():
auth_req = cs3gw.AuthenticateRequest(type='basic', client_id=self.client_id, client_secret=self.client_secret)
auth_res = self.cs3_stub.Authenticate(auth_req)
self.tokens[userid] = {'tok': auth_res.token, 'exp': time.time() + self.authtokenvalidity}

# piggy back on the opportunity to expire old tokens, but asynchronously
# as to not impact the current session: let's use python3.7's coroutines support
# asyncio.run(_async_cleanuptokens())

return self.tokens[userid]['tok']

def _cs3_reference(self, fileid, endpoint=None):

if len(fileid) > 0 and fileid[0] == '/':

# assume this is a filepath
if len(self.home_dir) > 0 and not fileid.startswith(self.home_dir):
fileid = self.home_dir + fileid

file = cs3spr.Reference(path=fileid)
return file

if endpoint == 'default' or endpoint is None:
raise IOError('A CS3API-compatible storage endpoint must be identified by a storage UUID')

# assume we have an opaque fileid
return cs3spr.Reference(id=cs3spr.ResourceId(storage_id=endpoint, opaque_id=fileid))

def stat(self, fileid, userid, endpoint=None):

"""
Expand All @@ -113,9 +58,10 @@ def stat(self, fileid, userid, endpoint=None):

time_start = time.time()

ref = self._cs3_reference(fileid, endpoint)
ref = file_utils.get_reference(fileid, self.home_dir, endpoint)

stat_info = self.cs3_stub.Stat(request=cs3sp.StatRequest(ref=ref), metadata=[('x-access-token', self._authenticate(userid))])
stat_info = self.cs3_stub.Stat(request=cs3sp.StatRequest(ref=ref),
metadata=[('x-access-token', self.auth.authenticate(userid))])

time_end = time.time()
self.log.info('msg="Invoked stat" fileid="%s" elapsedTimems="%.1f"' % (fileid, (time_end - time_start) * 1000))
Expand Down Expand Up @@ -143,28 +89,32 @@ def read_file(self, filepath, userid, endpoint=None):
#
# Prepare endpoint
#
reference = self._cs3_reference(filepath, endpoint)
reference = file_utils.get_reference(filepath, self.home_dir, endpoint)

req = cs3sp.InitiateFileDownloadRequest(ref=reference)

init_file_download = self.cs3_stub.InitiateFileDownload(request=req, metadata=[('x-access-token', self._authenticate(userid))])
init_file_download = self.cs3_stub.InitiateFileDownload(request=req, metadata=[
('x-access-token', self.auth.authenticate(userid))])

if init_file_download.status.code == cs3code.CODE_NOT_FOUND:
self.log.info('msg="File not found on read" filepath="%s"' % filepath)
raise IOError('No such file or directory')

elif init_file_download.status.code != cs3code.CODE_OK:
self.log.debug('msg="Failed to initiateFileDownload on read" filepath="%s" reason="%s"' % filepath, init_file_download.status.message)
self.log.debug('msg="Failed to initiateFileDownload on read" filepath="%s" reason="%s"' % filepath,
init_file_download.status.message)
raise IOError(init_file_download.status.message)

self.log.debug('msg="readfile: InitiateFileDownloadRes returned" endpoint="%s"' % init_file_download.download_endpoint)
self.log.debug(
'msg="readfile: InitiateFileDownloadRes returned" endpoint="%s"' % init_file_download.download_endpoint)

#
# Download
#
file_get = None
try:
file_get = requests.get(url=init_file_download.download_endpoint, headers={'x-access-token': self._authenticate(userid)})
file_get = requests.get(url=init_file_download.download_endpoint,
headers={'x-access-token': self.auth.authenticate(userid)})
except requests.exceptions.RequestException as e:
self.log.error('msg="Exception when downloading file from Reva" reason="%s"' % e)
raise IOError(e)
Expand All @@ -173,10 +123,12 @@ def read_file(self, filepath, userid, endpoint=None):
data = file_get.content

if file_get.status_code != http.HTTPStatus.OK:
self.log.error('msg="Error downloading file from Reva" code="%d" reason="%s"' % (file_get.status_code, file_get.reason))
self.log.error('msg="Error downloading file from Reva" code="%d" reason="%s"' % (
file_get.status_code, file_get.reason))
raise IOError(file_get.reason)
else:
self.log.info('msg="File open for read" filepath="%s" elapsedTimems="%.1f"' % (filepath, (time_end - time_start) * 1000))
self.log.info('msg="File open for read" filepath="%s" elapsedTimems="%.1f"' % (
filepath, (time_end - time_start) * 1000))
for i in range(0, len(data), self.chunksize):
yield data[i:i + self.chunksize]

Expand All @@ -191,24 +143,27 @@ def write_file(self, filepath, userid, content, endpoint=None):
#
time_start = time.time()

reference = self._cs3_reference(filepath, endpoint)
reference = file_utils.get_reference(filepath, self.home_dir, endpoint)

if isinstance(content, str):
content_size = str(len(content))
else:
content_size = str(len(content.decode('utf-8')))

meta_data = types.Opaque(map={"Upload-Length": types.OpaqueEntry(decoder="plain", value=str.encode(content_size))})
meta_data = types.Opaque(
map={"Upload-Length": types.OpaqueEntry(decoder="plain", value=str.encode(content_size))})

req = cs3sp.InitiateFileUploadRequest(ref=reference, opaque=meta_data)
init_file_upload_res = self.cs3_stub.InitiateFileUpload(request=req, metadata=[('x-access-token', self._authenticate(userid))])
init_file_upload_res = self.cs3_stub.InitiateFileUpload(request=req, metadata=[
('x-access-token', self.auth.authenticate(userid))])

if init_file_upload_res.status.code != cs3code.CODE_OK:
self.log.debug('msg="Failed to initiateFileUpload on write" filepath="%s" reason="%s"' % \
(filepath, init_file_upload_res.status.message))
raise IOError(init_file_upload_res.status.message)

self.log.debug('msg="writefile: InitiateFileUploadRes returned" endpoint="%s"' % init_file_upload_res.upload_endpoint)
self.log.debug(
'msg="writefile: InitiateFileUploadRes returned" endpoint="%s"' % init_file_upload_res.upload_endpoint)

#
# Upload
Expand All @@ -218,7 +173,7 @@ def write_file(self, filepath, userid, content, endpoint=None):
'Tus-Resumable': '1.0.0',
'File-Path': filepath,
'File-Size': content_size,
'x-access-token': self._authenticate(userid),
'x-access-token': self.auth.authenticate(userid),
'X-Reva-Transfer': init_file_upload_res.token
}
put_res = requests.put(url=init_file_upload_res.upload_endpoint, data=content, headers=headers)
Expand All @@ -230,20 +185,22 @@ def write_file(self, filepath, userid, content, endpoint=None):
time_end = time.time()

if put_res.status_code != http.HTTPStatus.OK:
self.log.error('msg="Error uploading file to Reva" code="%d" reason="%s"' % (put_res.status_code, put_res.reason))
self.log.error(
'msg="Error uploading file to Reva" code="%d" reason="%s"' % (put_res.status_code, put_res.reason))
raise IOError(put_res.reason)

self.log.info('msg="File open for write" filepath="%s" elapsedTimems="%.1f"' % (filepath, (time_end - time_start) * 1000))
self.log.info(
'msg="File open for write" filepath="%s" elapsedTimems="%.1f"' % (filepath, (time_end - time_start) * 1000))

def remove(self, filepath, userid, endpoint=None):
"""
Remove a file or container using the given userid as access token.
"""

reference = self._cs3_reference(filepath, endpoint)
reference = file_utils.get_reference(filepath, self.home_dir, endpoint)

req = cs3sp.DeleteRequest(ref=reference)
res = self.cs3_stub.Delete(request=req, metadata=[('x-access-token', self._authenticate(userid))])
res = self.cs3_stub.Delete(request=req, metadata=[('x-access-token', self.auth.authenticate(userid))])

if res.status.code != cs3code.CODE_OK:
self.log.warning('msg="Failed to remove file or folder" filepath="%s" error="%s"' % (filepath, res))
Expand All @@ -259,17 +216,18 @@ def read_directory(self, path, userid, endpoint=None):

tstart = time.time()

reference = self._cs3_reference(path, endpoint)
reference = file_utils.get_reference(path, self.home_dir, endpoint)

req = cs3sp.ListContainerRequest(ref=reference, arbitrary_metadata_keys="*")
res = self.cs3_stub.ListContainer(request=req, metadata=[('x-access-token', self._authenticate(userid))])
res = self.cs3_stub.ListContainer(request=req, metadata=[('x-access-token', self.auth.authenticate(userid))])

if res.status.code != cs3code.CODE_OK:
self.log.warning('msg="Failed to read container" filepath="%s" reason="%s"' % (path, res.status.message))
raise IOError(res.status.message)

tend = time.time()
self.log.debug('msg="Invoked read container" filepath="%s" elapsedTimems="%.1f"' % (path, (tend - tstart) * 1000))
self.log.debug(
'msg="Invoked read container" filepath="%s" elapsedTimems="%.1f"' % (path, (tend - tstart) * 1000))

out = []
for info in res.infos:
Expand All @@ -289,18 +247,20 @@ def move(self, source_path, destination_path, userid, endpoint=None):

tstart = time.time()

src_reference = self._cs3_reference(source_path, endpoint)
dest_reference = self._cs3_reference(destination_path, endpoint)
src_reference = file_utils.get_reference(source_path, self.home_dir, endpoint)
dest_reference = file_utils.get_reference(destination_path, self.home_dir, endpoint)

req = cs3sp.MoveRequest(source=src_reference, destination=dest_reference)
res = self.cs3_stub.Move(request=req, metadata=[('x-access-token', self._authenticate(userid))])
res = self.cs3_stub.Move(request=req, metadata=[('x-access-token', self.auth.authenticate(userid))])

if res.status.code != cs3code.CODE_OK:
self.log.error('msg="Failed to move" source="%s" destination="%s" reason="%s"' % (source_path, destination_path, res.status.message))
self.log.error('msg="Failed to move" source="%s" destination="%s" reason="%s"' % (
source_path, destination_path, res.status.message))
raise IOError(res.status.message)

tend = time.time()
self.log.debug('msg="Invoked move" source="%s" destination="%s" elapsedTimems="%.1f"' % (source_path, destination_path, (tend - tstart) * 1000))
self.log.debug('msg="Invoked move" source="%s" destination="%s" elapsedTimems="%.1f"' % (
source_path, destination_path, (tend - tstart) * 1000))

def create_directory(self, path, userid, endpoint=None):

Expand All @@ -310,14 +270,15 @@ def create_directory(self, path, userid, endpoint=None):

tstart = time.time()

reference = self._cs3_reference(path, endpoint)
reference = file_utils.get_reference(path, self.home_dir, endpoint)

req = cs3sp.CreateContainerRequest(ref=reference)
res = self.cs3_stub.CreateContainer(request=req, metadata=[('x-access-token', self._authenticate(userid))])
res = self.cs3_stub.CreateContainer(request=req, metadata=[('x-access-token', self.auth.authenticate(userid))])

if res.status.code != cs3code.CODE_OK:
self.log.warning('msg="Failed to create container" filepath="%s" reason="%s"' % (path, res.status.message))
raise IOError(res.status.message)

tend = time.time()
self.log.debug('msg="Invoked create container" filepath="%s" elapsedTimems="%.1f"' % (path, (tend - tstart) * 1000))
self.log.debug(
'msg="Invoked create container" filepath="%s" elapsedTimems="%.1f"' % (path, (tend - tstart) * 1000))
Loading

0 comments on commit 10afff6

Please sign in to comment.