diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 9ac1d66a1a1f..95086c06efe7 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -189,6 +189,13 @@ RUN /byoc-mock && rm /byoc-mock ################################# +FROM base as keycloak + +COPY --chown=0:0 --chmod=0755 tests/docker/ducktape-deps/keycloak / +RUN /keycloak && rm /keycloak + +################################# + FROM librdkafka as final COPY --chown=0:0 --chmod=0755 tests/docker/ducktape-deps/teleport / @@ -263,6 +270,7 @@ COPY --from=kaf /usr/local/bin/kaf /usr/local/bin/ COPY --from=kcl /usr/local/bin/kcl /usr/local/bin/ COPY --from=kgo-verifier /opt/kgo-verifier /opt/kgo-verifier COPY --from=byoc-mock /opt/redpanda-tests/go/byoc-mock/.rpk.managed-byoc /root/.local/bin/.rpk.managed-byoc +COPY --from=keycloak /opt/keycloak/ /opt/keycloak/ RUN ldconfig diff --git a/tests/docker/ducktape-deps/keycloak b/tests/docker/ducktape-deps/keycloak new file mode 100644 index 000000000000..6cdb4b794ef7 --- /dev/null +++ b/tests/docker/ducktape-deps/keycloak @@ -0,0 +1,11 @@ +#!/usr/bin/env bash +set -e + +# get keycloak binary +KC_VERSION=21.1.2 +wget "https://github.com/keycloak/keycloak/releases/download/${KC_VERSION}/keycloak-${KC_VERSION}.tar.gz" + +tar -xvzf keycloak-${KC_VERSION}.tar.gz +rm keycloak-${KC_VERSION}.tar.gz + +mv keycloak-${KC_VERSION} /opt/keycloak diff --git a/tests/rptest/clients/python_librdkafka.py b/tests/rptest/clients/python_librdkafka.py index c074c4cbb9dd..e210d5d71ea0 100644 --- a/tests/rptest/clients/python_librdkafka.py +++ b/tests/rptest/clients/python_librdkafka.py @@ -6,9 +6,15 @@ # As of the Change Date specified in that file, in accordance with # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 +import requests +import time +import functools + +from confluent_kafka import Producer from confluent_kafka.admin import AdminClient, NewTopic from typing import Optional from rptest.services import tls +from rptest.services.keycloak import OAuthConfig class PythonLibrdkafka: @@ -21,12 +27,14 @@ def __init__(self, username=None, password=None, algorithm=None, - tls_cert: Optional[tls.Certificate] = None): + tls_cert: Optional[tls.Certificate] = None, + oauth_config: Optional[OAuthConfig] = None): self._redpanda = redpanda self._username = username self._password = password self._algorithm = algorithm self._tls_cert = tls_cert + self._oauth_config = oauth_config def brokers(self): client = AdminClient(self._get_config()) @@ -56,21 +64,22 @@ def create_topic(self, spec): def get_client(self): return AdminClient(self._get_config()) + def get_producer(self): + producer_conf = self._get_config() + self._redpanda.logger.debug(f"{producer_conf}") + return Producer(producer_conf) + def _get_config(self): conf = { 'bootstrap.servers': self._redpanda.brokers(), } + if self._redpanda.sasl_enabled(): - if self._username: - c = (self._username, self._password, self._algorithm) + if self._algorithm == 'OAUTHBEARER': + conf.update(self._get_oauth_config()) else: - c = self._redpanda.SUPERUSER_CREDENTIALS - conf.update({ - 'sasl.mechanism': c[2], - 'security.protocol': 'sasl_plaintext', - 'sasl.username': c[0], - 'sasl.password': c[1], - }) + conf.update(self._get_sasl_config()) + if self._tls_cert: conf.update({ 'ssl.key.location': self._tls_cert.key, @@ -87,3 +96,57 @@ def _get_config(self): }) self._redpanda.logger.info(conf) return conf + + def _get_oauth_config(self): + assert self._oauth_config is not None + return { + 'security.protocol': + 'sasl_plaintext', + 'sasl.mechanisms': + "OAUTHBEARER", + 'oauth_cb': + functools.partial(self._get_oauth_token, self._oauth_config), + 'logger': + self._redpanda.logger, + } + + def _get_sasl_config(self): + if self._username: + c = (self._username, self._password, self._algorithm) + else: + c = self._redpanda.SUPERUSER_CREDENTIALS + return { + 'sasl.mechanism': c[2], + 'security.protocol': 'sasl_plaintext', + 'sasl.username': c[0], + 'sasl.password': c[1], + } + + def _get_oauth_token(self, conf: OAuthConfig, _): + # Better to wrap this whole thing in a try block, since the context where + # librdkafka invokes the callback seems to prevent exceptions from making + # their way back up to ducktape. This way we get a log and librdkafka will + # barf when we return the wrong thing. + try: + payload = { + 'client_id': conf.client_id, + 'client_secret': conf.client_secret, + 'audience': 'redpanda', + 'grant_type': 'client_credentials', + 'scope': ' '.join(conf.scopes), + } + self._redpanda.logger.info( + f"GETTING TOKEN: {conf.token_endpoint}, payload: {payload}") + + resp = requests.post( + conf.token_endpoint, + headers={'content-type': 'application/x-www-form-urlencoded'}, + auth=(conf.client_id, conf.client_secret), + data=payload) + self._redpanda.logger.info( + f"response status: {resp.status_code}, body: {resp.content}") + token = resp.json() + return token['access_token'], time.time() + float( + token['expires_in']) + except Exception as e: + self._redpanda.logger.error(f"Exception: {e}") diff --git a/tests/rptest/services/keycloak.py b/tests/rptest/services/keycloak.py new file mode 100644 index 000000000000..d8a3e7a19b46 --- /dev/null +++ b/tests/rptest/services/keycloak.py @@ -0,0 +1,237 @@ +import json +import os +import requests + +from ducktape.services.service import Service +from ducktape.utils.util import wait_until + +from keycloak import KeycloakAdmin + +KC_INSTALL_DIR = os.path.join('/', 'opt', 'keycloak') +KC_DATA_DIR = os.path.join(KC_INSTALL_DIR, 'data') +KC_VAULT_DIR = os.path.join(KC_DATA_DIR, 'vault') +KC_BIN_DIR = os.path.join(KC_INSTALL_DIR, 'bin') +KC = os.path.join(KC_BIN_DIR, 'kc.sh') +KCADM = os.path.join(KC_BIN_DIR, 'kcadm.sh') +KC_ADMIN = 'admin' +KC_ADMIN_PASSWORD = 'admin' +KC_ROOT_LOG_LEVEL = 'INFO' +KC_LOG_HANDLER = 'console,file' +KC_LOG_FILE = '/var/log/kc.log' +KC_PORT = 8080 + +DEFAULT_REALM = 'demorealm' + +START_CMD_TMPL = """ +LAUNCH_JBOSS_IN_BACKGROUND=1 \ +KEYCLOAK_ADMIN={admin} \ +KEYCLOAK_ADMIN_PASSWORD={pw} \ +{kc} start-dev --http-port={port} \ +--log="{log_handler}" --log-file="{logfile}" --log-level="{log_level}" & +""" + +OIDC_CONFIG_TMPL = """\ +http://{host}:{port}/realms/{realm}/.well-known/openid-configuration\ +""" + + +class OAuthConfig: + def __init__(self, + client_id, + client_secret, + token_endpoint, + scopes=['openid']): + self.client_id = client_id + self.client_secret = client_secret + self.token_endpoint = token_endpoint + self.scopes = scopes + + +class KeycloakAdminClient: + def __init__(self, + logger, + server_url, + realm, + username=KC_ADMIN, + password=KC_ADMIN_PASSWORD): + self.logger = logger + self.logger.debug(f"KeycloakAdminClient for {server_url}") + self.kc_admin = KeycloakAdmin( + server_url=server_url, + username=username, + password=password, + realm_name='master', + ) + self.kc_admin.create_realm(payload={ + 'realm': realm, + 'enabled': True, + }) + self.kc_admin.realm_name = realm + + def config(self, server_url, username, password, realm): + self.kc_admin = KeycloakAdmin(server_url=server_url, + username=username, + password=password, + realm_name=realm) + + def create_client(self, client_id, **kwargs): + rep = { + 'clientId': client_id, + 'enabled': True, + 'serviceAccountsEnabled': True, # for client credentials grant + } + rep.update(kwargs) + id = self.kc_admin.create_client(payload=rep) + self.logger.debug(f'client_id: {id}') + return id + + def generate_client_secret(self, client_id): + id = self.kc_admin.get_client_id(client_id) + self.kc_admin.generate_client_secrets(id) + + def get_client_secret(self, client_id): + id = self.kc_admin.get_client_id(client_id) + secret = self.kc_admin.get_client_secrets(id) + return secret['value'] + + def create_user(self, username, password, realm_admin=False, **kwargs): + rep = { + 'username': + username, + 'credentials': [{ + 'type': 'password', + 'value': password, + 'temporary': False, + }], + 'enabled': + True + } + rep.update(kwargs) + + user_id = self.kc_admin.create_user(rep) + + if realm_admin: + client_id = self.kc_admin.get_client_id('realm-management') + role_id = self.kc_admin.get_client_role_id(client_id=client_id, + role_name='realm-admin') + self.kc_admin.assign_client_role(user_id=user_id, + client_id=client_id, + roles={ + 'name': 'realm-admin', + 'id': role_id + }) + return user_id + + def update_user(self, username, **kwargs): + user_id = self.kc_admin.get_user_id(username) + if user_id is None: + raise Exception(f"User {username} not found") + + self.kc_admin.update_user(user_id=user_id, payload=kwargs) + + +class KeycloakService(Service): + logs = { + 'keycloak_log': { + 'path': f"{KC_LOG_FILE}", + "collect_default": True, + }, + } + + def __init__(self, + context, + port=KC_PORT, + realm=DEFAULT_REALM, + log_level=KC_ROOT_LOG_LEVEL): + super(KeycloakService, self).__init__(context, num_nodes=1) + self.realm = realm + self.http_port = port + self.log_level = log_level + self._admin = None + + @property + def admin(self): + assert self._admin is not None + return self._admin + + @property + def admin_ll(self): + return self.admin.kc_admin + + def _start_cmd(self): + cmd = START_CMD_TMPL.format(admin=KC_ADMIN, + pw=KC_ADMIN_PASSWORD, + kc=KC, + port=self.http_port, + log_handler=KC_LOG_HANDLER, + logfile=KC_LOG_FILE, + log_level=self.log_level) + return cmd + + def host(self, node): + return node.account.hostname + + def get_token_endpoint(self, node): + oidc_config = requests.get( + OIDC_CONFIG_TMPL.format(host=self.host(node), + port=self.http_port, + realm=self.realm)).json() + return oidc_config['token_endpoint'] + + def login_admin_user(self, node, username, password): + self.admin.config( + server_url=f'http://{self.host(node)}:{self.http_port}', + username=username, + password=password, + realm=self.realm) + + def generate_oauth_config(self, node, client_id): + secret = self.admin.get_client_secret(client_id) + token_endpoint = self.get_token_endpoint(node) + return OAuthConfig(client_id, secret, token_endpoint) + + def pids(self, node): + return node.account.java_pids('quarkus') + + def alive(self, node): + return len(self.pids(node)) > 0 + + def start_node(self, node, **kwargs): + self.logger.debug("Starting Keycloak service") + + node.account.ssh(f"touch {KC_LOG_FILE}", allow_fail=False) + + with node.account.monitor_log(KC_LOG_FILE) as monitor: + node.account.ssh_capture(self._start_cmd(), allow_fail=False) + monitor.wait_until("Running the server in", timeout_sec=120) + + self.logger.debug(f"Keycloak PIDs: {self.pids(node)}") + + self._admin = KeycloakAdminClient( + self.logger, + server_url=f'http://{self.host(node)}:{self.http_port}', + realm=self.realm, + username=f'{KC_ADMIN}', + password=f'{KC_ADMIN_PASSWORD}', + ) + + def stop_node(self, node, clean_shutdown=True): + s = "TERM" if clean_shutdown else "KILL" + self.logger.warn(f"Stopping node {node.name}") + + for p in self.pids(node): + node.account.ssh(f"kill -s {s} {p}", allow_fail=not clean_shutdown) + + wait_until(lambda: not self.alive(node), + timeout_sec=30, + backoff_sec=.5, + err_msg="Keycloak took too long to stop.") + + def clean_node(self, node, **kwargs): + self.logger.warn(f"Cleaning Keycloak node {node.name}") + if self.alive(node): + self.stop_node(node) + + # TODO: this might be overly aggressive + node.account.ssh(f"rm -rf {KC_LOG_FILE}") + node.account.ssh(f"rm -rf /opt/keycloak/data/*", allow_fail=False) diff --git a/tests/rptest/tests/redpanda_oauth_test.py b/tests/rptest/tests/redpanda_oauth_test.py new file mode 100644 index 000000000000..3c5cd47012ed --- /dev/null +++ b/tests/rptest/tests/redpanda_oauth_test.py @@ -0,0 +1,111 @@ +# Copyright 2023 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import time +import functools +import json + +from rptest.clients.python_librdkafka import PythonLibrdkafka +from rptest.services.redpanda import SecurityConfig, make_redpanda_service +from rptest.services.keycloak import KeycloakService +from rptest.services.cluster import cluster + +from ducktape.tests.test import Test +from rptest.services.redpanda import make_redpanda_service +from rptest.clients.rpk import RpkTool +from rptest.util import expect_exception + +from confluent_kafka import KafkaException + +CLIENT_ID = 'myapp' +EXAMPLE_TOPIC = 'foo' + + +class RedpandaOIDCTestBase(Test): + """ + Base class for tests that use the Redpanda service with OIDC + """ + def __init__(self, + test_context, + num_nodes=5, + sasl_mechanisms=['SCRAM', 'OAUTHBEARER'], + **kwargs): + super(RedpandaOIDCTestBase, self).__init__(test_context, **kwargs) + self.produce_messages = [] + self.produce_errors = [] + num_brokers = num_nodes - 1 + self.keycloak = KeycloakService(test_context) + + security = SecurityConfig() + security.enable_sasl = True + security.sasl_mechanisms = sasl_mechanisms + + self.redpanda = make_redpanda_service(test_context, num_brokers) + self.redpanda.set_security_settings(security) + + self.su_username, self.su_password, self.su_algorithm = self.redpanda.SUPERUSER_CREDENTIALS + + self.rpk = RpkTool(self.redpanda, + username=self.su_username, + password=self.su_password, + sasl_mechanism=self.su_algorithm) + + def setUp(self): + self.produce_messages.clear() + self.produce_errors.clear() + self.redpanda.logger.info("Starting Redpanda") + self.redpanda.start() + + +class RedpandaOIDCTest(RedpandaOIDCTestBase): + @cluster(num_nodes=5) + def test_init(self): + kc_node = self.keycloak.nodes[0] + try: + self.keycloak.start_node(kc_node) + except Exception as e: + self.logger.error(f"{e}") + self.keycloak.clean_node(kc_node) + assert False, "Keycloak failed to start" + + self.keycloak.admin.create_user('norma', + 'desmond', + realm_admin=True, + email='10086@sunset.blvd') + self.keycloak.login_admin_user(kc_node, 'norma', 'desmond') + self.keycloak.admin.create_client(CLIENT_ID) + + # add an email address to myapp client's service user. this should + # appear alongside the access token. + self.keycloak.admin.update_user(f'service-account-{CLIENT_ID}', + email='myapp@customer.com') + + self.rpk.create_topic(EXAMPLE_TOPIC) + service_user_id = self.keycloak.admin_ll.get_user_id( + f'service-account-{CLIENT_ID}') + result = self.rpk.sasl_allow_principal(f'User:{service_user_id}', + ['all'], 'topic', EXAMPLE_TOPIC, + self.su_username, + self.su_password, + self.su_algorithm) + + cfg = self.keycloak.generate_oauth_config(kc_node, CLIENT_ID) + assert cfg.client_secret is not None + assert cfg.token_endpoint is not None + k_client = PythonLibrdkafka(self.redpanda, + algorithm='OAUTHBEARER', + oauth_config=cfg) + producer = k_client.get_producer() + + # Expclicit poll triggers OIDC token flow. Required for librdkafka + # metadata requests to behave nicely. + producer.poll(0.0) + + with expect_exception(KafkaException, lambda _: True): + producer.list_topics(timeout=5) diff --git a/tests/setup.py b/tests/setup.py index 0c7a2b62d74b..3a299f00b128 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -23,7 +23,8 @@ 'grpcio==1.57.0', 'grpcio-tools==1.57', 'grpcio-status==1.57.0', 'cachetools==5.3.1', 'google-api-core==2.11.1', 'google-auth==2.22.0', 'googleapis-common-protos==1.60.0', 'google.cloud.compute==1.14.0', - 'proto-plus==1.22.3', 'rsa==4.9' + 'proto-plus==1.22.3', 'rsa==4.9', + 'python-keycloak@git+https://github.com/redpanda-data/python-keycloak.git@10b822cb0320c54dbf5bf4fd00435afb1487415d' ], scripts=[], )