Skip to content

Commit

Permalink
KAFKA-17614: Remove AclAuthorizer (apache#17424)
Browse files Browse the repository at this point in the history
Reviewers: Mickael Maison <[email protected]>
  • Loading branch information
FrankYang0529 authored Oct 23, 2024
1 parent 14a098b commit 2d896d9
Show file tree
Hide file tree
Showing 29 changed files with 242 additions and 1,998 deletions.
766 changes: 0 additions & 766 deletions core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala

This file was deleted.

5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/zk/KafkaZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka.zk
import java.util.Properties
import kafka.cluster.Broker
import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, ReplicaAssignment}
import kafka.security.authorizer.AclAuthorizer.{NoAcls, VersionedAcls}
import kafka.server.KafkaConfig
import kafka.utils.Logging
import kafka.zk.TopicZNode.TopicIdReplicaAssignment
Expand Down Expand Up @@ -1347,12 +1346,12 @@ class KafkaZkClient private[zk] (
* @param resource Resource to get VersionedAcls for
* @return VersionedAcls
*/
def getVersionedAclsForResource(resource: ResourcePattern): VersionedAcls = {
def getVersionedAclsForResource(resource: ResourcePattern): ZkData.VersionedAcls = {
val getDataRequest = GetDataRequest(ResourceZNode.path(resource))
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.resultCode match {
case Code.OK => ResourceZNode.decode(getDataResponse.data, getDataResponse.stat)
case Code.NONODE => NoAcls
case Code.NONODE => ZkData.VersionedAcls(Set.empty, ZkVersion.UnknownVersion)
case _ => throw getDataResponse.resultException.get
}
}
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/kafka/zk/ZkData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.fasterxml.jackson.core.JsonProcessingException
import kafka.cluster.{Broker, EndPoint}
import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch, ReplicaAssignment}
import kafka.security.authorizer.AclAuthorizer.VersionedAcls
import kafka.server.DelegationTokenManagerZk
import kafka.utils.Json
import kafka.utils.json.JsonObject
Expand Down Expand Up @@ -766,7 +765,7 @@ object ResourceZNode {
def path(resource: ResourcePattern): String = ZkAclStore(resource.patternType).path(resource.resourceType, resource.name)

def encode(acls: Set[AclEntry]): Array[Byte] = Json.encodeAsBytes(AclEntry.toJsonCompatibleMap(acls.asJava))
def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = VersionedAcls(AclEntry.fromBytes(bytes).asScala.toSet, stat.getVersion)
def decode(bytes: Array[Byte], stat: Stat): ZkData.VersionedAcls = ZkData.VersionedAcls(AclEntry.fromBytes(bytes).asScala.toSet, stat.getVersion)
}

object ExtendedAclChangeEvent {
Expand Down Expand Up @@ -1085,6 +1084,11 @@ object MigrationZNode {

object ZkData {

case class VersionedAcls(acls: Set[AclEntry], zkVersion: Int) {
def exists: Boolean = zkVersion != ZkVersion.UnknownVersion
}


// Important: it is necessary to add any new top level Zookeeper path to the Seq
val SecureRootPaths: Seq[String] = Seq(AdminZNode.path,
BrokersZNode.path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package kafka.api

import kafka.security.authorizer.AclAuthorizer
import kafka.server.BaseRequestTest
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.apache.kafka.clients.consumer.ConsumerConfig
Expand Down Expand Up @@ -105,12 +104,8 @@ class AbstractAuthorizerIntegrationTest extends BaseRequestTest {
}

private def addNodeProperties(properties: Properties): Unit = {
if (isKRaftTest()) {
properties.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName)
properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString)
} else {
properties.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[AclAuthorizer].getName)
}
properties.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName)
properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString)

properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
Expand Down
286 changes: 106 additions & 180 deletions core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import kafka.security.JaasTestUtils

import java.util.Properties
import kafka.utils._
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateDelegationTokenOptions, ScramCredentialInfo, UserScramCredentialAlteration, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
Expand Down Expand Up @@ -60,12 +59,6 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest

override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = {
super.configureSecurityBeforeServersStart(testInfo)

if (!TestInfoUtils.isKRaft(testInfo)) {
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
// Create broker admin credentials before starting brokers
createScramCredentials(zkConnect, kafkaPrincipal.getName, kafkaPassword)
}
}

// Create the admin credentials for KRaft as part of controller initialization
Expand Down Expand Up @@ -107,7 +100,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
}

@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
@ValueSource(strings = Array("kraft"))
def testCreateUserWithDelegationToken(quorum: String): Unit = {
val privilegedAdminClient = Admin.create(privilegedAdminClientConfig)
try {
Expand All @@ -124,7 +117,7 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest

@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), Both))
startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism), KafkaSasl))
super.setUp(testInfo)
privilegedAdminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest extends DelegationTokenE
}

