Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARGO-3859 Add performance/time trackers in connectors async tasks #271

Open
wants to merge 11 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion exec/downtimes-csv-connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ def main():
parser.add_argument('-d', dest='date', nargs=1, metavar='YEAR-MONTH-DAY', required=True)
parser.add_argument('-c', dest='custconf', nargs=1, metavar='customer.conf', help='path to customer configuration file', type=str, required=False)
parser.add_argument('-g', dest='gloconf', nargs=1, metavar='global.conf', help='path to global configuration file', type=str, required=False)
parser.add_argument('-v', '--verbose', dest="performance", help='Set verbosity level', action='count', default=0)
args = parser.parse_args()

logger = Logger(os.path.basename(sys.argv[0]))
performance = args.performance
confpath = args.gloconf[0] if args.gloconf else None
cglob = Global(sys.argv[0], confpath)
globopts = cglob.parse()
Expand Down Expand Up @@ -77,7 +79,7 @@ def main():
webapi_opts, confcust,
confcust.get_custname(cust), feed,
current_date, uidservtype, args.date[0],
timestamp)
timestamp, performance)
loop.run_until_complete(task.run())

except (ConnectorHttpError, ConnectorParseError, KeyboardInterrupt) as exc:
Expand Down
5 changes: 4 additions & 1 deletion exec/downtimes-gocdb-connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ def main():
help='path to customer configuration file', type=str, required=False)
parser.add_argument('-g', dest='gloconf', nargs=1, metavar='global.conf',
help='path to global configuration file', type=str, required=False)
parser.add_argument('-v', '--verbose', dest="performance",
help='Set verbosity level', action='count', default=0)
args = parser.parse_args()

logger = Logger(os.path.basename(sys.argv[0]))
performance = args.performance
confpath = args.gloconf[0] if args.gloconf else None
cglob = Global(sys.argv[0], confpath)
globopts = cglob.parse()
Expand Down Expand Up @@ -91,7 +94,7 @@ def main():
task = TaskGocdbDowntimes(loop, logger, sys.argv[0], globopts,
auth_opts, webapi_opts, confcust,
confcust.get_custname(cust), downtime_feed, start,
end, uidservtype, args.date[0], timestamp)
end, uidservtype, args.date[0], timestamp, performance)
loop.run_until_complete(task.run())

except (ConnectorHttpError, ConnectorParseError, KeyboardInterrupt) as exc:
Expand Down
4 changes: 3 additions & 1 deletion exec/metricprofile-webapi-connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ def main():
parser.add_argument('-c', dest='custconf', nargs=1, metavar='customer.conf', help='path to customer configuration file', type=str, required=False)
parser.add_argument('-g', dest='gloconf', nargs=1, metavar='global.conf', help='path to global configuration file', type=str, required=False)
parser.add_argument('-d', dest='date', metavar='YEAR-MONTH-DAY', help='write data for this date', type=str, required=False)
parser.add_argument('-v', '--verbose', dest="performance", help='Set verbosity level', action='count', default=0)
args = parser.parse_args()

logger = Logger(os.path.basename(sys.argv[0]))
performance = args.performance

fixed_date = None
if args.date and date_check(args.date):
Expand All @@ -48,7 +50,7 @@ def main():
for cust in confcust.get_customers():
try:
task = TaskWebApiMetricProfile(
loop, logger, sys.argv[0], globopts, cglob, confcust, cust, fixed_date
loop, logger, sys.argv[0], globopts, cglob, confcust, cust, fixed_date, performance
)
loop.run_until_complete(task.run())

Expand Down
4 changes: 3 additions & 1 deletion exec/service-types-csv-connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ def main():
parser.add_argument('-g', dest='gloconf', nargs=1, metavar='global.conf', help='path to global configuration file', type=str, required=False)
parser.add_argument('-d', dest='date', metavar='YEAR-MONTH-DAY', help='write data for this date', type=str, required=False)
parser.add_argument('--initial', dest='initsync', help='initial sync of service types', action='store_true', default=False, required=False)
parser.add_argument('-v', '--verbose', dest="performance", help='Set verbosity level', action='count', default=0)
args = parser.parse_args()

fixed_date = None
if args.date and date_check(args.date):
fixed_date = args.date

