Skip to content

Commit

Permalink
get all tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
dsschult committed Apr 19, 2024
1 parent bb60dfb commit 29ac821
Show file tree
Hide file tree
Showing 45 changed files with 1,448 additions and 2,542 deletions.
2 changes: 1 addition & 1 deletion iceprod/rest/handlers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async def post(self):

if 'job_id' in data:
try:
job_id = uuid.UUID(hex=data['job_id']).hex
uuid.UUID(hex=data['job_id']).hex
except Exception:
raise tornado.web.HTTPError(400, reason='job_id should be a valid uuid')

Expand Down
26 changes: 18 additions & 8 deletions iceprod/rest/handlers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ async def post(self):
opt_fields = {
'status': str,
'priority': float,
'instance_id': str,
}
for k in opt_fields:
if k in data and not isinstance(data[k], opt_fields[k]):
Expand All @@ -181,12 +182,13 @@ async def post(self):
'walltime_err': 0.0,
'walltime_err_n': 0,
'site': '',
'instance_id': '',
})
if 'status' not in data:
data['status'] = TASK_STATUS_START
if 'priority' not in data:
data['priority'] = 1.
if 'instance_id' not in data:
data['instance_id'] = ''

await self.db.tasks.insert_one(data)
self.set_status(201)
Expand Down Expand Up @@ -816,7 +818,7 @@ async def post(self, task_id):
task_id (str): task id
Body args (json):
instance_id (str): task instance id
instance_id (str): task instance id
time_used (int): (optional) time used to run task, in seconds
resources (dict): (optional) resources used by task
site (str): (optional) site the task was running at
Expand Down Expand Up @@ -907,7 +909,11 @@ async def post(self, task_id):
)
if not ret:
logger.info('filter_query: %r', filter_query)
self.send_error(400, reason="Task not found")
ret = await self.db.tasks.find_one({'task_id': task_id, 'instance_id': data['instance_id']})
if not ret:
self.send_error(404, reason="Task not found")
else:
self.send_error(400, reason="Bad state transition for status")
else:
self.write(ret)
self.finish()
Expand All @@ -930,7 +936,7 @@ async def post(self, task_id):
task_id (str): task id
Body args (json):
instance_id (str): task instance id
instance_id (str): task instance id
time_used (int): (optional) time used to run task, in seconds
site (str): (optional) site the task was running at
Expand Down Expand Up @@ -969,7 +975,11 @@ async def post(self, task_id):
)
if not ret:
logger.info('filter_query: %r', filter_query)
self.send_error(400, reason="Task not found or not processing")
ret = await self.db.tasks.find_one({'task_id': task_id, 'instance_id': data['instance_id']})
if not ret:
self.send_error(404, reason="Task not found")
else:
self.send_error(400, reason="Bad state transition for status")
else:
self.write(ret)
self.finish()
Expand Down Expand Up @@ -1107,7 +1117,7 @@ async def post(self, dataset_id):
raise tornado.web.HTTPError(400, reason='Too many tasks specified (limit: 100k)')
query['task_id'] = {'$in': tasks}

ret = await self.db.tasks.update_many(query, {'$set': update_data})
await self.db.tasks.update_many(query, {'$set': update_data})
self.write({})
self.finish()

Expand Down Expand Up @@ -1155,7 +1165,7 @@ async def post(self, dataset_id):
raise tornado.web.HTTPError(400, reason='Too many tasks specified (limit: 100k)')
query['task_id'] = {'$in': tasks}

ret = await self.db.tasks.update_many(query, {'$set': update_data})
await self.db.tasks.update_many(query, {'$set': update_data})
self.write({})
self.finish()

Expand Down Expand Up @@ -1204,7 +1214,7 @@ async def post(self, dataset_id):
raise tornado.web.HTTPError(400, reason='Too many tasks specified (limit: 100k)')
query['task_id'] = {'$in': tasks}

ret = await self.db.tasks.update_many(query, {'$set': update_data})
await self.db.tasks.update_many(query, {'$set': update_data})
self.write({})
self.finish()

Expand Down
4 changes: 2 additions & 2 deletions iceprod/scheduled_tasks/job_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ async def run(rest_client, dataset_id=None, debug=False):
logger.info('dataset %s job %s status -> complete', dataset_id, job_id)
args = {'status': 'complete'}
await rest_client.request('PUT', '/datasets/{}/jobs/{}/status'.format(dataset_id,job_id), args)
elif task_statuses - {'complete', 'suspended'}:
elif not task_statuses - {'complete', 'suspended'}:
logger.info('dataset %s job %s status -> suspended', dataset_id, job_id)
args = {'status': 'suspended'}
await rest_client.request('PUT', '/datasets/{}/jobs/{}/status'.format(dataset_id,job_id), args)
elif task_statuses - {'complete', 'failed', 'suspended'}:
elif not task_statuses - {'complete', 'failed', 'suspended'}:
logger.info('dataset %s job %s status -> errors', dataset_id, job_id)
args = {'status': 'errors'}
await rest_client.request('PUT', '/datasets/{}/jobs/{}/status'.format(dataset_id,job_id), args)
Expand Down
74 changes: 0 additions & 74 deletions iceprod/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
Some general functions used by the iceprod server
"""

from __future__ import absolute_import, division, print_function

import os
import sys
import logging
Expand Down Expand Up @@ -43,78 +41,6 @@ def run_module(name,*args,**kwargs):
return (getattr(x,class_name))(*args,**kwargs)


class GlobalID(object):
"""Global ID configuration and generation"""
import string
# never change these settings, otherwise all old ids will fail
CHARS = string.ascii_letters+string.digits
CHARS_LEN = len(CHARS)
# define dict to make reverse lookup super fast
INTS_DICT = {c:i for i,c in enumerate(CHARS)}
IDLEN = 15
MAXSITEID = 10**10
MAXLOCALID = 10**15

@classmethod
def int2char(cls,i):
if not isinstance(i,int) or i < 0: # only deal with positive ints
logging.warning('bad input to int2char: %r',i)
raise Exception('bad input to int2char')
out = ''
while i >= 0:
out += cls.CHARS[i%cls.CHARS_LEN]
i = i//cls.CHARS_LEN - 1
return out[::-1]

@classmethod
def char2int(cls,c):
if not isinstance(c,str) or len(c) < 1: # only deal with string
logging.warning('bad input to char2int: %r',c)
raise Exception('bad input to char2int')
out = -1
for i,cc in enumerate(reversed(c)):
if cc not in cls.CHARS:
raise Exception('non-char input to chars2int')
out += (cls.INTS_DICT[cc]+1)*(cls.CHARS_LEN**i)
return out

@classmethod
def siteID_gen(cls):
"""Generate a new site id"""
import random
return cls.int2char(random.randint(0,cls.MAXSITEID-1))

@classmethod
def globalID_gen(cls,id,site_id):
"""Generate a new global id given a local id and site id"""
if isinstance(id,str):
id = cls.char2int(id)
elif not isinstance(id,int):
raise Exception('id is not a string, int, or long')
if isinstance(site_id,str):
return cls.int2char(cls.char2int(site_id)*cls.MAXLOCALID+id)
elif isinstance(site_id,int):
return cls.int2char(site_id*cls.MAXLOCALID+id)
else:
raise Exception('Site id is not a string, int, or long')

@classmethod
def localID_ret(cls,id,type='str'):
"""Retrieve a local id from a global id"""
ret = cls.char2int(id) % cls.MAXLOCALID
if type == 'str':
ret = cls.int2char(ret)
return ret

@classmethod
def siteID_ret(cls,id,type='str'):
"""Retrieve a site id from a global id"""
ret = cls.char2int(id) // cls.MAXLOCALID
if type == 'str':
ret = cls.int2char(ret)
return ret


def salt(length=2):
"""Returns a string of random letters"""
import string
Expand Down
1 change: 0 additions & 1 deletion iceprod/server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from pathlib import Path

import jsonschema
from iceprod.server import GlobalID

logger = logging.getLogger('config')

Expand Down
27 changes: 14 additions & 13 deletions iceprod/server/data/condor_transfer_plugins/gsiftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@
"""

