Skip to content

Commit cff10e6

Browse files
KAFKA-19302 Move ReplicaState and Replica to server module (#19755)
1. Move `ReplicaState` and `Replica` to server module. 2. Rewrite `ReplicaState` and `Replica` in Java. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
1 parent 6573b4a commit cff10e6

9 files changed

Lines changed: 636 additions & 596 deletions

File tree

core/src/main/scala/kafka/cluster/Partition.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogManager
4444
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard}
4545
import org.apache.kafka.server.metrics.KafkaMetricsGroup
4646
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, TopicPartitionOperationKey}
47+
import org.apache.kafka.server.replica.Replica
4748
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
4849
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, UnexpectedAppendOffsetException}
4950
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
@@ -795,10 +796,10 @@ class Partition(val topicPartition: TopicPartition,
795796
// lastFetchLeaderLogEndOffset.
796797
remoteReplicas.foreach { replica =>
797798
replica.resetReplicaState(
798-
currentTimeMs = currentTimeMs,
799-
leaderEndOffset = leaderEpochStartOffset,
800-
isNewLeader = isNewLeader,
801-
isFollowerInSync = partitionState.isr.contains(replica.brokerId)
799+
currentTimeMs,
800+
leaderEpochStartOffset,
801+
isNewLeader,
802+
partitionState.isr.contains(replica.brokerId)
802803
)
803804
}
804805

@@ -1072,9 +1073,9 @@ class Partition(val topicPartition: TopicPartition,
10721073
isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)
10731074
}
10741075

1075-
private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], cachedBrokerEpoch: Optional[java.lang.Long]): Boolean = {
1076-
storedBrokerEpoch.isDefined && cachedBrokerEpoch.isPresent() &&
1077-
(storedBrokerEpoch.get == -1 || storedBrokerEpoch.get == cachedBrokerEpoch.get())
1076+
private def isBrokerEpochIsrEligible(storedBrokerEpoch: Optional[java.lang.Long], cachedBrokerEpoch: Optional[java.lang.Long]): Boolean = {
1077+
storedBrokerEpoch.isPresent && cachedBrokerEpoch.isPresent &&
1078+
(storedBrokerEpoch.get == -1 || storedBrokerEpoch.get == cachedBrokerEpoch.get)
10781079
}
10791080

10801081
/*
@@ -1802,7 +1803,7 @@ class Partition(val topicPartition: TopicPartition,
18021803
brokerState.setBrokerEpoch(localBrokerEpochSupplier())
18031804
} else {
18041805
val replica = remoteReplicasMap.get(brokerId)
1805-
val brokerEpoch = if (replica == null) Option.empty else replica.stateSnapshot.brokerEpoch
1806+
val brokerEpoch = if (replica == null) Optional.empty else replica.stateSnapshot.brokerEpoch
18061807
if (brokerEpoch.isEmpty) {
18071808
// There are two cases where the broker epoch can be missing:
18081809
// 1. During ISR expansion, we already held lock for the partition and did the broker epoch check, so the new

core/src/main/scala/kafka/cluster/Replica.scala

Lines changed: 0 additions & 207 deletions
This file was deleted.

core/src/test/scala/unit/kafka/cluster/PartitionTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.kafka.common.utils.Time
3131
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicPartition, Uuid}
3232
import org.apache.kafka.metadata.MetadataCache
3333
import org.apache.kafka.server.config.ReplicationConfigs
34+
import org.apache.kafka.server.replica.Replica
3435
import org.apache.kafka.metadata.LeaderRecoveryState
3536
import org.junit.jupiter.api.Assertions._
3637
import org.junit.jupiter.api.Test

0 commit comments

Comments
 (0)