@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
@ValueSource(strings = Array("kraft"))
def testCreateTokenForOtherUserFails(quorum: String): Unit = {
val thrown = assertThrows(classOf[ExecutionException], () => {
createDelegationTokens(() => new CreateDelegationTokenOptions().owner(otherClientPrincipal), assert = false)
Expand All @@ -104,7 +104,7 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest extends DelegationTokenE
}

@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
@ValueSource(strings = Array("kraft"))
def testDescribeTokenForOtherUserFails(quorum: String): Unit = {
Using(createScramAdminClient(kafkaClientSaslMechanism, describeTokenFailPrincipal.getName, describeTokenFailPassword)) { describeTokenFailAdminClient =>
Using(createScramAdminClient(kafkaClientSaslMechanism, otherClientPrincipal.getName, otherClientPassword)) { otherClientAdminClient =>
Expand All @@ -118,7 +118,7 @@ class DelegationTokenEndToEndAuthorizationWithOwnerTest extends DelegationTokenE
}

@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
@ValueSource(strings = Array("kraft"))
def testDescribeTokenForOtherUserPasses(quorum: String): Unit = {
val adminClient = createTokenRequesterAdminClient()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import kafka.security.JaasTestUtils

import java.util
import java.util.Properties
import kafka.security.authorizer.AclAuthorizer
import kafka.utils.{CoreUtils, TestUtils}
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, CLUSTER_ACTION, DELETE, DESCRIBE}
Expand All @@ -25,11 +24,12 @@ import org.apache.kafka.common.acl._
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.{ServerConfigs, ZkConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}

import scala.jdk.CollectionConverters._
import scala.jdk.javaapi.OptionConverters
Expand Down Expand Up @@ -74,12 +74,13 @@ object DescribeAuthorizedOperationsTest {
}
}

@Disabled("KAFKA-17833: change to use kraft")
class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslSetup {
import DescribeAuthorizedOperationsTest._

override val brokerCount = 1
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[AclAuthorizer].getName)
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName)

var client: Admin = _

Expand All @@ -88,7 +89,7 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))

override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = {
val authorizer = CoreUtils.createObject[Authorizer](classOf[AclAuthorizer].getName)
val authorizer = CoreUtils.createObject[Authorizer](classOf[StandardAuthorizer].getName)
val clusterResource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
val topicResource = new ResourcePattern(ResourceType.TOPIC, AclEntry.WILDCARD_RESOURCE, PatternType.LITERAL)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import com.yammer.metrics.core.Gauge

import java.util.{Collections, Properties}
import java.util.concurrent.ExecutionException
import kafka.security.authorizer.AclAuthorizer
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import kafka.utils._
import org.apache.kafka.clients.admin.Admin
Expand All @@ -39,7 +38,7 @@ import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.apache.kafka.common.security.auth._
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, ServerLogConfigs, ZkConfigs}
import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
Expand All @@ -56,8 +55,7 @@ import scala.jdk.CollectionConverters._
* This test relies on a chain of test harness traits to set up. It directly
* extends IntegrationTestHarness. IntegrationTestHarness creates producers and
* consumers, and it extends KafkaServerTestHarness. KafkaServerTestHarness starts
* brokers, but first it initializes a ZooKeeper server and client, which happens
* in QuorumTestHarness.
* brokers.
*
* To start brokers we need to set a cluster ACL, which happens optionally in KafkaServerTestHarness.
* The remaining ACLs to enable access to producers and consumers are set here. To set ACLs, we use AclCommand directly.
Expand All @@ -81,7 +79,6 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
val tp = new TopicPartition(topic, part)

override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))
protected def authorizerClass: Class[_] = classOf[AclAuthorizer]

val topicResource = new ResourcePattern(TOPIC, topic, LITERAL)
val groupResource = new ResourcePattern(GROUP, group, LITERAL)
Expand Down Expand Up @@ -148,21 +145,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
*/
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {

if (TestInfoUtils.isKRaft(testInfo)) {
this.serverConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString)
this.controllerConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString + ";" + "User:ANONYMOUS")
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName)
this.controllerConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName)
} else {
// The next two configuration parameters enable ZooKeeper secure ACLs
// and sets the Kafka authorizer, both necessary to enable security.
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, authorizerClass.getName)

// Set the specific principal that can update ACLs.
this.serverConfig.setProperty(AclAuthorizer.SuperUsersProp, kafkaPrincipal.toString)
}
this.serverConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString)
this.controllerConfig.setProperty(StandardAuthorizer.SUPER_USERS_CONFIG, kafkaPrincipal.toString + ";" + "User:ANONYMOUS")
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName)
this.controllerConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName)

