Skip to content

Commit

Permalink
skip kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
AstrakhantsevaAA committed Mar 15, 2024
1 parent 7abf591 commit 8b9633d
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 1 deletion.
82 changes: 82 additions & 0 deletions sources/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Kafka

> **Warning!**
>
> This source is a Community source and was tested only once. Currently, we **don't test** it on a regular basis.
> If you have any problem with this source, ask for help in our [Slack Community](https://dlthub.com/community).
[Kafka](https://www.confluent.io/) is an open-source distributed event streaming platform, organized
in the form of a log with message publishers and subscribers.
The Kafka `dlt` verified source loads data using Confluent Kafka API to the destination of your choice,
see a [pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/kafka_pipeline.py).

The resource that can be loaded:

| Name | Description |
| ----------------- |--------------------------------------------|
| kafka_consumer | Extracts messages from Kafka topics |


## Initialize the pipeline

```bash
dlt init kafka duckdb
```

Here, we chose `duckdb` as the destination. Alternatively, you can also choose `redshift`,
`bigquery`, or any of the other [destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/).

## Setup verified source

To grab Kafka credentials and configure the verified source, please refer to the
[full documentation here.](https://dlthub.com/docs/dlt-ecosystem/verified-sources/kafka#grab-kafka-cluster-credentials)

## Add credential

1. In the `.dlt` folder, there's a file called `secrets.toml`. It's where you store sensitive
information securely, like access tokens. Keep this file safe.

Use the following format for service account authentication:

```toml
[sources.kafka.credentials]
bootstrap_servers="web.address.gcp.confluent.cloud:9092"
group_id="test_group"
security_protocol="SASL_SSL"
sasl_mechanisms="PLAIN"
sasl_username="example_username"
sasl_password="example_secret"
```

1. Next, follow the instructions in [Destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/) to add credentials for
your chosen destination. This will ensure that your data is properly routed to its final
destination.

## Run the pipeline

1. Before running the pipeline, ensure that you have installed all the necessary dependencies by
running the command:

```bash
pip install -r requirements.txt
```

1. You're now ready to run the pipeline! To get started, run the following command:

```bash
python kafka_pipeline.py
```

1. Once the pipeline has finished running, you can verify that everything loaded correctly by using
the following command:

```bash
dlt pipeline <pipeline_name> show
```
For example, the `pipeline_name` for the above pipeline example is `kafka_pipeline`, you may also use
any custom name instead.

💡 To explore additional customizations for this pipeline, we recommend referring to the official
`dlt` [Kafka](https://dlthub.com/docs/dlt-ecosystem/verified-sources/kafka) documentation. It
provides comprehensive information and guidance on how to further customize and tailor the pipeline
to suit your specific needs.
2 changes: 1 addition & 1 deletion sources/personio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ To grab Personio credentials and configure the verified source, please refer to
1. Replace the value of `client_id` and `client_secret`. This will ensure that you can access
Personio API securely.

1. Next, follow the instructions in [Destinations](../destinations/duckdb) to add credentials for
1. Next, follow the instructions in [Destinations](https://dlthub.com/docs/dlt-ecosystem/destinations/) to add credentials for
your chosen destination. This will ensure that your data is properly routed to its final
destination.

Expand Down
4 changes: 4 additions & 0 deletions tests/kafka/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def kafka_timed_messages(kafka_admin, kafka_producer):
_await(kafka_admin.delete_topics([topic]))


@pytest.mark.skip("We don't have a Kafka instance to test this source.")
def test_kafka_read(kafka_topics, kafka_messages):
"""Test simple messages reading."""
pipeline = dlt.pipeline(
Expand Down Expand Up @@ -143,6 +144,7 @@ def test_kafka_read(kafka_topics, kafka_messages):
)


@pytest.mark.skip("We don't have a Kafka instance to test this source.")
def test_kafka_read_custom_msg_processor(kafka_topics, kafka_messages):
"""
Test messages reading and processing with a
Expand Down Expand Up @@ -178,6 +180,7 @@ def _custom_msg_processor(msg):
)


@pytest.mark.skip("We don't have a Kafka instance to test this source.")
def test_kafka_read_with_timestamp(kafka_timed_messages):
"""Test if offset is set correctly from a timestamp."""
topic, ts = kafka_timed_messages
Expand Down Expand Up @@ -215,6 +218,7 @@ def test_kafka_read_now():
assert tracker["topic1"]["0"] == {"cur": 9, "max": 10}


@pytest.mark.skip("We don't have a Kafka instance to test this source.")
def test_kafka_incremental_read(kafka_producer, kafka_topics):
"""Test incremental messages reading.
Expand Down

0 comments on commit 8b9633d

Please sign in to comment.