logger = Logger(os.path.basename(sys.argv[0]))
performance = args.performance
confpath = args.gloconf[0] if args.gloconf else None
cglob = Global(sys.argv[0], confpath)
globopts = cglob.parse()
Expand Down Expand Up @@ -72,7 +74,7 @@ def main():
try:
task = TaskFlatServiceTypes(
loop, logger, sys.argv[0], globopts, auth_opts, webapi_opts,
confcust, custname, feed, fixed_date, is_csv=True,
confcust, custname, feed, fixed_date, performance, is_csv=True,
initsync=args.initsync
)
loop.run_until_complete(task.run())
Expand Down
4 changes: 3 additions & 1 deletion exec/service-types-gocdb-connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ def main():
parser.add_argument('--initial', dest='initsync', help='initial sync of service types', action='store_true', default=False, required=False)
parser.add_argument('-g', dest='gloconf', nargs=1, metavar='global.conf', help='path to global configuration file', type=str, required=False)
parser.add_argument('-d', dest='date', metavar='YEAR-MONTH-DAY', help='write data for this date', type=str, required=False)
parser.add_argument('-v', '--verbose', dest="performance", help='Set verbosity level', action='count', default=0)
args = parser.parse_args()

fixed_date = None
if args.date and date_check(args.date):
fixed_date = args.date

performance = args.performance
logger = Logger(os.path.basename(sys.argv[0]))
confpath = args.gloconf[0] if args.gloconf else None
cglob = Global(sys.argv[0], confpath)
Expand Down Expand Up @@ -72,7 +74,7 @@ def main():
try:
task = TaskGocdbServiceTypes(
loop, logger, sys.argv[0], globopts, auth_opts, webapi_opts,
confcust, custname, feed, fixed_date, args.initsync
confcust, custname, feed, fixed_date, args.initsync, performance
)
loop.run_until_complete(task.run())

Expand Down
8 changes: 5 additions & 3 deletions exec/topology-agora-connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from argo_connectors.log import Logger
from argo_connectors.config import Global, CustomerConf
from argo_connectors.utils import date_check
from argo_connectors.tasks.agora_topology import TaskProviderTopology
from argo_connectors.tasks.agora_topology import AgoraProviderTopology
from argo_connectors.tasks.common import write_state


Expand All @@ -37,9 +37,11 @@ def main():
parser.add_argument('-c', dest='custconf', nargs=1, metavar='customer.conf', help='path to customer configuration file', type=str, required=False)
parser.add_argument('-g', dest='gloconf', nargs=1, metavar='global.conf', help='path to global configuration file', type=str, required=False)
parser.add_argument('-d', dest='date', metavar='YEAR-MONTH-DAY', help='write data for this date', type=str, required=False)
parser.add_argument('-v', '--verbose', dest="performance", help='Set verbosity level', action='count', default=0)
args = parser.parse_args()

logger = Logger(os.path.basename(sys.argv[0]))
performance = args.performance
fixed_date = None
if args.date and date_check(args.date):
fixed_date = args.date
Expand All @@ -64,9 +66,9 @@ def main():
asyncio.set_event_loop(loop)

try:
task = TaskProviderTopology(
task = AgoraProviderTopology(
loop, logger, sys.argv[0], globopts, webapi_opts, confcust,
uidservendp, fetchtype, fixed_date
uidservendp, fetchtype, fixed_date, performance
)
loop.run_until_complete(task.run())

Expand Down
5 changes: 3 additions & 2 deletions exec/topology-csv-connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ def main():
parser.add_argument('-c', dest='custconf', nargs=1, metavar='customer.conf', help='path to customer configuration file', type=str, required=False)
parser.add_argument('-g', dest='gloconf', nargs=1, metavar='global.conf', help='path to global configuration file', type=str, required=False)
parser.add_argument('-d', dest='date', metavar='YEAR-MONTH-DAY', help='write data for this date', type=str, required=False)
parser.add_argument('-v', '--verbose', dest="performance", help='Set verbosity level', action='count', default=0)
args = parser.parse_args()
logger = Logger(os.path.basename(sys.argv[0]))

performance = args.performance
fixed_date = None
if args.date and date_check(args.date):
fixed_date = args.date
Expand Down Expand Up @@ -78,7 +79,7 @@ def main():
task = TaskFlatTopology(
loop, logger, sys.argv[0], globopts, webapi_opts, confcust,
custname, topofeed, topofetchtype, fixed_date, uidservendp,
is_csv=True
performance, is_csv=True
)
loop.run_until_complete(task.run())

Expand Down
9 changes: 7 additions & 2 deletions exec/topology-gocdb-connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,15 @@ def main():
help='path to global configuration file', type=str, required=False)
parser.add_argument('-d', dest='date', metavar='YEAR-MONTH-DAY',
help='write data for this date', type=str, required=False)
parser.add_argument('-v', '--verbose', dest="performance",
help='Set verbosity level', action='count', default=0)


