Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14421

OffsetFetchRequest throws NPE Exception

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Resolved
    • 0.10.2.0, 0.10.2.1, 0.10.2.2
    • None
    • clients
    • None

    Description

      when I use 0.10.2 client send Metadata request to 0.10.0 server, NPE exception happens,

      the NPE exception quite confused me, because if just send Metadata request doest not cause the NPE exception occurs, after troubleshooting the problem, It is the NetworkClient#poll call ConsumerNetworkClient#trySend and further call NetworkClient#doSend when trying to build OffsetFetchRequest, because the 0.10.0 server doest not support fetch all TopicPartitions, it throw UnsupportedVersionException,

      private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
              String nodeId = clientRequest.destination();
              ......
              AbstractRequest request = null;
              AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
              try {
                  NodeApiVersions versionInfo = nodeApiVersions.get(nodeId);
                  // Note: if versionInfo is null, we have no server version information. This would be
                  // the case when sending the initial ApiVersionRequest which fetches the version
                  // information itself.  It is also the case when discoverBrokerVersions is set to false.
                  if (versionInfo == null) {
                      if (discoverBrokerVersions && log.isTraceEnabled())
                          log.trace("No version information found when sending message of type {} to node {}. " +
                                  "Assuming version {}.", clientRequest.apiKey(), nodeId, builder.version());
                  } else {
                      short version = versionInfo.usableVersion(clientRequest.apiKey());
                      builder.setVersion(version);
                  }
                  // The call to build may also throw UnsupportedVersionException, if there are essential
                  // fields that cannot be represented in the chosen version.
                  request = builder.build();
              } catch (UnsupportedVersionException e) {
                  // If the version is not supported, skip sending the request over the wire.
                  // Instead, simply add it to the local queue of aborted requests.
                  log.debug("Version mismatch when attempting to send {} to {}",
                          clientRequest.toString(), clientRequest.destination(), e);
                  ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(),
                          clientRequest.callback(), clientRequest.destination(), now, now,
                          false, e, null);
                  abortedSends.add(clientResponse);
                  return;
              }
      

      until now, all are expected, but unfortunately, in catch UnsupportedVersionException code block, clientRequest.toString need to call requestBuilder#toString, that is OffsetFetchRequest's Builder#toString, when partition is ALL_TOPIC_PARTITIONS, it is null, therefore it cause the unexpected NPE, and make the normal MetadataRequest failed..

      catch (UnsupportedVersionException e) {
               
                  log.debug("Version mismatch when attempting to send {} to {}",
                          clientRequest.toString(), clientRequest.destination(), e);
                  ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(),
                          clientRequest.callback(), clientRequest.destination(), now, now,
                          false, e, null);
                  abortedSends.add(clientResponse);
                  return;
              }
      
      ClientRequest#toString()
         public String toString() {
              return "ClientRequest(expectResponse=" + expectResponse +
                  ", callback=" + callback +
                  ", destination=" + destination +
                  ", correlationId=" + correlationId +
                  ", clientId=" + clientId +
                  ", createdTimeMs=" + createdTimeMs +
                  ", requestBuilder=" + requestBuilder +
                  ")";
          }
      
        OffsetFetchRequest's Builder#toString
              public String toString() {
                  StringBuilder bld = new StringBuilder();
                  bld.append("(type=OffsetFetchRequest, ").
                          append("groupId=").append(groupId).
                          append(", partitions=").append(Utils.join(partitions, ",")). // cause NPE
                          append(")");
                  return bld.toString();
              }
      
      

      I think the NPE is unexpected, when broker doest not support specific protocal, It should not throw NPE instead of UnsupportedVersionException, and I find in 0.11 or later version
      it is fixed, but the OffsetFetchRequest support ALL_TOPIC_PARTITIONS is introduced in 0.10.2 kIP88, Therefore, I think it should also be fixed in 0.10.2 client

      look forward to any reply , Thanks~

      Attachments

        Issue Links

          Activity

            People

              ws yws
              ws yws
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: