Skip to content

Commit c72fcda

Browse files
authored
MINOR: Cleanup TestUtils.scala (#22091)
Some methods are only used in a single class; they should be moved into that class. Java tests should not use TestUtils.scala Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
1 parent f9037bc commit c72fcda

13 files changed

Lines changed: 406 additions & 411 deletions

core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4189,11 +4189,11 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
41894189
}
41904190

41914191
def removeAllClientAcls(): Unit = {
4192-
val authorizerForWrite = TestUtils.pickAuthorizerForWrite(brokers, controllerServers)
4192+
val authorizerForWrite = pickAuthorizerForWrite(brokers, controllerServers)
41934193
val aclEntryFilter = new AccessControlEntryFilter(clientPrincipalString, null, AclOperation.ANY, AclPermissionType.ANY)
41944194
val aclFilter = new AclBindingFilter(ResourcePatternFilter.ANY, aclEntryFilter)
41954195

4196-
authorizerForWrite.deleteAcls(TestUtils.anonymousAuthorizableContext, java.util.List.of(aclFilter)).asScala.
4196+
authorizerForWrite.deleteAcls(anonymousAuthorizableContext, java.util.List.of(aclFilter)).asScala.
41974197
map(_.toCompletableFuture.get).flatMap { deletion =>
41984198
deletion.aclBindingDeleteResults().asScala.map(_.aclBinding.pattern).toSet
41994199
}.foreach { resource =>

core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg
5858
override def setUp(testInfo: TestInfo): Unit = {
5959
this.testInfo = testInfo
6060
super.setUp(testInfo)
61-
waitUntilBrokerMetadataIsPropagated(brokers)
61+
val expectedBrokerIds = brokers.map(_.config.brokerId).toSet
62+
waitUntilTrue(() => brokers.forall(server =>
63+
expectedBrokerIds.forall(server.dataPlaneRequestProcessor.metadataCache.hasAliveBroker(_))
64+
), "Timed out waiting for broker metadata to propagate to all servers", 15000)
6265
}
6366

6467
@AfterEach

core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

Lines changed: 135 additions & 57 deletions
Large diffs are not rendered by default.

core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,16 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter
2424
import org.apache.kafka.common.network.ListenerName
2525
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
2626
import org.apache.kafka.common.resource.ResourcePattern
27-
import org.apache.kafka.common.security.auth.SecurityProtocol
27+
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
2828
import org.apache.kafka.common.security.scram.ScramCredential
2929
import org.apache.kafka.common.utils.Time
3030
import org.apache.kafka.common.{KafkaException, Uuid}
3131
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
32+
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer => JAuthorizer}
3233
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
3334

3435
import java.io.File
36+
import java.net.InetAddress
3537
import java.time.Duration
3638
import java.util
3739
import java.util.Properties
@@ -214,6 +216,17 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
214216
}
215217
}
216218

219+
val anonymousAuthorizableContext = new AuthorizableRequestContext() {
220+
override def listenerName(): String = ""
221+
override def securityProtocol(): SecurityProtocol = SecurityProtocol.PLAINTEXT
222+
override def principal(): KafkaPrincipal = KafkaPrincipal.ANONYMOUS
223+
override def clientAddress(): InetAddress = null
224+
override def requestType(): Int = 0
225+
override def requestVersion(): Int = 0
226+
override def clientId(): String = ""
227+
override def correlationId(): Int = 0
228+
}
229+
217230
def addAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
218231
val authorizerForWrite = pickAuthorizerForWrite(brokers, controllerServers)
219232
val aclBindings = acls.map { acl => new AclBinding(resource, acl) }
@@ -371,4 +384,20 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
371384
}
372385
}
373386
}
387+
388+
/**
389+
* Find an Authorizer that we can call createAcls or deleteAcls on.
390+
*/
391+
def pickAuthorizerForWrite[B <: KafkaBroker](brokers: Seq[B], controllers: Seq[ControllerServer]): JAuthorizer = {
392+
if (controllers.isEmpty) {
393+
brokers.head.authorizerPlugin.get.get
394+
} else {
395+
var result: JAuthorizer = null
396+
TestUtils.retry(120000) {
397+
val active = controllers.filter(_.controller.isActive).head
398+
result = active.authorizerPlugin.get.get
399+
}
400+
result
401+
}
402+
}
374403
}

