Skip to content

Commit

Permalink
Merge pull request #16 from monarch-initiative/string-agg-closure-lists
Browse files Browse the repository at this point in the history
Uses string_agg instead of array_agg to gather up closure ID and label lists to get pipe separated lists in the tsv. Also adds a dry_run flag to just print the query.
  • Loading branch information
kevinschaper authored Nov 1, 2023
2 parents 21d5f74 + e205093 commit 6be736d
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 43 deletions.
4 changes: 3 additions & 1 deletion closurizer/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
@click.option('--output', '-o', required=True, help='file write kgx file with closure fields added')
@click.option('--fields', multiple=True, help='fields to closurize')
@click.option('--grouping-fields', multiple=True, help='fields to populate a single value grouping_key field')
@click.option('--dry-run', is_flag=True, help='A dry run will not write the output file, but will print the SQL query')
def main(kg: str,
closure: str,
output: str,
dry_run: bool = False,
fields: List[str] = None,
grouping_fields: List[str] = None):
add_closure(kg_archive=kg, closure_file=closure, fields=fields, output_file=output, grouping_fields=grouping_fields)
add_closure(kg_archive=kg, closure_file=closure, fields=fields, output_file=output, dry_run=dry_run, grouping_fields=grouping_fields)

if __name__ == "__main__":
main()
85 changes: 44 additions & 41 deletions closurizer/closurizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def add_closure(kg_archive: str,
closure_file: str,
output_file: str,
fields: List[str] = ['subject', 'object'],
dry_run: bool = False,
evidence_fields: List[str] = None,
grouping_fields: List[str] = None
):
Expand All @@ -63,43 +64,45 @@ def add_closure(kg_archive: str,
if grouping_fields is None or len(grouping_fields) == 0:
grouping_fields = ['subject', 'negated', 'predicate', 'object']

print(f"fields: {','.join(fields)}")
print(f"output_file: {output_file}")

tar = tarfile.open(f"{kg_archive}")
if not dry_run:
print(f"fields: {','.join(fields)}")
print(f"output_file: {output_file}")

print("Loading node table...")
node_file_name = [member.name for member in tar.getmembers() if member.name.endswith('_nodes.tsv') ][0]
tar.extract(node_file_name,)
node_file = f"{node_file_name}"
print(f"node_file: {node_file}")
tar = tarfile.open(f"{kg_archive}")

db.sql(f"""
create or replace table nodes as select *, substr(id, 1, instr(id,':') -1) as namespace from read_csv('{node_file_name}', header=True, sep='\t', AUTO_DETECT=TRUE)
""")
print("Loading node table...")
node_file_name = [member.name for member in tar.getmembers() if member.name.endswith('_nodes.tsv') ][0]
tar.extract(node_file_name,)
node_file = f"{node_file_name}"
print(f"node_file: {node_file}")

edge_file_name = [member.name for member in tar.getmembers() if member.name.endswith('_edges.tsv') ][0]
tar.extract(edge_file_name)
edge_file = f"{edge_file_name}"
print(f"edge_file: {edge_file}")
db.sql(f"""
create or replace table nodes as select *, substr(id, 1, instr(id,':') -1) as namespace from read_csv('{node_file_name}', header=True, sep='\t', AUTO_DETECT=TRUE)
""")

db.sql(f"""
create or replace table edges as select * from read_csv('{edge_file_name}', header=True, sep='\t', AUTO_DETECT=TRUE)
""")
edge_file_name = [member.name for member in tar.getmembers() if member.name.endswith('_edges.tsv') ][0]
tar.extract(edge_file_name)
edge_file = f"{edge_file_name}"
print(f"edge_file: {edge_file}")

# Load the relation graph tsv in long format mapping a node to each of it's ancestors
db.sql(f"""
create or replace table closure as select * from read_csv('{closure_file}', sep='\t', names=['subject_id', 'predicate_id', 'object_id'], AUTO_DETECT=TRUE)
""")
db.sql(f"""
create or replace table edges as select * from read_csv('{edge_file_name}', header=True, sep='\t', AUTO_DETECT=TRUE)
""")

db.sql("""
create or replace table closure_id as select subject_id as id, array_agg(object_id) as closure from closure group by subject_id
""")
# Load the relation graph tsv in long format mapping a node to each of it's ancestors
db.sql(f"""
create or replace table closure as select * from read_csv('{closure_file}', sep='\t', names=['subject_id', 'predicate_id', 'object_id'], AUTO_DETECT=TRUE)
""")

db.sql("""
create or replace table closure_label as select subject_id as id, array_agg(name) as closure_label from closure join nodes on object_id = id
group by subject_id
""")
db.sql("""
create or replace table closure_id as select subject_id as id, string_agg(object_id, '|') as closure from closure group by subject_id
""")

db.sql("""
create or replace table closure_label as select subject_id as id, string_agg(name, '|') as closure_label from closure join nodes on object_id = id
group by subject_id
""")

query = f"""
create or replace table denormalized_edges as
Expand All @@ -112,16 +115,16 @@ def add_closure(kg_archive: str,
"""

print(query)
db.query(query)

db.query(f"""
-- write denormalized_edges as tsv
copy (select * from denormalized_edges) to '{output_file}' (header, delimiter '\t')
""")


# Clean up extracted node & edge files
if os.path.exists(f"{node_file}"):
os.remove(f"{node_file}")
if os.path.exists(f"{edge_file}"):
os.remove(f"{edge_file}")
if not dry_run:
db.query(query)
db.query(f"""
-- write denormalized_edges as tsv
copy (select * from denormalized_edges) to '{output_file}' (header, delimiter '\t')
""")

# Clean up extracted node & edge files
if os.path.exists(f"{node_file}"):
os.remove(f"{node_file}")
if os.path.exists(f"{edge_file}"):
os.remove(f"{edge_file}")
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "closurizer"
version = "0.4.0"
version = "0.4.1"
description = "Add closure expansion fields to kgx files following the Golr pattern"
authors = ["Kevin Schaper <[email protected]>"]

Expand Down

0 comments on commit 6be736d

Please sign in to comment.