Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backends): support creation from a DB-API con #9603

Merged
merged 23 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
09e3abb
feat(duckdb): enable backend create from dbapi con
deepyaman Jul 16, 2024
d62142c
feat(clickhouse): enable backend create from ClickHouse Connect Client
deepyaman Jul 16, 2024
d93f0c2
feat(druid): enable backend create from dbapi con
deepyaman Jul 16, 2024
63aba4e
feat(exasol): enable backend create from dbapi con
deepyaman Jul 16, 2024
6b863ce
feat(flink): add `from_connection` API for backend
deepyaman Jul 18, 2024
1939c2a
feat(impala): enable backend create from dbapi con
deepyaman Jul 19, 2024
54ffc4d
feat(mssql): support backend create from dbapi con
deepyaman Jul 19, 2024
fa68df7
feat(mysql): support backend create from dbapi con
deepyaman Jul 19, 2024
5609c8f
feat(oracle): enable backend create from dbapi con
deepyaman Jul 19, 2024
eaa1ba4
feat(postgres): construct backend from a dbapi con
deepyaman Jul 19, 2024
172be31
feat(datafusion): add `from_connection` to backend
deepyaman Jul 19, 2024
415ebca
feat(datafusion): add `from_connection` to backend
deepyaman Jul 20, 2024
42fb876
test: don't check from connection where irrelevant
deepyaman Jul 19, 2024
fb1534d
feat(pyspark): implement the `from_connection` API
deepyaman Jul 20, 2024
97d5a95
feat(sqlite): enable backend create from dbapi con
deepyaman Jul 20, 2024
251c81f
feat(trino): support backend create from dbapi con
deepyaman Jul 20, 2024
0a9cc32
feat(bigquery): create a backend `from_connection`
deepyaman Jul 20, 2024
88d7214
feat(snowflake): create from `SnowflakeConnection`
deepyaman Jul 22, 2024
4f3c4b3
docs(datafusion): capitalize the 'F' in DataFusion
deepyaman Jul 20, 2024
442c9f6
feat(backends): add `from_connection` to top level
deepyaman Jul 22, 2024
c8267e6
chore(backends): mark from_connection experimental
deepyaman Jul 22, 2024
ff776cb
test: change test to not require schema information
cpcloud Jul 22, 2024
2184b77
feat(bigquery): implement `from_connection`
cpcloud Jul 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ jobs:
extras:
- sqlite
- name: datafusion
title: Datafusion
title: DataFusion
extras:
- datafusion
- name: polars
Expand Down
9 changes: 6 additions & 3 deletions gen_redirects.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"/backends/{version}/BigQuery/": "/backends/bigquery/",
"/backends/{version}/Clickhouse/": "/backends/clickhouse/",
"/backends/{version}/Dask/": "/backends/dask/",
"/backends/{version}/Datafusion/": "/backends/datafusion/",
"/backends/{version}/DataFusion/": "/backends/datafusion/",
"/backends/{version}/Datafusion/": "/backends/datafusion/", # For backwards compatibility
"/backends/{version}/Druid/": "/backends/druid/",
"/backends/{version}/DuckDB/": "/backends/duckdb/",
"/backends/{version}/Impala/": "/backends/impala/",
Expand All @@ -30,7 +31,8 @@
"/docs/{version}/backends/BigQuery/": "/backends/bigquery/",
"/docs/{version}/backends/Clickhouse/": "/backends/clickhouse/",
"/docs/{version}/backends/Dask/": "/backends/dask/",
"/docs/{version}/backends/Datafusion/": "/backends/datafusion/",
"/docs/{version}/backends/DataFusion/": "/backends/datafusion/",
"/docs/{version}/backends/Datafusion/": "/backends/datafusion/", # For backwards compatibility
"/docs/{version}/backends/Druid/": "/backends/druid/",
"/docs/{version}/backends/DuckDB/": "/backends/duckdb/",
"/docs/{version}/backends/Impala/": "/backends/impala/",
Expand Down Expand Up @@ -73,7 +75,8 @@
"/backends/BigQuery/": "/backends/bigquery/",
"/backends/Clickhouse/": "/backends/clickhouse/",
"/backends/Dask/": "/backends/dask/",
"/backends/Datafusion/": "/backends/datafusion/",
"/backends/DataFusion/": "/backends/datafusion/",
"/backends/Datafusion/": "/backends/datafusion/", # For backwards compatibility
"/backends/Druid/": "/backends/druid/",
"/backends/DuckDB/": "/backends/duckdb/",
"/backends/Impala/": "/backends/impala/",
Expand Down
6 changes: 5 additions & 1 deletion ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,7 @@
def __init__(self, *args, **kwargs):
self._con_args: tuple[Any] = args
self._con_kwargs: dict[str, Any] = kwargs
self._can_reconnect: bool = True
# expression cache
self._query_cache = RefCountedCache(
populate=self._load_into_cache,
Expand Down Expand Up @@ -856,7 +857,10 @@
# TODO(kszucs): should call self.connect(*self._con_args, **self._con_kwargs)
def reconnect(self) -> None:
"""Reconnect to the database already configured with connect."""
self.do_connect(*self._con_args, **self._con_kwargs)
if self._can_reconnect:
self.do_connect(*self._con_args, **self._con_kwargs)
else:
raise exc.IbisError("Cannot reconnect to unconfigured {self.name} backend")

Check warning on line 863 in ibis/backends/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/__init__.py#L863

Added line #L863 was not covered by tests

def do_connect(self, *args, **kwargs) -> None:
"""Connect to database specified by `args` and `kwargs`."""
Expand Down
31 changes: 31 additions & 0 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,37 @@ def do_connect(

self.partition_column = partition_column

@util.experimental
@classmethod
def from_connection(
cls,
client: bq.Client,
partition_column: str | None = "PARTITIONTIME",
storage_client: bqstorage.BigQueryReadClient | None = None,
dataset_id: str = "",
) -> Backend:
"""Create a BigQuery `Backend` from an existing ``Client``.

Parameters
----------
client
A `Client` from the `google.cloud.bigquery` package.
partition_column
Identifier to use instead of default `_PARTITIONTIME` partition
column. Defaults to `'PARTITIONTIME'`.
storage_client
A `BigQueryReadClient` from the `google.cloud.bigquery_storage_v1`
package.
dataset_id
A dataset id that lives inside of the project attached to `client`.
"""
return ibis.bigquery.connect(
client=client,
partition_column=partition_column,
storage_client=storage_client,
dataset_id=dataset_id,
)

def disconnect(self) -> None:
self.client.close()

Expand Down
15 changes: 15 additions & 0 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,21 @@ def do_connect(
**kwargs,
)

@util.experimental
@classmethod
def from_connection(cls, con: cc.driver.Client) -> Backend:
"""Create an Ibis client from an existing ClickHouse Connect Client instance.

Parameters
----------
con
An existing ClickHouse Connect Client instance.
"""
new_backend = cls()
new_backend._can_reconnect = False
new_backend.con = con
return new_backend

@property
def version(self) -> str:
return self.con.server_version
Expand Down
29 changes: 22 additions & 7 deletions ibis/backends/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
import sqlglot as sg
import sqlglot.expressions as sge

import ibis
import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
from ibis import util
from ibis.backends import CanCreateCatalog, CanCreateDatabase, CanCreateSchema, NoUrl
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers import DataFusionCompiler
Expand Down Expand Up @@ -77,12 +79,13 @@ def version(self):
def do_connect(
self, config: Mapping[str, str | Path] | SessionContext | None = None
) -> None:
"""Create a Datafusion backend for use with Ibis.
"""Create a DataFusion `Backend` for use with Ibis.

Parameters
----------
config
Mapping of table names to files.
Mapping of table names to files or a `SessionContext`
instance.

Examples
--------
Expand Down Expand Up @@ -112,6 +115,18 @@ def do_connect(
for name, path in config.items():
self.register(path, table_name=name)

@util.experimental
@classmethod
def from_connection(cls, con: SessionContext) -> Backend:
"""Create a DataFusion `Backend` from an existing `SessionContext` instance.

Parameters
----------
con
A `SessionContext` instance.
"""
return ibis.datafusion.connect(con)

def disconnect(self) -> None:
pass

Expand Down Expand Up @@ -329,7 +344,7 @@ def register(
table_name
The name of the table
kwargs
Datafusion-specific keyword arguments
DataFusion-specific keyword arguments

Examples
--------
Expand Down Expand Up @@ -423,7 +438,7 @@ def read_csv(
An optional name to use for the created table. This defaults to
a sequentially generated name.
**kwargs
Additional keyword arguments passed to Datafusion loading function.
Additional keyword arguments passed to DataFusion loading function.

Returns
-------
Expand Down Expand Up @@ -451,7 +466,7 @@ def read_parquet(
An optional name to use for the created table. This defaults to
a sequentially generated name.
**kwargs
Additional keyword arguments passed to Datafusion loading function.
Additional keyword arguments passed to DataFusion loading function.

Returns
-------
Expand Down Expand Up @@ -576,7 +591,7 @@ def create_table(
temp: bool = False,
overwrite: bool = False,
):
"""Create a table in Datafusion.
"""Create a table in DataFusion.

Parameters
----------
Expand Down Expand Up @@ -697,7 +712,7 @@ def truncate_table(
def _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite):
"""Workaround inability to overwrite tables in dataframe API.

Datafusion has helper methods for loading in-memory data, but these methods
DataFusion has helper methods for loading in-memory data, but these methods
don't allow overwriting tables.
The SQL interface allows creating tables from existing tables, so we register
the data as a table using the dataframe API, then run a
Expand Down
16 changes: 16 additions & 0 deletions ibis/backends/druid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import ibis.expr.datatypes as dt
import ibis.expr.schema as sch
from ibis import util
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers import DruidCompiler
from ibis.backends.sql.compilers.base import STAR
Expand Down Expand Up @@ -81,6 +82,21 @@ def do_connect(self, **kwargs: Any) -> None:
header = kwargs.pop("header", True)
self.con = pydruid.db.connect(**kwargs, header=header)

@util.experimental
@classmethod
def from_connection(cls, con: pydruid.db.api.Connection) -> Backend:
"""Create an Ibis client from an existing connection to a Druid database.

Parameters
----------
con
An existing connection to a Druid database.
"""
new_backend = cls()
new_backend._can_reconnect = False
new_backend.con = con
return new_backend

@contextlib.contextmanager
def _safe_raw_sql(self, query, *args, **kwargs):
with contextlib.suppress(AttributeError):
Expand Down
25 changes: 25 additions & 0 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,31 @@ def do_connect(

self.con = duckdb.connect(str(database), config=config, read_only=read_only)

self._post_connect(extensions)

@util.experimental
@classmethod
def from_connection(
cls,
con: duckdb.DuckDBPyConnection,
extensions: Sequence[str] | None = None,
deepyaman marked this conversation as resolved.
Show resolved Hide resolved
) -> Backend:
"""Create an Ibis client from an existing connection to a DuckDB database.

Parameters
----------
con
An existing connection to a DuckDB database.
extensions
A list of duckdb extensions to install/load upon connection.
"""
new_backend = cls(extensions=extensions)
new_backend._can_reconnect = False
new_backend.con = con
new_backend._post_connect(extensions)
return new_backend

def _post_connect(self, extensions: Sequence[str] | None = None) -> None:
# Load any pre-specified extensions
if extensions is not None:
self._load_extensions(extensions)
Expand Down
26 changes: 26 additions & 0 deletions ibis/backends/exasol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,32 @@ def do_connect(
quote_ident=True,
**kwargs,
)
self._post_connect(timezone)

@util.experimental
@classmethod
def from_connection(
cls, con: pyexasol.ExaConnection, timezone: str | None = None
) -> Backend:
"""Create an Ibis client from an existing connection to an Exasol database.

Parameters
----------
con
An existing connection to an Exasol database.
timezone
The session timezone.
"""
if timezone is None:
timezone = (con.execute("SELECT SESSIONTIMEZONE").fetchone() or ("UTC",))[0]

new_backend = cls(timezone=timezone)
new_backend._can_reconnect = False
new_backend.con = con
new_backend._post_connect(timezone)
return new_backend

def _post_connect(self, timezone: str = "UTC") -> None:
with self.begin() as con:
con.execute(f"ALTER SESSION SET TIME_ZONE = {timezone!r}")

Expand Down
13 changes: 13 additions & 0 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
from ibis import util
from ibis.backends import CanCreateDatabase, NoUrl
from ibis.backends.flink.ddl import (
CreateDatabase,
Expand Down Expand Up @@ -71,6 +72,18 @@ def do_connect(self, table_env: TableEnvironment) -> None:
"""
self._table_env = table_env

@util.experimental
@classmethod
def from_connection(cls, table_env: TableEnvironment) -> Backend:
"""Create a Flink `Backend` from an existing table environment.

Parameters
----------
table_env
A table environment.
"""
return ibis.flink.connect(table_env)

def disconnect(self) -> None:
pass

Expand Down
20 changes: 20 additions & 0 deletions ibis/backends/impala/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from pathlib import Path
from urllib.parse import ParseResult

import impala.hiveserver2 as hs2
import pandas as pd
import polars as pl
import pyarrow as pa
Expand Down Expand Up @@ -183,6 +184,25 @@ def do_connect(
cur.ping()

self.con = con
self._post_connect()

@util.experimental
@classmethod
def from_connection(cls, con: hs2.HiveServer2Connection) -> Backend:
"""Create an Impala `Backend` from an existing HS2 connection.

Parameters
----------
con
An existing connection to HiveServer2 (HS2).
"""
new_backend = cls()
new_backend._can_reconnect = False
new_backend.con = con
new_backend._post_connect()
return new_backend

def _post_connect(self) -> None:
self.options = {}

@cached_property
Expand Down
Loading
Loading