From a7245419754e3d38657de93c61de986a302e6355 Mon Sep 17 00:00:00 2001 From: Giovanni Grano Date: Wed, 7 Jun 2023 16:33:05 +0200 Subject: [PATCH 1/9] Adding sample for ddb and kinesis integration --- dynamodb-kinesis-stream/Makefile | 44 +++++++++++++++++++ dynamodb-kinesis-stream/README.md | 43 ++++++++++++++++++ dynamodb-kinesis-stream/ddb-data.sh | 18 ++++++++ dynamodb-kinesis-stream/main.tf | 35 +++++++++++++++ .../test_stream_consumer.py | 39 ++++++++++++++++ 5 files changed, 179 insertions(+) create mode 100644 dynamodb-kinesis-stream/Makefile create mode 100644 dynamodb-kinesis-stream/README.md create mode 100644 dynamodb-kinesis-stream/ddb-data.sh create mode 100644 dynamodb-kinesis-stream/main.tf create mode 100644 dynamodb-kinesis-stream/test_stream_consumer.py diff --git a/dynamodb-kinesis-stream/Makefile b/dynamodb-kinesis-stream/Makefile new file mode 100644 index 0000000..e88e4c1 --- /dev/null +++ b/dynamodb-kinesis-stream/Makefile @@ -0,0 +1,44 @@ +export AWS_ACCESS_KEY_ID ?= test +export AWS_SECRET_ACCESS_KEY ?= test +export AWS_DEFAULT_REGION=us-east-1 + +usage: ## Show this help + @fgrep -h "##" $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/##//' + +install: ## Install dependencies + @which localstack || pip install localstack + @which awslocal || pip install awscli-local + @which tflocal || pip install terraform-local + +run: ## Deploy and run the sample locally + @(test -d .terraform || tflocal init) && tflocal apply --auto-approve + pip install boto3 + python test_stream_consumer.py & + ./ddb-data.sh + +start: + localstack start -d + +clear: ## remove remnants from older deployments + @rm -f terraform.tfstate terraform.tfstate.backup + +clean: clear ## remove all project related files and reverse terraform init + @rm -f -r .terraform .terraform.lock.hcl + +stop: + @echo + localstack stop + +ready: + @echo Waiting on the LocalStack container... + @localstack wait -t 30 && echo Localstack is ready to use! || (echo Gave up waiting on LocalStack, exiting. && exit 1) + +logs: + @localstack logs > logs.txt + +test-ci: + make start install ready run; return_code=`echo $$?`;\ + make logs; make stop; exit $$return_code; + +.PHONY: usage install start run stop ready logs test-ci + diff --git a/dynamodb-kinesis-stream/README.md b/dynamodb-kinesis-stream/README.md new file mode 100644 index 0000000..f756329 --- /dev/null +++ b/dynamodb-kinesis-stream/README.md @@ -0,0 +1,43 @@ +# DynamoDB and Kinesis Stream Integration + +Simple demo illustrating the integration between DynamoDB and Kinesis streams. + +## Prerequisites + +- LocalStack +- Docker +- `make` +- Python >= 3.7 +- `tflocal` + + +## Running + +Make sure that LocalStack is started: + +``` +DEBUG=1 localstack start +``` + +Deploy the app with Terraform: + +``` +tflocal init +tflocal apply --auto-approve +``` + +You can now start the Python script that subscribes to the Kinesis shard, listen, and prints to the changes happening in the DynamoDB table: + +``` +pip install boto3 +python test_stream_consumer.py +``` + +You can now populate the DynamoDB table with: + +``` +./ddb-data.sh +``` + +The Python script will start printing the records the shards receive to the console. + diff --git a/dynamodb-kinesis-stream/ddb-data.sh b/dynamodb-kinesis-stream/ddb-data.sh new file mode 100644 index 0000000..d8dc46f --- /dev/null +++ b/dynamodb-kinesis-stream/ddb-data.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +artists=("Queen" "Queen" "Queen" "The Beatles" "The Beatles" "The Beatles" "The Rolling Stones" "The Rolling Stones" "The Rolling Stones") +songs=("Bohemian Rapsody" "We Will Rock You" "Radio Gaga" "Come Together" "Let it Be" "Here Comes the Sun" "Sympathy For The Devil" "Angie" "Satisfaction") + +for i in "${!artists[@]}"; do + artist="${artists[i]}" + song="${songs[i]}" + + awslocal dynamodb put-item \ + --table-name MusicTable \ + --item '{ + "Artist": {"S": "'"$artist"'"}, + "Song": {"S": "'"$song"'"} + }' \ + --return-consumed-capacity TOTAL + sleep 1 +done diff --git a/dynamodb-kinesis-stream/main.tf b/dynamodb-kinesis-stream/main.tf new file mode 100644 index 0000000..fa98475 --- /dev/null +++ b/dynamodb-kinesis-stream/main.tf @@ -0,0 +1,35 @@ +resource "aws_dynamodb_table" "demo_table" { + name = "MusicTable" + billing_mode = "PROVISIONED" + read_capacity = 20 + write_capacity = 20 + hash_key = "Artist" + range_key = "Song" + + attribute { + name = "Artist" + type = "S" + } + + attribute { + name = "Song" + type = "S" + } + + stream_enabled = true + stream_view_type = "NEW_AND_OLD_IMAGES" +} + +resource "aws_kinesis_stream" "demo_stream" { + name = "demo_stream" + shard_count = 1 + + retention_period = 24 + + shard_level_metrics = ["IncomingBytes", "OutgoingBytes"] +} + +resource "aws_dynamodb_kinesis_streaming_destination" "streaming_destination" { + stream_arn = aws_kinesis_stream.demo_stream.arn + table_name = aws_dynamodb_table.demo_table.name +} diff --git a/dynamodb-kinesis-stream/test_stream_consumer.py b/dynamodb-kinesis-stream/test_stream_consumer.py new file mode 100644 index 0000000..871506f --- /dev/null +++ b/dynamodb-kinesis-stream/test_stream_consumer.py @@ -0,0 +1,39 @@ +import boto3 +import time + + +endpoint_url = "http://localhost.localstack.cloud:4566" +stream_name = "demo_stream" + + +kinesis_client = boto3.client("kinesis", endpoint_url=endpoint_url, + region_name='us-east-1', + aws_access_key_id="test", + aws_secret_access_key="test") + +response = kinesis_client.describe_stream( + StreamName=stream_name, +) +stream_arn = response["StreamDescription"]["StreamARN"] +shard_id = response["StreamDescription"]["Shards"][0]["ShardId"] + +consumer_name = "ls_consumer" +response = kinesis_client.register_stream_consumer( + StreamARN=stream_arn, + ConsumerName=consumer_name +) + +consumer_arn = response["Consumer"]["ConsumerARN"] + +response = kinesis_client.subscribe_to_shard( + ConsumerARN=consumer_arn, + ShardId=shard_id, + StartingPosition={"Type": "TRIM_HORIZON"}, +) + +for record in response["EventStream"]: + try: + print("****************") + print(record) + except Exception as e: + print(f"Error reading stream: {str(e)}") From 35ef51e44be53a74f3986d2563699ff4f4ff9984 Mon Sep 17 00:00:00 2001 From: Giovanni Grano Date: Wed, 7 Jun 2023 18:42:29 +0200 Subject: [PATCH 2/9] fix permission for ddb data script --- dynamodb-kinesis-stream/ddb-data.sh | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 dynamodb-kinesis-stream/ddb-data.sh diff --git a/dynamodb-kinesis-stream/ddb-data.sh b/dynamodb-kinesis-stream/ddb-data.sh old mode 100644 new mode 100755 From e34c02fa20baa7f9d2e3511746aa04ca2fa86280 Mon Sep 17 00:00:00 2001 From: Giovanni Grano Date: Fri, 9 Jun 2023 19:32:41 +0200 Subject: [PATCH 3/9] black format --- dynamodb-kinesis-stream/test_stream_consumer.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/dynamodb-kinesis-stream/test_stream_consumer.py b/dynamodb-kinesis-stream/test_stream_consumer.py index 871506f..296f557 100644 --- a/dynamodb-kinesis-stream/test_stream_consumer.py +++ b/dynamodb-kinesis-stream/test_stream_consumer.py @@ -6,10 +6,13 @@ stream_name = "demo_stream" -kinesis_client = boto3.client("kinesis", endpoint_url=endpoint_url, - region_name='us-east-1', - aws_access_key_id="test", - aws_secret_access_key="test") +kinesis_client = boto3.client( + "kinesis", + endpoint_url=endpoint_url, + region_name="us-east-1", + aws_access_key_id="test", + aws_secret_access_key="test", +) response = kinesis_client.describe_stream( StreamName=stream_name, @@ -19,8 +22,7 @@ consumer_name = "ls_consumer" response = kinesis_client.register_stream_consumer( - StreamARN=stream_arn, - ConsumerName=consumer_name + StreamARN=stream_arn, ConsumerName=consumer_name ) consumer_arn = response["Consumer"]["ConsumerARN"] From 1a1562942a259cd4efd7f53f7ace75ceee27c291 Mon Sep 17 00:00:00 2001 From: Giovanni Grano Date: Thu, 15 Jun 2023 22:27:38 +0200 Subject: [PATCH 4/9] addressed PR --- dynamodb-kinesis-stream/Makefile | 4 ++-- dynamodb-kinesis-stream/requirements.xtx | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 dynamodb-kinesis-stream/requirements.xtx diff --git a/dynamodb-kinesis-stream/Makefile b/dynamodb-kinesis-stream/Makefile index e88e4c1..83f1ee5 100644 --- a/dynamodb-kinesis-stream/Makefile +++ b/dynamodb-kinesis-stream/Makefile @@ -9,11 +9,11 @@ install: ## Install dependencies @which localstack || pip install localstack @which awslocal || pip install awscli-local @which tflocal || pip install terraform-local + @test -e .venv || (virtualenv .venv; . .venv/bin/activate; pip install -r requirements.txt) run: ## Deploy and run the sample locally @(test -d .terraform || tflocal init) && tflocal apply --auto-approve - pip install boto3 - python test_stream_consumer.py & + @source .venv/bin/activate && python test_stream_consumer.py & ./ddb-data.sh start: diff --git a/dynamodb-kinesis-stream/requirements.xtx b/dynamodb-kinesis-stream/requirements.xtx new file mode 100644 index 0000000..2b8737e --- /dev/null +++ b/dynamodb-kinesis-stream/requirements.xtx @@ -0,0 +1 @@ +boto3==1.26.72 From 75645f228349bed5a8540cc8cbeccbfc2f377bdd Mon Sep 17 00:00:00 2001 From: Giovanni Grano Date: Thu, 15 Jun 2023 23:29:53 +0200 Subject: [PATCH 5/9] fixed typo --- dynamodb-kinesis-stream/{requirements.xtx => requirements.txt} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename dynamodb-kinesis-stream/{requirements.xtx => requirements.txt} (100%) diff --git a/dynamodb-kinesis-stream/requirements.xtx b/dynamodb-kinesis-stream/requirements.txt similarity index 100% rename from dynamodb-kinesis-stream/requirements.xtx rename to dynamodb-kinesis-stream/requirements.txt From 5fe838a438b35317cb4ae0a801bf8eff40f14239 Mon Sep 17 00:00:00 2001 From: Giovanni Grano Date: Fri, 16 Jun 2023 08:23:55 +0200 Subject: [PATCH 6/9] fix activation venv --- dynamodb-kinesis-stream/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dynamodb-kinesis-stream/Makefile b/dynamodb-kinesis-stream/Makefile index 83f1ee5..1aa2d66 100644 --- a/dynamodb-kinesis-stream/Makefile +++ b/dynamodb-kinesis-stream/Makefile @@ -13,7 +13,7 @@ install: ## Install dependencies run: ## Deploy and run the sample locally @(test -d .terraform || tflocal init) && tflocal apply --auto-approve - @source .venv/bin/activate && python test_stream_consumer.py & + @ .venv/bin/activate; python test_stream_consumer.py & ./ddb-data.sh start: From b79b326bae45538e907839d1ec879c7a3fe8f3ba Mon Sep 17 00:00:00 2001 From: Giovanni Grano Date: Fri, 16 Jun 2023 10:17:24 +0200 Subject: [PATCH 7/9] fix path for venv --- dynamodb-kinesis-stream/Makefile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dynamodb-kinesis-stream/Makefile b/dynamodb-kinesis-stream/Makefile index 1aa2d66..c52fe71 100644 --- a/dynamodb-kinesis-stream/Makefile +++ b/dynamodb-kinesis-stream/Makefile @@ -2,6 +2,7 @@ export AWS_ACCESS_KEY_ID ?= test export AWS_SECRET_ACCESS_KEY ?= test export AWS_DEFAULT_REGION=us-east-1 + usage: ## Show this help @fgrep -h "##" $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/##//' @@ -13,7 +14,7 @@ install: ## Install dependencies run: ## Deploy and run the sample locally @(test -d .terraform || tflocal init) && tflocal apply --auto-approve - @ .venv/bin/activate; python test_stream_consumer.py & + @. .venv/bin/activate; python test_stream_consumer.py & ./ddb-data.sh start: From 64b3e4473e5b715683bc5b24f5528f6c66112de8 Mon Sep 17 00:00:00 2001 From: Giovanni Grano Date: Fri, 16 Jun 2023 11:21:55 +0200 Subject: [PATCH 8/9] better exception handling --- dynamodb-kinesis-stream/test_stream_consumer.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dynamodb-kinesis-stream/test_stream_consumer.py b/dynamodb-kinesis-stream/test_stream_consumer.py index 296f557..be5de0e 100644 --- a/dynamodb-kinesis-stream/test_stream_consumer.py +++ b/dynamodb-kinesis-stream/test_stream_consumer.py @@ -33,9 +33,9 @@ StartingPosition={"Type": "TRIM_HORIZON"}, ) -for record in response["EventStream"]: - try: +try: + for record in response["EventStream"]: print("****************") print(record) - except Exception as e: - print(f"Error reading stream: {str(e)}") +except Exception as e: + print(f"Error reading stream: {str(e)}") From ed095d161ee7cf5d4348cf60b6205a728036ddce Mon Sep 17 00:00:00 2001 From: Giovanni Grano Date: Fri, 16 Jun 2023 14:56:13 +0200 Subject: [PATCH 9/9] longer timeout --- .github/workflows/makefile.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/makefile.yml b/.github/workflows/makefile.yml index 79297d2..1d0ef96 100644 --- a/.github/workflows/makefile.yml +++ b/.github/workflows/makefile.yml @@ -47,5 +47,5 @@ jobs: LOCALSTACK_API_KEY: ${{ secrets.TEST_LOCALSTACK_API_KEY }} DNS_ADDRESS: 127.0.0.1 DEBUG: 1 - timeout-minutes: 50 + timeout-minutes: 60 run: make test-ci-all