Partition
is an internal representation of a TopicPartition for ReplicaManager (to manage all partitions).
Partition
knows the host broker (by local broker ID) which is one of the partition replicas (replicated across brokers of a Kafka cluster).
Partition
can be the leader of the TopicPartition or a follower.
Partition
can be elected the partition leader (when ReplicaManager
is requested to makeLeaders).
Partition
is under-replicated if it is the leader and the number of in-sync replicas is below the number of all replicas. Use the UnderReplicated metric to know it (or UnderReplicatedPartitions metric of the ReplicaManager).
Partition
uses the remoteReplicasMap internal registry to track remote replicas (and is requested to update partition replicas and the in-sync replicas when…FIXME).
A Kafka topic is spread across a Kafka cluster as a logical group of one or more partitions.
Kafka producers publish messages to topic leaders as do Kafka consumers consume them from.
In-Sync Replicas (ISR) are brokers that…FIXME
Offline Replicas are…FIXME
Partition
is created when ReplicaManager
is requested to add a TopicPartition to allPartitions (indirectly using apply) and for the OfflinePartition.
Partition
uses [Partition [topicPartition] broker=[localBrokerId]] as the logging prefix (aka logIdent
).
Tip
|
Enable Add the following line to
Refer to Logging. |
Partition
is a KafkaMetricsGroup with the following performance metrics.
Metric Name | Description |
---|---|
|
One of the two possible values:
|
|
|
|
One of the two possible values:
|
|
One of the two possible values:
|
|
One of the two possible values:
|
The performance metrics are registered in kafka.cluster:type=Partition group (only when the partition is not offline).
log: Option[Log] = None
log
is a partition log (of the partition).
log
is assigned a Log
when Partition
is requested to createLogIfNotExists and maybeReplaceCurrentWithFutureReplica.
log
is available until Partition
is requested to delete.
log
is used when:
-
Partition
is requested for the LastStableOffsetLag metric, getLocalLog, leaderLogIfLocal, localLogOrException, maybeReplaceCurrentWithFutureReplica -
ReplicaManager
is requested to localLog, becomeLeaderOrFollower, checkpointHighWatermarks and handleLogDirFailure.
When created, a Partition
is given a PartitionStateStore (that is a ZkPartitionStateStore by default).
PartitionStateStore
is used when Partition
is requested for the following:
apply(
topicPartition: TopicPartition,
time: Time,
replicaManager: ReplicaManager): Partition
apply
creates a new Partition for the given TopicPartition
, Time
, and ReplicaManager with the following:
-
Creates a new
ZkPartitionStateStore
for the givenTopicPartition
and ReplicaManager -
Creates a new
DelayedOperations
for the givenTopicPartition
and ReplicaManager -
Uses replica.lag.time.max.ms and inter.broker.protocol.version configuration properties.
Note
|
apply is used when ReplicaManager is requested to add a partition to allPartitions Registry.
|
leaderIsrUpdateLock: ReentrantReadWriteLock
leaderIsrUpdateLock
is a Java’s ReentrantReadWriteLock for a pair of locks, one for read-only operations and one for writing. (The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive).
A read lock (inReadLock
) is used for the following:
A write lock (inWriteLock
) is used for the following:
maybeExpandIsr(
replicaId: Int,
logReadResult: LogReadResult): Boolean
maybeExpandIsr
…FIXME
Note
|
maybeExpandIsr is used when Partition is requested to updateFollowerFetchState.
|
updateFollowerFetchState(
followerId: Int,
followerFetchOffsetMetadata: LogOffsetMetadata,
followerStartOffset: Long,
followerFetchTimeMs: Long,
leaderEndOffset: Long,
lastSentHighwatermark: Long): Boolean
updateFollowerFetchState
…FIXME
Note
|
updateFollowerFetchState is used when ReplicaManager is requested to updateFollowerFetchState.
|
maybeShrinkIsr(
replicaMaxLagTimeMs: Long): Unit
maybeShrinkIsr
…
Note
|
maybeShrinkIsr is used when ReplicaManager is requested to maybeShrinkIsr.
|
updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult): Boolean
updateReplicaLogReadResult
…FIXME
Note
|
updateReplicaLogReadResult is used exclusively when ReplicaManager updateFollowerLogReadResults.
|
updateIsr(newIsr: Set[Replica]): Unit
updateIsr
…FIXME
makeFollower(
controllerId: Int,
partitionStateInfo: LeaderAndIsrRequest.PartitionState,
correlationId: Int): Boolean
makeFollower
…FIXME
Note
|
makeFollower is used exclusively when ReplicaManager is requested to makeFollowers.
|
leaderReplicaIfLocal: Option[Replica]
leaderReplicaIfLocal
returns a Replica when the leaderReplicaIdOpt is the localBrokerId. Otherwise, leaderReplicaIfLocal
returns None
(i.e. undefined).
Note
|
leaderReplicaIfLocal is used…FIXME
|
isUnderMinIsr: Boolean
isUnderMinIsr
is true
only if the partition isLeaderReplicaLocal and the number of in-sync replicas is below the min.insync.replicas configuration property (as configured for the Log of the leader replica).
Note
|
isUnderMinIsr is used when…FIXME
|
checkEnoughReplicasReachOffset(
requiredOffset: Long): (Boolean, Errors)
checkEnoughReplicasReachOffset
…FIXME
Note
|
checkEnoughReplicasReachOffset is used when…FIXME
|
makeLeader(
controllerId: Int,
partitionState: LeaderAndIsrPartitionState,
correlationId: Int,
highWatermarkCheckpoints: OffsetCheckpoints): Boolean
makeLeader
returns true
if this broker has just been elected as the leader for the partition.
Note
|
makeLeader could be executed for a broker that is the partition leader already.
|
Internally, makeLeader
starts by acquiring write lock (which makes it a single-thread-exclusive operation).
makeLeader
changes the internal controller epoch based on the given LeaderAndIsrPartitionState
.
makeLeader
requests the given LeaderAndIsrPartitionState
for the partition replicas (their broker IDs) and the ISR (as a list of broker IDs) and updates internal registries for replica assignments and in-sync replicas.
makeLeader
creates a Log (unless available already) for the local broker ID and the given OffsetCheckpoints
(with isFutureReplica
flag off).
makeLeader
requests the leader log for the logEndOffset.
makeLeader
prints out the following INFO message to the logs:
[topicPartition] starts at Leader Epoch [leaderEpoch] from offset [leaderEpochStartOffset]. Previous Leader Epoch was: [leaderEpoch]
makeLeader
updates the internal registries: leaderEpoch, leaderEpochStartOffsetOpt and zkVersion.
makeLeader
requests the leader log to maybeAssignEpochStartOffset with the current leaderEpoch and the log end offset (that is now considered the leader epoch’s start offset).
makeLeader
requests the remoteReplicas to resetLastCaughtUpTime. For replicas in inSyncReplicaIds, the last caught-up time is the current time while for the others it is 0
.
When the partition has just been elected a new leader, makeLeader
updates the leaderReplicaIdOpt internal registry and requests the remoteReplicas to updateFetchState.
makeLeader
checks if increment the high watermark with the leader log and, if incremented, tryCompleteDelayedRequests.
Note
|
makeLeader is used when ReplicaManager is requested to makeLeaders.
|
Updating Internal Registries for Replica Assignments and In-Sync Replicas — updateAssignmentAndIsr
Method
updateAssignmentAndIsr(
assignment: Seq[Int],
isr: Set[Int]): Unit
updateAssignmentAndIsr
uses the remoteReplicasMap internal registry and the given assignment
to find the broker IDs that are no longer partition replicas (of the partition).
updateAssignmentAndIsr
creates new partition replicas for the broker IDs that have not been registered in the remoteReplicasMap internal registry before (based on the given assignment
).
updateAssignmentAndIsr
updates the allReplicaIds and the inSyncReplicaIds internal registries with the given assignment
and isr
, respectively.
getOrCreateReplica(
replicaId: Int,
isNew: Boolean = false): Replica
getOrCreateReplica
simply looks up the Replica in the allReplicasMap internal registry (by the given replicaId
).
If not found, getOrCreateReplica
…FIXME
Note
|
|
maybeCreateFutureReplica(logDir: String): Boolean
maybeCreateFutureReplica
…FIXME
Note
|
maybeCreateFutureReplica is used exclusively when ReplicaManager is requested to alterReplicaLogDirs.
|
appendRecordsToLeader(
records: MemoryRecords,
isFromClient: Boolean,
requiredAcks: Int = 0): LogAppendInfo
appendRecordsToLeader
basically requests the Log
(of the leader Replica) to appendAsLeader.
Internally, appendRecordsToLeader
…FIXME
Note
|
|
doAppendRecordsToFollowerOrFutureReplica(
records: MemoryRecords,
isFuture: Boolean): Option[LogAppendInfo]
doAppendRecordsToFollowerOrFutureReplica
…FIXME
Note
|
doAppendRecordsToFollowerOrFutureReplica is used exclusively when Partition is requested to appendRecordsToFollowerOrFutureReplica.
|
appendRecordsToFollowerOrFutureReplica(
records: MemoryRecords,
isFuture: Boolean): Option[LogAppendInfo]
appendRecordsToFollowerOrFutureReplica
…FIXME
Note
|
|
truncateTo(
offset: Long,
isFuture: Boolean): Unit
truncateTo
…FIXME
truncateFullyAndStartAt(newOffset: Long, isFuture: Boolean): Unit
truncateFullyAndStartAt
…FIXME
Note
|
|
maybeReplaceCurrentWithFutureReplica(): Boolean
maybeReplaceCurrentWithFutureReplica
…FIXME
Note
|
maybeReplaceCurrentWithFutureReplica is used exclusively when ReplicaAlterLogDirsThread is requested to processPartitionData.
|
delete(): Unit
delete
…FIXME
Note
|
delete is used exclusively when ReplicaManager is requested to stopReplica.
|
removeFutureLocalReplica(deleteFromLogDir: Boolean = true): Unit
removeFutureLocalReplica
…FIXME
Note
|
removeFutureLocalReplica is used when ReplicaManager is requested to alterReplicaLogDirs and handleLogDirFailure.
|
isLeaderReplicaLocal: Boolean
isLeaderReplicaLocal
is positive (true
) when the optional Replica is defined. Otherwise, false
.
Note
|
isLeaderReplicaLocal is used when ReplicaManager is requested for the performance metrics (InSyncReplicasCount and ReplicasCount), isUnderReplicated, and lowWatermarkIfLeader.
|
localReplicaOrException: Replica
localReplicaOrException
localReplica and returns the local replica if available. Otherwise, localReplicaOrException
throws a ReplicaNotAvailableException
:
Replica for partition [topicPartition] is not available on broker [localBrokerId]
Note
|
|
localReplica: Option[Replica]
localReplica
simply gets the partition replica for the local broker ID.
Note
|
|
getReplica(replicaId: Int): Option[Replica]
getReplica
returns the replica by the given replicaId
(in the allReplicasMap registry) or None
.
Note
|
|
addReplicaIfNotExists(replica: Replica): Replica
addReplicaIfNotExists
…FIXME
Note
|
addReplicaIfNotExists is used when…FIXME
|
assignedReplicas: Set[Replica]
assignedReplicas
…FIXME
Note
|
assignedReplicas is used when…FIXME
|
allReplicas: Set[Replica]
allReplicas
…FIXME
Note
|
allReplicas is used when…FIXME
|
removeReplica(replicaId: Int): Unit
removeReplica
…FIXME
Note
|
removeReplica is used when…FIXME
|
toString(): String
Note
|
toString is part of the java.lang.Object Contract for a string representation of the object.
|
toString
…FIXME
readRecords(
fetchOffset: Long,
currentLeaderEpoch: Optional[Integer],
maxBytes: Int,
fetchIsolation: FetchIsolation,
fetchOnlyFromLeader: Boolean,
minOneMessage: Boolean): LogReadInfo
readRecords
…FIXME
Note
|
readRecords is used when…FIXME
|
deleteRecordsOnLeader(
offset: Long): LogDeleteRecordsResult
deleteRecordsOnLeader
…FIXME
Note
|
deleteRecordsOnLeader is used when…FIXME
|
createLogIfNotExists(
replicaId: Int,
isNew: Boolean,
isFutureReplica: Boolean,
offsetCheckpoints: OffsetCheckpoints): Unit
createLogIfNotExists
branches off per the given isFutureReplica
flag:
For all other cases, createLogIfNotExists
simply prints out the following TRACE message (with the Future
prefix for isFutureReplica
flag enabled):
[Future] Log already exists.
Note
|
|
getOutOfSyncReplicas(
maxLagMs: Long): Set[Int]
getOutOfSyncReplicas
requests the Log for the logEndOffset.
maybeShrinkIsr
isFollowerOutOfSync for every inSyncReplicaIds (without the local broker ID)
Note
|
getOutOfSyncReplicas is used when Partition is requested to maybeShrinkIsr.
|
localLogOrException: Log
localLogOrException
gives the Log if defined or throws a ReplicaNotAvailableException
:
Log for partition [topicPartition] is not available on broker [localBrokerId]
Note
|
localLogOrException is used when…FIXME
|
leaderLogIfLocal: Option[Log]
Note
|
leaderLogIfLocal is used when…FIXME
|
isLeader: Boolean
isLeader
is positive (true
) when the leaderReplicaIdOpt is the local broker ID.
Note
|
|
lastOffsetForLeaderEpoch(
currentLeaderEpoch: Optional[Integer],
leaderEpoch: Int,
fetchOnlyFromLeader: Boolean): EpochEndOffset
lastOffsetForLeaderEpoch
…FIXME
Note
|
lastOffsetForLeaderEpoch is used when…FIXME
|
tryCompleteDelayedRequests(): Unit
tryCompleteDelayedRequests
…FIXME
Note
|
tryCompleteDelayedRequests is used when Partition is requested to makeLeader, updateReplicaLogReadResult, maybeShrinkIsr, and appendRecordsToLeader (when leaderHWIncremented ).
|
createLog(
replicaId: Int,
isNew: Boolean,
isFutureReplica: Boolean,
offsetCheckpoints: OffsetCheckpoints): Log
createLog
requests the LogManager to initializingLog for the TopicPartition.
createLog
requests the LogManager to look up or create a new partition log for the TopicPartition. The LogConfig
passed in is created by requesting the PartitionStateStore for the fetchTopicConfig to override the currentDefaultConfig of the LogManager.
createLog
requests the given OffsetCheckpoints
to fetch the checkpointed high watermark for the TopicPartition. Unless found, createLog
prints out the following INFO message to the logs and assumes 0
:
No checkpointed highwatermark is found for partition [topicPartition]
createLog
requests the Log
to update the high watermark.
createLog
prints out the following INFO message to the logs:
Log loaded for partition [topicPartition] with initial high watermark [initialHighWatermark]
In the end, createLog
requests the LogManager to finishedInitializingLog for the TopicPartition.
Note
|
createLog is used when Partition is requested to create a log or future log (unless available already).
|
expandIsr(
newIsr: Set[Int]): Unit
expandIsr
…FIXME
Note
|
expandIsr is used when Partition is requested to maybeExpandIsr.
|
shrinkIsr(
newIsr: Set[Int]): Unit
shrinkIsr
…FIXME
Note
|
shrinkIsr is used when Partition is requested to maybeShrinkIsr.
|
isFollowerOutOfSync(
replicaId: Int,
leaderEndOffset: Long,
currentTimeMs: Long,
maxLagMs: Long): Boolean
isFollowerOutOfSync
gets the follower replica for the given replicaId
(if available or throws an exception).
isFollowerOutOfSync
is positive (true
) when the following hold:
-
logEndOffset of the follower replica is different than the given
leaderEndOffset
-
Time since the lastCaughtUpTimeMs of the follower replica is greater than the given
maxLagMs
(i.e. replica.lag.time.max.ms configuration property).
Note
|
isFollowerOutOfSync is used when Partition is requested to getOutOfSyncReplicas.
|
getReplicaOrException(
replicaId: Int): Replica
getReplicaOrException
finds the replica for the given replica ID or throws a ReplicaNotAvailableException
:
Replica with id [replicaId] is not available on broker [localBrokerId]
Note
|
getReplicaOrException is used when Partition is requested to checkEnoughReplicasReachOffset, maybeShrinkIsr, and isFollowerOutOfSync.
|
getLocalLog(
currentLeaderEpoch: Optional[Integer],
requireLeader: Boolean): Either[Log, Errors]
getLocalLog
checkCurrentLeaderEpoch for the given currentLeaderEpoch
and branches off per the result:
-
For no errors (
Errors.NONE
), if therequireLeader
flag is on (true
), but the broker is not the leader,getLocalLog
returnsErrors.NOT_LEADER_FOR_PARTITION
. -
For no errors (
Errors.NONE
), if therequireLeader
flag is off (false
) or the broker is the leader,getLocalLog
branches off per the optional log.getLocalLog
gives the log if defined or the errors:-
Errors.NOT_LEADER_FOR_PARTITION
when therequireLeader
flag is on (true
) -
Errors.REPLICA_NOT_AVAILABLE
when therequireLeader
flag is off (false
)
-
-
For any error,
getLocalLog
simply returns it
Note
|
getLocalLog is used when Partition is requested to localLogWithEpochOrException and lastOffsetForLeaderEpoch.
|
checkCurrentLeaderEpoch(
remoteLeaderEpochOpt: Optional[Integer]): Errors
checkCurrentLeaderEpoch
…FIXME
Note
|
checkCurrentLeaderEpoch is used when Partition is requested to getLocalLog.
|
localLogWithEpochOrException(
currentLeaderEpoch: Optional[Integer],
requireLeader: Boolean): Log
localLogWithEpochOrException
…FIXME
Note
|
localLogWithEpochOrException is used when Partition is requested to…FIXME
|
isFollowerInSync(
followerReplica: Replica,
highWatermark: Long): Boolean
isFollowerInSync
…FIXME
Note
|
isFollowerInSync is used when Partition is requested to maybeExpandIsr.
|
maybeUpdateIsrAndVersion(
isr: Set[Int],
zkVersionOpt: Option[Int]): Unit
maybeUpdateIsrAndVersion
…FIXME
Name | Description | ||
---|---|---|---|
|
|||
|
In-sync replicas of the topic partition, i.e. broker IDs that are known to be in-sync with the leader
Updated when maybeUpdateIsrAndVersion, updateAssignmentAndIsr, and delete |
||
|
Default:
|
||
|
All replica broker IDs that were assigned to this topic partition |
||
|
Replicas by ID
Used in getReplica, assignedReplicas, allReplicas, toString |
||
|
|||
|
Remote replicas by broker ID (
Used for getReplica, remoteReplicas, maybeIncrementLeaderHW All of the pairs are removed when delete |