From 33bb15a9369860190864576fcdb53878aec0a7fa Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Wed, 1 Nov 2023 14:17:41 -0700 Subject: [PATCH 1/2] Uses string_agg instead of array_agg to gather up closure ID and label lists to get pipe separated lists in the tsv. Also adds a dry_run flag to just print the query. --- closurizer/cli.py | 4 +- closurizer/closurizer.py | 85 +++++++++++++++++++++------------------- 2 files changed, 47 insertions(+), 42 deletions(-) diff --git a/closurizer/cli.py b/closurizer/cli.py index 40eac45..01f004a 100644 --- a/closurizer/cli.py +++ b/closurizer/cli.py @@ -9,12 +9,14 @@ @click.option('--output', '-o', required=True, help='file write kgx file with closure fields added') @click.option('--fields', multiple=True, help='fields to closurize') @click.option('--grouping-fields', multiple=True, help='fields to populate a single value grouping_key field') +@click.option('--dry-run', is_flag=True, help='A dry run will not write the output file, but will print the SQL query') def main(kg: str, closure: str, output: str, + dry_run: bool = False, fields: List[str] = None, grouping_fields: List[str] = None): - add_closure(kg_archive=kg, closure_file=closure, fields=fields, output_file=output, grouping_fields=grouping_fields) + add_closure(kg_archive=kg, closure_file=closure, fields=fields, output_file=output, dry_run=dry_run, grouping_fields=grouping_fields) if __name__ == "__main__": main() diff --git a/closurizer/closurizer.py b/closurizer/closurizer.py index 09808c2..13cd5f6 100644 --- a/closurizer/closurizer.py +++ b/closurizer/closurizer.py @@ -45,6 +45,7 @@ def add_closure(kg_archive: str, closure_file: str, output_file: str, fields: List[str] = ['subject', 'object'], + dry_run: bool = False, evidence_fields: List[str] = None, grouping_fields: List[str] = None ): @@ -63,43 +64,45 @@ def add_closure(kg_archive: str, if grouping_fields is None or len(grouping_fields) == 0: grouping_fields = ['subject', 'negated', 'predicate', 'object'] - print(f"fields: {','.join(fields)}") - print(f"output_file: {output_file}") - tar = tarfile.open(f"{kg_archive}") + if not dry_run: + print(f"fields: {','.join(fields)}") + print(f"output_file: {output_file}") - print("Loading node table...") - node_file_name = [member.name for member in tar.getmembers() if member.name.endswith('_nodes.tsv') ][0] - tar.extract(node_file_name,) - node_file = f"{node_file_name}" - print(f"node_file: {node_file}") + tar = tarfile.open(f"{kg_archive}") - db.sql(f""" - create or replace table nodes as select *, substr(id, 1, instr(id,':') -1) as namespace from read_csv('{node_file_name}', header=True, sep='\t', AUTO_DETECT=TRUE) - """) + print("Loading node table...") + node_file_name = [member.name for member in tar.getmembers() if member.name.endswith('_nodes.tsv') ][0] + tar.extract(node_file_name,) + node_file = f"{node_file_name}" + print(f"node_file: {node_file}") - edge_file_name = [member.name for member in tar.getmembers() if member.name.endswith('_edges.tsv') ][0] - tar.extract(edge_file_name) - edge_file = f"{edge_file_name}" - print(f"edge_file: {edge_file}") + db.sql(f""" + create or replace table nodes as select *, substr(id, 1, instr(id,':') -1) as namespace from read_csv('{node_file_name}', header=True, sep='\t', AUTO_DETECT=TRUE) + """) - db.sql(f""" - create or replace table edges as select * from read_csv('{edge_file_name}', header=True, sep='\t', AUTO_DETECT=TRUE) - """) + edge_file_name = [member.name for member in tar.getmembers() if member.name.endswith('_edges.tsv') ][0] + tar.extract(edge_file_name) + edge_file = f"{edge_file_name}" + print(f"edge_file: {edge_file}") - # Load the relation graph tsv in long format mapping a node to each of it's ancestors - db.sql(f""" - create or replace table closure as select * from read_csv('{closure_file}', sep='\t', names=['subject_id', 'predicate_id', 'object_id'], AUTO_DETECT=TRUE) - """) + db.sql(f""" + create or replace table edges as select * from read_csv('{edge_file_name}', header=True, sep='\t', AUTO_DETECT=TRUE) + """) - db.sql(""" - create or replace table closure_id as select subject_id as id, array_agg(object_id) as closure from closure group by subject_id - """) + # Load the relation graph tsv in long format mapping a node to each of it's ancestors + db.sql(f""" + create or replace table closure as select * from read_csv('{closure_file}', sep='\t', names=['subject_id', 'predicate_id', 'object_id'], AUTO_DETECT=TRUE) + """) - db.sql(""" - create or replace table closure_label as select subject_id as id, array_agg(name) as closure_label from closure join nodes on object_id = id - group by subject_id - """) + db.sql(""" + create or replace table closure_id as select subject_id as id, string_agg(object_id, '|') as closure from closure group by subject_id + """) + + db.sql(""" + create or replace table closure_label as select subject_id as id, string_agg(name, '|') as closure_label from closure join nodes on object_id = id + group by subject_id + """) query = f""" create or replace table denormalized_edges as @@ -112,16 +115,16 @@ def add_closure(kg_archive: str, """ print(query) - db.query(query) - - db.query(f""" - -- write denormalized_edges as tsv - copy (select * from denormalized_edges) to '{output_file}' (header, delimiter '\t') - """) - - # Clean up extracted node & edge files - if os.path.exists(f"{node_file}"): - os.remove(f"{node_file}") - if os.path.exists(f"{edge_file}"): - os.remove(f"{edge_file}") + if not dry_run: + db.query(query) + db.query(f""" + -- write denormalized_edges as tsv + copy (select * from denormalized_edges) to '{output_file}' (header, delimiter '\t') + """) + + # Clean up extracted node & edge files + if os.path.exists(f"{node_file}"): + os.remove(f"{node_file}") + if os.path.exists(f"{edge_file}"): + os.remove(f"{edge_file}") From e20509310b5156ee2f3f5f4c5c6daf7f97f4b87f Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Wed, 1 Nov 2023 14:27:50 -0700 Subject: [PATCH 2/2] bump version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 85b639d..4e7e3bd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "closurizer" -version = "0.4.0" +version = "0.4.1" description = "Add closure expansion fields to kgx files following the Golr pattern" authors = ["Kevin Schaper "]