Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-2496

New consumer from trunk doesn't work with 0.8.2.1 brokers

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Invalid
    • 0.8.2.1
    • None
    • None
    • None

    Description

      I have a 0.8.2.1 broker running with a topic created and some messages in it.
      I also have a consumer built from trunk (commit 9c936b186d390f59f1d4ad8cc2995f800036a3d6 to be precise).

      When trying to consume messages from this topic the consumer fails with a following stacktrace:

      Exception in thread "main" org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request
      	at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:361)
      	at org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:309)
      	at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:701)
      	at org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:675)
      	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
      	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
      	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
      	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:145)
      	at org.apache.kafka.clients.consumer.internals.Coordinator.reassignPartitions(Coordinator.java:195)
      	at org.apache.kafka.clients.consumer.internals.Coordinator.ensurePartitionAssignment(Coordinator.java:170)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:770)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:731)
      	at Sandbox$.main(Sandbox.scala:38)
      	at Sandbox.main(Sandbox.scala)
      

      What actually happens is broker being unable to handle the JoinGroup request from consumer:

      [2015-09-01 11:48:38,820] ERROR [KafkaApi-0] error when handling request Name: JoinGroup; Version: 0; CorrelationId: 141; ClientId: consumer-1; Body: {group_id=mirror_maker_group,session_timeout=30000,topics=[mirror],consumer_id=,partition_assignment_strategy=range} (kafka.server.KafkaApis)
      kafka.common.KafkaException: Unknown api code 11
      	at kafka.server.KafkaApis.handle(KafkaApis.scala:70)
      	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
      	at java.lang.Thread.run(Thread.java:745)
      

      The consumer code that leads to this is pretty much straightforward:

      import org.apache.kafka.clients.consumer.KafkaConsumer
      import scala.collection.JavaConverters._
      
      object Sandbox {
        def main(args: Array[String]) {
          val consumerProps = new Properties
          consumerProps.put("bootstrap.servers", "localhost:9092")
          consumerProps.put("group.id", "mirror_maker_group")
          consumerProps.put("enable.auto.commit", "false")
          consumerProps.put("session.timeout.ms", "30000")
          consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
          consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
          val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
          consumer.subscribe(List("mirror").asJava)
      
          val records = consumer.poll(1000)
          for (record <- records.iterator().asScala) {
            println(record.offset())
          }
        }
      }
      

      I looked into the source code of the Kafka server in 0.8.2.1 branch and it does not have the logic to handle JoinGroup request. It does not actually have all the logic related to consumer coordination there so I wonder if there is any way to make the new consumer work with 0.8.2.1 brokers?

      Attachments

        Activity

          People

            ewencp Ewen Cheslack-Postava
            serejja Serhey Novachenko
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: