2222import org .apache .kafka .clients .consumer .internals .events .ErrorEvent ;
2323import org .apache .kafka .clients .consumer .internals .metrics .HeartbeatMetricsManager ;
2424import org .apache .kafka .common .TopicPartition ;
25+ import org .apache .kafka .common .errors .GroupAuthorizationException ;
26+ import org .apache .kafka .common .errors .RetriableException ;
27+ import org .apache .kafka .common .errors .UnsupportedVersionException ;
2528import org .apache .kafka .common .message .StreamsGroupHeartbeatRequestData ;
2629import org .apache .kafka .common .message .StreamsGroupHeartbeatResponseData ;
2730import org .apache .kafka .common .metrics .Metrics ;
5962 */
6063public class StreamsGroupHeartbeatRequestManager implements RequestManager {
6164
65+ private static final String UNSUPPORTED_VERSION_ERROR_MESSAGE = "The cluster does not support the STREAMS group " +
66+ "protocol or does not support the versions of the STREAMS group protocol used by this client " +
67+ "(used versions: " + StreamsGroupHeartbeatRequestData .LOWEST_SUPPORTED_VERSION + " to " +
68+ StreamsGroupHeartbeatRequestData .HIGHEST_SUPPORTED_VERSION + ")." ;
69+
6270 static class HeartbeatState {
6371
6472 // Fields of StreamsGroupHeartbeatRequest sent in the most recent request
@@ -409,6 +417,8 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequestAndHandleRespons
409417 if (response != null ) {
410418 metricsManager .recordRequestLatency (response .requestLatencyMs ());
411419 onResponse ((StreamsGroupHeartbeatResponse ) response .responseBody (), completionTimeMs );
420+ } else {
421+ onFailure (exception , completionTimeMs );
412422 }
413423 });
414424 }
@@ -428,6 +438,8 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long curr
428438 private void onResponse (final StreamsGroupHeartbeatResponse response , long currentTimeMs ) {
429439 if (Errors .forCode (response .data ().errorCode ()) == Errors .NONE ) {
430440 onSuccessResponse (response , currentTimeMs );
441+ } else {
442+ onErrorResponse (response , currentTimeMs );
431443 }
432444 }
433445
@@ -451,6 +463,146 @@ private void onSuccessResponse(final StreamsGroupHeartbeatResponse response, fin
451463 membershipManager .onHeartbeatSuccess (response );
452464 }
453465
466+ private void onErrorResponse (final StreamsGroupHeartbeatResponse response , final long currentTimeMs ) {
467+ final Errors error = Errors .forCode (response .data ().errorCode ());
468+ final String errorMessage = response .data ().errorMessage ();
469+
470+ heartbeatState .reset ();
471+ this .heartbeatRequestState .onFailedAttempt (currentTimeMs );
472+
473+ switch (error ) {
474+ case NOT_COORDINATOR :
475+ logInfo (
476+ String .format ("StreamsGroupHeartbeatRequest failed because the group coordinator %s is incorrect. " +
477+ "Will attempt to find the coordinator again and retry" , coordinatorRequestManager .coordinator ()),
478+ response ,
479+ currentTimeMs
480+ );
481+ coordinatorRequestManager .markCoordinatorUnknown (errorMessage , currentTimeMs );
482+ // Skip backoff so that the next HB is sent as soon as the new coordinator is discovered
483+ heartbeatRequestState .reset ();
484+ break ;
485+
486+ case COORDINATOR_NOT_AVAILABLE :
487+ logInfo (
488+ String .format ("StreamsGroupHeartbeatRequest failed because the group coordinator %s is not available. " +
489+ "Will attempt to find the coordinator again and retry" , coordinatorRequestManager .coordinator ()),
490+ response ,
491+ currentTimeMs
492+ );
493+ coordinatorRequestManager .markCoordinatorUnknown (errorMessage , currentTimeMs );
494+ // Skip backoff so that the next HB is sent as soon as the new coordinator is discovered
495+ heartbeatRequestState .reset ();
496+ break ;
497+
498+ case COORDINATOR_LOAD_IN_PROGRESS :
499+ logInfo (
500+ String .format ("StreamsGroupHeartbeatRequest failed because the group coordinator %s is still loading. " +
501+ "Will retry" , coordinatorRequestManager .coordinator ()),
502+ response ,
503+ currentTimeMs
504+ );
505+ break ;
506+
507+ case GROUP_AUTHORIZATION_FAILED :
508+ GroupAuthorizationException exception =
509+ GroupAuthorizationException .forGroupId (membershipManager .groupId ());
510+ logger .error ("StreamsGroupHeartbeatRequest failed due to group authorization failure: {}" ,
511+ exception .getMessage ());
512+ handleFatalFailure (error .exception (exception .getMessage ()));
513+ break ;
514+
515+ case TOPIC_AUTHORIZATION_FAILED :
516+ logger .error ("StreamsGroupHeartbeatRequest failed for member {} with state {} due to {}: {}" ,
517+ membershipManager .memberId (), membershipManager .state (), error , errorMessage );
518+ // Propagate auth error received in HB so that it's returned on poll.
519+ // Member should stay in its current state so it can recover if ever the missing ACLs are added.
520+ backgroundEventHandler .add (new ErrorEvent (error .exception ()));
521+ break ;
522+
523+ case INVALID_REQUEST :
524+ case GROUP_MAX_SIZE_REACHED :
525+ case STREAMS_INVALID_TOPOLOGY :
526+ case STREAMS_INVALID_TOPOLOGY_EPOCH :
527+ case STREAMS_TOPOLOGY_FENCED :
528+ logger .error ("StreamsGroupHeartbeatRequest failed due to {}: {}" , error , errorMessage );
529+ handleFatalFailure (error .exception (errorMessage ));
530+ break ;
531+
532+ case FENCED_MEMBER_EPOCH :
533+ logInfo (
534+ String .format ("StreamsGroupHeartbeatRequest failed for member %s because epoch %s is fenced." ,
535+ membershipManager .memberId (), membershipManager .memberEpoch ()),
536+ response ,
537+ currentTimeMs
538+ );
539+ membershipManager .onFenced ();
540+ // Skip backoff so that a next HB to rejoin is sent as soon as the fenced member releases its assignment
541+ heartbeatRequestState .reset ();
542+ break ;
543+
544+ case UNKNOWN_MEMBER_ID :
545+ logInfo (
546+ String .format ("StreamsGroupHeartbeatRequest failed because member %s is unknown." ,
547+ membershipManager .memberId ()),
548+ response ,
549+ currentTimeMs
550+ );
551+ membershipManager .onFenced ();
552+ // Skip backoff so that a next HB to rejoin is sent as soon as the fenced member releases its assignment
553+ heartbeatRequestState .reset ();
554+ break ;
555+
556+ case UNSUPPORTED_VERSION :
557+ logger .error ("StreamsGroupHeartbeatRequest failed due to {}: {}" , error , UNSUPPORTED_VERSION_ERROR_MESSAGE );
558+ handleFatalFailure (error .exception (UNSUPPORTED_VERSION_ERROR_MESSAGE ));
559+ break ;
560+
561+ default :
562+ logger .error ("StreamsGroupHeartbeatRequest failed due to unexpected error {}: {}" , error , errorMessage );
563+ handleFatalFailure (error .exception (errorMessage ));
564+ }
565+ membershipManager .onFatalHeartbeatFailure ();
566+ }
567+
568+ private void logInfo (final String message ,
569+ final StreamsGroupHeartbeatResponse response ,
570+ final long currentTimeMs ) {
571+ logger .info ("{} in {}ms: {}" ,
572+ message ,
573+ heartbeatRequestState .remainingBackoffMs (currentTimeMs ),
574+ response .data ().errorMessage ());
575+ }
576+
577+ private void onFailure (final Throwable exception , final long responseTimeMs ) {
578+ heartbeatRequestState .onFailedAttempt (responseTimeMs );
579+ heartbeatState .reset ();
580+ if (exception instanceof RetriableException ) {
581+ coordinatorRequestManager .handleCoordinatorDisconnect (exception , responseTimeMs );
582+ String message = String .format ("StreamsGroupHeartbeatRequest failed because of a retriable exception. Will retry in %s ms: %s" ,
583+ heartbeatRequestState .remainingBackoffMs (responseTimeMs ),
584+ exception .getMessage ());
585+ logger .debug (message );
586+ membershipManager .onRetriableHeartbeatFailure ();
587+ } else {
588+ if (exception instanceof UnsupportedVersionException ) {
589+ logger .error ("StreamsGroupHeartbeatRequest failed because of an unsupported version exception: {}" ,
590+ exception .getMessage ());
591+ handleFatalFailure (new UnsupportedVersionException (UNSUPPORTED_VERSION_ERROR_MESSAGE ));
592+ } else {
593+ logger .error ("StreamsGroupHeartbeatRequest failed because of a fatal exception while sending request: {}" ,
594+ exception .getMessage ());
595+ handleFatalFailure (exception );
596+ }
597+ membershipManager .onFatalHeartbeatFailure ();
598+ }
599+ }
600+
601+ private void handleFatalFailure (Throwable error ) {
602+ backgroundEventHandler .add (new ErrorEvent (error ));
603+ membershipManager .transitionToFatal ();
604+ }
605+
454606 private static Map <StreamsRebalanceData .HostInfo , List <TopicPartition >> convertHostInfoMap (final StreamsGroupHeartbeatResponseData data ) {
455607 Map <StreamsRebalanceData .HostInfo , List <TopicPartition >> partitionsByHost = new HashMap <>();
456608 data .partitionsByUserEndpoint ().forEach (endpoint -> {
0 commit comments