Skip to content

Commit

Permalink
few more changes
Browse files Browse the repository at this point in the history
  • Loading branch information
skarakuzu committed Jan 28, 2025
1 parent 057e074 commit 918200f
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 7 deletions.
1 change: 0 additions & 1 deletion tiled/adapters/arrow.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import copy
from pathlib import Path
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
from typing import Callable, Dict, Iterator, List, Optional, Tuple, Union
from urllib.parse import quote_plus

import pandas
Expand Down
1 change: 0 additions & 1 deletion tiled/adapters/awkward_buffers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from ..structures.awkward import AwkwardStructure
from ..structures.core import Spec, StructureFamily
from ..structures.data_source import Asset, DataSource, Storage

from ..type_aliases import JSON
from ..utils import path_from_uri
from .awkward import AwkwardAdapter
Expand Down
17 changes: 13 additions & 4 deletions tiled/adapters/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,25 @@ def init_storage(
data_source.structure.arrow_schema_decoded
) # based on hash of Arrow schema
encoded = schema.serialize()

default_table_name = "table_" + hashlib.md5(encoded).hexdigest()
data_source.parameters.setdefault("table_name", default_table_name)
table_name = data_source.parameters.setdefault("table_name", default_table_name)

data_source.parameters["dataset_id"] = secrets.randbits(63)
data_uri = storage.get("sql") # TODO scrub credential

schema_new = schema.insert(0, pyarrow.field("dataset_id", pyarrow.int64()))
statement = schema_to_pg_create_table(schema_new, default_table_name)
create_table_statement = schema_to_pg_create_table(schema_new, table_name)

create_index_statement = f"""create
index if not exists
dataset_id_index
on
{table_name}(dataset_id);"""

conn = create_connection(data_uri)
conn.cursor().execute(statement)
conn.cursor().execute(create_table_statement)
conn.cursor().execute(create_index_statement)
conn.commit()

data_source.assets.append(
Expand Down Expand Up @@ -232,8 +241,8 @@ def read_partition(
raise NotImplementedError
return self.read(fields)

def create_connection(uri: str):

def create_connection(uri):
if uri.startswith("sqlite:"):
# Ensure this path is writable to avoid a confusing error message
# from abdc_driver_sqlite.
Expand Down
3 changes: 2 additions & 1 deletion tiled/structures/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import io
from dataclasses import dataclass
from typing import List, Tuple, Union

import pyarrow

B64_ENCODED_PREFIX = "data:application/vnd.apache.arrow.file;base64,"
Expand Down Expand Up @@ -64,9 +65,9 @@ def from_arrays(cls, arr, names):
schema_b64 = base64.b64encode(schema_bytes).decode("utf-8")
data_uri = B64_ENCODED_PREFIX + schema_b64
return cls(arrow_schema=data_uri, npartitions=1, columns=list(names))

@classmethod
def from_schema(cls, schema: pyarrow.Schema, npartitions: int = 1):

schema_bytes = schema.serialize()
schema_b64 = base64.b64encode(schema_bytes).decode("utf-8")
data_uri = B64_ENCODED_PREFIX + schema_b64
Expand Down

0 comments on commit 918200f

Please sign in to comment.