Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10208

org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.OffsetFetchResponseHandler return null when Broker unexpectedly doesn't support requireStable flag on version while not any information

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.7.0
    • None
    • clients
    • None

    Description

      When the 2.7.0 client try to request the broker whose version is 2.3.0, the OffsetAndMetadata will be null and miss the Key information.

      I have create the test case as below :

      @Test
      public void testCreateTopicAndCheckTheOffsite() throws ExecutionException, InterruptedException {
      String topicName = UUID.randomUUID().toString();
      String groupId = "DEMO_" + topicName;
      final Properties props = new Properties();
      props.put(StreamsConfig.APPLICATION_ID_CONFIG, groupId);
      props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
      String serializer = StringSerializer.class.getName();
      String deserializer = StringDeserializer.class.getName();
      props.put("auto.offset.reset", "latest");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", deserializer);
      props.put("value.deserializer", deserializer);
      props.put("key.serializer", serializer);
      props.put("value.serializer", serializer);
      props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
      AdminClient adminClient = AdminClient.create(props);
      boolean topicExist = false;
      try

      { NewTopic newTopic = new NewTopic(topicName, 1, (short) 1); CreateTopicsOptions createTopicsOptions = new CreateTopicsOptions(); createTopicsOptions.timeoutMs(3000000); final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic), createTopicsOptions); createTopicsResult.values().get(topicName).get(); }

      catch (TopicExistsException e)

      { topicExist = true; }

      try {
      List<TopicPartition> topicPartitions = new ArrayList<>();
      KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(props);
      Field kafkaClientField = kafkaConsumer.getClass().getDeclaredField("client");
      kafkaClientField.setAccessible(true);
      ConsumerNetworkClient client = (ConsumerNetworkClient) kafkaClientField.get(kafkaConsumer);

      FindCoordinatorRequest.Builder findCoordinatorRequest =
      new FindCoordinatorRequest.Builder(
      new FindCoordinatorRequestData()
      .setKeyType(FindCoordinatorRequest.CoordinatorType.GROUP.id())
      .setKey(groupId));
      Node node = client.leastLoadedNode();
      Node coordinator;
      RequestFuture<Node> requestCoordinatorFuture = client.send(node, findCoordinatorRequest)
      .compose(new RequestFutureAdapter<ClientResponse, Node>() {

      @Override
      public void onFailure(RuntimeException e, RequestFuture<Node> future)

      { super.onFailure(e, future); }

      @Override
      public void onSuccess(ClientResponse value, RequestFuture<Node> future) {
      Node coordinator;
      FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) value.responseBody();
      Errors error = findCoordinatorResponse.error();
      if (error == Errors.NONE) { // use MAX_VALUE - node.id as the coordinator id to allow separate connections // for the coordinator in the underlying network client layer int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId(); coordinator = new Node( coordinatorConnectionId, findCoordinatorResponse.data().host(), findCoordinatorResponse.data().port()); client.tryConnect(coordinator); future.complete(coordinator); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { Assert.fail(error.message()); } else { future.raise(error); }
      }
      });

      client.poll(requestCoordinatorFuture);
      if (requestCoordinatorFuture.succeeded()) { coordinator = requestCoordinatorFuture.value(); } else { throw requestCoordinatorFuture.exception(); }

      OffsetFetchRequest.Builder requestBuilder =
      new OffsetFetchRequest.Builder(groupId, true, topicPartitions, true);

      RequestFuture<Map<TopicPartition, OffsetAndMetadata>> topicPartitionMetadataRequestFuture = client.send(coordinator, requestBuilder)
      .compose(new RequestFutureAdapter<ClientResponse, Map<TopicPartition, OffsetAndMetadata>>() {
      @Override
      public void onSuccess(ClientResponse value, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
      OffsetFetchResponse response = (OffsetFetchResponse) value.responseBody();

      if (response.hasError()) {
      Errors error = response.error();
      if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { // just retry future.raise(error); } else if (error == Errors.NOT_COORDINATOR) { // re-discover the coordinator and retry future.raise(error); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { Assert.fail(Errors.GROUP_AUTHORIZATION_FAILED + ""); } else { future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message())); }
      return;
      }

      Set<String> unauthorizedTopics = null;
      Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
      Set<TopicPartition> unstableTxnOffsetTopicPartitions = new HashSet<>();
      for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
      TopicPartition tp = entry.getKey();
      OffsetFetchResponse.PartitionData partitionData = entry.getValue();
      if (partitionData.hasError()) {
      Errors error = partitionData.error;
      if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { future.raise(new KafkaException("Topic or Partition " + tp + " does not exist")); return; } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
      if (unauthorizedTopics == null) { unauthorizedTopics = new HashSet<>(); }
      unauthorizedTopics.add(tp.topic());
      } else if (error == Errors.UNSTABLE_OFFSET_COMMIT) { unstableTxnOffsetTopicPartitions.add(tp); } else { future.raise(new KafkaException("Unexpected error in fetch offset response for partition " + tp + ": " + error.message())); return; }
      } else if (partitionData.offset >= 0) { // record the position with the offset (-1 indicates no committed offset to fetch); // if there's no committed offset, record as null offsets.put(tp, new OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, partitionData.metadata)); } else {
      try { HashMap<TopicPartition, OffsetSpec> offsetMap = new HashMap<>(); offsetMap.put(tp, OffsetSpec.earliest()); ListOffsetsResult listOffsetsResult = adminClient.listOffsets(offsetMap); Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> topicPartitionListOffsetsResultInfoMap = listOffsetsResult.all().get(); ListOffsetsResult.ListOffsetsResultInfo offsetsResultInfo = topicPartitionListOffsetsResultInfoMap.get(tp); offsets.put(tp, new OffsetAndMetadata(offsetsResultInfo.offset(), offsetsResultInfo.leaderEpoch(), "")); } catch (Exception e) { Assert.fail(e.getMessage()); }
      }
      Assert.fail("not found the topic and partition");
      }

      if (unauthorizedTopics != null) { future.raise(new TopicAuthorizationException(unauthorizedTopics)); } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) { // just retry future.raise(new UnstableOffsetCommitException("There are unstable offsets for the requested topic partitions")); } else { future.complete(offsets); }
      }

      @Override
      public void onFailure(RuntimeException e, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) { super.onFailure(e, future); }

      });

      client.poll(topicPartitionMetadataRequestFuture);

      if(topicPartitionMetadataRequestFuture.succeeded())

      { Map<TopicPartition, OffsetAndMetadata> value = topicPartitionMetadataRequestFuture.value(); Assert.assertNotNull(value); }

      else

      { Assert.fail(topicPartitionMetadataRequestFuture.exception().getMessage()); }

      }catch (Exception e)

      { Assert.fail(e.getMessage()); }

      finally {
      if(topicExist)

      { List<String> topicToDeleted = new ArrayList<>(); DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicToDeleted); deleteTopicsResult.all().get(); }

      }
      }

      Attachments

        Activity

          People

            Unassigned Unassigned
            Jack-Lee lqjacklee
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: