Skip to content

Commit

Permalink
support kms_key_name feature for seeds
Browse files Browse the repository at this point in the history
  • Loading branch information
b0lle committed May 25, 2022
1 parent 69d1bf2 commit e4ba036
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 2 deletions.
6 changes: 5 additions & 1 deletion dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import google.cloud.bigquery

from google.cloud.bigquery import AccessEntry, SchemaField
from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration

import time
import agate
Expand Down Expand Up @@ -610,16 +611,19 @@ def alter_table_add_columns(self, relation, columns):
client.update_table(new_table, ["schema"])

@available.parse_none
def load_dataframe(self, database, schema, table_name, agate_table, column_override):
def load_dataframe(self, database, schema, table_name, agate_table, column_override, kms_key_name):
bq_schema = self._agate_to_schema(agate_table, column_override)
conn = self.connections.get_thread_connection()
client = conn.handle

table_ref = self.connections.table_ref(database, schema, table_name)

destination_encryption_configuration = EncryptionConfiguration(kms_key_name=kms_key_name)

load_config = google.cloud.bigquery.LoadJobConfig()
load_config.skip_leading_rows = 1
load_config.schema = bq_schema
load_config.destination_encryption_configuration = destination_encryption_configuration

with open(agate_table.original_abspath, "rb") as f:
job = client.load_table_from_file(f, table_ref, rewind=True, job_config=load_config)
Expand Down
3 changes: 2 additions & 1 deletion dbt/include/bigquery/macros/materializations/seed.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
{% macro bigquery__load_csv_rows(model, agate_table) %}

{%- set column_override = model['config'].get('column_types', {}) -%}
{%- set kms_key_name = model['config'].get('kms_key_name', {}) -%}
{{ adapter.load_dataframe(model['database'], model['schema'], model['alias'],
agate_table, column_override) }}
agate_table, column_override, kms_key_name) }}
{% if config.persist_relation_docs() and 'description' in model %}

{{ adapter.update_table_description(model['database'], model['schema'], model['alias'], model['description']) }}
Expand Down
5 changes: 5 additions & 0 deletions tests/integration/simple_seed_test/models-bq/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,8 @@ seeds:
tests:
- column_type:
type: STRING

- name: seed_kms
columns:
- name: birthday
- name: id
21 changes: 21 additions & 0 deletions tests/integration/simple_seed_test/seeds-config/seed_kms.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
id,first_name,email,ip_address,birthday
1,Larry,[email protected],69.135.206.194,2008-09-12 19:08:31
2,Larry,[email protected],64.210.133.162,1978-05-09 04:15:14
3,Anna,[email protected],168.104.64.114,2011-10-16 04:07:57
4,Sandra,[email protected],229.235.252.98,1973-07-19 10:52:43
5,Fred,[email protected],78.229.170.124,2012-09-30 16:38:29
6,Stephen,[email protected],182.227.157.105,1995-11-07 21:40:50
7,William,[email protected],135.139.249.50,1982-09-05 03:11:59
8,Jessica,[email protected],203.62.178.210,1991-10-16 11:03:15
9,Douglas,[email protected],178.187.247.1,1979-10-01 09:49:48
10,Lisa,[email protected],168.234.128.249,2011-05-26 07:45:49
11,Ralph,[email protected],55.152.163.149,1972-11-18 19:06:11
12,Louise,[email protected],141.116.153.154,2014-11-25 20:56:14
13,Clarence,[email protected],81.171.31.133,2011-11-17 07:02:36
14,Daniel,[email protected],8.204.211.37,1980-09-13 00:09:04
15,Katherine,[email protected],176.96.134.59,1997-08-22 19:36:56
16,Billy,[email protected],214.108.78.85,2003-10-19 02:14:47
17,Annie,[email protected],190.108.42.70,1988-10-28 15:12:35
18,Shirley,[email protected],109.251.164.84,1988-08-24 10:50:57
19,Roger,[email protected],38.145.218.108,1985-12-31 15:17:15
20,Lillian,[email protected],47.57.236.17,1970-06-08 02:09:05
160 changes: 160 additions & 0 deletions tests/integration/simple_seed_test/test_seed_kms_key_name.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import json
import os

from google.oauth2 import service_account
from google.api_core.exceptions import AlreadyExists
from google.cloud import bigquery
from google.cloud import kms

from tests.integration.base import DBTIntegrationTest, use_profile


class GcpKmsAdapter:
def __init__(self, service_account_info, location_id) -> None:
self.location_id = location_id
credentials = service_account.Credentials.from_service_account_info(
info=service_account_info
)
self.project_id = credentials.project_id
self.kms_client = kms.KeyManagementServiceClient.from_service_account_info(
info=service_account_info
)

def get_or_create_key(self, key_ring_id, key_id):
self._create_key_ring(name=key_ring_id)
self._create_key_symmetric_encrypt_decrypt(
key_ring_id=key_ring_id, key_id=key_id
)
return f"projects/{self.project_id}/locations/{self.location_id}/keyRings/{key_ring_id}/cryptoKeys/{key_id}"

def _create_key_ring(self, name):
location_name = f"projects/{self.project_id}/locations/{self.location_id}"
key_ring = {}
try:
self.kms_client.create_key_ring(
request={
"parent": location_name,
"key_ring_id": name,
"key_ring": key_ring,
}
)
except AlreadyExists:
pass

def _create_key_symmetric_encrypt_decrypt(self, key_ring_id, key_id):
# Build the parent key ring name.
key_ring_name = self.kms_client.key_ring_path(
self.project_id, self.location_id, key_ring_id
)

# Build the key.
purpose = kms.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT
algorithm = (
kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.GOOGLE_SYMMETRIC_ENCRYPTION
)
key = {
"purpose": purpose,
"version_template": {
"algorithm": algorithm,
},
}

# Call the API.
try:
self.kms_client.create_crypto_key(
request={
"parent": key_ring_name,
"crypto_key_id": key_id,
"crypto_key": key,
}
)
except AlreadyExists:
pass


class TestSimpleSeedKmsKeyName(DBTIntegrationTest):
def setUp(self):
self.gcs_kms_adapter = GcpKmsAdapter(
service_account_info=self.gcs_service_account_info, location_id="us"
)
self.kms_key_name = self.gcs_kms_adapter.get_or_create_key(
key_ring_id="dbt-integration-test", key_id="dbt-integration-test-key"
)
return super().setUp()

@property
def schema(self):
return "simple_seed"

@property
def project_id(self):
project_id = (
self.bigquery_profile()
.get("test", {})
.get("outputs", {})
.get("default2", {})
.get("project")
)
if project_id is None:
raise Exception("unable to get gcp project")
return project_id

@property
def gcs_service_account_info(self):
credentials_json_str = os.getenv("BIGQUERY_TEST_SERVICE_ACCOUNT_JSON").replace(
"'", ""
)
credentials_dict = json.loads(credentials_json_str)
return credentials_dict

@property
def bigquery_client(self):
credentials = service_account.Credentials.from_service_account_info(
info=self.gcs_service_account_info
)
client = bigquery.Client(
credentials=credentials, project=credentials.project_id
)
return client

@property
def project_config(self):
return {
"config-version": 2,
"seed-paths": ["seeds-config"],
"macro-paths": ["macros"],
"seeds": {
"+kms_key_name": self.kms_key_name,
"test": {
"enabled": False,
"quote_columns": True,
"seed_kms": {
"enabled": True,
},
},
},
}


class TestSimpleSeedKmsKeyNameBq(TestSimpleSeedKmsKeyName):
@property
def models(self):
return "models-bq"

@property
def bigquery_table_metadata(self):
table_id = f"{self.project_id}.{self.unique_schema()}.seed_kms"
table = self.bigquery_client.get_table(table_id)
return table

@property
def profile_config(self):
return self.bigquery_profile()

@use_profile("bigquery")
def test_bigquery_simple_seed_with_kms_key_name_bigquery(self):
results = self.run_dbt(["seed", "--show"])
self.assertEqual(len(results), 1)
self.assertIsNotNone(
self.bigquery_table_metadata.encryption_configuration.kms_key_name
)

0 comments on commit e4ba036

Please sign in to comment.