Kafka
  1. Kafka
  2. KAFKA-576

SimpleConsumer throws UnsupportedOperationException: empty.head

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: None
    • Labels:
      None

      Description

      • In this case, there are 15 log segment files in broker-1 data dir:

      ls -l /tmp/kafka_server_1_logs/test_1-0/
      total 240
      rw-rr- 1 jfung eng 16 Oct 16 10:41 00000000000000000000.index
      rw-rr- 1 jfung eng 10440 Oct 16 10:40 00000000000000000000.log
      rw-rr- 1 jfung eng 8 Oct 16 10:41 00000000000000000020.index
      rw-rr- 1 jfung eng 10440 Oct 16 10:40 00000000000000000020.log
      . . .
      rw-rr- 1 jfung eng 8 Oct 16 10:41 00000000000000000280.index
      rw-rr- 1 jfung eng 10440 Oct 16 10:41 00000000000000000280.log

      • The following are the dump log segment of the first log segment file

      bin/kafka-run-class.sh kafka.tools.DumpLogSegments /tmp/kafka_server_1_logs/test_1-0/00000000000000000000.log
      Dumping /tmp/kafka_server_1_logs/test_1-0/00000000000000000000.log
      Starting offset: 0
      offset: 0 isvalid: true payloadsize: 500 magic: 2 compresscodec: NoCompressionCodec crc: 1663889063
      offset: 1 isvalid: true payloadsize: 500 magic: 2 compresscodec: NoCompressionCodec crc: 2803454828
      offset: 2 isvalid: true payloadsize: 500 magic: 2 compresscodec: NoCompressionCodec crc: 683347625
      . . .
      offset: 18 isvalid: true payloadsize: 500 magic: 2 compresscodec: NoCompressionCodec crc: 1892511043
      offset: 19 isvalid: true payloadsize: 500 magic: 2 compresscodec: NoCompressionCodec crc: 601297044

      • Output of SimpleConsumerShell:
        . . .
        next offset = 16
        Topic:test_1:ThreadID:2:MessageID:0000000043:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        next offset = 17
        Topic:test_1:ThreadID:3:MessageID:0000000063:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        next offset = 18
        Topic:test_1:ThreadID:4:MessageID:0000000083:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        next offset = 19
        Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        next offset = 19
        Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        next offset = 19
        Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        next offset = 19
        Topic:test_1:ThreadID:0:MessageID:0000000003:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        . . .
      • It appears that SimpleConsumerShell doesn't advance to the next log segment file
      • It should probably block inside the while loop to prevent infinite looping
      1. kafka_server_9093.log.gz
        3 kB
        John Fung
      2. kafka_576_v1.diff
        3 kB
        Yang Ye

        Issue Links

          Activity

          Jun Rao made changes -
          Status Resolved [ 5 ] Closed [ 6 ]
          Jun Rao made changes -
          Status Open [ 1 ] Resolved [ 5 ]
          Fix Version/s 0.8 [ 12317244 ]
          Resolution Fixed [ 1 ]
          Hide
          Jun Rao added a comment -

          Thanks for the patch. Committed to 0.8 with the following minor changes:

          1. Use nextOffset instead of offset +1 to advance the offset.
          2. Removed unused imports and variables.
          3. Break long lines into multiple lines.

          Show
          Jun Rao added a comment - Thanks for the patch. Committed to 0.8 with the following minor changes: 1. Use nextOffset instead of offset +1 to advance the offset. 2. Removed unused imports and variables. 3. Break long lines into multiple lines.
          Yang Ye made changes -
          Attachment kafka_576_v1.diff [ 12550565 ]
          Hide
          Yang Ye added a comment -

          1. add --no-wait-logend command line option

          2. bug found, before in SimpleConsumer line 40:
          val request = if(isFromOrdinaryConsumer)
          OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
          else
          OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), Request.DebuggingConsumerId)

          In the else branch, we are intending to use another constructor, but it turned out if we used this way, it's the same constructor. By using "new OffsetRequest(...", we uses the constructor we want, as bellow:

          val request = if(isFromOrdinaryConsumer)
          OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
          else
          new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), Request.DebuggingConsumerId)

          Show
          Yang Ye added a comment - 1. add --no-wait-logend command line option 2. bug found, before in SimpleConsumer line 40: val request = if(isFromOrdinaryConsumer) OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))) else OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), Request.DebuggingConsumerId) In the else branch, we are intending to use another constructor, but it turned out if we used this way, it's the same constructor. By using "new OffsetRequest(...", we uses the constructor we want, as bellow: val request = if(isFromOrdinaryConsumer) OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))) else new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)), Request.DebuggingConsumerId)
          John Fung made changes -
          Attachment kafka_server_9093.log.gz [ 12550331 ]
          Hide
          John Fung added a comment -

          Attached a server log4j messages file

          Show
          John Fung added a comment - Attached a server log4j messages file
          John Fung made changes -
          Summary SimpleConsumerShell throws Exception SimpleConsumer throws UnsupportedOperationException: empty.head
          John Fung made changes -
          Summary SimpleConsumerShell runs into an infinite loop SimpleConsumerShell throws Exception
          John Fung made changes -
          Assignee Yang Ye [ yeyangever ]
          Hide
          John Fung added a comment - - edited

          1. SimpleConsumerShell will receive data from 1 of the broker in a 3-broker cluster with this change:

          $ svn diff core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
          Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
          ===================================================================
          — core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (revision 1400944)
          +++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (working copy)
          @@ -186,7 +186,7 @@
          var consumed = 0
          for(messageAndOffset <- messageSet) {
          try

          { - offset = messageAndOffset.offset + offset = messageAndOffset.nextOffset if(printOffsets) System.out.println("next offset = " + offset) formatter.writeTo(messageAndOffset.message, System.out) 2. By printing out the producedOffset in SimpleConsumer and showing -1: $ svn diff core/src/main/scala/kafka/consumer/SimpleConsumer.scala Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/SimpleConsumer.scala (revision 1400944) +++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala (working copy) @@ -50,6 +50,7 @@ if (simpleConsumer != null) simpleConsumer.close() }

          + System.out.println("====> producedOffset : " + producedOffset)
          producedOffset
          }

          3. Exception is thrown from SimpleConsumer.earliestOrLatestOffset( ):

          $ bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9093 --topic test_1 --partition 0 --replica 3

          [2012-10-22 10:41:56,525] INFO Getting topic metatdata... (kafka.tools.SimpleConsumerShell$)
          [2012-10-22 10:41:56,583] INFO Fetching metadata for topic Set(test_1) (kafka.client.ClientUtils$)
          [2012-10-22 10:41:56,589] INFO Connected to localhost:9093 for producing (kafka.producer.SyncProducer)
          [2012-10-22 10:41:56,751] INFO Disconnecting from localhost:9093 (kafka.producer.SyncProducer)
          [2012-10-22 10:41:56,784] ERROR error in earliestOrLatestOffset() (kafka.consumer.SimpleConsumer$)
          java.lang.UnsupportedOperationException: empty.head
          at scala.collection.immutable.Vector.head(Vector.scala:162)
          at kafka.consumer.SimpleConsumer$.earliestOrLatestOffset(SimpleConsumer.scala:45)
          at kafka.tools.SimpleConsumerShell$.main(SimpleConsumerShell.scala:169)
          at kafka.tools.SimpleConsumerShell.main(SimpleConsumerShell.scala)
          ====> producedOffset : -1
          [2012-10-22 10:41:56,786] INFO Starting simple consumer shell to partition [test_1, 0], replica [3], host and port: [127.0.0.1, 9093], from offset [-1] (kafka.tools.SimpleConsumerShell$)

          Show
          John Fung added a comment - - edited 1. SimpleConsumerShell will receive data from 1 of the broker in a 3-broker cluster with this change: $ svn diff core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala =================================================================== — core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (revision 1400944) +++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (working copy) @@ -186,7 +186,7 @@ var consumed = 0 for(messageAndOffset <- messageSet) { try { - offset = messageAndOffset.offset + offset = messageAndOffset.nextOffset if(printOffsets) System.out.println("next offset = " + offset) formatter.writeTo(messageAndOffset.message, System.out) 2. By printing out the producedOffset in SimpleConsumer and showing -1: $ svn diff core/src/main/scala/kafka/consumer/SimpleConsumer.scala Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/SimpleConsumer.scala (revision 1400944) +++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala (working copy) @@ -50,6 +50,7 @@ if (simpleConsumer != null) simpleConsumer.close() } + System.out.println("====> producedOffset : " + producedOffset) producedOffset } 3. Exception is thrown from SimpleConsumer.earliestOrLatestOffset( ): $ bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9093 --topic test_1 --partition 0 --replica 3 [2012-10-22 10:41:56,525] INFO Getting topic metatdata... (kafka.tools.SimpleConsumerShell$) [2012-10-22 10:41:56,583] INFO Fetching metadata for topic Set(test_1) (kafka.client.ClientUtils$) [2012-10-22 10:41:56,589] INFO Connected to localhost:9093 for producing (kafka.producer.SyncProducer) [2012-10-22 10:41:56,751] INFO Disconnecting from localhost:9093 (kafka.producer.SyncProducer) [2012-10-22 10:41:56,784] ERROR error in earliestOrLatestOffset() (kafka.consumer.SimpleConsumer$) java.lang.UnsupportedOperationException: empty.head at scala.collection.immutable.Vector.head(Vector.scala:162) at kafka.consumer.SimpleConsumer$.earliestOrLatestOffset(SimpleConsumer.scala:45) at kafka.tools.SimpleConsumerShell$.main(SimpleConsumerShell.scala:169) at kafka.tools.SimpleConsumerShell.main(SimpleConsumerShell.scala) ====> producedOffset : -1 [2012-10-22 10:41:56,786] INFO Starting simple consumer shell to partition [test_1, 0] , replica [3] , host and port: [127.0.0.1, 9093] , from offset [-1] (kafka.tools.SimpleConsumerShell$)
          Hide
          John Fung added a comment -

          This is a minor fix which is needed in kafka-571-v1.patch. Therefore, it is included in that patch.

          Show
          John Fung added a comment - This is a minor fix which is needed in kafka-571-v1.patch. Therefore, it is included in that patch.
          John Fung made changes -
          Field Original Value New Value
          Link This issue Is contained by KAFKA-571 [ KAFKA-571 ]
          John Fung created issue -

            People

            • Assignee:
              Yang Ye
              Reporter:
              John Fung
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development