Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6662

Consumer use offsetsForTimes() get offset return None.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Not A Problem
    • 0.10.2.0
    • None
    • core
    • None

    Description

      When we use Consumer's method  offsetsForTimes()  to get the topic-partition offset, sometimes it will return null. Print the client log

      // 2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop interval 256 upload 0 retry 0 fail 0 (com.meituan.mtrace.collector.sg.AbstractCollector)
      [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
      [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
      [2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
      [2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53. (org.apache.kafka.clients.NetworkClient)
      [2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) (org.apache.kafka.clients.NetworkClient)
      [2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for org.matt_test2-0. Fetched offset -1, timestamp -1 (org.apache.kafka.clients.consumer.internals.Fetcher)

      From the log, we find broker return the offset, but it's value is -1, this value will be removed in Fetcher.handleListOffsetResponse(),

      // // Handle v1 and later response
      log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
              topicPartition, partitionData.offset, partitionData.timestamp);
      if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
          OffsetData offsetData = new OffsetData(partitionData.offset, partitionData.timestamp);
          timestampOffsetMap.put(topicPartition, offsetData);
      }

      We test several situations, and we found that in the following two cases it will return none.

      1. The topic-partition msg number is 0, when we use offsetsForTimes() to get the offset, the offset will retuan -1;
      2.  The targetTime we use to find offset is larger than the partition active_segment's largestTimestamp, the offset will return -1;

      If the offset is set -1, it will not be return to consumer client. I think in these situation, it should be return the latest offset, and it's also defined in kafka/core annotation.

      // /**
       * Search the message offset based on timestamp.
       * This method returns an option of TimestampOffset. The offset is the offset of the first message whose timestamp is
       * greater than or equals to the target timestamp.
       *
       * If all the message in the segment have smaller timestamps, the returned offset will be last offset + 1 and the
       * timestamp will be max timestamp in the segment.
       *
       * If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp,
       * the returned the offset will be the base offset of the segment and the timestamp will be Message.NoTimestamp.
       *
       * This methods only returns None when the log is not empty but we did not see any messages when scanning the log
       * from the indexed position. This could happen if the log is truncated after we get the indexed position but
       * before we scan the log from there. In this case we simply return None and the caller will need to check on
       * the truncated log and maybe retry or even do the search on another log segment.
       *
       * @param timestamp The timestamp to search for.
       * @return the timestamp and offset of the first message whose timestamp is larger than or equals to the
       *         target timestamp. None will be returned if there is no such message.
       */
      def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
        // Get the index entry with a timestamp less than or equal to the target timestamp
        val timestampOffset = timeIndex.lookup(timestamp)
        val position = index.lookup(timestampOffset.offset).position
        // Search the timestamp
        log.searchForTimestamp(timestamp, position)
      }
      

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              wangzzu Matt Wang
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: