Skip to content

Commit e7f8466

Browse files
committed
Merge branch 'trunk' into KAFKA-20167
# Conflicts: # streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
2 parents 1826efa + c72fcda commit e7f8466

636 files changed

Lines changed: 3838 additions & 2270 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/scripts/pr-format.py

Lines changed: 82 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -107,86 +107,102 @@ def split_paragraphs(text: str):
107107
def resolve_reviewer(login: str) -> tuple:
108108
"""Map a GitHub login to (name, email).
109109
110-
Tries three tiers in order: repo commit history, GitHub user profile,
111-
and past `Reviewers:` trailers in git log (matched by name).
112-
Noreply emails (@users.noreply.github.com) are treated as missing since
113-
they are GitHub privacy placeholders that do not identify the reviewer.
114-
Returns (name, None) when no usable email is found; the caller falls
115-
back to the '(@login)' form in the Reviewers trailer.
110+
Tries reviewer email sources in order: repo commit author email, past
111+
`Reviewers:` trailers searched via GitHub commit search API (matched
112+
by name and verified by PR review login), and GitHub user profile
113+
public email. Noreply emails (@users.noreply.github.com) are treated
114+
as missing since they are GitHub privacy placeholders that do not
115+
identify the reviewer. Returns (name, None) when no usable email is
116+
found; the caller falls back to the '(github:login)' form in the
117+
Reviewers trailer.
116118
"""
117119
def _usable_email(e):
118120
if not e or e.endswith("@users.noreply.github.com"):
119121
return None
120122
return e
121123

122-
name = None
123-
email = None
124-
125-
# Tier 1: find from repo commit history. Misses when the reviewer has no
126-
# merged commit in apache/kafka, or had "Keep my email private" enabled
127-
# at commit time (GitHub rewrites the author to the noreply form).
128-
try:
129-
cmd = f"gh api repos/apache/kafka/commits?author={login}&per_page=1"
130-
p = subprocess.run(shlex.split(cmd), capture_output=True, text=True)
131-
if p.returncode == 0:
132-
commits = json.loads(p.stdout)
133-
if commits:
134-
author = commits[0].get("commit", {}).get("author", {})
135-
name = author.get("name")
136-
email = _usable_email(author.get("email"))
137-
except Exception as e:
138-
logger.debug(f"Failed to resolve {login} from commit history: {e}")
139-
140-
# Tier 2: GitHub user profile. Only exposes an email when the reviewer
141-
# has set a Public email in their profile settings.
142-
if not name or not email:
143-
try:
144-
cmd = f"gh api users/{login}"
145-
p = subprocess.run(shlex.split(cmd), capture_output=True, text=True)
146-
if p.returncode == 0:
147-
user = json.loads(p.stdout)
148-
if not name:
149-
name = user.get("name")
150-
if not email:
151-
email = _usable_email(user.get("email"))
152-
except Exception as e:
153-
logger.debug(f"Failed to resolve {login} from GitHub profile: {e}")
154-
155-
# Tier 3: past Reviewers: trailers in git log, matched by name. Catches
156-
# pure reviewers (no commits in apache/kafka, no public profile email)
157-
# who have been credited with a real email in an earlier merged PR.
158-
# git log is newest-first, so the first usable match is the most recent.
159-
if name and not email:
124+
def _run_json(cmd, source):
160125
try:
161-
p = subprocess.run(
162-
["git", "log",
163-
"--pretty=format:%(trailers:key=Reviewers,valueonly=true,unfold=true)"],
164-
capture_output=True, text=True,
165-
)
126+
p = subprocess.run(cmd, capture_output=True, text=True)
166127
if p.returncode == 0:
167-
pattern = re.compile(rf"{re.escape(name)}\s*<([^>]+)>")
168-
for line in p.stdout.splitlines():
169-
for m in pattern.finditer(line):
170-
candidate = _usable_email(m.group(1))
171-
if candidate:
172-
email = candidate
173-
break
174-
if email:
175-
break
128+
return json.loads(p.stdout)
129+
logger.debug(f"Failed to resolve {login} from {source}: {p.stderr}")
176130
except Exception as e:
177-
logger.debug(f"Failed to resolve {login} from past Reviewers trailers: {e}")
178-
179-
if not name:
180-
name = login
131+
logger.debug(f"Failed to resolve {login} from {source}: {e}")
132+
return None
181133

182-
return (name, email)
134+
def _has_pr_review_from_login(commit_sha):
135+
pulls = _run_json(["gh", "api", f"repos/apache/kafka/commits/{commit_sha}/pulls"],
136+
f"associated PRs for commit {commit_sha}") or []
137+
for pull in pulls:
138+
pr_number = pull.get("number")
139+
if not pr_number:
140+
continue
141+
reviews = _run_json(["gh", "api", f"repos/apache/kafka/pulls/{pr_number}/reviews?per_page=100"],
142+
f"reviews for PR {pr_number}") or []
143+
if any((review.get("user") or {}).get("login", "").lower() == login.lower()
144+
for review in reviews):
145+
return True
146+
return False
147+
148+
commits = _run_json(["gh", "api", f"repos/apache/kafka/commits?author={login}&per_page=1"],
149+
"commit history") or []
150+
author = commits[0].get("commit", {}).get("author", {}) if commits else {}
151+
152+
# Tier 1: latest repo commit authored by this GitHub login. Misses
153+
# when the reviewer has no merged commit in apache/kafka, or had
154+
# "Keep my email private" enabled at commit time (GitHub rewrites
155+
# the author to the noreply form).
156+
email = _usable_email(author.get("email"))
157+
if email:
158+
return (author.get("name") or login, email)
159+
160+
user = _run_json(["gh", "api", f"users/{login}"], "GitHub profile") or {}
161+
162+
name_candidates = []
163+
for candidate in (user.get("name"), author.get("name"), login):
164+
if candidate and candidate not in name_candidates:
165+
name_candidates.append(candidate)
166+
167+
name = name_candidates[0] if name_candidates else login
168+
169+
# Tier 2: past Reviewers: trailers in commit history, matched by name,
170+
# via the GitHub commit search API. Catches pure reviewers (no commits
171+
# in apache/kafka, no public profile email) who have been credited
172+
# with a real email in an earlier merged PR. Sort by committer-date
173+
# desc so the most recent email wins if a reviewer has changed it.
174+
# Full-text search is tokenized (not strict substring), so we re-verify
175+
# with a regex client-side. To avoid same-name matches, we only accept
176+
# a trailer email when the matched commit's associated PR includes a
177+
# review from this GitHub login.
178+
for candidate in name_candidates:
179+
results = _run_json(["gh", "search", "commits",
180+
"--repo", "apache/kafka",
181+
f'"{candidate} <"',
182+
"--limit", "10",
183+
"--sort", "committer-date",
184+
"--order", "desc",
185+
"--json", "sha,commit"],
186+
"commit search") or []
187+
pattern = re.compile(rf"{re.escape(candidate)}\s*<([^>]+)>")
188+
for result in results:
189+
msg = result.get("commit", {}).get("message", "")
190+
commit_sha = result.get("sha")
191+
for match in pattern.finditer(msg):
192+
candidate_email = _usable_email(match.group(1))
193+
if candidate_email and commit_sha and _has_pr_review_from_login(commit_sha):
194+
return (candidate, candidate_email)
195+
196+
# Tier 3: GitHub user profile. Only exposes an email when the reviewer
197+
# has set a Public email in their profile settings.
198+
return (name, _usable_email(user.get("email")))
183199

184200

185201
def already_exists(identity: str, existing_reviewers: List[str]) -> bool:
186202
"""Check if a reviewer identity is already in the existing reviewers list.
187203
188204
identity is the delimited token that uniquely identifies a reviewer, either
189-
'<email>' (for the email form) or '(@login)' (for the login fallback).
205+
'<email>' (for the email form) or '(github:login)' (for the login fallback).
190206
"""
191207
return identity.lower() in ", ".join(existing_reviewers).lower()
192208

@@ -246,7 +262,8 @@ def update_reviewers_trailer(body: str, trailer: str) -> str:
246262
if email:
247263
identity = f"<{email}>"
248264
else:
249-
identity = f"(@{reviewer_login})"
265+
# Tier 4: fall back to the GitHub handle without tagging the reviewer.
266+
identity = f"(github:{reviewer_login})"
250267
resolved = f"{name} {identity}"
251268
existing_reviewers = parse_trailers(title, body).get("Reviewers", [])
252269
if not already_exists(identity, existing_reviewers):

