Skip to content

Commit

Permalink
Cred client (#353)
Browse files Browse the repository at this point in the history
* credentials client

* don't overwrite job option if present

* <bot> update requirements-docs.txt

* <bot> update requirements-tests.txt

* <bot> update requirements.txt

* fix flake8

* better logging

* fix for when time is longer than 1 day

* s3 debugging

* try stripping / from s3 keys

* for S3, the ETAG is the md5 checksum. use it

* add upload ETAG test

* <bot> update requirements-docs.txt

* <bot> update requirements-tests.txt

* <bot> update requirements.txt

---------

Co-authored-by: github-actions <[email protected]>
  • Loading branch information
dsschult and github-actions authored Jul 17, 2023
1 parent adbcd74 commit c4267b8
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 34 deletions.
23 changes: 14 additions & 9 deletions iceprod/core/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,16 +415,21 @@ def _d():
headers={'Content-Type': m.content_type})
r.raise_for_status()
if checksum: # get checksum
r = s.get(url, stream=True, timeout=300)
try:
with open(local+'.tmp', 'wb') as f:
for chunk in r.iter_content(65536):
f.write(chunk)
r.raise_for_status()
if sha512sum(local+'.tmp') != chksum:
if 'ETAG' in r.headers:
md5 = r.headers['ETAG'].strip('"\'')
if md5sum(local) != md5:
raise Exception('http checksum error')
finally:
removedirs(local+'.tmp')
else:
r = s.get(url, stream=True, timeout=300)
try:
with open(local+'.tmp', 'wb') as f:
for chunk in r.iter_content(65536):
f.write(chunk)
r.raise_for_status()
if sha512sum(local+'.tmp') != chksum:
raise Exception('http checksum error')
finally:
removedirs(local+'.tmp')
await asyncio.get_event_loop().run_in_executor(None, _d)
elif url.startswith('file:'):
# use copy command
Expand Down
73 changes: 73 additions & 0 deletions iceprod/credentials/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import argparse
import asyncio
from ..client_auth import add_auth_to_argparse, create_rest_client


async def main():
parser = argparse.ArgumentParser()

subparsers = parser.add_subparsers(title='Cred Types', description='Register a credential', required=True)

def generic_args(p):
p.add_argument('url', default='https://data.icecube.aq', help='data base url')
g = p.add_mutually_exclusive_group(required=True)
g.add_argument('--user', help='username to assign cred to')
g.add_argument('--group', help='group to assign cred to')

parser_s3 = subparsers.add_parser('S3', aliases=['s3'], help='Register an S3 credential')
parser_s3.set_defaults(type_='s3')
generic_args(parser_s3)
parser_s3.add_argument('access_key', metavar='access-key', help='s3 access key')
parser_s3.add_argument('secret_key', metavar='secret-key', help='s3 secret key')
parser_s3.add_argument('bucket', action='append', help='bucket(s) available (use multiple times for more buckets)')

parser_oauth = subparsers.add_parser('OAuth', aliases=['oauth'], help='Register an OAuth credential', description='Must include either an access or refresh token')
parser_oauth.set_defaults(type_='oauth')
generic_args(parser_oauth)
parser_oauth.add_argument('--access-token', dest='access_token', help='access token')
parser_oauth.add_argument('--refresh-token', dest='refresh_token', help='refresh token')
parser_oauth.add_argument('--expire-date', dest='expire_date', type=float, default=None, help='(optional) manual expiration date in unix time')
parser_oauth.add_argument('--last-use', dest='last_use', type=float, default=None, help='(optional) manual last use date in unix time')

add_auth_to_argparse(parser)
args = parser.parse_args()

if args.type_ == 'oauth' and not (args.access_token or args.refresh_token):
raise argparse.ArgumentError(argument=args.refresh_token, message='--access-token or --refresh-token is required')

args.rest_url = 'https://credentials.iceprod.icecube.aq'
rc = create_rest_client(args)

if args.user:
url = f'/users/{args.user}/credentials'
else:
url = f'/groups/{args.group}/credentials'

data = {
'url': args.url,
'type': args.type_,
}

if args.type_ == 's3':
data.update({
'access_key': args.access_key,
'secret_key': args.secret_key,
'buckets': args.bucket,
})
elif args.type_ == 'oauth':
if args.access_token:
data['access_token'] = args.access_token
if args.refresh_token:
data['refresh_token'] = args.refresh_token
if args.expire_date:
data['expire_date'] = args.expire_date
if args.last_use:
data['last_use'] = args.last_use
else:
raise RuntimeError('bad type')

await rc.request('POST', url, data)