import classad
import json
import glob
import os
import posixpath
import shutil
import socket
import sys
import subprocess
import time
Expand All @@ -25,7 +21,7 @@
EXIT_AUTHENTICATION_REFRESH = 2


def print_help(stream = sys.stderr):
def print_help(stream=sys.stderr):
help_msg = '''Usage: {0} -infile <input-filename> -outfile <output-filename>
{0} -classad
Expand All @@ -39,16 +35,18 @@ def print_help(stream = sys.stderr):
'''
stream.write(help_msg.format(sys.argv[0]))


def print_capabilities():
capabilities = {
'MultipleFileSupport': True,
'PluginType': 'FileTransfer',
# SupportedMethods indicates which URL methods/types this plugin supports
'SupportedMethods': 'gsiftp',
'Version': PLUGIN_VERSION,
'MultipleFileSupport': True,
'PluginType': 'FileTransfer',
# SupportedMethods indicates which URL methods/types this plugin supports
'SupportedMethods': 'gsiftp',
'Version': PLUGIN_VERSION,
}
sys.stdout.write(classad.ClassAd(capabilities).printOld())


def parse_args():

# The only argument lists that are acceptable are
Expand Down Expand Up @@ -95,10 +93,12 @@ def parse_args():

return {'infile': infile, 'outfile': outfile, 'upload': is_upload}


def format_error(error):
return '{0}: {1}'.format(type(error).__name__, str(error))

def get_error_dict(error, url = ''):

def get_error_dict(error, url=''):
error_string = format_error(error)
error_dict = {
'TransferSuccess': False,
Expand All @@ -108,6 +108,7 @@ def get_error_dict(error, url = ''):

return error_dict


class GridftpPlugin:

def setup_env(self):
Expand Down Expand Up @@ -196,7 +197,7 @@ def upload_file(self, url, local_file_path):
gridftp_plugin = GridftpPlugin()
gridftp_plugin.setup_env()

# Parse in the classads stored in the input file.
# Parse in the classads stored in the input file.
# Each ad represents a single file to be transferred.
try:
infile_ads = classad.parseAds(open(args['infile'], 'r'))
Expand All @@ -223,7 +224,7 @@ def upload_file(self, url, local_file_path):

except Exception as err:
try:
outfile_dict = get_error_dict(err, url = ad['Url'])
outfile_dict = get_error_dict(err, url=ad['Url'])
outfile.write(str(classad.ClassAd(outfile_dict)))
except Exception:
pass
Expand Down
6 changes: 2 additions & 4 deletions iceprod/server/dataset_prio.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from functools import partial
from collections import defaultdict

from iceprod.server import GlobalID


def apply_group_prios(datasets, groups=None, filters=None):
"""
Expand Down Expand Up @@ -84,10 +82,10 @@ def calc_dataset_prio(dataset, queueing_factor_priority=1.0,
# do not allow negative or overly large priorities (they skew things)
p = 0
logging.warning('Priority for dataset %s is invalid, using default',dataset['dataset_id'])
d = GlobalID.localID_ret(dataset['dataset_id'],type='int')
d = dataset['dataset']
if d < 0:
d = 0
logging.warning('Dataset for dataset %s is invalid, using default',dataset['dataset_id'])
logging.warning('Dataset num for dataset %s is invalid, using default',dataset['dataset_id'])
t = dataset['tasks_submitted']

# return prio
Expand Down
Loading

0 comments on commit 29ac821

Please sign in to comment.