.github/workflows/pr-reviewed.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ jobs:
2929
save-pr-number:
3030
name: Save PR Number
3131
runs-on: ubuntu-latest
32+
if: github.event.pull_request.state == 'open'
3233
steps:
3334
- name: Env
3435
run: printenv

build.gradle

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,15 @@ subprojects {
696696

697697
task docsJar(dependsOn: javadocJar)
698698

699+
// Consumed by root aggregatedJavadoc; mirrors compileClasspath.
700+
configurations {
701+
javadocClasspathElements {
702+
canBeConsumed = true
703+
canBeResolved = false
704+
extendsFrom configurations.compileClasspath
705+
}
706+
}
707+
699708
check.dependsOn('javadoc')
700709

701710
task systemTestLibs(dependsOn: jar)
@@ -3991,12 +4000,34 @@ project(':connect:test-plugins') {
39914000
}
39924001
}
39934002

3994-
task aggregatedJavadoc(type: Javadoc, dependsOn: compileJava) {
4003+
// Gradle 9+ forbids resolving a subproject's Configuration from the root.
4004+
// Consume each subproject's javadocClasspathElements via project dependencies instead.
4005+
configurations {
4006+
aggregatedJavadocClasspath {
4007+
canBeConsumed = false
4008+
canBeResolved = true
4009+
}
4010+
}
4011+
4012+
task aggregatedJavadoc(type: Javadoc) {
4013+
destinationDir = file("${layout.buildDirectory.get().asFile.path}/docs/javadoc")
4014+
}
4015+
4016+
gradle.projectsEvaluated {
39954017
def projectsWithJavadoc = subprojects.findAll { it.javadoc.enabled }
3996-
source = projectsWithJavadoc.collect { it.sourceSets.main.allJava }
3997-
classpath = files(projectsWithJavadoc.collect { it.sourceSets.main.compileClasspath })
3998-
includes = projectsWithJavadoc.collectMany { it.javadoc.getIncludes() }
3999-
excludes = projectsWithJavadoc.collectMany { it.javadoc.getExcludes() }
4018+
4019+
projectsWithJavadoc.each { sp ->
4020+
dependencies.add('aggregatedJavadocClasspath',
4021+
dependencies.project(path: sp.path, configuration: 'javadocClasspathElements'))
4022+
}
4023+
4024+
tasks.named('aggregatedJavadoc').configure {
4025+
dependsOn projectsWithJavadoc.collect { "${it.path}:compileJava" }
4026+
source projectsWithJavadoc.collect { it.sourceSets.main.allJava }
4027+
classpath = configurations.aggregatedJavadocClasspath
4028+
includes = projectsWithJavadoc.collectMany { it.javadoc.getIncludes() }
4029+
excludes = projectsWithJavadoc.collectMany { it.javadoc.getExcludes() }
4030+
}
40004031
}
40014032

40024033

checkstyle/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@
269269

270270
<!-- group coordinator -->
271271
<suppress checks="CyclomaticComplexity"
272-
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorRecordSerde|GroupMetadataManagerTestContext).java"/>
272+
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorRecordSerde|GroupMetadataManagerTestContext|GroupConfigTest).java"/>
273273
<suppress checks="(NPathComplexity|MethodLength)"
274274
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GroupCoordinatorShard).java"/>
275275
<suppress checks="ClassFanOutComplexity"

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.kafka.common.test.api.ClusterTestDefaults;
3434
import org.apache.kafka.common.test.api.ClusterTests;
3535
import org.apache.kafka.common.test.api.Type;
36-
import org.apache.kafka.common.utils.LogContext;
36+
import org.apache.kafka.common.utils.internals.LogContext;
3737
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
3838
import org.apache.kafka.raft.KRaftConfigs;
3939
import org.apache.kafka.server.IntegrationTestUtils;

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ public void testLeaderEpoch(ClusterInstance clusterInstance) throws Exception {
240240
@ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"),
241241
@ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack1"),
242242
@ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack2"),
243+
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "1000"),
244+
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "1000"),
243245
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = "org.apache.kafka.clients.consumer.RackAwareAssignor"),
244246
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
245247
}
@@ -251,6 +253,8 @@ public void testLeaderEpoch(ClusterInstance clusterInstance) throws Exception {
251253
@ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"),
252254
@ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack1"),
253255
@ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack2"),
256+
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "1000"),
257+
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "1000"),
254258
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = "org.apache.kafka.clients.consumer.RackAwareAssignor"),
255259
@ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
256260
}
@@ -263,16 +267,19 @@ public void testRackAwareAssignment(ClusterInstance clusterInstance) throws Exec
263267
Consumer<byte[], byte[]> consumer0 = clusterInstance.consumer(Map.of(
264268
ConsumerConfig.GROUP_ID_CONFIG, "group0",
265269
ConsumerConfig.CLIENT_RACK_CONFIG, "rack0",
270+
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
266271
ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()
267272
));
268273
Consumer<byte[], byte[]> consumer1 = clusterInstance.consumer(Map.of(
269274
ConsumerConfig.GROUP_ID_CONFIG, "group0",
270275
ConsumerConfig.CLIENT_RACK_CONFIG, "rack1",
276+
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
271277
ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()
272278
));
273279
Consumer<byte[], byte[]> consumer2 = clusterInstance.consumer(Map.of(
274280
ConsumerConfig.GROUP_ID_CONFIG, "group0",
275281
ConsumerConfig.CLIENT_RACK_CONFIG, "rack2",
282+
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
276283
ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()
277284
))
278285
) {
@@ -288,9 +295,9 @@ public void testRackAwareAssignment(ClusterInstance clusterInstance) throws Exec
288295
consumer2.subscribe(List.of(topic));
289296

290297
TestUtils.waitForCondition(() -> {
291-
consumer0.poll(Duration.ofMillis(1000));
292-
consumer1.poll(Duration.ofMillis(1000));
293-
consumer2.poll(Duration.ofMillis(1000));
298+
consumer0.poll(Duration.ofMillis(100));
299+
consumer1.poll(Duration.ofMillis(100));
300+
consumer2.poll(Duration.ofMillis(100));
294301
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
295302
consumer1.assignment().isEmpty() &&
296303
consumer2.assignment().isEmpty();
@@ -305,9 +312,9 @@ public void testRackAwareAssignment(ClusterInstance clusterInstance) throws Exec
305312
);
306313
clusterInstance.waitTopicCreation(topic, 3);
307314
TestUtils.waitForCondition(() -> {
308-
consumer0.poll(Duration.ofMillis(1000));
309-
consumer1.poll(Duration.ofMillis(1000));
310-
consumer2.poll(Duration.ofMillis(1000));
315+
consumer0.poll(Duration.ofMillis(100));
316+
consumer1.poll(Duration.ofMillis(100));
317+
consumer2.poll(Duration.ofMillis(100));
311318
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
312319
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
313320
consumer2.assignment().isEmpty();
@@ -322,9 +329,9 @@ public void testRackAwareAssignment(ClusterInstance clusterInstance) throws Exec
322329
);
323330
clusterInstance.waitTopicCreation(topic, 6);
324331
TestUtils.waitForCondition(() -> {
325-
consumer0.poll(Duration.ofMillis(1000));
326-
consumer1.poll(Duration.ofMillis(1000));
327-
consumer2.poll(Duration.ofMillis(1000));
332+
consumer0.poll(Duration.ofMillis(100));
333+
consumer1.poll(Duration.ofMillis(100));
334+
consumer2.poll(Duration.ofMillis(100));
328335
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
329336
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
330337
consumer2.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4), new TopicPartition(topic, 5)));
@@ -346,13 +353,13 @@ public void testRackAwareAssignment(ClusterInstance clusterInstance) throws Exec
346353
new TopicPartition(topic, 5), Optional.of(new NewPartitionReassignment(List.of(0)))
347354
)).all().get();
348355
TestUtils.waitForCondition(() -> {
349-
consumer0.poll(Duration.ofMillis(1000));
350-
consumer1.poll(Duration.ofMillis(1000));
351-
consumer2.poll(Duration.ofMillis(1000));
356+
consumer0.poll(Duration.ofMillis(100));
357+
consumer1.poll(Duration.ofMillis(100));
358+
consumer2.poll(Duration.ofMillis(100));
352359
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 5))) &&
353360
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4))) &&
354361
consumer2.assignment().equals(Set.of(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2)));
355-
}, "Consumer with topic partition mapping should be 0 -> 5 | 1 -> 3, 4 | 2 -> 0, 1, 2");
362+
}, 30000, "Consumer with topic partition mapping should be 0 -> 5 | 1 -> 3, 4 | 2 -> 0, 1, 2");
356363
}
357364
}
358365

0 commit comments

Comments
 (0)