Skip to content
This repository has been archived by the owner on Jul 3, 2024. It is now read-only.
/ kafka-embedded-env Public archive

Simple API for starting up a Kafka environment in Kotlin or Java

License

Notifications You must be signed in to change notification settings

navikt/kafka-embedded-env

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Use Kafka with testcontainers https://testcontainers.com/

Build Status Maven Central

kafka-embedded-env

A simple API for creating an embedded Kafka environment with the KafkaEnvironment class, typically used for running integration tests.

Based on the Confluent Open Source distribution v7.2.x.

Instead of using the classic ports (2181, 9092, ...) for each server, the class will get the required number of available ports and use those in configurations for each server.

class KafkaEnvironment(
    noOfBrokers: Int = 1,
    topicNames: List<String> = emptyList(),
    topicInfos: List<TopicInfo> = emptyList(),
    withSchemaRegistry: Boolean = false,
    val withSecurity: Boolean = false,
    users: List<JAASCredential> = emptyList(),
    autoStart: Boolean = false,
    brokerConfigOverrides: Properties = Properties()
) : AutoCloseable {
    data class TopicInfo(val name: String, val partitions: Int = 2, val config: Map<String, String>? = null)
}
 
fun start() // start servers in correct order
 
fun stop() // stop servers in correct order - session data are available
 
fun tearDown() // when finished with the kafka environment, stops servers and remove session data                    

Maximum allowed number of brokers is 2.

Security

The 'withSecurity' parameter gives a kafka cluster with security, thus authentication and authorization

  • Secured Zookeeper, kafka broker must authenticate and be authorized
  • Secured Kafka broker (and inter broker), clients (including Schema registry) must authenticate and be authorized
  • No security for Schema Registry clients
  • No TLS

See JAASContext.kt for details and predefined set of producer and consumer credentials. Observe that auto creation of topics is disabled when security is enabled.

The 'users' parameter is an option for custom set of producer and consumer credentials. Only relevant when security is enabled.

Getting Started

Add the dependency:

Gradle

dependencies {
    testImplementation "no.nav:kafka-embedded-env:3.2.4"
}

Maven

<dependency>
    <groupId>no.nav</groupId>
    <artifactId>kafka-embedded-env</artifactId>
    <version>3.2.4</version>
    <scope>test</scope>
</dependency>

Note: It is recommended that you use the Confluent version matching this library - currently v7.2.x

Examples

Default

val kafkaEnv = KafkaEnvironment()
 
kafkaEnv.start()
// do stuff
kafkaEnv.tearDown()

The default settings gives

  • 1 Zookeeper
  • 1 Kafka broker

Custom 1

val kafkaEnv = KafkaEnvironment(
    noOfBrokers = 2,
    topicNames = listOf("test1", "test2", "test3"),
    withSchemaRegistry = true,
    autoStart = true
)
// do stuff
kafkaEnv.tearDown()

The above configuration gives

  • 1 Zookeeper instance
  • 2 Kafka brokers
  • 1 Schema Registry instance

Given topics are automatically created and all servers are started in correct order - ready to use. Each topic will have number of partitions equal to number of brokers.

Custom 2

val kafkaEnv = KafkaEnvironment(
    noOfBrokers = 2,
    topicNames = listOf("custom1"),
    withSecurity = true,
    users = listOf(JAASCredential("myP1", "myP1p"),JAASCredential("myC1", "myC1p")),
    autoStart = true
)
// do stuff
kafkaEnv.tearDown()

The above configuration gives

  • 1 Zookeeper instance
  • 2 Kafka brokers

Given users are added to Kafka brokers JAAS context (authentication) and the topic is automatically created. Observe that relevant authorization must be given before produce and consume scenario is activated.

See Confluent authorization.

For 'crash course' approach, see relevant test cases with security in KafkaEnvironmentSpec.kt

ServerPark

An instance of KafkaEnvironment has a serverPark (ServerPark) property, giving access to details depending on the state. Each server (ServerBase) has a few relevant properties and start/stop functions.

data class ServerPark(
    val zookeeper: ServerBase,
    val brokerStatus: BrokerStatus,
    val schemaRegStatus: SchemaRegistryStatus,
    val status: ServerParkStatus
)
        
abstract class ServerBase {
    protected var status: ServerStatus = NotRunning

    open val host: String = "localhost"
    abstract val port: Int
    abstract val url: String

    abstract fun start()
    abstract fun stop()
}

Thus each server can be stopped and started independently.

In order to ease the state handling, some properties are available.

val zookeeper get() = serverPark.zookeeper as ZKServer
val brokers get() = serverPark.getBrokers()
val brokersURL get() = serverPark.getBrokersURL()
val adminClient get() = serverPark.getAdminClient()
val schemaRegistry get() = serverPark.getSchemaReg()

Be aware of what you are doing

  • kafka environment without brokers (noOfBrokers = 0), gives empty list for 'brokers'
  • non started kafka environment gives 'null' for 'adminClient'
  • a started kafka environment without brokers still gives 'null' for 'adminClient'
  • ...

The 'adminClient' property creates an instance of AdminClient with super user authorization. Thus, feel free to play with it. See Kafka AdminClient API for the set of available operations.

Please close adminClient after use.

Breaking changes from 2.0.x

class KafkaEnvironment:

  • The parameter topics should be renamed to topicNames if you wish to keep the behaviour from previous versions.

Local development

Build and test

./mvnw clean install

Contact

Create an issue here on the GitHub issue tracker. Pull requests are also welcome.

Internal resources may reach us on Slack in the #kafka channel.