if __name__ == '__main__':
asyncio.run(main())
9 changes: 7 additions & 2 deletions iceprod/server/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,8 @@ def expand_remote(cfg):
processing_time = timedelta(seconds=self.queue_cfg['max_task_processing_time'])
except Exception:
processing_time = timedelta(seconds=86400*2)
expiration = (queued_time + processing_time).seconds
expiration = (queued_time + processing_time).total_seconds()
logger.info(f's3 cred expire time: {expiration}')

def presign_s3(cfg):
new_data = []
Expand All @@ -469,7 +470,11 @@ def presign_s3(cfg):
if bucket not in s3_creds[url]['buckets']:
raise RuntimeError('bad s3 bucket')

while key.startswith('/'):
key = key[1:]

s = S3(url, s3_creds[url]['access_key'], s3_creds[url]['secret_key'], bucket=bucket)
logger.info(f'S3 url={url} bucket={bucket} key={key}')
if d['movement'] == 'input':
d['remote'] = s.get_presigned(key, expiration=expiration)
new_data.append(d)
Expand Down Expand Up @@ -591,7 +596,7 @@ def create_config(self, task):
config['options']['debug'] = task['debug']
config['options']['upload'] = 'logging'
config['options']['gridspec'] = self.gridspec
if 'site_temp' in self.cfg['queue']:
if (not config['options'].get('site_temp','')) and 'site_temp' in self.cfg['queue']:
config['options']['site_temp'] = self.cfg['queue']['site_temp']
if ('download' in self.cfg and 'http_username' in self.cfg['download']
and self.cfg['download']['http_username']):
Expand Down
4 changes: 3 additions & 1 deletion iceprod/server/plugins/condor_direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ async def get_hold_reason(self, submit_dir, resources=None):
if resource_type:
if val:
resources[resource_type] = val
reason = f'Resource overusage for {resource_type}: {resources[resource_type]}'
reason = f'Resource overusage for {resource_type}: '
if resource_type in resources:
reason += f'{resources[resource_type]}'
break
elif 'Transfer output files failure' in line:
reason = 'Failed to transfer output files'
Expand Down
15 changes: 9 additions & 6 deletions requirements-docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ asyncache==0.3.1
# via iceprod (setup.py)
babel==2.12.1
# via sphinx
boto3==1.28.1
boto3==1.28.3
# via iceprod (setup.py)
botocore==1.31.1
botocore==1.31.3
# via
# boto3
# s3transfer
Expand All @@ -33,12 +33,12 @@ cffi==1.15.1
# via cryptography
charset-normalizer==3.2.0
# via requests
cryptography==41.0.1
cryptography==41.0.2
# via
# iceprod (setup.py)
# pyjwt
# pyopenssl
dnspython==2.3.0
dnspython==2.4.0
# via pymongo
docutils==0.20.1
# via sphinx
Expand All @@ -47,7 +47,9 @@ exceptiongroup==1.1.2
h11==0.14.0
# via httpcore
httpcore==0.17.3
# via httpx
# via
# dnspython
# httpx
httpx==0.24.1
# via iceprod (setup.py)
idna==3.4
Expand Down Expand Up @@ -81,7 +83,7 @@ pygments==2.15.1
# via sphinx
pyjwt[crypto]==2.7.0
# via wipac-rest-tools
pymongo==4.4.0
pymongo==4.4.1
# via
# iceprod (setup.py)
# motor
Expand Down Expand Up @@ -120,6 +122,7 @@ six==1.16.0
sniffio==1.3.0
# via
# anyio
# dnspython
# httpcore
# httpx
snowballstemmer==2.2.0
Expand Down
21 changes: 12 additions & 9 deletions requirements-tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ asyncache==0.3.1
# via iceprod (setup.py)
beautifulsoup4==4.12.2
# via iceprod (setup.py)
boto3==1.28.1
boto3==1.28.3
# via
# iceprod (setup.py)
# moto
botocore==1.31.1
botocore==1.31.3
# via
# boto3
# moto
Expand All @@ -38,13 +38,13 @@ coverage[toml]==7.2.7
# via
# iceprod (setup.py)
# pytest-cov
cryptography==41.0.1
cryptography==41.0.2
# via
# iceprod (setup.py)
# moto
# pyjwt
# pyopenssl
dnspython==2.3.0
dnspython==2.4.0
# via pymongo
exceptiongroup==1.1.2
# via
Expand All @@ -57,7 +57,9 @@ flexmock==0.11.3
h11==0.14.0
# via httpcore
httpcore==0.17.3
# via httpx
# via
# dnspython
# httpx
httpx==0.24.1
# via
# iceprod (setup.py)
Expand All @@ -83,9 +85,9 @@ markupsafe==2.1.3
# werkzeug
mccabe==0.7.0
# via flake8
mock==5.0.2
mock==5.1.0
# via iceprod (setup.py)
moto==4.1.12
moto==4.1.13
# via iceprod (setup.py)
motor==3.2.0
# via iceprod (setup.py)
Expand All @@ -105,7 +107,7 @@ pyflakes==3.0.1
# via flake8
pyjwt[crypto]==2.7.0
# via wipac-rest-tools
pymongo==4.4.0
pymongo==4.4.1
# via
# iceprod (setup.py)
# motor
Expand All @@ -119,7 +121,7 @@ pytest==7.4.0
# pytest-asyncio
# pytest-cov
# pytest-mock
pytest-asyncio==0.21.0
pytest-asyncio==0.21.1
# via iceprod (setup.py)
pytest-cov==4.1.0
# via iceprod (setup.py)
Expand Down Expand Up @@ -169,6 +171,7 @@ six==1.16.0
sniffio==1.3.0
# via
# anyio
# dnspython
# httpcore
# httpx
soupsieve==2.4.1
Expand Down
15 changes: 9 additions & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ anyio==3.7.1
# via httpcore
asyncache==0.3.1
# via iceprod (setup.py)
boto3==1.28.1
boto3==1.28.3
# via iceprod (setup.py)
botocore==1.31.1
botocore==1.31.3
# via
# boto3
# s3transfer
Expand All @@ -29,19 +29,21 @@ cffi==1.15.1
# via cryptography
charset-normalizer==3.2.0
# via requests
cryptography==41.0.1
cryptography==41.0.2
# via
# iceprod (setup.py)
# pyjwt
# pyopenssl
dnspython==2.3.0
dnspython==2.4.0
# via pymongo
exceptiongroup==1.1.2
# via anyio
h11==0.14.0
# via httpcore
httpcore==0.17.3
# via httpx
# via
# dnspython
# httpx
httpx==0.24.1
# via iceprod (setup.py)
idna==3.4
Expand All @@ -65,7 +67,7 @@ pycparser==2.21
# via cffi
pyjwt[crypto]==2.7.0
# via wipac-rest-tools
pymongo==4.4.0
pymongo==4.4.1
# via
# iceprod (setup.py)
# motor
Expand Down Expand Up @@ -103,6 +105,7 @@ six==1.16.0
sniffio==1.3.0
# via
# anyio
# dnspython
# httpcore
# httpx
statsd==4.0.1
Expand Down
31 changes: 30 additions & 1 deletion tests/core/functions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ async def test_402_upload(self, http_mock):
options=download_options)