core/src/test/scala/unit/kafka/log/LogCleanerTest.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1229,7 +1229,7 @@ class LogCleanerTest extends Logging {
12291229

12301230
// the last (active) segment has just one message
12311231

1232-
def distinctValuesBySegment = log.logSegments.asScala.map(s => s.log.records.asScala.map(record => TestUtils.readString(record.value)).toSet.size).toSeq
1232+
def distinctValuesBySegment = log.logSegments.asScala.map(s => s.log.records.asScala.map(record => Utils.utf8(record.value())).toSet.size).toSeq
12331233

12341234
val distinctValuesBySegmentBeforeClean = distinctValuesBySegment
12351235
assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N),
@@ -1927,7 +1927,7 @@ class LogCleanerTest extends Logging {
19271927

19281928
for (segment <- log.logSegments.asScala; batch <- segment.log.batches.asScala; record <- batch.asScala) {
19291929
assertTrue(record.hasMagic(batch.magic))
1930-
val value = TestUtils.readString(record.value).toLong
1930+
val value = Utils.utf8(record.value()).toLong
19311931
assertEquals(record.offset, value)
19321932
}
19331933
}
@@ -1947,7 +1947,7 @@ class LogCleanerTest extends Logging {
19471947

19481948
for (logEntry <- records.records.asScala) {
19491949
val offset = logEntry.offset
1950-
val value = TestUtils.readString(logEntry.value).toLong
1950+
val value = Utils.utf8(logEntry.value()).toLong
19511951
assertEquals(offset, value)
19521952
}
19531953
}

core/src/test/scala/unit/kafka/log/LogTestUtils.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package kafka.log
1919

2020
import java.io.File
2121
import java.util.Properties
22-
import kafka.utils.TestUtils
2322
import org.apache.kafka.common.Uuid
2423
import org.apache.kafka.common.compress.Compression
2524
import org.apache.kafka.common.record.internal.{ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
@@ -38,7 +37,7 @@ import org.apache.kafka.server.storage.log.FetchIsolation
3837
import org.apache.kafka.server.util.Scheduler
3938
import org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG, DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG}
4039
import org.apache.kafka.common.message.AbortedTxn
41-
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex, VerificationGuard, UnifiedLog}
40+
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LazyIndex, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetsListener, LogSegment, ProducerStateManager, ProducerStateManagerConfig, TransactionIndex, UnifiedLog, VerificationGuard}
4241
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
4342

4443
import scala.jdk.CollectionConverters._
@@ -202,7 +201,7 @@ object LogTestUtils {
202201
for (logSegment <- log.logSegments.asScala;
203202
batch <- logSegment.log.batches.asScala if !batch.isControlBatch;
204203
record <- batch.asScala if record.hasValue && record.hasKey)
205-
yield TestUtils.readString(record.key).toLong
204+
yield Utils.utf8(record.key()).toLong
206205
}
207206

208207
def recoverAndCheck(logDir: File, config: LogConfig, expectedKeys: Iterable[Long], brokerTopicStats: BrokerTopicStats, time: Time, scheduler: Scheduler): UnifiedLog = {
@@ -308,4 +307,5 @@ object LogTestUtils {
308307
sequence += numRecords
309308
}
310309
}
310+
311311
}

core/src/test/scala/unit/kafka/metrics/MetricsTest.scala

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,14 @@ import org.apache.kafka.common.metrics.JmxReporter
3636
import org.apache.kafka.common.utils.Time
3737
import org.apache.kafka.server.config.ServerLogConfigs
3838
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
39+
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
40+
import org.apache.kafka.storage.internals.log.UnifiedLog
3941
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics
4042
import org.junit.jupiter.api.{Test, Timeout}
4143
import org.junit.jupiter.params.ParameterizedTest
4244
import org.junit.jupiter.params.provider.MethodSource
4345

46+
import java.io.File
4447
import scala.jdk.OptionConverters.RichOptional
4548

4649
@Timeout(120)
@@ -64,7 +67,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
6467
val topic = "test-topic-metric"
6568
createTopic(topic)
6669
deleteTopic(topic)
67-
TestUtils.verifyTopicDeletion(topic, 1, brokers)
70+
verifyTopicDeletion(topic, brokers)
6871
assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic")
6972
}
7073

@@ -78,7 +81,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
7881
assertTrue(topicMetricGroups(topic).nonEmpty, "Topic metrics don't exist")
7982
brokers.foreach(b => assertNotNull(b.brokerTopicStats.topicStats(topic)))
8083
deleteTopic(topic)
81-
TestUtils.verifyTopicDeletion(topic, 1, brokers)
84+
verifyTopicDeletion(topic, brokers)
8285
assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists after deleteTopic")
8386
}
8487

