ReplicaManager
manages log replicas using the LogManager.
ReplicaManager
is given all the required services (e.g. Metrics, KafkaZkClient, Scheduler, LogManager, QuotaManagers, MetadataCache) from the KafkaServer
.
Note
|
KafkaServer creates a ReplicaManager for the only purpose of creating KafkaApis, GroupCoordinator, and TransactionCoordinator.
|
When started, ReplicaManager
schedules isr-expiration and isr-change-propagation recurring tasks (every half of replica.lag.time.max.ms property and 2500 ms, respectively).
ReplicaManager
manages a registry of Partitions by TopicPartition.
ReplicaManager
can be the leader or a follower of partitions which happens when KafkaApis
is requested to handle a LeaderAndIsr request (mostly, if not always, from the KafkaController).
ReplicaManager
uses the MetadataCache for the following:
-
…FIXME
Tip
|
Enable Add the following line to
Refer to Logging. Please note that
That means that the logs of |
ReplicaManager
creates a ReplicaFetcherManager while being created (via createReplicaFetcherManager).
ReplicaManager
uses the ReplicaFetcherManager
for the following:
ReplicaFetcherManager
is also used when:
-
DynamicThreadPool
is requested to reconfigure (resize) the thread pool -
KafkaApis
is requested to handle a StopReplica request
ReplicaManager
is a KafkaMetricsGroup with the following performance metrics.
Metric Name | Description |
---|---|
|
|
|
|
|
|
|
|
|
|
|
Number of allPartitions |
|
|
|
The performance metrics are registered in kafka.server:type=ReplicaManager group.
allPartitions: Pool[TopicPartition, Partition]
allPartitions
is a registry of Partitions by TopicPartition
The number of partitions in allPartitions
is available as the PartitionCount performance metric.
A new TopicPartition
is added exclusively when looking up a Partition by a TopicPartition that is not registered yet.
A TopicPartition
is removed when stopping a replica (and marked as the OfflinePartition).
Note
|
allPartitions is used when getPartition, nonOfflinePartitionsIterator, offlinePartitionsIterator, becomeLeaderOrFollower, maybeShrinkIsr, handleLogDirFailure
|
createReplicaFetcherManager(
metrics: Metrics
time: Time
threadNamePrefix: Option[String]
quotaManager: ReplicationQuotaManager): ReplicaFetcherManager
createReplicaFetcherManager
simply creates a ReplicaFetcherManager.
Note
|
createReplicaFetcherManager is used when ReplicaManager is created.
|
createReplicaAlterLogDirsManager(
quotaManager: ReplicationQuotaManager,
brokerTopicStats: BrokerTopicStats): ReplicaAlterLogDirsManager
createReplicaAlterLogDirsManager
simply creates a ReplicaAlterLogDirsManager.
Note
|
createReplicaAlterLogDirsManager is used when ReplicaManager is created.
|
shutdown(
checkpointHW: Boolean = true): Unit
shutdown
…FIXME
Note
|
shutdown is used when KafkaServer is requested to shut down.
|
alterReplicaLogDirs(partitionDirs: Map[TopicPartition, String]): Map[TopicPartition, Errors]
alterReplicaLogDirs
…FIXME
Note
|
alterReplicaLogDirs is used exclusively when KafkaApis is requested to handle an AlterReplicaLogDirs request.
|
becomeLeaderOrFollower(
correlationId: Int,
leaderAndIsrRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse
becomeLeaderOrFollower
prints out the following TRACE message to the logs:
Received LeaderAndIsr request [stateInfo] correlation id [correlationId] from controller [controllerId] epoch [controllerEpoch] for partition [topicPartition]
becomeLeaderOrFollower
records the current controller epoch (of the LeaderAndIsrRequest) in the controllerEpoch internal registry.
For all valid partition states, becomeLeaderOrFollower
finds the partition states with the leader being the (local) broker and that are the partitions for which the broker becomes the leader. All other partition states are for partitions for which the broker becomes a follower.
For all partitions for which the broker becomes the leader, becomeLeaderOrFollower
makeLeaders.
For all partitions for which the broker becomes a follower, becomeLeaderOrFollower
makeFollowers.
With the hwThreadInitialized internal flag disabled (false
), becomeLeaderOrFollower
startHighWaterMarksCheckPointThread and turns the flag on (true
).
For every new partitions, becomeLeaderOrFollower
…FIXME
becomeLeaderOrFollower
…FIXME
becomeLeaderOrFollower
calls the given onLeadershipChange
callback with the partitions for the broker to be the leader and a follower.
In the end, becomeLeaderOrFollower
creates a new LeaderAndIsrResponse
to "announce" a successful request processing.
Note
|
becomeLeaderOrFollower is used exclusively when KafkaApis is requested to handle a LeaderAndIsr request.
|
makeFollowers(
controllerId: Int,
epoch: Int,
partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition]
makeFollowers
…FIXME
Note
|
makeFollowers is used when ReplicaManager is requested to becomeLeaderOrFollower.
|
recordIsrChange(topicPartition: TopicPartition): Unit
recordIsrChange
adds the input topicPartition
to isrChangeSet internal registry and sets lastIsrChangeMs to the current time.
Note
|
recordIsrChange is used exclusively when Partition does updateIsr
|
updateFollowerLogReadResults(
replicaId: Int,
readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)]
updateFollowerLogReadResults
…FIXME
Note
|
updateFollowerLogReadResults is used exclusively when ReplicaManager is requested to fetch messages from the leader replica.
|
fetchMessages(
timeout: Long,
replicaId: Int,
fetchMinBytes: Int,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
fetchInfos: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
isolationLevel: IsolationLevel,
clientMetadata: Option[ClientMetadata]): Unit
fetchMessages
determines fetchIsolation
:
-
FetchLogEnd
for a request from a follower or when thereplicaId
isRequest.FutureLocalReplicaId
-
FetchTxnCommitted
forisolationLevel
beingREAD_COMMITTED
-
FetchHighWatermark
otherwise
fetchMessages
readFromLocalLog (with the fetchIsolation
).
fetchMessages
…FIXME
Note
|
|
maybePropagateIsrChanges(): Unit
maybePropagateIsrChanges
…FIXME
Note
|
maybePropagateIsrChanges is used exclusively when isr-change-propagation task is executed (every 2500 milliseconds).
|
ReplicaManager
takes the following when created:
ReplicaManager
initializes the internal registries and counters.
startup(): Unit
startup
requests Scheduler to schedule the ISR-related tasks:
startup
then creates a LogDirFailureHandler and requests it to start.
Note
|
startup uses Scheduler that was specified when ReplicaManager was created.
|
Note
|
startup is used exclusively when KafkaServer starts up.
|
maybeShrinkIsr(): Unit
maybeShrinkIsr
prints out the following TRACE message to the logs:
Evaluating ISR list of partitions to see which replicas can be removed from the ISR
maybeShrinkIsr
requests the partitions (from allPartitions pool that are not offline partitions) to maybeShrinkIsr (with replica.lag.time.max.ms property).
Note
|
maybeShrinkIsr is used exclusively to schedule isr-expiration recurring task when ReplicaManager starts up.
|
makeLeaders(
controllerId: Int,
epoch: Int,
partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition]
makeLeaders
…FIXME
Note
|
makeLeaders is used exclusively when ReplicaManager is requested to becomeLeaderOrFollower.
|
describeLogDirs(partitions: Set[TopicPartition]): Map[String, LogDirInfo]
describeLogDirs
…FIXME
Note
|
describeLogDirs is used exclusively when KafkaApis is requested to handle a DescribeLogDirs request.
|
getLog(topicPartition: TopicPartition): Option[Log]
getLog
…FIXME
Note
|
|
startHighWaterMarksCheckPointThread(): Unit
startHighWaterMarksCheckPointThread
…FIXME
Note
|
startHighWaterMarksCheckPointThread is used when…FIXME
|
checkpointHighWatermarks(): Unit
checkpointHighWatermarks
…FIXME
Note
|
checkpointHighWatermarks is used when…FIXME
|
shutdownIdleReplicaAlterLogDirsThread(): Unit
shutdownIdleReplicaAlterLogDirsThread
…FIXME
Note
|
shutdownIdleReplicaAlterLogDirsThread is used when…FIXME
|
handleLogDirFailure(
dir: String,
sendZkNotification: Boolean = true): Unit
handleLogDirFailure
…FIXME
Note
|
handleLogDirFailure is used when LogDirFailureHandler is requested to do the work.
|
maybeUpdateMetadataCache(
correlationId: Int,
updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition]
maybeUpdateMetadataCache
…FIXME
Note
|
maybeUpdateMetadataCache is used exclusively when KafkaApis is requested to handle an UpdateMetadata request.
|
appendRecords(
timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
isFromClient: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit
appendRecords
…FIXME
Note
|
|
isValidRequiredAcks(requiredAcks: Short): Boolean
isValidRequiredAcks
is positive (true
) when the given requiredAcks
is one of the following:
-
-1
-
1
-
0
Otherwise, isValidRequiredAcks
is negative (false
).
Note
|
isValidRequiredAcks is used exclusively when ReplicaManager is requested to appendRecords.
|
appendToLocalLog(
internalTopicsAllowed: Boolean,
isFromClient: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
requiredAcks: Short): Map[TopicPartition, LogAppendResult]
appendToLocalLog
processes (maps over) the given Map[TopicPartition, MemoryRecords]
(entriesPerPartition
), so that the leader partition (of every TopicPartition
) is requested to appendRecordsToLeader.
Internally, appendToLocalLog
prints out the following TRACE message to the logs:
Append [[entriesPerPartition]] to local log
For every tuple in the given entriesPerPartition
(Map[TopicPartition, MemoryRecords]
), appendToLocalLog
does the following steps:
-
Requests the BrokerTopicStats to mark the occurrence of an event for the totalProduceRequestRate for the topic (of the
TopicPartition
) in the topicStats and for all topics -
Gets the partition (or throws an exception) (with
expectLeader
flag enabled) -
Requests the
Partition
to appendRecordsToLeader (with theMemoryRecords
, theisFromClient
flag, and therequiredAcks
bit map) -
Requests the BrokerTopicStats to mark the
sizeInBytes
of theMemoryRecords
for the bytesInRate for the topic (of theTopicPartition
) in the bytesInRate and for all topics -
Requests the BrokerTopicStats to mark the number of messages appended for the messagesInRate for the topic (of the
TopicPartition
) in the bytesInRate and for all topics -
Prints out the following TRACE message to the logs:
[sizeInBytes] written to log [topicPartition] beginning at offset [firstOffset] and ending at offset [lastOffset]
In case Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed
, appendToLocalLog
…FIXME
In case of exceptions, appendToLocalLog
…FIXME
Note
|
appendToLocalLog is used exclusively when ReplicaManager is requested to append records.
|
getPartitionOrException(
topicPartition: TopicPartition,
expectLeader: Boolean): Partition
getPartitionOrException
gets the partition if available or throws one of the following exceptions:
-
KafkaStorageException
when the partition is offlinePartition [topicPartition] is in an offline log directory
-
NotLeaderForPartitionException
Broker [localBrokerId] is not a replica of [topicPartition]
-
ReplicaNotAvailableException
Partition [topicPartition] is not available
-
UnknownTopicOrPartitionException
Partition [topicPartition] doesn't exist
Note
|
getPartitionOrException is used when…FIXME
|
getPartition(topicPartition: TopicPartition): Option[Partition]
getPartition
gets the partition for the given TopicPartition
.
Note
|
|
stopReplica(
topicPartition: TopicPartition,
deletePartition: Boolean): Unit
stopReplica
…FIXME
Note
|
stopReplica is used exclusively when ReplicaManager is requested to stopReplicas.
|
underReplicatedPartitionCount: Int
underReplicatedPartitionCount
…FIXME
Note
|
underReplicatedPartitionCount is used exclusively for the UnderReplicatedPartitions performance metric.
|
leaderPartitionsIterator: Iterator[Partition]
leaderPartitionsIterator
…FIXME
Note
|
leaderPartitionsIterator is used exclusively for the performance metrics: LeaderCount, UnderMinIsrPartitionCount, and UnderReplicatedPartitions (indirectly using underReplicatedPartitionCount).
|
nonOfflinePartitionsIterator: Iterator[Partition]
nonOfflinePartitionsIterator
…FIXME
Note
|
nonOfflinePartitionsIterator is used when ReplicaManager is requested to leaderPartitionsIterator, checkpointHighWatermarks, and handleLogDirFailure.
|
getOrCreatePartition(topicPartition: TopicPartition): Partition
getOrCreatePartition
simply looks up a Partition by the TopicPartition
(in the allPartitions internal registry). If not found, getOrCreatePartition
adds a new Partition
.
Note
|
getOrCreatePartition is used exclusively when ReplicaManager is requested to becomeLeaderOrFollower.
|
offlinePartitionsIterator: Iterator[Partition]
offlinePartitionsIterator
…FIXME
Note
|
offlinePartitionsIterator is used when…FIXME
|
markPartitionOffline(tp: TopicPartition): Unit
markPartitionOffline
…FIXME
Note
|
markPartitionOffline is used when…FIXME
|
lastOffsetForLeaderEpoch(
requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]
): Map[TopicPartition, EpochEndOffset]
lastOffsetForLeaderEpoch
…FIXME
Note
|
lastOffsetForLeaderEpoch is used when…FIXME
|
nonOfflinePartition(topicPartition: TopicPartition): Option[Partition]
nonOfflinePartition
…FIXME
Note
|
nonOfflinePartition is used when…FIXME
|
deleteRecords(
timeout: Long,
offsetPerPartition: Map[TopicPartition, Long],
responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit): Unit
deleteRecords
…FIXME
Note
|
deleteRecords is used when…FIXME
|
fetchOffsetForTimestamp(
topicPartition: TopicPartition,
timestamp: Long,
isolationLevel: Option[IsolationLevel],
currentLeaderEpoch: Optional[Integer],
fetchOnlyFromLeader: Boolean): TimestampOffset
fetchOffsetForTimestamp
…FIXME
Note
|
fetchOffsetForTimestamp is used when…FIXME
|
stopReplicas(
stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Errors], Errors)
stopReplicas
…FIXME
Note
|
stopReplicas is used exclusively when KafkaApis is requested to handle a StopReplica request.
|
getMagic(topicPartition: TopicPartition): Option[Byte]
getMagic
…FIXME
Note
|
getMagic is used when…FIXME
|
localReplicaOrException(topicPartition: TopicPartition): Replica
localReplicaOrException
finds the partition (or throws an exception) for the given TopicPartition
(and expectLeader
flag off) and requests the Partition
to get the local partition replica (or throw an exception).
Note
|
A partition replica is local when the replica ID is exactly the local broker ID. |
Note
|
localReplicaOrException is used when…FIXME
|
shouldLeaderThrottle(
quota: ReplicaQuota,
topicPartition: TopicPartition,
replicaId: Int): Boolean
shouldLeaderThrottle
…FIXME
Note
|
shouldLeaderThrottle is used when…FIXME
|
readFromLocalLog(
replicaId: Int,
fetchOnlyFromLeader: Boolean,
fetchIsolation: FetchIsolation,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
readPartitionInfo: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota,
clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)]
readFromLocalLog
returns records (as FetchDataInfo
) for every TopicPartition
in the given readPartitionInfo
.
Internally, readFromLocalLog
reads the requested PartitionData
(up to fetchMaxBytes
that is decremented sequentially for every partition) for every TopicPartition
(in the given readPartitionInfo
collection).
Note
|
|
read(
tp: TopicPartition,
fetchInfo: PartitionData,
limitBytes: Int,
minOneMessage: Boolean): LogReadResult
read
increments the totalFetchRequestRate metric of the topic (of the TopicPartition
) and the allTopicsStats in the BrokerTopicStats.
read
prints out the following TRACE message to the logs:
Fetching log segment for partition [tp], offset [offset], partition fetch size [partitionFetchSize], remaining response limit [limitBytes]
read
finds the Partition for the TopicPartition
.
read
…FIXME
electPreferredLeaders(
controller: KafkaController,
partitions: Set[TopicPartition],
responseCallback: Map[TopicPartition, ApiError] => Unit,
requestTimeout: Long): Unit
electPreferredLeaders
simply requests the KafkaController
to electPreferredLeaders for the partitions.
Note
|
electPreferredLeaders is used exclusively when KafkaApis is requested to handle an ElectPreferredLeaders request.
|
tryCompleteElection(key: DelayedOperationKey): Unit
tryCompleteElection
…FIXME
Note
|
tryCompleteElection is used exclusively when KafkaApis is requested to handle an UpdateMetadata request.
|
localLog(
topicPartition: TopicPartition): Option[Log]
localLog
…FIXME
Note
|
localLog is used when…FIXME
|
findPreferredReadReplica(
tp: TopicPartition,
clientMetadata: ClientMetadata,
replicaId: Int,
fetchOffset: Long,
currentTimeMs: Long): Option[Int]
findPreferredReadReplica
finds the Partition for the TopicPartition
(with expectLeader
flag off).
findPreferredReadReplica
…FIXME
Note
|
findPreferredReadReplica is used when ReplicaManager is requested to readFromLocalLog.
|
localLogOrException(
topicPartition: TopicPartition): Log
localLogOrException
finds the Partition for the given TopicPartition
(and expectLeader
flag off) and then requests it for the local Log.
localLogOrException
throws an exception if neither the Partition
nor the Log
of the given TopicPartition
is available locally.
Note
|
localLogOrException is used when ReplicaFetcherThread is requested for the latest epoch, logEndOffset, endOffsetForEpoch, and buildFetch.
|
updateFollowerFetchState(
followerId: Int,
readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)]
updateFollowerFetchState
…FIXME
Note
|
updateFollowerFetchState is used when ReplicaManager is requested to fetchMessages.
|
Name | Description |
---|---|
|
|
|
Created immediately with Used when:
|
|
|
|
StateChangeLogger with the broker ID and |
|
|
|
|
|
Time when isrChangeSet has a new |
|
|
|