@requests_mock.mock()
@unittest_reporter(name='upload() http')
@unittest_reporter(name='upload() http POST')
async def test_403_upload(self, http_mock):
"""Test the upload function"""
download_options = {'username': 'user',
Expand Down Expand Up @@ -875,6 +875,35 @@ async def test_403_upload(self, http_mock):
'http://prod-exe.icecube.wisc.edu/globus.tar.gz',
options=download_options)

@requests_mock.mock()
@unittest_reporter(name='upload() http s3 ETAG')
async def test_403_upload(self, http_mock):
"""Test the upload function with ETAG"""
download_options = {}
data = b'the data'
filename = os.path.join(self.test_dir, 'globus.tar.gz')
with open(filename, 'wb') as f:
f.write(data)

# upload file to http
http_mock.put('/globus.tar.gz', content=b'', headers={'ETAG': iceprod.core.functions.md5sum(filename)})
http_mock.get('/globus.tar.gz', content=b'', status_code=403)
await iceprod.core.functions.upload(filename,
'http://prod-exe.icecube.wisc.edu/globus.tar.gz',
options=download_options)
self.assertTrue(http_mock.called)
req = http_mock.request_history[0]
self.assertEqual(req.method, 'PUT', msg='not a PUT request first')
self.assertEqual(os.path.basename(req.url), 'globus.tar.gz', msg='bad upload url')
self.assertEqual(len(http_mock.request_history), 1, msg='more than one http request')

# test bad upload
http_mock.put('/globus.tar.gz', content=b'', headers={'ETAG': 'blah'})
with self.assertRaises(Exception):
await iceprod.core.functions.upload(filename,
'http://prod-exe.icecube.wisc.edu/globus.tar.gz',
options=download_options)

@unittest_reporter(name='upload() file')
async def test_404_upload(self):
"""Test the upload function"""
Expand Down

0 comments on commit c4267b8

Please sign in to comment.