Skip to content

Commit

Permalink
[BUG] Fix Cassandra Connect Session (#316)
Browse files Browse the repository at this point in the history
* Fix Cassandra Connect Session.

* Apply style.
  • Loading branch information
moromimay authored Apr 23, 2021
1 parent 378f3a5 commit 3b18b5a
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 61 deletions.
59 changes: 29 additions & 30 deletions butterfree/clients/cassandra_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,35 +61,36 @@ def __init__(
@property
def conn(self, *, ssl_path: str = None) -> Session: # type: ignore
"""Establishes a Cassandra connection."""
auth_provider = (
PlainTextAuthProvider(username=self.user, password=self.password)
if self.user is not None
else None
)
ssl_opts = (
{
"ca_certs": ssl_path,
"ssl_version": PROTOCOL_TLSv1,
"cert_reqs": CERT_REQUIRED,
}
if ssl_path is not None
else None
)
if not self._session:
auth_provider = (
PlainTextAuthProvider(username=self.user, password=self.password)
if self.user is not None
else None
)
ssl_opts = (
{
"ca_certs": ssl_path,
"ssl_version": PROTOCOL_TLSv1,
"cert_reqs": CERT_REQUIRED,
}
if ssl_path is not None
else None
)

execution_profiles = {
EXEC_PROFILE_DEFAULT: ExecutionProfile(
load_balancing_policy=DCAwareRoundRobinPolicy(),
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
row_factory=dict_factory,
execution_profiles = {
EXEC_PROFILE_DEFAULT: ExecutionProfile(
load_balancing_policy=DCAwareRoundRobinPolicy(),
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
row_factory=dict_factory,
)
}
cluster = Cluster(
contact_points=self.host,
auth_provider=auth_provider,
ssl_options=ssl_opts,
execution_profiles=execution_profiles,
)
}
cluster = Cluster(
contact_points=self.host,
auth_provider=auth_provider,
ssl_options=ssl_opts,
execution_profiles=execution_profiles,
)
self._session = cluster.connect(self.keyspace)
self._session = cluster.connect(self.keyspace)
return self._session

def sql(self, query: str) -> ResponseFuture:
Expand All @@ -99,9 +100,7 @@ def sql(self, query: str) -> ResponseFuture:
query: desired query.
"""
if not self._session:
raise RuntimeError("There's no session available for this query.")
return self._session.execute(query)
return self.conn.execute(query)

def get_schema(self, table: str, database: str = None) -> List[Dict[str, str]]:
"""Returns desired table schema.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import find_packages, setup

__package_name__ = "butterfree"
__version__ = "1.2.0.dev12"
__version__ = "1.2.0.dev13"
__repository_url__ = "https://github.com/quintoandar/butterfree"

with open("requirements.txt") as f:
Expand Down
30 changes: 0 additions & 30 deletions tests/unit/butterfree/clients/test_cassandra_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from typing import Any, Dict, List
from unittest.mock import MagicMock

import pytest

from butterfree.clients import CassandraClient
from butterfree.clients.cassandra_client import CassandraColumn

Expand Down Expand Up @@ -88,31 +86,3 @@ def test_cassandra_create_table(
query = cassandra_client.sql.call_args[0][0]

assert sanitize_string(query) == sanitize_string(expected_query)

def test_cassandra_without_session(self, cassandra_client: CassandraClient) -> None:
cassandra_client = cassandra_client

with pytest.raises(
RuntimeError, match="There's no session available for this query."
):
cassandra_client.sql(
query="select feature1, feature2 from cassandra_feature_set"
)
with pytest.raises(
RuntimeError, match="There's no session available for this query."
):
cassandra_client.create_table(
[
{"column_name": "id", "type": "int", "primary_key": True},
{
"column_name": "rent_per_month",
"type": "float",
"primary_key": False,
},
],
"test",
)
with pytest.raises(
RuntimeError, match="There's no session available for this query."
):
cassandra_client.get_schema("test")

0 comments on commit 3b18b5a

Please sign in to comment.