Skip to content

Commit 80d99ea

Browse files
authored
KAFKA-18991: FetcherThread should match leader epochs between fetch request and fetch state (#19223)
This PR fixes a potential issue where the `FetchResponse` returns `divergingEndOffsets` with an older leader epoch. This can lead to committed records being removed from the follower's log, potentially causing data loss. In detail: `processFetchRequest` gets the requested leader epoch of partition data by `topicPartition` and compares it with the leader epoch of the current fetch state. If they don't match, the response is ignored. Reviewers: Jun Rao <junrao@gmail.com>
1 parent 8c3f16d commit 80d99ea

2 files changed

Lines changed: 33 additions & 5 deletions

File tree

core/src/main/scala/kafka/server/AbstractFetcherThread.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,8 @@ abstract class AbstractFetcherThread(name: String,
307307
}
308308
}
309309

310-
private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
310+
// visible for testing
311+
private[server] def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
311312
fetchRequest: FetchRequest.Builder): Unit = {
312313
val partitionsWithError = mutable.Set[TopicPartition]()
313314
val divergingEndOffsets = mutable.Map.empty[TopicPartition, EpochEndOffset]
@@ -333,11 +334,14 @@ abstract class AbstractFetcherThread(name: String,
333334
responseData.foreachEntry { (topicPartition, partitionData) =>
334335
Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState =>
335336
// It's possible that a partition is removed and re-added or truncated when there is a pending fetch request.
336-
// In this case, we only want to process the fetch response if the partition state is ready for fetch and
337-
// the current offset is the same as the offset requested.
337+
// In this case, we only want to process the fetch response if:
338+
// - the partition state is ready for fetch
339+
// - the current offset is the same as the offset requested
340+
// - the current leader epoch is the same as the leader epoch requested
338341
val fetchPartitionData = sessionPartitions.get(topicPartition)
339342
if (fetchPartitionData != null &&
340343
fetchPartitionData.fetchOffset == currentFetchState.fetchOffset &&
344+
fetchPartitionData.currentLeaderEpoch.map[Boolean](_ == currentFetchState.currentLeaderEpoch).orElse(true) &&
341345
currentFetchState.isReadyForFetch) {
342346
Errors.forCode(partitionData.errorCode) match {
343347
case Errors.NONE =>
@@ -362,7 +366,7 @@ abstract class AbstractFetcherThread(name: String,
362366
val logAppendInfoOpt = processPartitionData(
363367
topicPartition,
364368
currentFetchState.fetchOffset,
365-
fetchPartitionData.currentLeaderEpoch.orElse(currentFetchState.currentLeaderEpoch),
369+
currentFetchState.currentLeaderEpoch,
366370
partitionData
367371
)
368372

core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1153,4 +1153,28 @@ class AbstractFetcherThreadTest {
11531153
assertTrue(fetcher.fetchState(unknownPartition).isEmpty)
11541154
}
11551155

1156-
}
1156+
@Test
1157+
def testIgnoreFetchResponseWhenLeaderEpochChanged(): Unit = {
1158+
val newEpoch = 1
1159+
val initEpoch = 0
1160+
1161+
val partition = new TopicPartition("topic", 0)
1162+
val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
1163+
val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
1164+
val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine)
1165+
val replicaState = PartitionState(leaderEpoch = newEpoch)
1166+
fetcher.setReplicaState(partition, replicaState)
1167+
val initFetchState = initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = newEpoch)
1168+
fetcher.addPartitions(Map(partition -> initFetchState))
1169+
1170+
val batch = mkBatch(baseOffset = 0L, leaderEpoch = initEpoch, new SimpleRecord("a".getBytes))
1171+
val leaderState = PartitionState(Seq(batch), leaderEpoch = initEpoch, highWatermark = 1L)
1172+
fetcher.mockLeader.setLeaderState(partition, leaderState)
1173+
1174+
val partitionData = Map(partition -> new FetchRequest.PartitionData(Uuid.randomUuid(), 0, 0, 1048576, Optional.of(initEpoch), Optional.of(initEpoch))).asJava
1175+
val fetchRequestOpt = FetchRequest.Builder.forReplica(0, 0, initEpoch, 0, Int.MaxValue, partitionData)
1176+
1177+
fetcher.processFetchRequest(partitionData, fetchRequestOpt)
1178+
assertEquals(0, replicaState.logEndOffset, "FetchResponse should be ignored when leader epoch does not match")
1179+
}
1180+
}

0 commit comments

Comments
 (0)