Kafka
  1. Kafka
  2. KAFKA-576

SimpleConsumer throws UnsupportedOperationException: empty.head

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      • In this case, there are 15 log segment files in broker-1 data dir:

      ls -l /tmp/kafka_server_1_logs/test_1-0/
      total 240
      rw-rr- 1 jfung eng 16 Oct 16 10:41 00000000000000000000.index
      rw-rr- 1 jfung eng 10440 Oct 16 10:40 00000000000000000000.log
      rw-rr- 1 jfung eng 8 Oct 16 10:41 00000000000000000020.index
      rw-rr- 1 jfung eng 10440 Oct 16 10:40 00000000000000000020.log
      . . .
      rw-rr- 1 jfung eng 8 Oct 16 10:41 00000000000000000280.index
      rw-rr- 1 jfung eng 10440 Oct 16 10:41 00000000000000000280.log

      • The following are the dump log segment of the first log segment file

      bin/kafka-run-class.sh kafka.tools.DumpLogSegments /tmp/kafka_server_1_logs/test_1-0/00000000000000000000.log
      Dumping /tmp/kafka_server_1_logs/test_1-0/00000000000000000000.log
      Starting offset: 0
      offset: 0 isvalid: true payloadsize: 500 magic: 2 compresscodec: NoCompressionCodec crc: 1663889063
      offset: 1 isvalid: true payloadsize: 500 magic: 2 compresscodec: NoCompressionCodec crc: 2803454828
      offset: 2 isvalid: true payloadsize: 500 magic: 2 compresscodec: NoCompressionCodec crc: 683347625
      . . .
      offset: 18 isvalid: true payloadsize: 500 magic: 2 compresscodec: NoCompressionCodec crc: 1892511043
      offset: 19 isvalid: true payloadsize: 500 magic: 2 compresscodec: NoCompressionCodec crc: 601297044

      • Output of SimpleConsumerShell:
        . . .
        next offset = 16
        Topic:test_1:ThreadID:2:MessageID:0000000043:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        next offset = 17
        Topic:test_1:ThreadID:3:MessageID:0000000063:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        next offset = 18
        Topic:test_1:ThreadID:4:MessageID:0000000083:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        next offset = 19
        Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        next offset = 19
        Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        next offset = 19
        Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        next offset = 19
        Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        . . .
      • It appears that SimpleConsumerShell doesn't advance to the next log segment file
      • It should probably block inside the while loop to prevent infinite looping
      1. kafka_server_9093.log.gz
        3 kB
        John Fung
      2. kafka_576_v1.diff
        3 kB
        Yang Ye

        Issue Links

          Activity

          Hide
          Jun Rao added a comment -

          Thanks for the patch. Committed to 0.8 with the following minor changes:

          1. Use nextOffset instead of offset +1 to advance the offset.
          2. Removed unused imports and variables.
          3. Break long lines into multiple lines.

          Show
          Jun Rao added a comment - Thanks for the patch. Committed to 0.8 with the following minor changes: 1. Use nextOffset instead of offset +1 to advance the offset. 2. Removed unused imports and variables. 3. Break long lines into multiple lines.
          Hide
          Yang Ye added a comment -

          1. add --no-wait-logend command line option

          2. bug found, before in SimpleConsumer line 40:
          val request = if(isFromOrdinaryConsumer)
          OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
          else
          OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), Request.DebuggingConsumerId)

          In the else branch, we are intending to use another constructor, but it turned out if we used this way, it's the same constructor. By using "new OffsetRequest(...", we uses the constructor we want, as bellow:

          val request = if(isFromOrdinaryConsumer)
          OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
          else
          new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), Request.DebuggingConsumerId)

          Show
          Yang Ye added a comment - 1. add --no-wait-logend command line option 2. bug found, before in SimpleConsumer line 40: val request = if(isFromOrdinaryConsumer) OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))) else OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), Request.DebuggingConsumerId) In the else branch, we are intending to use another constructor, but it turned out if we used this way, it's the same constructor. By using "new OffsetRequest(...", we uses the constructor we want, as bellow: val request = if(isFromOrdinaryConsumer) OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))) else new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), Request.DebuggingConsumerId)
          Hide
          John Fung added a comment -

          Attached a server log4j messages file

          Show
          John Fung added a comment - Attached a server log4j messages file
          Hide
          John Fung added a comment - - edited

          1. SimpleConsumerShell will receive data from 1 of the broker in a 3-broker cluster with this change:

          $ svn diff core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
          Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
          ===================================================================
          — core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (revision 1400944)
          +++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (working copy)
          @@ -186,7 +186,7 @@
          var consumed = 0
          for(messageAndOffset <- messageSet) {
          try

          { - offset = messageAndOffset.offset + offset = messageAndOffset.nextOffset if(printOffsets) System.out.println("next offset = " + offset) formatter.writeTo(messageAndOffset.message, System.out) 2. By printing out the producedOffset in SimpleConsumer and showing -1: $ svn diff core/src/main/scala/kafka/consumer/SimpleConsumer.scala Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/SimpleConsumer.scala (revision 1400944) +++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala (working copy) @@ -50,6 +50,7 @@ if (simpleConsumer != null) simpleConsumer.close() }

          + System.out.println("====> producedOffset : " + producedOffset)
          producedOffset
          }

          3. Exception is thrown from SimpleConsumer.earliestOrLatestOffset( ):

          $ bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9093 --topic test_1 --partition 0 --replica 3

          [2012-10-22 10:41:56,525] INFO Getting topic metatdata... (kafka.tools.SimpleConsumerShell$)
          [2012-10-22 10:41:56,583] INFO Fetching metadata for topic Set(test_1) (kafka.client.ClientUtils$)
          [2012-10-22 10:41:56,589] INFO Connected to localhost:9093 for producing (kafka.producer.SyncProducer)
          [2012-10-22 10:41:56,751] INFO Disconnecting from localhost:9093 (kafka.producer.SyncProducer)
          [2012-10-22 10:41:56,784] ERROR error in earliestOrLatestOffset() (kafka.consumer.SimpleConsumer$)
          java.lang.UnsupportedOperationException: empty.head
          at scala.collection.immutable.Vector.head(Vector.scala:162)
          at kafka.consumer.SimpleConsumer$.earliestOrLatestOffset(SimpleConsumer.scala:45)
          at kafka.tools.SimpleConsumerShell$.main(SimpleConsumerShell.scala:169)
          at kafka.tools.SimpleConsumerShell.main(SimpleConsumerShell.scala)
          ====> producedOffset : -1
          [2012-10-22 10:41:56,786] INFO Starting simple consumer shell to partition [test_1, 0], replica [3], host and port: [127.0.0.1, 9093], from offset [-1] (kafka.tools.SimpleConsumerShell$)

          Show
          John Fung added a comment - - edited 1. SimpleConsumerShell will receive data from 1 of the broker in a 3-broker cluster with this change: $ svn diff core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala =================================================================== — core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (revision 1400944) +++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (working copy) @@ -186,7 +186,7 @@ var consumed = 0 for(messageAndOffset <- messageSet) { try { - offset = messageAndOffset.offset + offset = messageAndOffset.nextOffset if(printOffsets) System.out.println("next offset = " + offset) formatter.writeTo(messageAndOffset.message, System.out) 2. By printing out the producedOffset in SimpleConsumer and showing -1: $ svn diff core/src/main/scala/kafka/consumer/SimpleConsumer.scala Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/SimpleConsumer.scala (revision 1400944) +++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala (working copy) @@ -50,6 +50,7 @@ if (simpleConsumer != null) simpleConsumer.close() } + System.out.println("====> producedOffset : " + producedOffset) producedOffset } 3. Exception is thrown from SimpleConsumer.earliestOrLatestOffset( ): $ bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9093 --topic test_1 --partition 0 --replica 3 [2012-10-22 10:41:56,525] INFO Getting topic metatdata... (kafka.tools.SimpleConsumerShell$) [2012-10-22 10:41:56,583] INFO Fetching metadata for topic Set(test_1) (kafka.client.ClientUtils$) [2012-10-22 10:41:56,589] INFO Connected to localhost:9093 for producing (kafka.producer.SyncProducer) [2012-10-22 10:41:56,751] INFO Disconnecting from localhost:9093 (kafka.producer.SyncProducer) [2012-10-22 10:41:56,784] ERROR error in earliestOrLatestOffset() (kafka.consumer.SimpleConsumer$) java.lang.UnsupportedOperationException: empty.head at scala.collection.immutable.Vector.head(Vector.scala:162) at kafka.consumer.SimpleConsumer$.earliestOrLatestOffset(SimpleConsumer.scala:45) at kafka.tools.SimpleConsumerShell$.main(SimpleConsumerShell.scala:169) at kafka.tools.SimpleConsumerShell.main(SimpleConsumerShell.scala) ====> producedOffset : -1 [2012-10-22 10:41:56,786] INFO Starting simple consumer shell to partition [test_1, 0] , replica [3] , host and port: [127.0.0.1, 9093] , from offset [-1] (kafka.tools.SimpleConsumerShell$)
          Hide
          John Fung added a comment -

          This is a minor fix which is needed in kafka-571-v1.patch. Therefore, it is included in that patch.

          Show
          John Fung added a comment - This is a minor fix which is needed in kafka-571-v1.patch. Therefore, it is included in that patch.

            People

            • Assignee:
              Yang Ye
              Reporter:
              John Fung
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development