Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service cleanup #184

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions services/data/db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ def aiopg_exception_handling(exception):
body = {"err_msg": err_msg}
if isinstance(exception, psycopg2.IntegrityError):
if "duplicate key" in err_msg:
return DBResponse(response_code=409, body=json.dumps(body))
return DBResponse(response_code=409, body=body)
elif "foreign key" in err_msg:
return DBResponse(response_code=404, body=json.dumps(body))
return DBResponse(response_code=404, body=body)
else:
return DBResponse(response_code=500, body=json.dumps(body))
return DBResponse(response_code=500, body=body)
elif isinstance(exception, psycopg2.errors.UniqueViolation):
return DBResponse(response_code=409, body=json.dumps(body))
return DBResponse(response_code=409, body=body)
elif isinstance(exception, IndexError):
return DBResponse(response_code=404, body={})
else:
return DBResponse(response_code=500, body=json.dumps(body))
return DBResponse(response_code=500, body=body)


def get_db_ts_epoch_str():
Expand Down
28 changes: 13 additions & 15 deletions services/data/postgres_async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
import datetime
from services.utils import logging
from typing import List
from typing import List, Tuple

from .db_utils import DBResponse, DBPagination, aiopg_exception_handling, \
get_db_ts_epoch_str, translate_run_key, translate_task_key
Expand Down Expand Up @@ -74,9 +74,10 @@ async def _init(self, db_conf: DBConfiguration, create_triggers=DB_TRIGGER_CREAT
try:
self.pool = await aiopg.create_pool(
db_conf.dsn,
timeout=db_conf.pool_timeout,
minsize=db_conf.pool_min,
maxsize=db_conf.pool_max,
timeout=db_conf.timeout,
pool_recycle=db_conf.pool_recycle,
echo=AIOPG_ECHO)

# Clean existing trigger functions before creating new ones
Expand All @@ -89,9 +90,11 @@ async def _init(self, db_conf: DBConfiguration, create_triggers=DB_TRIGGER_CREAT

self.logger.info(
"Connection established.\n"
" Pool min: {pool_min} max: {pool_max}\n".format(
" Pool min: {pool_min} max: {pool_max} timeout: {pool_timeout} recycle: {pool_recycle}\n".format(
pool_min=self.pool.minsize,
pool_max=self.pool.maxsize))
pool_max=self.pool.maxsize,
pool_timeout=self.pool.timeout,
pool_recycle=db_conf.pool_recycle))

break # Break the retry loop
except Exception as e:
Expand All @@ -107,17 +110,15 @@ def get_table_by_name(self, table_name: str):
return None

async def get_run_ids(self, flow_id: str, run_id: str):
run = await self.run_table_postgres.get_run(flow_id, run_id,
return await self.run_table_postgres.get_run(flow_id, run_id,
expanded=True)
return run.body['run_number'], run.body['run_id']

async def get_task_ids(self, flow_id: str, run_id: str,
step_name: str, task_name: str):

task = await self.task_table_postgres.get_task(flow_id, run_id,
return await self.task_table_postgres.get_task(flow_id, run_id,
step_name, task_name,
expanded=True)
return task.body['task_id'], task.body['task_name']


class AsyncPostgresDB(object):
Expand Down Expand Up @@ -183,7 +184,7 @@ async def get_records(self, filter_dict={}, fetch_single=False,

async def find_records(self, conditions: List[str] = None, values=[], fetch_single=False,
limit: int = 0, offset: int = 0, order: List[str] = None, expanded=False,
enable_joins=False) -> (DBResponse, DBPagination):
enable_joins=False) -> Tuple[DBResponse, DBPagination]:
sql_template = """
SELECT * FROM (
SELECT
Expand Down Expand Up @@ -212,7 +213,7 @@ async def find_records(self, conditions: List[str] = None, values=[], fetch_sing
expanded=expanded, limit=limit, offset=offset)

async def execute_sql(self, select_sql: str, values=[], fetch_single=False,
expanded=False, limit: int = 0, offset: int = 0) -> (DBResponse, DBPagination):
expanded=False, limit: int = 0, offset: int = 0) -> Tuple[DBResponse, DBPagination]:
try:
with (
await self.db.pool.cursor(
Expand Down Expand Up @@ -544,9 +545,7 @@ async def update_heartbeat(self, flow_id: str, run_id: str):
update_dict=set_dict)
body = {"wait_time_in_seconds": WAIT_TIME}

return DBResponse(response_code=result.response_code,
body=json.dumps(body))

return DBResponse(response_code=result.response_code, body=body)

class AsyncStepTablePostgres(AsyncPostgresTable):
step_dict = {}
Expand Down Expand Up @@ -689,8 +688,7 @@ async def update_heartbeat(self, flow_id: str, run_id: str, step_name: str,

body = {"wait_time_in_seconds": WAIT_TIME}

return DBResponse(response_code=result.response_code,
body=json.dumps(body))
return DBResponse(response_code=result.response_code, body=body)


class AsyncMetadataTablePostgres(AsyncPostgresTable):
Expand Down
12 changes: 6 additions & 6 deletions services/metadata_service/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def version(self, request):
tags:
- Admin
produces:
- 'text/plain'
- text/plain
responses:
"200":
description: successful operation. Return the version number
Expand All @@ -46,7 +46,7 @@ async def ping(self, request):
tags:
- Admin
produces:
- 'text/plain'
- text/plain
responses:
"202":
description: successful operation. Return "pong" text
Expand All @@ -64,7 +64,7 @@ async def healthcheck(self, request):
tags:
- Admin
produces:
- 'application/json'
- application/json
responses:
"202":
description: successful operation.
Expand Down Expand Up @@ -96,11 +96,11 @@ async def healthcheck(self, request):
async def get_authorization_token(self, request):
"""
---
description: this is used exclusively for sandbox auth
description: This endpoint is used exclusively for sandbox auth
tags:
- Auth
produces:
- text/plain
- application/json
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you verified that this change wouldn't affect the sandbox? We have a similar issue where the heartbeat endpoint returns an actual JSON but is tagged as text/plain which has caused issues in the metaflow codebase before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to rework things and push a PR to make it so that everything works on application/json. I will also check with Ferras if there was a reason for the string only implementation.

responses:
"200":
description: successfully returned certs
Expand Down Expand Up @@ -139,5 +139,5 @@ async def get_authorization_token(self, request):

return web.Response(status=200, body=json.dumps(credentials))
except Exception as ex:
body = {"err_msg": str(ex), "traceback": get_traceback_str()}
body = {"message": str(ex), "traceback": get_traceback_str()}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the expected body for the authorization route? the http_500 helper only has a detail field for the error message. Consider keeping it consistent with the helper output if possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to make them all consistent. Still working on it. After I pushed this, I realized I missed af ew things.

return web.Response(status=500, body=json.dumps(body))
80 changes: 44 additions & 36 deletions services/metadata_service/api/artifact.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from aiohttp import web
from services.data.postgres_async_db import AsyncPostgresDB
from services.data.db_utils import filter_artifacts_by_attempt_id_for_tasks
from services.utils import read_body
from services.data.db_utils import DBResponse, filter_artifacts_by_attempt_id_for_tasks
from services.metadata_service.api.utils import format_response, \
handle_exceptions
import json
Expand Down Expand Up @@ -43,12 +42,12 @@ def __init__(self, app):
self._async_table = AsyncPostgresDB.get_instance().artifact_table_postgres
self._db = AsyncPostgresDB.get_instance()

@format_response
@handle_exceptions
@format_response
async def get_artifact(self, request):
"""
---
description: get all artifacts associated with the specified task.
description: get a specific artifact
tags:
- Artifacts
parameters:
Expand Down Expand Up @@ -78,10 +77,12 @@ async def get_artifact(self, request):
required: true
type: "string"
produces:
- text/plain
- application/json
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the comment above.

responses:
"200":
description: successful operation
"404":
description: no such artifact
"405":
description: invalid HTTP Method
"""
Expand All @@ -95,6 +96,8 @@ async def get_artifact(self, request):
flow_name, run_number, step_name, task_id, artifact_name
)

@handle_exceptions
@format_response
async def get_artifacts_by_task(self, request):
"""
---
Expand Down Expand Up @@ -123,7 +126,7 @@ async def get_artifacts_by_task(self, request):
required: true
type: "string"
produces:
- text/plain
- application/json
responses:
"200":
description: successful operation
Expand All @@ -138,17 +141,17 @@ async def get_artifacts_by_task(self, request):
artifacts = await self._async_table.get_artifact_in_task(
flow_name, run_number, step_name, task_id
)
if artifacts.response_code == 200:
artifacts.body = filter_artifacts_by_attempt_id_for_tasks(
artifacts.body)
return artifacts

filtered_body = filter_artifacts_by_attempt_id_for_tasks(
artifacts.body)
return web.Response(
status=artifacts.response_code, body=json.dumps(filtered_body)
)

@handle_exceptions
@format_response
async def get_artifacts_by_step(self, request):
"""
---
description: get all artifacts associated with the specified task.
description: get all artifacts associated with a given step
tags:
- Artifacts
parameters:
Expand All @@ -168,7 +171,7 @@ async def get_artifacts_by_step(self, request):
required: true
type: "string"
produces:
- text/plain
- application/json
responses:
"200":
description: successful operation
Expand All @@ -183,16 +186,17 @@ async def get_artifacts_by_step(self, request):
flow_name, run_number, step_name
)

filtered_body = filter_artifacts_by_attempt_id_for_tasks(
artifacts.body)
return web.Response(
status=artifacts.response_code, body=json.dumps(filtered_body)
)
if artifacts.response_code == 200:
artifacts.body = filter_artifacts_by_attempt_id_for_tasks(
artifacts.body)
return artifacts

@handle_exceptions
@format_response
async def get_artifacts_by_run(self, request):
"""
---
description: get all artifacts associated with the specified task.
description: get all artifacts associated with the specified run.
tags:
- Artifacts
parameters:
Expand All @@ -207,7 +211,7 @@ async def get_artifacts_by_run(self, request):
required: true
type: "string"
produces:
- text/plain
- application/json
responses:
"200":
description: successful operation
Expand All @@ -218,16 +222,18 @@ async def get_artifacts_by_run(self, request):
run_number = request.match_info.get("run_number")

artifacts = await self._async_table.get_artifacts_in_runs(flow_name, run_number)
filtered_body = filter_artifacts_by_attempt_id_for_tasks(
artifacts.body)
return web.Response(
status=artifacts.response_code, body=json.dumps(filtered_body)
)

if artifacts.response_code == 200:
artifacts.body = filter_artifacts_by_attempt_id_for_tasks(
artifacts.body)
return artifacts

@handle_exceptions
@format_response
async def create_artifacts(self, request):
"""
---
description: This end-point allow to test that service is up.
description: Registers artifacts with the service
tags:
- Artifacts
parameters:
Expand Down Expand Up @@ -277,7 +283,7 @@ async def create_artifacts(self, request):
system_tags:
type: object
produces:
- 'text/plain'
- application/json
responses:
"202":
description: successful operation.
Expand Down Expand Up @@ -306,16 +312,18 @@ async def create_artifacts(self, request):
run_number = request.match_info.get("run_number")
step_name = request.match_info.get("step_name")
task_id = request.match_info.get("task_id")
body = await read_body(request.content)
body = await request.json()
count = 0

try:
run_number, run_id = await self._db.get_run_ids(flow_name, run_number)
task_id, task_name = await self._db.get_task_ids(flow_name, run_number,
step_name, task_id)
except Exception:
return web.Response(status=400, body=json.dumps(
{"message": "need to register run_id and task_id first"}))
run = await self._db.get_run_ids(flow_name, run_number)
task = await self._db.get_task_ids(flow_name, run_number,
step_name, task_id)
if run.response_code != 200 or task.response_code != 200:
return DBResponse(400, {"message": "need to register run_id and task_id first"})
Comment on lines +318 to +322
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more generic thought and not a blocker for merging the cleanup, but is there a reason why this kind of integrity check is not covered by a foreign key constraint in the table?

also affects the metadata create handler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is covered I think; the issue is that it used to be silent (ie: the db would raise the error but it could silently make it through.

run_id = run.body['run_id']
run_number = run.body['run_number']
task_id = task.body['task_id']
task_name = task.body['task_name']

# todo change to bulk insert
for artifact in body:
Expand Down
Loading