super.setUp(testInfo)

Expand All @@ -185,7 +171,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
* Tests the ability of producing and consuming with the appropriate ACLs set.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
@ValueSource(strings = Array("kraft"))
def testProduceConsumeViaAssign(quorum: String): Unit = {
setAclsAndProduce(tp)
val consumer = createConsumer()
Expand Down Expand Up @@ -214,7 +200,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}

@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
@ValueSource(strings = Array("kraft"))
def testProduceConsumeViaSubscribe(quorum: String): Unit = {
setAclsAndProduce(tp)
val consumer = createConsumer()
Expand All @@ -224,7 +210,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}

@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
@ValueSource(strings = Array("kraft"))
def testProduceConsumeWithWildcardAcls(quorum: String): Unit = {
setWildcardResourceAcls()
val producer = createProducer()
Expand All @@ -236,7 +222,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}

@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
@ValueSource(strings = Array("kraft"))
def testProduceConsumeWithPrefixedAcls(quorum: String): Unit = {
setPrefixedResourceAcls()
val producer = createProducer()
Expand All @@ -248,7 +234,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}

@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
@ValueSource(strings = Array("kraft"))
def testProduceConsumeTopicAutoCreateTopicCreateAcl(quorum: String): Unit = {
// topic2 is not created on setup()
val tp2 = new TopicPartition("topic2", 0)
Expand Down Expand Up @@ -319,8 +305,6 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
@CsvSource(value = Array(
"kraft, true",
"kraft, false",
"zk, true",
"zk, false"
))
def testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(quorum:String, isIdempotenceEnabled:Boolean): Unit = {
// Set consumer group acls since we are testing topic authorization
Expand Down Expand Up @@ -390,8 +374,6 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
@CsvSource(value = Array(
"kraft, true",
"kraft, false",
"zk, true",
"zk, false"
))
def testNoProduceWithDescribeAcl(quorum:String, isIdempotenceEnabled:Boolean): Unit = {
val superuserAdminClient = createSuperuserAdminClient()
Expand Down Expand Up @@ -420,7 +402,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
* ACL set.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
@ValueSource(strings = Array("kraft"))
def testNoConsumeWithoutDescribeAclViaAssign(quorum: String): Unit = {
noConsumeWithoutDescribeAclSetup()
val consumer = createConsumer()
Expand All @@ -431,7 +413,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}

@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
@ValueSource(strings = Array("kraft"))
def testNoConsumeWithoutDescribeAclViaSubscribe(quorum: String): Unit = {
noConsumeWithoutDescribeAclSetup()
val consumer = createConsumer()
Expand Down Expand Up @@ -472,7 +454,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}

@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
@ValueSource(strings = Array("kraft"))
def testNoConsumeWithDescribeAclViaAssign(quorum: String): Unit = {
noConsumeWithDescribeAclSetup()
val consumer = createConsumer()
Expand All @@ -484,7 +466,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}

@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
@ValueSource(strings = Array("kraft"))
def testNoConsumeWithDescribeAclViaSubscribe(quorum: String): Unit = {
noConsumeWithDescribeAclSetup()
val consumer = createConsumer()
Expand Down Expand Up @@ -513,7 +495,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
* ACL set.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
@ValueSource(strings = Array("kraft"))
def testNoGroupAcl(quorum: String): Unit = {
val superuserAdminClient = createSuperuserAdminClient()
superuserAdminClient.createAcls(List(AclTopicWrite(), AclTopicCreate(), AclTopicDescribe()).asJava).values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package kafka.api
import java.util.Properties
import java.util.concurrent.ExecutionException
import kafka.api.GroupAuthorizerIntegrationTest._
import kafka.security.authorizer.AclAuthorizer
import kafka.server.BaseRequestTest
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.ConsumerConfig
Expand Down Expand Up @@ -82,12 +81,8 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
}

private def addNodeProperties(properties: Properties): Unit = {
if (isKRaftTest()) {
properties.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName)
properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString)
} else {
properties.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[AclAuthorizer].getName)
}
properties.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName)
properties.put(StandardAuthorizer.SUPER_USERS_CONFIG, BrokerPrincipal.toString)

properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1")
Expand Down Expand Up @@ -117,7 +112,7 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testUnauthorizedProduceAndConsume(quorum: String): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition("topic", 0)
Expand All @@ -138,7 +133,7 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
@ValueSource(strings = Array("kraft"))
def testAuthorizedProduceAndConsume(quorum: String): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition("topic", 0)
Expand Down
Loading

0 comments on commit 2d896d9

Please sign in to comment.