Skip to content

Commit

Permalink
feat: add oauth support (#1632)
Browse files Browse the repository at this point in the history
* added oauthbearer support

* ruff missing newilne

* adding test

* username and password are not necessary for all sasl_ssl variants

* added one more test

* chore: Update security documentation for SASLOAuthBearer

* docs: run API generator

* tests: separate docs tests

* docs: update References

* docs: generate API

* docs: remove useless API file

* docs: correct styles

---------

Co-authored-by: Pastukhov Nikita <[email protected]>
Co-authored-by: Nikita Pastukhov <[email protected]>
  • Loading branch information
3 people authored Aug 4, 2024
1 parent a36d282 commit 35aac56
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 106 deletions.
1 change: 1 addition & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,7 @@ search:
- security
- [BaseSecurity](api/faststream/security/BaseSecurity.md)
- [SASLGSSAPI](api/faststream/security/SASLGSSAPI.md)
- [SASLOAuthBearer](api/faststream/security/SASLOAuthBearer.md)
- [SASLPlaintext](api/faststream/security/SASLPlaintext.md)
- [SASLScram256](api/faststream/security/SASLScram256.md)
- [SASLScram512](api/faststream/security/SASLScram512.md)
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/security/SASLOAuthBearer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.security.SASLOAuthBearer
18 changes: 15 additions & 3 deletions docs/docs/en/confluent/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,31 @@ This chapter discusses the security options available in **FastStream** and how
{!> docs_src/confluent/security/sasl_scram512.py [ln:1-6.25,7-] !}
```

### 4. Other security related usecases
### 4. SASLOAuthBearer Object with SSL/TLS

**Purpose:** The `SASLOAuthBearer` is used for authentication using the Oauth sasl.mechanism. While using it you additionaly need to provide necessary `sasl.oauthbearer.*` values in config and provide it to `KafkaBroker`, eg. `sasl.oauthbearer.client.id`, `sasl.oauthbearer.client.secret`. Full list is available in the [confluent doc](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md){.external-link target="_blank"}

**Usage:**

=== "OauthBearer"
```python linenums="1"
{!> docs_src/confluent/security/sasl_oauthbearer.py [ln:1-8] !}
```


### 5. Other security related usecases

**Purpose**: If you want to pass additional values to `confluent-kafka-python`, you can pass a dictionary called `config` to `KafkaBroker`. For example, to pass your own certificate file:

**Usage:**

```python
```python linenums="1"
from faststream.confluent import KafkaBroker
from faststream.security import SASLPlaintext

security = SASLPlaintext(
username="admin",
password="password", # pragma: allowlist secret
password="password",
)

config = {"ssl.ca.location": "~/my_certs/CRT_cacerts.pem"}
Expand Down
8 changes: 8 additions & 0 deletions docs/docs_src/confluent/security/sasl_oauthbearer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from faststream.confluent import KafkaBroker
from faststream.security import SASLOAuthBearer

security = SASLOAuthBearer(
use_ssl=True,
)

broker = KafkaBroker("localhost:9092", security=security)
4 changes: 2 additions & 2 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def __init__(
}
self.config = {**self.config, **config_from_params}

if sasl_mechanism:
if sasl_mechanism in ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"]:
self.config.update(
{
"sasl.mechanism": sasl_mechanism,
Expand Down Expand Up @@ -365,7 +365,7 @@ def __init__(
}
self.config = {**self.config, **config_from_params}

if sasl_mechanism:
if sasl_mechanism in ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"]:
self.config.update(
{
"sasl.mechanism": sasl_mechanism,
Expand Down
10 changes: 10 additions & 0 deletions faststream/confluent/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from faststream.exceptions import SetupError
from faststream.security import (
BaseSecurity,
SASLOAuthBearer,
SASLPlaintext,
SASLScram256,
SASLScram512,
Expand All @@ -27,6 +28,8 @@ def parse_security(security: Optional[BaseSecurity]) -> "AnyDict":
return _parse_sasl_scram256(security)
elif isinstance(security, SASLScram512):
return _parse_sasl_scram512(security)
elif isinstance(security, SASLOAuthBearer):
return _parse_sasl_oauthbearer(security)
elif isinstance(security, BaseSecurity):
return _parse_base_security(security)
else:
Expand Down Expand Up @@ -64,3 +67,10 @@ def _parse_sasl_scram512(security: SASLScram512) -> "AnyDict":
"sasl_plain_username": security.username,
"sasl_plain_password": security.password,
}


def _parse_sasl_oauthbearer(security: SASLOAuthBearer) -> "AnyDict":
return {
"security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT",
"sasl_mechanism": "OAUTHBEARER",
}
33 changes: 21 additions & 12 deletions faststream/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,28 +153,37 @@ def get_schema(self) -> Dict[str, Dict[str, str]]:
return {"scram512": {"type": "scramSha512"}}


class SASLOAuthBearer(BaseSecurity):
"""Security configuration for SASL/OAUTHBEARER authentication.
This class defines basic security configuration for SASL/OAUTHBEARER authentication.
"""

__slots__ = (
"use_ssl",
"ssl_context"
)

def get_requirement(self) -> List["AnyDict"]:
"""Get the security requirements for SASL/OAUTHBEARER authentication."""
return [{"oauthbearer": []}]

def get_schema(self) -> Dict[str, Dict[str, str]]:
"""Get the security schema for SASL/OAUTHBEARER authentication."""
return {"oauthbearer": {"type": "oauthBearer"}}


class SASLGSSAPI(BaseSecurity):
"""Security configuration for SASL/GSSAPI authentication.
This class defines security configuration for SASL/GSSAPI authentication.
"""

# TODO: mv to SecretStr
__slots__ = (
"use_ssl",
"ssl_context",
"ssl_context"
)

def __init__(
self,
ssl_context: Optional["SSLContext"] = None,
use_ssl: Optional[bool] = None,
) -> None:
super().__init__(
ssl_context=ssl_context,
use_ssl=use_ssl,
)

def get_requirement(self) -> List["AnyDict"]:
"""Get the security requirements for SASL/GSSAPI authentication."""
return [{"gssapi": []}]
Expand Down
26 changes: 26 additions & 0 deletions tests/asyncapi/confluent/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
SASLPlaintext,
SASLScram256,
SASLScram512,
SASLOAuthBearer,
)

basic_schema = {
Expand Down Expand Up @@ -167,3 +168,28 @@ async def test_topic(msg: str) -> str:
}

assert schema == sasl512_security_schema


def test_oauthbearer_security_schema():
ssl_context = ssl.create_default_context()
security = SASLOAuthBearer(
ssl_context=ssl_context,
)

broker = KafkaBroker("localhost:9092", security=security)
app = FastStream(broker)

@broker.publisher("test_2")
@broker.subscriber("test_1")
async def test_topic(msg: str) -> str:
pass

schema = get_app_schema(app).to_jsonable()

sasl_oauthbearer_security_schema = deepcopy(basic_schema)
sasl_oauthbearer_security_schema["servers"]["development"]["security"] = [{"oauthbearer": []}]
sasl_oauthbearer_security_schema["components"]["securitySchemes"] = {
"oauthbearer": {"type": "oauthBearer"}
}

assert schema == sasl_oauthbearer_security_schema
89 changes: 0 additions & 89 deletions tests/brokers/confluent/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,6 @@ def patch_aio_consumer_and_producer() -> Tuple[MagicMock, MagicMock]:
pass


@pytest.mark.asyncio()
@pytest.mark.confluent()
async def test_base_security():
from docs.docs_src.confluent.security.basic import broker as basic_broker

with patch_aio_consumer_and_producer() as producer:
async with basic_broker:
producer_call_kwargs = producer.call_args.kwargs

call_kwargs = {}

assert call_kwargs.items() <= producer_call_kwargs.items()


@pytest.mark.asyncio()
@pytest.mark.confluent()
async def test_base_security_pass_ssl_context():
Expand All @@ -58,78 +44,3 @@ async def test_base_security_pass_ssl_context():
str(e.value)
== "ssl_context in not supported by confluent-kafka-python, please use config instead."
)


@pytest.mark.asyncio()
@pytest.mark.confluent()
async def test_scram256():
from docs.docs_src.confluent.security.sasl_scram256 import (
broker as scram256_broker,
)

with patch_aio_consumer_and_producer() as producer:
async with scram256_broker:
producer_call_kwargs = producer.call_args.kwargs

call_kwargs = {}
call_kwargs["sasl_mechanism"] = "SCRAM-SHA-256"
call_kwargs["sasl_plain_username"] = "admin"
call_kwargs["sasl_plain_password"] = "password" # pragma: allowlist secret
call_kwargs["security_protocol"] = "SASL_SSL"

assert call_kwargs.items() <= producer_call_kwargs.items()

assert (
producer_call_kwargs["security_protocol"]
== call_kwargs["security_protocol"]
)


@pytest.mark.asyncio()
@pytest.mark.confluent()
async def test_scram512():
from docs.docs_src.confluent.security.sasl_scram512 import (
broker as scram512_broker,
)

with patch_aio_consumer_and_producer() as producer:
async with scram512_broker:
producer_call_kwargs = producer.call_args.kwargs

call_kwargs = {}
call_kwargs["sasl_mechanism"] = "SCRAM-SHA-512"
call_kwargs["sasl_plain_username"] = "admin"
call_kwargs["sasl_plain_password"] = "password" # pragma: allowlist secret
call_kwargs["security_protocol"] = "SASL_SSL"

assert call_kwargs.items() <= producer_call_kwargs.items()

assert (
producer_call_kwargs["security_protocol"]
== call_kwargs["security_protocol"]
)


@pytest.mark.asyncio()
@pytest.mark.confluent()
async def test_plaintext():
from docs.docs_src.confluent.security.plaintext import (
broker as plaintext_broker,
)

with patch_aio_consumer_and_producer() as producer:
async with plaintext_broker:
producer_call_kwargs = producer.call_args.kwargs

call_kwargs = {}
call_kwargs["sasl_mechanism"] = "PLAIN"
call_kwargs["sasl_plain_username"] = "admin"
call_kwargs["sasl_plain_password"] = "password" # pragma: allowlist secret
call_kwargs["security_protocol"] = "SASL_SSL"

assert call_kwargs.items() <= producer_call_kwargs.items()

assert (
producer_call_kwargs["security_protocol"]
== call_kwargs["security_protocol"]
)
Loading

0 comments on commit 35aac56

Please sign in to comment.