Skip to content

Commit

Permalink
weights-vapor-connector refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
deksa89 committed Mar 23, 2023
1 parent 89356db commit d46ea46
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 33 deletions.
109 changes: 107 additions & 2 deletions exec/topology-gocdb-connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import uvloop

from argo_connectors.singleton_config import ConfigClass
from argo_connectors.config import Global, CustomerConf
from argo_connectors.exceptions import ConnectorError, ConnectorParseError, ConnectorHttpError
from argo_connectors.log import Logger
from argo_connectors.tasks.common import write_state
Expand Down Expand Up @@ -58,17 +59,95 @@ def main():
help='write data for this date', type=str, required=False)
args = parser.parse_args()

# logger = Logger(os.path.basename(sys.argv[0]))

# fixed_date = None
# if args.date and date_check(args.date):
# fixed_date = args.date

# confpath = args.gloconf[0] if args.gloconf else None
# cglob = Global(sys.argv[0], confpath)
# globopts = cglob.parse()
# pass_extensions = eval(globopts['GeneralPassExtensions'.lower()])

# confpath = args.custconf[0] if args.custconf else None
# confcust = CustomerConf(sys.argv[0], confpath)
# confcust.parse()
# confcust.make_dirstruct()
# confcust.make_dirstruct(globopts['InputStateSaveDir'.lower()])

# topofeed = confcust.get_topofeed()
# topofeedpaging = confcust.get_topofeedpaging()
# uidservendp = confcust.get_uidserviceendpoints()
# topofetchtype = confcust.get_topofetchtype()
# custname = confcust.get_custname()
# #logger.customer = custname #TODO: VIDIT DAL NAM TREBA

# auth_custopts = confcust.get_authopts()
# auth_opts = cglob.merge_opts(auth_custopts, 'authentication')

# auth_complete, missing = cglob.is_complete(auth_opts, 'authentication')
# if not auth_complete:
# logger.error('%s options incomplete, missing %s' %
# ('authentication', ' '.join(missing)))
# raise SystemExit(1)

# bdii_opts = get_bdii_opts(confcust)
# # print("bdii_opts: ", bdii_opts) # za EGI: {'bdii': 'True', 'bdiihost': 'bdii.egi.cro-ngi.hr', 'bdiiport': '2170', 'bdiiquerybase': 'o=grid', 'bdiiqueryfiltersrm': '(&(objectClass=GlueService)(|(GlueServiceType=srm_v1)(GlueServiceType=srm)))', 'bdiiqueryattributessrm': 'GlueServiceEndpoint', 'bdiiqueryfiltersepath': '(objectClass=GlueSATop)', 'bdiiqueryattributessepath': 'GlueVOInfoAccessControlBaseRule GlueVOInfoPath'}

# webapi_opts = get_webapi_opts(cglob, confcust)
# # print("webapi_opts: ", webapi_opts) # za EGI: {'webapitoken': '505c3be00e9e30400b72dbfb0c06268aa73f694b', 'webapihost': 'api.devel.argo.grnet.gr'}

# toposcope = confcust.get_toposcope()
# topofeedendpoints = confcust.get_topofeedendpoints()
# topofeedservicegroups = confcust.get_topofeedservicegroups()
# topofeedsites = confcust.get_topofeedsites()
# notiflag = confcust.get_notif_flag()

# if toposcope:
# SERVICE_ENDPOINTS_PI = topofeedendpoints + toposcope
# SERVICE_GROUPS_PI = topofeedservicegroups + toposcope
# SITES_PI = topofeedsites + toposcope

# else:
# SERVICE_ENDPOINTS_PI = topofeedendpoints
# SERVICE_GROUPS_PI = topofeedservicegroups
# SITES_PI = topofeedsites

# loop = uvloop.new_event_loop()
# asyncio.set_event_loop(loop)


###############################################################################################

loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)

config = ConfigClass(args)
fixed_date = config.get_fixed_date()

###############################################################################################


try:
task = TaskGocdbTopology(config, loop)
# task = TaskGocdbTopology(
# loop, logger, sys.argv[0], SERVICE_ENDPOINTS_PI, SERVICE_GROUPS_PI,
# SITES_PI, globopts, auth_opts, webapi_opts, bdii_opts, confcust,
# custname, topofeed, topofetchtype, fixed_date, uidservendp,
# pass_extensions, topofeedpaging, notiflag)

###############################################################################################