@@ -244,4 +247,41 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
244247
val pattern = (".*BrokerTopicMetrics.*" + topic.map(t => s"($t)$$").getOrElse("")).r.pattern
245248
metrics.filter(pattern.matcher(_).matches())
246249
}
250+
251+
private def verifyTopicDeletion[B <: KafkaBroker](topic: String, brokers: Seq[B]): Unit = {
252+
val topicPartitions = (0 until 1).map(new TopicPartition(topic, _))
253+
// ensure that the topic-partition has been deleted from all brokers' replica managers
254+
TestUtils.waitUntilTrue(() =>
255+
brokers.forall(broker => topicPartitions.forall(tp => broker.replicaManager.onlinePartition(tp).isEmpty)),
256+
"Replica manager's should have deleted all of this topic's partitions")
257+
// ensure that logs from all replicas are deleted
258+
TestUtils.waitUntilTrue(() => brokers.forall(broker => topicPartitions.forall(tp => broker.logManager.getLog(tp).isEmpty)),
259+
"Replica logs not deleted after delete topic is complete")
260+
// ensure that topic is removed from all cleaner offsets
261+
TestUtils.waitUntilTrue(() => brokers.forall(broker => topicPartitions.forall { tp =>
262+
val checkpoints = broker.logManager.liveLogDirs.asScala.map { logDir =>
263+
new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint"), null).read()
264+
}
265+
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.containsKey(tp))
266+
}), "Cleaner offset for deleted partition should have been removed")
267+
TestUtils.waitUntilTrue(() => brokers.forall(broker =>
268+
broker.config.logDirs.stream().allMatch { logDir =>
269+
topicPartitions.forall { tp =>
270+
!new File(logDir, tp.topic + "-" + tp.partition).exists()
271+
}
272+
}
273+
), "Failed to soft-delete the data to a delete directory")
274+
TestUtils.waitUntilTrue(() => brokers.forall(broker =>
275+
broker.config.logDirs.stream().allMatch { logDir =>
276+
topicPartitions.forall { tp =>
277+
!util.List.of(new File(logDir).list()).asScala.exists { partitionDirectoryNames =>
278+
partitionDirectoryNames.exists { directoryName =>
279+
directoryName.startsWith(tp.topic + "-" + tp.partition) &&
280+
directoryName.endsWith(UnifiedLog.DELETE_DIR_SUFFIX)
281+
}
282+
}
283+
}
284+
}
285+
), "Failed to hard-delete the delete directory")
286+
}
247287
}

core/src/test/scala/unit/kafka/network/RequestChannelTest.scala

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package kafka.network
2020
import com.fasterxml.jackson.databind.ObjectMapper
2121
import kafka.network
2222
import kafka.server.EnvelopeUtils
23-
import kafka.utils.TestUtils
2423
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
2524
import org.apache.kafka.common.config.types.Password
2625
import org.apache.kafka.common.config.{ConfigResource, SaslConfigs, SslConfigs, TopicConfig}
@@ -29,7 +28,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
2928
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._
3029
import org.apache.kafka.common.message.{CreateTopicsRequestData, CreateTopicsResponseData, IncrementalAlterConfigsRequestData}
3130
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
32-
import org.apache.kafka.common.protocol.Errors
31+
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
3332
import org.apache.kafka.common.requests.AlterConfigsRequest._
3433
import org.apache.kafka.common.requests._
3534
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
@@ -48,6 +47,7 @@ import java.io.IOException
4847
import java.net.InetAddress
4948
import java.nio.ByteBuffer
5049
import java.util
50+
import java.util.Optional
5151
import java.util.concurrent.atomic.AtomicReference
5252
import scala.collection.Map
5353
import scala.jdk.CollectionConverters._
@@ -260,7 +260,7 @@ class RequestChannelTest {
260260
}
261261