args = parser.parse_args()
group_endpoints, group_groups = [], []
logger = Logger(os.path.basename(sys.argv[0]))


performance = args.performance
fixed_date = None
if args.date and date_check(args.date):
fixed_date = args.date
Expand Down Expand Up @@ -116,7 +121,7 @@ def main():
loop, logger, sys.argv[0], SERVICE_ENDPOINTS_PI, SERVICE_GROUPS_PI,
SITES_PI, globopts, auth_opts, webapi_opts, bdii_opts, confcust,
custname, topofeed, topofetchtype, fixed_date, uidservendp,
pass_extensions, topofeedpaging, notiflag
pass_extensions, topofeedpaging, notiflag, performance
)
loop.run_until_complete(task.run())

Expand Down
5 changes: 3 additions & 2 deletions exec/topology-provider-connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ def main():
parser.add_argument('-c', dest='custconf', nargs=1, metavar='customer.conf', help='path to customer configuration file', type=str, required=False)
parser.add_argument('-g', dest='gloconf', nargs=1, metavar='global.conf', help='path to global configuration file', type=str, required=False)
parser.add_argument('-d', dest='date', metavar='YEAR-MONTH-DAY', help='write data for this date', type=str, required=False)
parser.add_argument('-v', '--verbose', dest="performance", help='Set verbosity level', action='count', default=0)
args = parser.parse_args()
group_endpoints, group_groups = list(), list()
logger = Logger(os.path.basename(sys.argv[0]))

performance = args.performance
fixed_date = None
if args.date and date_check(args.date):
fixed_date = args.date
Expand Down Expand Up @@ -78,7 +79,7 @@ def main():
try:
task = TaskProviderTopology(
loop, logger, sys.argv[0], globopts, webapi_opts, confcust,
topofeedpaging, uidservendp, fetchtype, fixed_date
topofeedpaging, uidservendp, fetchtype, fixed_date, performance
)
loop.run_until_complete(task.run())

Expand Down
7 changes: 5 additions & 2 deletions exec/weights-vapor-connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ def main():
help='path to global configuration file', type=str, required=False)
parser.add_argument('-d', dest='date', metavar='YEAR-MONTH-DAY',
help='write data for this date', type=str, required=False)
parser.add_argument('-v', '--verbose', dest="performance",
help='Set verbosity level', action='count', default=0)
args = parser.parse_args()

logger = Logger(os.path.basename(sys.argv[0]))

performance = args.performance

fixed_date = None
if args.date and date_check(args.date):
fixed_date = args.date
Expand Down Expand Up @@ -66,7 +69,7 @@ def main():
try:
task = TaskVaporWeights(loop, logger, sys.argv[0], globopts,
confcust, VAPORPI, jobcust, cglob,
fixed_date)
fixed_date, performance)
loop.run_until_complete(task.run())

except (ConnectorHttpError, ConnectorParseError, KeyboardInterrupt) as exc:
Expand Down
102 changes: 62 additions & 40 deletions modules/tasks/agora_topology.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import asyncio
from urllib.parse import urlparse
import time

from argo_connectors.io.http import SessionWithRetry
from argo_connectors.io.webapi import WebAPI
from argo_connectors.parse.agora_topology import ParseAgoraTopo
from argo_connectors.tasks.common import write_topo_json as write_json, write_state
from argo_connectors.exceptions import ConnectorError, ConnectorHttpError
from argo_connectors.exceptions import ConnectorHttpError, ConnectorParseError, ConnectorError


def contains_exception(list):
Expand All @@ -16,9 +17,9 @@ def contains_exception(list):
return (False, None)