task = TaskGocdbTopology(config, loop) #TODO: OVAKO TREBA IZGLEDATI


###############################################################################################

loop.run_until_complete(task.run())



except (ConnectorError, ConnectorParseError, ConnectorHttpError, KeyboardInterrupt) as exc:
logger.error(repr(exc))
loop.run_until_complete(
Expand All @@ -81,3 +160,29 @@ def main():

if __name__ == '__main__':
main()







# print("loop: ", loop) # <uvloop.Loop running=False closed=False debug=False>
# print("logger: ", logger) # <argo_connectors.log.Logger object at 0x7f66f53b8940>
# print("sys.argv[0]: ", sys.argv[0]) # /usr/libexec/argo-connectors/topology-gocdb-connector.py
# print("SERVICE_ENDPOINTS_PI: ", SERVICE_ENDPOINTS_PI) # https://goc-sdc.argo.grnet.gr//gocdbpi/private/?method=get_service_endpoint&scope=
# print("SERVICE_GROUPS_PI: ", SERVICE_GROUPS_PI) # https://goc-sdc.argo.grnet.gr//gocdbpi/private/?method=get_service_group&scope=
# print("SITES_PI: ", SITES_PI) # https://goc-sdc.argo.grnet.gr//gocdbpi/private/?method=get_site&scope=
# print("globopts: ", globopts) # {'generalwritejson': 'True', 'generalpublishwebapi': 'False', 'generalpassextensions': 'True', 'generalcompressjson': 'False', 'authenticationhostkey': '/etc/grid-security/hostkey.pem', 'authenticationhostcert': '/etc/grid-security/hostcert.pem', 'authenticationcapath': '/etc/grid-security/certificates', 'authenticationcafile': '/etc/pki/tls/certs/ca-bundle.crt', 'authenticationverifyservercert': 'True', 'authenticationuseplainhttpauth': 'False', 'authenticationhttpuser': 'xxxx', 'authenticationhttppass': 'xxxx', 'connectiontimeout': '180', 'connectionretry': '60', 'connectionsleepretry': '60', 'connectionretryrandom': 'True', 'connectionsleeprandomretrymax': '300', 'inputstatesavedir': '/var/lib/argo-connectors/states/', 'inputstatedays': '3', 'webapihost': 'api.devel.argo.grnet.gr', 'outputtopologygroupofendpoints': 'group_endpoints_DATE.json', 'outputtopologygroupofgroups': 'group_groups_DATE.json'}
# print("auth_opts: ", auth_opts) # {'authenticationhostkey': '/etc/grid-security/hostkey.pem', 'authenticationhostcert': '/etc/grid-security/hostcert.pem', 'authenticationcapath': '/etc/grid-security/certificates', 'authenticationcafile': '/etc/pki/tls/certs/ca-bundle.crt', 'authenticationverifyservercert': 'True', 'authenticationuseplainhttpauth': 'False', 'authenticationhttpuser': 'xxxx', 'authenticationhttppass': 'xxxx'}
# print("webapi_opts: ", webapi_opts) # {'webapitoken': '4473153af6c67a650a74d81d367e9e83f70e2b7b', 'webapihost': 'api.devel.argo.grnet.gr'}
# print("bdii_opts: ", bdii_opts) # None
# print("confcust: ", confcust) # <argo_connectors.config.CustomerConf object at 0x7f66f53d32b0>
# print("custname: ", custname) # SDC
# print("topofeed: ", topofeed) # https://goc-sdc.argo.grnet.gr/
# print("topofetchtype: ", topofetchtype) # ['sites', 'servicegroups']
# print("fixed_date: ", fixed_date) # None
# print("uidservendp: ", uidservendp) # False
# print("pass_extensions: ", pass_extensions) # True
# print("topofeedpaging: ", topofeedpaging) # False
# print("notiflag: ", notiflag) # True
53 changes: 35 additions & 18 deletions exec/weights-vapor-connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@
import asyncio
import uvloop

from argo_connectors.singleton_config import ConfigClass
from argo_connectors.exceptions import ConnectorHttpError, ConnectorParseError
from argo_connectors.tasks.vapor_weights import TaskVaporWeights
from argo_connectors.tasks.common import write_weights_metricprofile_state as write_state
from argo_connectors.log import Logger

from argo_connectors.config import Global, CustomerConf
#from argo_connectors.config import Global, CustomerConf
from argo_connectors.utils import date_check

globopts = {}
logger = None


def main():
global logger, globopts
#global logger, globopts
parser = argparse.ArgumentParser(description="""Fetch weights information from Gstat provider
for every job listed in customer.conf""")
parser.add_argument('-c', dest='custconf', nargs=1, metavar='customer.conf',
Expand All @@ -33,22 +34,36 @@ def main():

logger = Logger(os.path.basename(sys.argv[0]))

fixed_date = None
if args.date and date_check(args.date):
fixed_date = args.date
# fixed_date = None
# if args.date and date_check(args.date):
# fixed_date = args.date

confpath = args.gloconf[0] if args.gloconf else None
cglob = Global(sys.argv[0], confpath)
globopts = cglob.parse()
# confpath = args.gloconf[0] if args.gloconf else None
# cglob = Global(sys.argv[0], confpath)
# globopts = cglob.parse()

confpath = args.custconf[0] if args.custconf else None
confcust = CustomerConf(sys.argv[0], confpath)
confcust.parse()
confcust.make_dirstruct()
confcust.make_dirstruct(globopts['InputStateSaveDir'.lower()])
# confpath = args.custconf[0] if args.custconf else None
# confcust = CustomerConf(sys.argv[0], confpath)
# confcust.parse()
# confcust.make_dirstruct()
# confcust.make_dirstruct(globopts['InputStateSaveDir'.lower()])

# VAPORPI = confcust.get_vaporpi()
# feeds = confcust.get_mapfeedjobs(sys.argv[0], deffeed=VAPORPI)

#####################################################################

config = ConfigClass(args)

fixed_date = config.get_fixed_date()
globopts, pass_extensions, cglob = config.get_globopts_n_pass_ext()
confcust = config.get_confcust(globopts)
VAPORPI = config.vaporrpi_data(confcust)
feeds = config.get_feeds(confcust, VAPORPI)


#####################################################################

VAPORPI = confcust.get_vaporpi()
feeds = confcust.get_mapfeedjobs(sys.argv[0], deffeed=VAPORPI)

loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
Expand All @@ -64,9 +79,11 @@ def main():
logger.customer = customers

try:
task = TaskVaporWeights(loop, logger, sys.argv[0], globopts,
confcust, VAPORPI, jobcust, cglob,
fixed_date)
# task = TaskVaporWeights(loop, logger, sys.argv[0], globopts,
# confcust, VAPORPI, jobcust, cglob,
# fixed_date)

task = TaskVaporWeights(config, loop, jobcust)
loop.run_until_complete(task.run())

except (ConnectorHttpError, ConnectorParseError, KeyboardInterrupt) as exc:
Expand Down
10 changes: 8 additions & 2 deletions modules/singleton_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def get_globopts_n_pass_ext(self):
cglob = Global(sys.argv[0], confpath)
globopts = cglob.parse()
pass_extensions = eval(globopts['GeneralPassExtensions'.lower()])
return globopts, pass_extensions
return globopts, pass_extensions, cglob

def get_confcust(self, globopts):
confpath = self.args.custconf[0] if self.args.custconf else None
Expand Down Expand Up @@ -146,4 +146,10 @@ def service_data(self, confcust):

return SERVICE_ENDPOINTS_PI, SERVICE_GROUPS_PI, SITES_PI

# return loop, logger, sys.argv[0], SERVICE_ENDPOINTS_PI, SERVICE_GROUPS_PI, SITES_PI, globopts, auth_opts, webapi_opts, bdii_opts, confcust, custname, topofeed, topofetchtype, fixed_date, uidservendp, pass_extensions, topofeedpaging, notiflag
def vaporrpi_data(self, confcust):
VAPORPI = confcust.get_vaporpi()
return VAPORPI

def get_feeds(self, confcust, VAPORPI):
feeds = confcust.get_mapfeedjobs(sys.argv[0], deffeed=VAPORPI)
return feeds
38 changes: 36 additions & 2 deletions modules/tasks/gocdb_topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from concurrent.futures import ProcessPoolExecutor
from functools import partial

from argo_connectors.singleton_config import ConfigClass
from argo_connectors.parse.gocdb_topology import ParseServiceGroups, ParseServiceEndpoints, ParseSites
from argo_connectors.parse.gocdb_contacts import ParseServiceEndpointContacts, ParseSitesWithContacts, ParseServiceGroupWithContacts
from argo_connectors.exceptions import ConnectorError, ConnectorParseError, ConnectorHttpError
Expand Down Expand Up @@ -93,7 +94,7 @@ def _parse(self):
return count, cursor


class TaskParseTopology(object):
class TaskParseTopology():
def __init__(self, logger, custname, uidservendp, pass_extensions,
notiflag):
self.logger = logger
Expand Down Expand Up @@ -156,29 +157,62 @@ def parse_servicegroups(logger, custname, uidservendp, pass_extensions,
class TaskParseContacts(object):
def __init__(self, logger):
self.logger = logger
#print("self.logger: ", self.logger)

def parse_siteswith_contacts(self, res):
contacts = ParseSitesWithContacts(self.logger, res)
#print("contacts1: ", contacts)
return contacts.get_contacts()

def parse_servicegroups_contacts(self, res):
contacts = ParseServiceGroupWithContacts(self.logger, res)
#print("contacts2: ", contacts)
return contacts.get_contacts()

def parse_serviceendpoints_contacts(self, res):
contacts = ParseServiceEndpointContacts(self.logger, res)

return contacts.get_contacts()


class TaskGocdbTopology(TaskParseContacts, TaskParseTopology):
# def __init__(self, loop, logger, connector_name, SERVICE_ENDPOINTS_PI,
# SERVICE_GROUPS_PI, SITES_PI, globopts, auth_opts, webapi_opts,
# bdii_opts, confcust, custname, topofeed, topofetchtype,
# fixed_date, uidservendp, pass_extensions, topofeedpaging,
# notiflag):
# TaskParseTopology.__init__(self, logger, custname, uidservendp,
# pass_extensions, notiflag)
# super(TaskGocdbTopology, self).__init__(logger)
# self.loop = loop
# self.logger = logger
# self.connector_name = connector_name
# self.SERVICE_ENDPOINTS_PI = SERVICE_ENDPOINTS_PI
# self.SERVICE_GROUPS_PI = SERVICE_GROUPS_PI
# self.SITES_PI = SITES_PI
# self.globopts = globopts
# self.auth_opts = auth_opts
# self.webapi_opts = webapi_opts
# self.bdii_opts = bdii_opts
# self.confcust = confcust
# self.custname = custname
# self.topofeed = topofeed
# self.topofetchtype = topofetchtype
# self.fixed_date = fixed_date
# self.uidservendp = uidservendp
# self.pass_extensions = pass_extensions
# self.topofeedpaging = topofeedpaging
# self.notification_flag = notiflag


def __init__(self, config, loop):
self.config = config
self.loop = loop

self.logger = self.config.get_logger()
self.connector_name = self.config.get_connector_name()
self.fixed_date = self.config.get_fixed_date()
self.globopts, self.pass_extensions = self.config.get_globopts_n_pass_ext()
self.globopts, self.pass_extensions, self.cglob = self.config.get_globopts_n_pass_ext()
self.confcust = self.config.get_confcust(self.globopts)
self.topofeed = self.config.topofeed_data(self.confcust)
self.topofeedpaging = self.config.topofeedpaging_data(self.confcust)
Expand Down
33 changes: 24 additions & 9 deletions modules/tasks/vapor_weights.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,32 @@


class TaskVaporWeights(object):
def __init__(self, loop, logger, connector_name, globopts, confcust, feed,
jobcust, cglob, fixed_date):
# def __init__(self, loop, logger, connector_name, globopts, confcust, feed,
# jobcust, cglob, fixed_date):
# self.event_loop = loop
# self.logger = logger
# self.connector_name = connector_name
# self.globopts = globopts
# self.confcust = confcust
# self.feed = feed
# self.jobcust = jobcust
# self.cglob = cglob
# self.fixed_date = fixed_date

########################################################

def __init__(self, config, loop, jobcust):
self.config = config
self.event_loop = loop
self.logger = logger
self.connector_name = connector_name
self.globopts = globopts
self.confcust = confcust
self.feed = feed
self.jobcust = jobcust
self.cglob = cglob
self.fixed_date = fixed_date

self.logger = config.get_logger()
self.connector_name = self.config.get_connector_name()
self.globopts, self.pass_extensions, self.cglob = self.config.get_globopts_n_pass_ext()
self.confcust = self.config.get_confcust(self.globopts)
self.feed = self.config.vaporrpi_data(self.confcust)
self.fixed_date = self.config.get_fixed_date()


async def fetch_data(self):
feed_parts = urlparse(self.feed)
Expand Down

0 comments on commit d46ea46

Please sign in to comment.