262262
private def buildUnwrappedEnvelopeRequest(request: AbstractRequest): RequestChannel.Request = {
263-
val wrappedRequest = TestUtils.buildEnvelopeRequest(
263+
val wrappedRequest = buildEnvelopeRequest(
264264
request,
265265
principalSerde,
266266
requestChannelMetrics,
@@ -278,6 +278,46 @@ class RequestChannelTest {
278278
unwrappedRequest.get()
279279
}
280280

281+
def buildEnvelopeRequest(
282+
request: AbstractRequest,
283+
principalSerde: KafkaPrincipalSerde,
284+
requestChannelMetrics: RequestChannelMetrics,
285+
startTimeNanos: Long,
286+
dequeueTimeNanos: Long = -1,
287+
fromPrivilegedListener: Boolean = true
288+
): RequestChannel.Request = {
289+
val clientId = "id"
290+
val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
291+
292+
val requestHeader = new RequestHeader(request.apiKey, request.version, clientId, 0)
293+
val requestBuffer = request.serializeWithHeader(requestHeader)
294+
295+
val envelopeHeader = new RequestHeader(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion(), clientId, 0)
296+
val envelopeBuffer = new EnvelopeRequest.Builder(
297+
requestBuffer,
298+
principalSerde.serialize(KafkaPrincipal.ANONYMOUS),
299+
InetAddress.getLocalHost.getAddress
300+
).build().serializeWithHeader(envelopeHeader)
301+
302+
RequestHeader.parse(envelopeBuffer)
303+
304+
val envelopeContext = new RequestContext(envelopeHeader, "1", InetAddress.getLocalHost, Optional.empty(),
305+
KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY,
306+
fromPrivilegedListener, Optional.of(principalSerde))
307+
308+
val envelopRequest = new RequestChannel.Request(
309+
processor = 1,
310+
context = envelopeContext,
311+
startTimeNanos = startTimeNanos,
312+
memoryPool = MemoryPool.NONE,
313+
buffer = envelopeBuffer,
314+
metrics = requestChannelMetrics,
315+
envelope = None
316+
)
317+
envelopRequest.requestDequeueTimeNanos = dequeueTimeNanos
318+
envelopRequest
319+
}
320+
281321
private def isValidJson(str: String): Boolean = {
282322
try {
283323
val mapper = new ObjectMapper

core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.StringSerializer
3333
import org.apache.kafka.common.test.ClusterInstance
3434
import org.apache.kafka.common.utils.ProducerIdAndEpoch
3535
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
36+
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3637
import org.apache.kafka.server.IntegrationTestUtils
3738
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
3839

@@ -70,10 +71,14 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
7071
protected def createTransactionStateTopic(): Unit = {
7172
val admin = cluster.admin()
7273
try {
73-
TestUtils.createTransactionStateTopicWithAdmin(
74+
TestUtils.createTopicWithAdmin(
7475
admin = admin,
76+
topic = Topic.TRANSACTION_STATE_TOPIC_NAME,
77+
numPartitions = brokers().head.config.getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG),
78+
replicationFactor = brokers().head.config.getShort(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG).toInt,
7579
brokers = brokers(),
76-
controllers = controllerServers()
80+
controllers = controllerServers(),
81+
topicConfig = new Properties()
7782
)
7883
} finally {
7984
admin.close()

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package kafka.server
1919

20+
import com.yammer.metrics.core.{Histogram, Meter}
2021
import kafka.cluster.Partition
2122
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
2223
import kafka.network.RequestChannel
@@ -96,7 +97,7 @@ import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authoriz
9697
import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, ShareVersion, StreamsVersion, TransactionVersion}
9798
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs}
9899
import org.apache.kafka.server.logger.LoggingController
99-
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
100+
import org.apache.kafka.server.metrics.{ClientMetricsTestUtils, KafkaYammerMetrics}
100101
import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, SharePartitionKey}
101102
import org.apache.kafka.server.quota.{ClientQuotaManager, ControllerMutationQuota, ControllerMutationQuotaManager, ReplicaQuota, ReplicationQuotaManager, ThrottleCallback}
102103
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
@@ -2262,12 +2263,22 @@ class KafkaApisTest extends Logging {
22622263
assertEquals(expectedError, error)
22632264

22642265
val metricName = if (version < 4) ApiKeys.ADD_PARTITIONS_TO_TXN.name else RequestMetrics.VERIFY_PARTITIONS_IN_TXN_METRIC_NAME
2265-
assertEquals(8, TestUtils.metersCount(metricName))
2266+
assertEquals(8, metersCount(metricName))
22662267
} finally {
22672268
requestMetrics.close()
22682269
}
22692270
}
22702271

2272+
private def metersCount(metricName: String): Long = {
2273+
KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
2274+
.filter { case (k, _) => k.getMBeanName.endsWith(metricName) }
2275+
.values.map {
2276+
case histogram: Histogram => histogram.count()
2277+
case meter: Meter => meter.count()
2278+
case _ => 0
2279+
}.sum
2280+
}
2281+
22712282
@ParameterizedTest
22722283
@ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
22732284
def testAddPartitionsToTxnOperationNotAttempted(version: Short): Unit = {

0 commit comments

Comments
 (0)