From baa594ba9543daa021451470a6b713e7895b7726 Mon Sep 17 00:00:00 2001 From: Felipe Victorino Caputo <13631451+fvcaputo@users.noreply.github.com> Date: Fri, 16 Apr 2021 12:21:28 -0300 Subject: [PATCH] Add local dc property (#312) * add local dc property * update version --- butterfree/configs/db/cassandra_config.py | 12 ++++++++++ butterfree/configs/environment.py | 1 + setup.py | 2 +- .../configs/db/test_cassandra_config.py | 22 +++++++++++++++++++ 4 files changed, 36 insertions(+), 1 deletion(-) diff --git a/butterfree/configs/db/cassandra_config.py b/butterfree/configs/db/cassandra_config.py index 3f9e129d..3d94e756 100644 --- a/butterfree/configs/db/cassandra_config.py +++ b/butterfree/configs/db/cassandra_config.py @@ -43,6 +43,7 @@ def __init__( stream_checkpoint_path: str = None, read_consistency_level: str = None, write_consistency_level: str = None, + local_dc: str = None, ): self.username = username self.password = password @@ -55,6 +56,7 @@ def __init__( self.stream_checkpoint_path = stream_checkpoint_path self.read_consistency_level = read_consistency_level self.write_consistency_level = write_consistency_level + self.local_dc = local_dc @property def database(self) -> str: @@ -178,6 +180,15 @@ def write_consistency_level(self, value: str) -> None: "CASSANDRA_WRITE_CONSISTENCY_LEVEL", "LOCAL_QUORUM" ) + @property + def local_dc(self) -> Optional[str]: + """Local DC for Cassandra connection.""" + return self.__local_dc + + @local_dc.setter + def local_dc(self, value: str) -> None: + self.__local_dc = value or environment.get_variable("CASSANDRA_LOCAL_DC") + def get_options(self, table: str) -> Dict[Optional[str], Optional[str]]: """Get options for connect to Cassandra DB. @@ -197,6 +208,7 @@ def get_options(self, table: str) -> Dict[Optional[str], Optional[str]]: "spark.cassandra.auth.username": self.username, "spark.cassandra.auth.password": self.password, "spark.cassandra.connection.host": self.host, + "spark.cassandra.connection.localDC": self.local_dc, "spark.cassandra.input.consistency.level": self.read_consistency_level, "spark.cassandra.output.consistency.level": self.write_consistency_level, } diff --git a/butterfree/configs/environment.py b/butterfree/configs/environment.py index 5d8bb4e9..f56efc5d 100644 --- a/butterfree/configs/environment.py +++ b/butterfree/configs/environment.py @@ -14,6 +14,7 @@ "STREAM_CHECKPOINT_PATH": None, "CASSANDRA_READ_CONSISTENCY_LEVEL": None, "CASSANDRA_WRITE_CONSISTENCY_LEVEL": None, + "CASSANDRA_LOCAL_DC": None, } diff --git a/setup.py b/setup.py index 2f04f794..264d9e0d 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import find_packages, setup __package_name__ = "butterfree" -__version__ = "1.2.0.dev10" +__version__ = "1.2.0.dev11" __repository_url__ = "https://github.com/quintoandar/butterfree" with open("requirements.txt") as f: diff --git a/tests/unit/butterfree/configs/db/test_cassandra_config.py b/tests/unit/butterfree/configs/db/test_cassandra_config.py index 9af4c42b..d34c8e9f 100644 --- a/tests/unit/butterfree/configs/db/test_cassandra_config.py +++ b/tests/unit/butterfree/configs/db/test_cassandra_config.py @@ -203,6 +203,28 @@ def test_write_consistency_level_custom_env_var(self, mocker, cassandra_config): # then assert cassandra_config.write_consistency_level == value + def test_local_dc(self, cassandra_config): + # expecting + default = None + assert cassandra_config.local_dc == default + + def test_local_dc_custom(self, cassandra_config): + # given + value = "VPC_1" + cassandra_config.local_dc = value + + # then + assert cassandra_config.local_dc == value + + def test_local_dc_custom_env_var(self, mocker, cassandra_config): + # given + value = "VPC_1" + mocker.patch("butterfree.configs.environment.get_variable", return_value=value) + cassandra_config.local_dc = value + + # then + assert cassandra_config.local_dc == value + def test_set_credentials_on_instantiation(self): cassandra_config = CassandraConfig( # noqa: S106 username="username", password="password", host="host", keyspace="keyspace"