Skip to content

Latest commit

 

History

History
134 lines (102 loc) · 4.57 KB

kafka-admin-AdminUtils.adoc

File metadata and controls

134 lines (102 loc) · 4.57 KB

AdminUtils Helper Object

AdminUtils is an utility (a Scala object) with methods for…​FIXME

Table 1. AdminUtils API
Name Description

assignReplicasToBrokers

assignReplicasToBrokers(
  brokerMetadatas: Seq[BrokerMetadata],
  nPartitions: Int,
  replicationFactor: Int,
  fixedStartIndex: Int = -1,
  startPartitionId: Int = -1): Map[Int, Seq[Int]]

AdminUtils uses __admin_client for…​FIXME

assignReplicasToBrokers Method

assignReplicasToBrokers(
  brokerMetadatas: Seq[BrokerMetadata],
  nPartitions: Int,
  replicationFactor: Int,
  fixedStartIndex: Int = -1,
  startPartitionId: Int = -1): Map[Int, Seq[Int]]

assignReplicasToBrokers branches off per whether all the brokers have rack information or not:

assignReplicasToBrokers throws a InvalidPartitionsException if the given nPartitions is 0 or less.

Number of partitions must be larger than 0.

assignReplicasToBrokers throws a InvalidReplicationFactorException if the given replicationFactor is 0 or less.

Replication factor must be larger than 0.

assignReplicasToBrokers throws a InvalidReplicationFactorException if the given replicationFactor is greater than the number of all brokers in the cluster.

Replication factor: [replicationFactor] larger than available brokers: [brokerMetadatas.size].

assignReplicasToBrokers throws an AdminOperationException if there is at least one broker (in brokerMetadatas) with no rack information (when it is assumed either all brokers have it or none).

Not all brokers have rack information for replica rack aware assignment.
Note

assignReplicasToBrokers is used when:

assignReplicasToBrokersRackUnaware Internal Method

assignReplicasToBrokersRackUnaware(
  nPartitions: Int,
  replicationFactor: Int,
  brokerList: Seq[Int],
  fixedStartIndex: Int,
  startPartitionId: Int): Map[Int, Seq[Int]]

assignReplicasToBrokersRackUnaware performs the replicas to brokers assignment in a fairly random manner (i.e. includes two random numbers). No additional information is used to make the decision (except the input parameters).

import kafka.admin.{AdminUtils, BrokerMetadata}
// assignReplicasToBrokersRackUnaware is a private method
// Using assignReplicasToBrokers instead as the entry point
val brokerMetadatas = Seq(
  BrokerMetadata(0, None),
  BrokerMetadata(1, None),
  BrokerMetadata(2, None))
val assignment = AdminUtils.assignReplicasToBrokers(
  brokerMetadatas,
  nPartitions = 3,
  replicationFactor = 2)
val output = assignment.toSeq.sortBy(_._1).map { case (brokerId, replicas) => s"$brokerId => $replicas" }
scala> output.foreach(println)
0 => ArrayBuffer(2, 1)
1 => ArrayBuffer(0, 2)
2 => ArrayBuffer(1, 0)

assignReplicasToBrokersRackUnaware…​FIXME

Note
assignReplicasToBrokersRackUnaware is used exclusively when AdminUtils is requested to assignReplicasToBrokers (when all the brokers in a cluster have no rack information assigned).

assignReplicasToBrokersRackAware Internal Method

assignReplicasToBrokersRackAware(
  nPartitions: Int,
  replicationFactor: Int,
  brokerMetadatas: Seq[BrokerMetadata],
  fixedStartIndex: Int,
  startPartitionId: Int): Map[Int, Seq[Int]]

assignReplicasToBrokersRackAware…​FIXME

Note
assignReplicasToBrokersRackAware is used exclusively when AdminUtils is requested to assignReplicasToBrokers (when all the brokers in a cluster have rack information assigned).