class TaskProviderTopology(object):
class AgoraProviderTopology(object):
def __init__(self, loop, logger, connector_name, globopts, webapi_opts,
confcust, uidservendp, fetchtype, fixed_date):
confcust, uidservendp, fetchtype, fixed_date, performance):
self.loop = loop
self.logger = logger
self.connector_name = connector_name
Expand All @@ -28,7 +29,7 @@ def __init__(self, loop, logger, connector_name, globopts, webapi_opts,
self.uidservendp = uidservendp
self.fixed_date = fixed_date
self.fetchtype = fetchtype

self.performance = performance

def parse_source_topo(self, resources, providers):
topo = ParseAgoraTopo(self.logger, providers, resources, self.uidservendp)
Expand All @@ -37,6 +38,7 @@ def parse_source_topo(self, resources, providers):


async def send_webapi(self, webapi_opts, data, topotype, fixed_date=None):
start_time = time.time()
webapi = WebAPI(self.connector_name, webapi_opts['webapihost'],
webapi_opts['webapitoken'], self.logger,
int(self.globopts['ConnectionRetry'.lower()]),
Expand All @@ -47,9 +49,13 @@ async def send_webapi(self, webapi_opts, data, topotype, fixed_date=None):
date=fixed_date)

await webapi.send(data, topotype)
elapsed_time = time.time() - start_time
if self.performance:
self.logger.info(f'send_webapi completed in {elapsed_time} seconds.')


async def fetch_data(self, feed):
start_time = time.time()
remote_topo = urlparse(feed)
session = SessionWithRetry(self.logger, self.logger.customer, self.globopts, handle_session_close=True)
headers = {
Expand All @@ -63,6 +69,9 @@ async def fetch_data(self, feed):
headers=headers)

await session.close()
elapsed_time = time.time() - start_time
if self.performance:
self.logger.info(f'fetch_data completed in {elapsed_time} seconds.')
return res

except ConnectorHttpError as exc:
Expand All @@ -71,39 +80,52 @@ async def fetch_data(self, feed):


async def run(self):
topofeedproviders = self.confcust.get_topofeedservicegroups()
topofeedresources = self.confcust.get_topofeedendpoints()

coros = [
self.fetch_data(topofeedresources),
self.fetch_data(topofeedproviders),
]

# fetch topology data concurrently in coroutines
fetched_data = await asyncio.gather(*coros, return_exceptions=True)

exc_raised, exc = contains_exception(fetched_data)
if exc_raised:
raise ConnectorError(repr(exc))

fetched_resources, fetched_providers = fetched_data
if fetched_resources and fetched_providers:
group_providers, group_resources = self.parse_source_topo(fetched_resources, fetched_providers)

await write_state(self.connector_name, self.globopts, self.confcust, self.fixed_date, True)

numgg = len(group_providers)
numge = len(group_resources)

# send concurrently to WEB-API in coroutines
if eval(self.globopts['GeneralPublishWebAPI'.lower()]):
await asyncio.gather(
self.send_webapi(self.webapi_opts, group_resources, 'endpoints', self.fixed_date),
self.send_webapi(self.webapi_opts, group_providers, 'groups', self.fixed_date),
loop=self.loop
)

if eval(self.globopts['GeneralWriteJson'.lower()]):
write_json(self.logger, self.globopts, self.confcust, group_providers, group_resources, self.fixed_date)

self.logger.info('Customer:' + self.logger.customer + ' Fetched Endpoints:%d' % (numge) + ' Groups(%s):%d' % (self.fetchtype, numgg))
try:
start_time = time.time()

topofeedproviders = self.confcust.get_topofeedservicegroups()
topofeedresources = self.confcust.get_topofeedendpoints()

coros = [
self.fetch_data(topofeedresources),
self.fetch_data(topofeedproviders),
]

# fetch topology data concurrently in coroutines
fetched_data = await asyncio.gather(*coros, return_exceptions=True)

exc_raised, exc = contains_exception(fetched_data)
if exc_raised:
raise ConnectorError(repr(exc))

fetched_resources, fetched_providers = fetched_data
if fetched_resources and fetched_providers:
group_providers, group_resources = self.parse_source_topo(fetched_resources, fetched_providers)

await write_state(self.connector_name, self.globopts, self.confcust, self.fixed_date, True)

numgg = len(group_providers)
numge = len(group_resources)

# send concurrently to WEB-API in coroutines
if eval(self.globopts['GeneralPublishWebAPI'.lower()]):
await asyncio.gather(
self.send_webapi(self.webapi_opts, group_resources, 'endpoints', self.fixed_date),
self.send_webapi(self.webapi_opts, group_providers, 'groups', self.fixed_date),
)

if eval(self.globopts['GeneralWriteJson'.lower()]):
write_json(self.logger, self.globopts, self.confcust, group_providers, group_resources, self.fixed_date)

elapsed_time = time.time() - start_time
if self.performance:
self.logger.info(f'run completed in {elapsed_time} seconds.')
self.logger.info('Customer:' + self.logger.customer + ' Fetched Endpoints:%d' % (numge) + ' Groups(%s):%d' % (self.fetchtype, numgg))



except (ConnectorHttpError, ConnectorParseError, ConnectorError, KeyboardInterrupt) as exc:
self.logger.error(repr(exc))
await write_state(self.connector_name, self.globopts, self.confcust, self.fixed_date, False)


Loading