Skip to content

Commit

Permalink
Get a list of results so that we can avoid foreign key errors on the …
Browse files Browse the repository at this point in the history
…artifacts
  • Loading branch information
john-dupuy committed Oct 5, 2020
1 parent 392e894 commit 09e8b6c
Showing 1 changed file with 25 additions and 10 deletions.
35 changes: 25 additions & 10 deletions backend/ibutsu_server/scripts/mongo2postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ def migrate_table(collection, Model, vprint, filter_=None):
"""Migrate a collection from MongoDB into a table in PostgreSQL"""
# TODO: update indexes once we know them

if Model.__tablename__ == "runs":
run_ids = []
if Model.__tablename__ in ["runs", "results"]:
ids = []

for idx, row in enumerate(collection.find(filter_, sort=[("_id", -1)])):
if idx > MIGRATION_LIMIT:
Expand Down Expand Up @@ -214,9 +214,9 @@ def migrate_table(collection, Model, vprint, filter_=None):

if idx % ROWS_TO_COMMIT_AT_ONCE == 0:
session.commit()
# for each run migrate the result specific to the run
if Model.__tablename__ == "runs":
run_ids.append(str(mongo_id))

if Model.__tablename__ in ["runs", "results"]:
ids.append(str(mongo_id))

session.commit()
# at the end of the session do a little cleanup
Expand All @@ -227,9 +227,9 @@ def migrate_table(collection, Model, vprint, filter_=None):
conn.execute(sql_delete)
vprint(" done")

if Model.__tablename__ == "runs":
if Model.__tablename__ in ["runs", "results"]:
# return run_ids for the results to use
return run_ids
return ids


def migrate_file(collection, Model, vprint, filter_=None):
Expand Down Expand Up @@ -279,7 +279,7 @@ def migrate_tables(mongo, vprint, migrate_files=False):
"""Migrate all the tables"""
# first get the time range
sort = [("_id", -1)]
most_recent_record = mongo["runs"].find_one(sort=sort)
most_recent_record = mongo["results"].find_one(sort=sort)
most_recent_create_time = most_recent_record["_id"].generation_time
# only include most recent runs and results
filter_ = {
Expand Down Expand Up @@ -308,6 +308,9 @@ def migrate_tables(mongo, vprint, migrate_files=False):
vprint(".", end="")
conn.execute(sql_index)

# get a list of result_ids
result_ids = []

# migrate the table over
vprint("Migrating {} ".format(collection), end="")
if collection == "runs":
Expand All @@ -317,13 +320,25 @@ def migrate_tables(mongo, vprint, migrate_files=False):
run_chunks = [run_ids[i : i + 100] for i in range(0, len(run_ids), 100)]
for run_list in run_chunks:
result_filter = {"metadata.run": {"$in": run_list}} # filter on runs we know exist
migrate_table(mongo[collection], model, vprint, filter_=result_filter)
result_ids.extend(
migrate_table(mongo[collection], model, vprint, filter_=result_filter)
)
else:
migrate_table(mongo[collection], model, vprint)
if migrate_files:
for collection, model in FILE_MAP:
vprint("Migrating {} ".format(collection), end="")
migrate_file(GridFSBucket(mongo, collection), model, vprint, filter_=file_filter)
if collection == "fs":
# migrate in chunks of 1000 results at a time
result_chunks = [result_ids[i : i + 1000] for i in range(0, len(result_ids), 1000)]
artifact_filter = file_filter.copy()
for result_list in result_chunks:
artifact_filter.update({"metadata.resultId": {"$in": result_list}})
migrate_file(
GridFSBucket(mongo, collection), model, vprint, filter_=artifact_filter
)
else:
migrate_file(GridFSBucket(mongo, collection), model, vprint, filter_=file_filter)


def build_mongo_connection(url):
Expand Down

0 comments on commit 09e8b6c

Please sign in to comment.