Skip to content

Commit

Permalink
Fix rebase error
Browse files Browse the repository at this point in the history
  • Loading branch information
danielballan committed Jan 28, 2025
1 parent a02c43e commit 057e074
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 72 deletions.
134 changes: 64 additions & 70 deletions tiled/adapters/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ def __init__(
self.table_name = table_name
self.dataset_id = dataset_id



def metadata(self) -> JSON:
"""
The metadata representing the actual data.
Expand Down Expand Up @@ -101,7 +99,6 @@ def init_storage(
data_source.parameters["dataset_id"] = secrets.randbits(63)
data_uri = storage.get("sql") # TODO scrub credential


schema_new = schema.insert(0, pyarrow.field("dataset_id", pyarrow.int64()))
statement = schema_to_pg_create_table(schema_new, default_table_name)

Expand Down Expand Up @@ -192,7 +189,9 @@ def append_partition(
table = pyarrow.Table.from_batches(batches)
table_with_dataset_id = add_dataset_column(table, self.dataset_id)

self.cur.adbc_ingest(self.table_name, table_with_dataset_id, mode="create_append")
self.cur.adbc_ingest(
self.table_name, table_with_dataset_id, mode="create_append"
)
self.conn.commit()

def read(self, fields: Optional[Union[str, List[str]]] = None) -> pandas.DataFrame:
Expand Down Expand Up @@ -233,6 +232,7 @@ def read_partition(
raise NotImplementedError
return self.read(fields)


def create_connection(uri):
if uri.startswith("sqlite:"):
# Ensure this path is writable to avoid a confusing error message
Expand All @@ -258,105 +258,96 @@ def create_connection(uri):

return conn


def add_dataset_column(table: pyarrow.Table, dataset_id: int) -> pyarrow.Table:
column = dataset_id * numpy.ones(len(table), dtype=numpy.int64)
return table.add_column(0, pyarrow.field("dataset_id", pyarrow.int64()), [column])


def schema_to_pg_create_table(schema: pyarrow.Schema, table_name: str) -> str:
# Comprehensive mapping of PyArrow types to PostgreSQL types
type_mapping = {
# Numeric Types
'int8': 'SMALLINT', # Could also use "TINYINT" but not native to PG
'int16': 'SMALLINT',
'int32': 'INTEGER',
'int64': 'BIGINT',
'uint8': 'SMALLINT',
'uint16': 'INTEGER',
'uint32': 'BIGINT',
'uint64': 'NUMERIC', # No unsigned in PG, so use NUMERIC
'float16': 'REAL',
'float32': 'REAL',
'float64': 'DOUBLE PRECISION',
'decimal128': 'DECIMAL',
'decimal256': 'DECIMAL',

"int8": "SMALLINT", # Could also use "TINYINT" but not native to PG
"int16": "SMALLINT",
"int32": "INTEGER",
"int64": "BIGINT",
"uint8": "SMALLINT",
"uint16": "INTEGER",
"uint32": "BIGINT",
"uint64": "NUMERIC", # No unsigned in PG, so use NUMERIC
"float16": "REAL",
"float32": "REAL",
"float64": "DOUBLE PRECISION",
"decimal128": "DECIMAL",
"decimal256": "DECIMAL",
# String Types
'string': 'TEXT',
'large_string': 'TEXT',

"string": "TEXT",
"large_string": "TEXT",
# Binary Types
'binary': 'BYTEA',
'large_binary': 'BYTEA',

"binary": "BYTEA",
"large_binary": "BYTEA",
# Boolean Type
'bool': 'BOOLEAN',

"bool": "BOOLEAN",
# Temporal Types
'date32': 'DATE',
'date64': 'DATE',
'timestamp[s]': 'TIMESTAMP',
'timestamp[ms]': 'TIMESTAMP',
'timestamp[us]': 'TIMESTAMP',
'timestamp[ns]': 'TIMESTAMP',
'time32[s]': 'TIME',
'time32[ms]': 'TIME',
'time64[us]': 'TIME',
'time64[ns]': 'TIME',

"date32": "DATE",
"date64": "DATE",
"timestamp[s]": "TIMESTAMP",
"timestamp[ms]": "TIMESTAMP",
"timestamp[us]": "TIMESTAMP",
"timestamp[ns]": "TIMESTAMP",
"time32[s]": "TIME",
"time32[ms]": "TIME",
"time64[us]": "TIME",
"time64[ns]": "TIME",
# Interval Types
'interval[s]': 'INTERVAL',
'interval[ms]': 'INTERVAL',
'interval[us]': 'INTERVAL',
'interval[ns]': 'INTERVAL',
'interval_month_day_nano': 'INTERVAL',

"interval[s]": "INTERVAL",
"interval[ms]": "INTERVAL",
"interval[us]": "INTERVAL",
"interval[ns]": "INTERVAL",
"interval_month_day_nano": "INTERVAL",
# List Types - mapped to ARRAY
'list': 'ARRAY',
'large_list': 'ARRAY',
'fixed_size_list': 'ARRAY',

"list": "ARRAY",
"large_list": "ARRAY",
"fixed_size_list": "ARRAY",
# Struct Type
'struct': 'JSONB', # Best approximate in PG

"struct": "JSONB", # Best approximate in PG
# Dictionary Type (usually used for categorical data)
'dictionary': 'TEXT', # Stored as its target type

"dictionary": "TEXT", # Stored as its target type
# Fixed Size Types
'fixed_size_binary': 'BYTEA',

"fixed_size_binary": "BYTEA",
# Map Type
'map': 'JSONB', # Best approximate in PG

"map": "JSONB", # Best approximate in PG
# Duration Types
'duration[s]': 'INTERVAL',
'duration[ms]': 'INTERVAL',
'duration[us]': 'INTERVAL',
'duration[ns]': 'INTERVAL',
"duration[s]": "INTERVAL",
"duration[ms]": "INTERVAL",
"duration[us]": "INTERVAL",
"duration[ns]": "INTERVAL",
}

def get_sql_type(field: pyarrow.Field) -> str:
base_type = str(field.type).lower()

# Handle list types (arrays)
if base_type.startswith(('list', 'large_list', 'fixed_size_list')):
if base_type.startswith(("list", "large_list", "fixed_size_list")):
value_type = field.type.value_type
sql_value_type = type_mapping.get(str(value_type).lower(), 'TEXT')
sql_value_type = type_mapping.get(str(value_type).lower(), "TEXT")
return f"{sql_value_type}[]"

# Handle dictionary types
if base_type.startswith('dictionary'):
if base_type.startswith("dictionary"):
# Use the dictionary value type
value_type = field.type.value_type
return type_mapping.get(str(value_type).lower(), 'TEXT')
return type_mapping.get(str(value_type).lower(), "TEXT")

# Handle decimal types with precision and scale
if base_type.startswith('decimal'):
if base_type.startswith("decimal"):
precision = field.type.precision
scale = field.type.scale
return f"DECIMAL({precision}, {scale})"

# Default handling
return type_mapping.get(base_type, 'TEXT')
return type_mapping.get(base_type, "TEXT")

# Build column definitions
columns = []
Expand All @@ -367,10 +358,13 @@ def get_sql_type(field: pyarrow.Field) -> str:
columns.append(f"{field.name} {sql_type} {nullable}")

# Construct the CREATE TABLE statement
create_statement = (f"""
create_statement = (
f"""
CREATE TABLE IF NOT EXISTS {table_name} (
""" + ',\n '.join(columns)
+ """)
""")
"""
+ ",\n ".join(columns)
+ """)
"""
)

return create_statement
return create_statement
4 changes: 2 additions & 2 deletions tiled/catalog/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,8 +661,8 @@ async def create_node(
),
)
adapter = STORAGE_ADAPTERS_BY_MIMETYPE[data_source.mimetype]
assets = await ensure_awaitable(
init_storage,
data_source = await ensure_awaitable(
adapter.init_storage,
self.context.writable_storage,
data_source,
self.segments + [key],
Expand Down

0 comments on commit 057e074

Please sign in to comment.