Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18472: Remove MetadataSupport #18483

Merged
merged 14 commits into from
Jan 16, 2025
12 changes: 6 additions & 6 deletions core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import kafka.server.AutoTopicCreationManager;
import kafka.server.DelegationTokenManager;
import kafka.server.FetchManager;
import kafka.server.ForwardingManager;
import kafka.server.KafkaApis;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.MetadataSupport;
import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager;
import kafka.server.metadata.ConfigRepository;
Expand All @@ -47,7 +47,7 @@

public class KafkaApisBuilder {
private RequestChannel requestChannel = null;
private MetadataSupport metadataSupport = null;
private ForwardingManager forwardingManager = null;
private ReplicaManager replicaManager = null;
private GroupCoordinator groupCoordinator = null;
private TransactionCoordinator txnCoordinator = null;
Expand All @@ -74,8 +74,8 @@ public KafkaApisBuilder setRequestChannel(RequestChannel requestChannel) {
return this;
}

public KafkaApisBuilder setMetadataSupport(MetadataSupport metadataSupport) {
this.metadataSupport = metadataSupport;
public KafkaApisBuilder setForwardingManager(ForwardingManager forwardingManager) {
this.forwardingManager = forwardingManager;
return this;
}

Expand Down Expand Up @@ -182,7 +182,7 @@ public KafkaApisBuilder setClientMetricsManager(ClientMetricsManager clientMetri
@SuppressWarnings({"CyclomaticComplexity"})
public KafkaApis build() {
if (requestChannel == null) throw new RuntimeException("you must set requestChannel");
if (metadataSupport == null) throw new RuntimeException("you must set metadataSupport");
if (forwardingManager == null) throw new RuntimeException("you must set forwardingManager");
if (replicaManager == null) throw new RuntimeException("You must set replicaManager");
if (groupCoordinator == null) throw new RuntimeException("You must set groupCoordinator");
if (txnCoordinator == null) throw new RuntimeException("You must set txnCoordinator");
Expand All @@ -200,7 +200,7 @@ public KafkaApis build() {
if (apiVersionManager == null) throw new RuntimeException("You must set apiVersionManager");

return new KafkaApis(requestChannel,
metadataSupport,
forwardingManager,
replicaManager,
groupCoordinator,
txnCoordinator,
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,9 @@ class BrokerServer(
metrics
)

// Create the request processor objects.
val raftSupport = RaftSupport(forwardingManager, metadataCache)
dataPlaneRequestProcessor = new KafkaApis(
requestChannel = socketServer.dataPlaneRequestChannel,
metadataSupport = raftSupport,
forwardingManager = forwardingManager,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
txnCoordinator = transactionCoordinator,
Expand Down
57 changes: 12 additions & 45 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ import scala.jdk.CollectionConverters._
* Logic to handle the various Kafka requests
*/
class KafkaApis(val requestChannel: RequestChannel,
val metadataSupport: MetadataSupport,
val forwardingManager: ForwardingManager,
val replicaManager: ReplicaManager,
val groupCoordinator: GroupCoordinator,
val txnCoordinator: TransactionCoordinator,
Expand Down Expand Up @@ -132,7 +132,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

metadataSupport.forward(request, responseCallback)
forwardingManager.forwardRequest(request, responseCallback)
}

private def handleInvalidVersionsDuringForwarding(request: RequestChannel.Request): Unit = {
Expand Down Expand Up @@ -2108,7 +2108,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (remaining.resources().isEmpty) {
sendResponse(Some(new AlterConfigsResponseData()))
} else {
metadataSupport.forwardingManager.get.forwardRequest(request,
forwardingManager.forwardRequest(request,
new AlterConfigsRequest(remaining, request.header.apiVersion()),
response => sendResponse(response.map(_.data())))
}
Expand All @@ -2135,7 +2135,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (remaining.resources().isEmpty) {
sendResponse(Some(new IncrementalAlterConfigsResponseData()))
} else {
metadataSupport.forwardingManager.get.forwardRequest(request,
forwardingManager.forwardRequest(request,
new IncrementalAlterConfigsRequest(remaining, request.header.apiVersion()),
response => sendResponse(response.map(_.data())))
}
Expand Down Expand Up @@ -2368,39 +2368,11 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
describeClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else {
metadataSupport match {
case ZkSupport(adminManager, controller, zkClient, forwardingManager, metadataCache, _) =>
val result = adminManager.describeClientQuotas(describeClientQuotasRequest.filter)

val entriesData = result.iterator.map { case (quotaEntity, quotaValues) =>
val entityData = quotaEntity.entries.asScala.iterator.map { case (entityType, entityName) =>
new DescribeClientQuotasResponseData.EntityData()
.setEntityType(entityType)
.setEntityName(entityName)
}.toBuffer

val valueData = quotaValues.iterator.map { case (key, value) =>
new DescribeClientQuotasResponseData.ValueData()
.setKey(key)
.setValue(value)
}.toBuffer

new DescribeClientQuotasResponseData.EntryData()
.setEntity(entityData.asJava)
.setValues(valueData.asJava)
}.toBuffer

requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeClientQuotasResponse(new DescribeClientQuotasResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setEntries(entriesData.asJava)))
case RaftSupport(_, metadataCache) =>
val result = metadataCache.describeClientQuotas(describeClientQuotasRequest.data())
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
result.setThrottleTimeMs(requestThrottleMs)
new DescribeClientQuotasResponse(result)
})
}
val result = metadataCache.asInstanceOf[KRaftMetadataCache].describeClientQuotas(describeClientQuotasRequest.data())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note for someone who sees this - there is a plan to address these casts separately. My suggestion elsewhere is to simply move the relevant methods to the base interface and then the cast is no longer needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @ijuma reminder, cc @FrankYang0529

requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
result.setThrottleTimeMs(requestThrottleMs)
new DescribeClientQuotasResponse(result)
})
}
}

Expand All @@ -2411,14 +2383,9 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
describeUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else {
metadataSupport match {
case RaftSupport(_, metadataCache) =>
val result = metadataCache.describeScramCredentials(describeUserScramCredentialsRequest.data())
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
case _ =>
throw KafkaApis.shouldNeverReceive(request)
}
val result = metadataCache.asInstanceOf[KRaftMetadataCache].describeScramCredentials(describeUserScramCredentialsRequest.data())
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
}
}

Expand Down
106 changes: 0 additions & 106 deletions core/src/main/scala/kafka/server/MetadataSupport.scala

This file was deleted.

8 changes: 1 addition & 7 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,6 @@ class KafkaApisTest extends Logging {
TestUtils.setIbpVersion(properties, interBrokerProtocolVersion)
val config = new KafkaConfig(properties)

val metadataSupport = metadataCache match {
case cache: KRaftMetadataCache => RaftSupport(forwardingManager, cache)
case _ => throw new IllegalStateException("Test must set an instance of KRaftMetadataCache")
}


val listenerType = ListenerType.BROKER
val enabledApis = ApiKeys.apisForListener(listenerType).asScala

Expand All @@ -190,7 +184,7 @@ class KafkaApisTest extends Logging {

new KafkaApis(
requestChannel = requestChannel,
metadataSupport = metadataSupport,
forwardingManager = forwardingManager,
replicaManager = replicaManager,
groupCoordinator = groupCoordinator,
txnCoordinator = txnCoordinator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory;
import kafka.server.RaftSupport;
import kafka.server.ReplicaManager;
import kafka.server.ReplicationQuotaManager;
import kafka.server.SimpleApiVersionManager;
Expand Down Expand Up @@ -191,7 +190,7 @@ private KafkaApis createKafkaApis() {
KafkaConfig config = new KafkaConfig(kafkaProps);
return new KafkaApisBuilder().
setRequestChannel(requestChannel).
setMetadataSupport(new RaftSupport(forwardingManager, metadataCache)).
setForwardingManager(forwardingManager).
setReplicaManager(replicaManager).
setGroupCoordinator(groupCoordinator).
setTxnCoordinator(transactionCoordinator).
Expand Down
Loading