Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase sorting scalability via CytoTable metadata columns #204

Merged
merged 15 commits into from
Jun 12, 2024
7 changes: 7 additions & 0 deletions cytotable/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@
],
}

# metadata column names and types for internal use within CytoTable
CYOTABLE_META_COLUMN_TYPES = {
"cytotable_meta_source_path": "VARCHAR",
"cytotable_meta_offset": "BIGINT",
"cytotable_meta_rownum": "BIGINT",
}

CYTOTABLE_DEFAULT_PARQUET_METADATA = {
"data-producer": "https://github.com/cytomining/CytoTable",
"data-producer-version": str(_get_cytotable_version()),
Expand Down
120 changes: 103 additions & 17 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@


@python_app
def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]]:
def _get_table_columns_and_types(
source: Dict[str, Any], sort_output: bool
) -> List[Dict[str, str]]:
"""
Gather column data from table through duckdb.

Args:
source: Dict[str, Any]
Contains the source data to be chunked. Represents a single
file or table of some kind.
sort_output:
Specifies whether to sort cytotable output or not.

Returns:
List[Dict[str, str]]
Expand Down Expand Up @@ -110,6 +114,8 @@ def _get_table_columns_and_types(source: Dict[str, Any]) -> List[Dict[str, str]]
# offset is set to 0 start at first row
# result from table
offset=0,
add_cytotable_meta=False,
sort_output=sort_output,
)
with _duckdb_reader() as ddb_reader:
return (
Expand Down Expand Up @@ -276,6 +282,7 @@ def _source_chunk_to_parquet(
chunk_size: int,
offset: int,
dest_path: str,
sort_output: bool,
) -> str:
"""
Export source data to chunked parquet file using chunk size and offsets.
Expand All @@ -292,6 +299,8 @@ def _source_chunk_to_parquet(
The offset for chunking the data from source.
dest_path: str
Path to store the output data.
sort_output: bool
Specifies whether to sort cytotable output or not.

Returns:
str
Expand All @@ -304,6 +313,7 @@ def _source_chunk_to_parquet(
from cloudpathlib import AnyPath
from pyarrow import parquet

from cytotable.constants import CYOTABLE_META_COLUMN_TYPES
from cytotable.utils import (
_duckdb_reader,
_sqlite_mixed_type_query_to_parquet,
Expand All @@ -317,13 +327,39 @@ def _source_chunk_to_parquet(
)
pathlib.Path(source_dest_path).mkdir(parents=True, exist_ok=True)

source_path_str = (
source["source_path"]
if "table_name" not in source.keys()
else f"{source['source_path']}_table_{source['table_name']}"
)
# build the column selection block of query

# add cytotable metadata columns
cytotable_metadata_cols = [
(
f"CAST( '{source_path_str}' "
f"AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path']})"
' AS "cytotable_meta_source_path"'
),
f"CAST( {offset} AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset']}) AS \"cytotable_meta_offset\"",
(
f"CAST( (row_number() OVER ()) AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_rownum']})"
' AS "cytotable_meta_rownum"'
),
]
# add source table columns
casted_source_cols = [
# here we cast the column to the specified type ensure the colname remains the same
f"CAST(\"{column['column_name']}\" AS {column['column_dtype']}) AS \"{column['column_name']}\""
for column in source["columns"]
]

# create selection statement from lists above
select_columns = ",".join(
[
# here we cast the column to the specified type ensure the colname remains the same
f"CAST(\"{column['column_name']}\" AS {column['column_dtype']}) AS \"{column['column_name']}\""
for column in source["columns"]
]
# if we should sort the output, add the metadata_cols
cytotable_metadata_cols + casted_source_cols
if sort_output
else casted_source_cols
)

# build output query and filepath base
Expand Down Expand Up @@ -353,6 +389,11 @@ def _source_chunk_to_parquet(
ORDER BY ALL
LIMIT {chunk_size} OFFSET {offset}
"""
if sort_output
else f"""
{base_query}
LIMIT {chunk_size} OFFSET {offset}
"""
).arrow(),
where=result_filepath,
)
Expand All @@ -375,6 +416,8 @@ def _source_chunk_to_parquet(
table_name=str(source["table_name"]),
chunk_size=chunk_size,
offset=offset,
add_cytotable_meta=True if sort_output else False,
sort_output=sort_output,
),
where=result_filepath,
)
Expand Down Expand Up @@ -423,7 +466,10 @@ def _prepend_column_name(

import pyarrow.parquet as parquet

from cytotable.constants import CYTOTABLE_ARROW_USE_MEMORY_MAPPING
from cytotable.constants import (
CYOTABLE_META_COLUMN_TYPES,
CYTOTABLE_ARROW_USE_MEMORY_MAPPING,
)
from cytotable.utils import _write_parquet_table_with_metadata

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -471,8 +517,10 @@ def _prepend_column_name(
# source_group_name_stem: 'Cells'
# column_name: 'AreaShape_Area'
# updated_column_name: 'Cells_AreaShape_Area'
if column_name not in identifying_columns and not column_name.startswith(
source_group_name_stem.capitalize()
if (
column_name not in identifying_columns
and not column_name.startswith(source_group_name_stem.capitalize())
and column_name not in CYOTABLE_META_COLUMN_TYPES
):
updated_column_names.append(f"{source_group_name_stem}_{column_name}")
# if-condition for prepending 'Metadata_' to column name
Expand Down Expand Up @@ -680,6 +728,7 @@ def _concat_source_group(
def _prepare_join_sql(
sources: Dict[str, List[Dict[str, Any]]],
joins: str,
sort_output: bool,
) -> str:
"""
Prepare join SQL statement with actual locations of data based on the sources.
Expand All @@ -691,22 +740,39 @@ def _prepare_join_sql(
joins: str:
DuckDB-compatible SQL which will be used to perform the join
operations using the join_group keys as a reference.
sort_output: bool
Specifies whether to sort cytotable output or not.

Returns:
str:
String representing the SQL to be used in later join work.
"""
import pathlib

from cytotable.constants import CYOTABLE_META_COLUMN_TYPES

# replace with real location of sources for join sql
order_by_tables = []
for key, val in sources.items():
if pathlib.Path(key).stem.lower() in joins.lower():
table_name = str(pathlib.Path(key).stem.lower())
joins = joins.replace(
f"'{str(pathlib.Path(key).stem.lower())}.parquet'",
f"'{table_name}.parquet'",
str([str(table) for table in val[0]["table"]]),
)
order_by_tables.append(table_name)

# create order by statement with from all tables using cytotable metadata
order_by_sql = "ORDER BY " + ", ".join(
[
f"{table}.{meta_column}"
for table in order_by_tables
for meta_column in CYOTABLE_META_COLUMN_TYPES
]
)

return joins
# add the order by statements to the join
return joins + order_by_sql if sort_output else joins


@python_app
Expand Down Expand Up @@ -740,21 +806,29 @@ def _join_source_chunk(

import pathlib

import pyarrow.parquet as parquet

from cytotable.constants import CYOTABLE_META_COLUMN_TYPES
from cytotable.utils import _duckdb_reader, _write_parquet_table_with_metadata

# Attempt to read the data to parquet file
# using duckdb for extraction and pyarrow for
# writing data to a parquet file.
# read data with chunk size + offset
# and export to parquet
exclude_meta_cols = [
f"c NOT LIKE '{col}%'" for col in list(CYOTABLE_META_COLUMN_TYPES.keys())
]
with _duckdb_reader() as ddb_reader:
result = ddb_reader.execute(
f"""
WITH joined AS (
{joins}
{"ORDER BY ALL" if "ORDER BY" not in joins.upper() else ""}
LIMIT {chunk_size} OFFSET {offset}
)
SELECT
/* exclude metadata columns from the results
by using a lambda on column names based on exclude_meta_cols. */
COLUMNS (c -> ({" AND ".join(exclude_meta_cols)}))
FROM joined;
"""
).arrow()

Expand Down Expand Up @@ -974,6 +1048,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
chunk_size: Optional[int],
infer_common_schema: bool,
drop_null: bool,
sort_output: bool,
data_type_cast_map: Optional[Dict[str, str]] = None,
**kwargs,
) -> Union[Dict[str, List[Dict[str, Any]]], str]:
Expand Down Expand Up @@ -1012,6 +1087,8 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
Whether to infer a common schema when concatenating sources.
drop_null: bool:
Whether to drop null results.
sort_output: bool
Specifies whether to sort cytotable output or not.
data_type_cast_map: Dict[str, str]
A dictionary mapping data type groups to specific types.
Roughly includes Arrow data types language from:
Expand Down Expand Up @@ -1083,7 +1160,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
**{
"columns": _prep_cast_column_data_types(
columns=_get_table_columns_and_types(
source=source,
source=source, sort_output=sort_output
),
data_type_cast_map=data_type_cast_map,
)
Expand All @@ -1109,6 +1186,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
chunk_size=chunk_size,
offset=offset,
dest_path=expanded_dest_path,
sort_output=sort_output,
),
source_group_name=source_group_name,
identifying_columns=identifying_columns,
Expand Down Expand Up @@ -1169,7 +1247,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
evaluated_results = evaluate_futures(results)

prepared_joins_sql = _prepare_join_sql(
sources=evaluated_results, joins=joins
sources=evaluated_results, joins=joins, sort_output=sort_output
).result()

# map joined results based on the join groups gathered above
Expand Down Expand Up @@ -1222,6 +1300,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals
infer_common_schema: bool = True,
drop_null: bool = False,
data_type_cast_map: Optional[Dict[str, str]] = None,
sort_output: bool = True,
preset: Optional[str] = "cellprofiler_csv",
parsl_config: Optional[parsl.Config] = None,
**kwargs,
Expand Down Expand Up @@ -1263,8 +1342,14 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals
DuckDB-compatible SQL which will be used to perform the join operations.
chunk_size: Optional[int] (Default value = None)
Size of join chunks which is used to limit data size during join ops
infer_common_schema: bool: (Default value = True)
infer_common_schema: bool (Default value = True)
Whether to infer a common schema when concatenating sources.
data_type_cast_map: Dict[str, str], (Default value = None)
A dictionary mapping data type groups to specific types.
Roughly includes Arrow data types language from:
https://arrow.apache.org/docs/python/api/datatypes.html
sort_output: bool (Default value = True)
Specifies whether to sort cytotable output or not.
drop_null: bool (Default value = False)
Whether to drop nan/null values from results
preset: str (Default value = "cellprofiler_csv")
Expand Down Expand Up @@ -1379,6 +1464,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals
infer_common_schema=infer_common_schema,
drop_null=drop_null,
data_type_cast_map=data_type_cast_map,
sort_output=sort_output,
**kwargs,
)

Expand Down
Loading
Loading