Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-270

sync producer / consumer test producing lot of kafka server exceptions & not getting the throughput mentioned here http://incubator.apache.org/kafka/performance.html


    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Won't Fix
    • Affects Version/s: 0.7
    • Fix Version/s: None
    • Component/s: clients, core
    • Environment:
      Linux 2.6.18-238.1.1.el5 , x86_64 x86_64 x86_64 GNU/Linux
      ext3 file system with raid10


      I am getting ridiculously low producer and consumer throughput.

      I am using default config values for producer, consumer and broker
      which are very good starting points, as they should yield sufficient

      Appreciate if you point what settings/changes-in-code needs to be done
      to get higher throughput.

      I changed num.partitions in the server.config to 10.

      Please look below for exception and error messages from the server
      BTW: I am running server, zookeeper, producer, consumer on the same host.

      ====Consumer Code=====
      long startTime = System.currentTimeMillis();
      long endTime = startTime + runDuration*1000l;

      Properties props = new Properties();
      props.put("zk.connect", "localhost:2181");
      props.put("groupid", subscriptionName); // to support multiple
      props.put("zk.sessiontimeout.ms", "400");
      props.put("zk.synctime.ms", "200");
      props.put("autocommit.interval.ms", "1000");

      consConfig = new ConsumerConfig(props);
      consumer =

      Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
      topicCountMap.put(topicName, new Integer(1)); // has the topic
      to which to subscribe to
      Map<String, List<KafkaMessageStream<Message>>> consumerMap =
      KafkaMessageStream<Message> stream = consumerMap.get(topicName).get(0);
      ConsumerIterator<Message> it = stream.iterator();

      while(System.currentTimeMillis() <= endTime )

      { it.next(); // discard data consumeMsgCount.incrementAndGet(); }

      ====End consumer CODE============================

      =====Producer CODE========================
      props.put("serializer.class", "kafka.serializer.StringEncoder");
      props.put("zk.connect", "localhost:2181");
      // Use random partitioner. Don't need the key type. Just
      set it to Integer.
      // The message is of type String.
      producer = new kafka.javaapi.producer.Producer<Integer,
      String>(new ProducerConfig(props));

      long endTime = startTime + runDuration*1000l; // run duration
      is in seconds
      while(System.currentTimeMillis() <= endTime )

      { String msg = org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0)); producer.send(new ProducerData<Integer, String>(topicName, msg)); pc.incrementAndGet(); }

      java.util.Date date = new java.util.Date(System.currentTimeMillis());
      System.out.println(date+" :: stopped producer for topic"+topicName);

      =====END Producer CODE========================

      I see a bunch of exceptions like this

      [2012-02-11 02:44:11,945] ERROR Closing socket for / because of error (kafka.network.Processor)
      java.io.IOException: Connection reset by peer
      at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
      at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405)
      at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506)
      at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:107)
      at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:51)
      at kafka.network.MultiSend.writeTo(Transmission.scala:95)
      at kafka.network.Processor.write(SocketServer.scala:332)
      at kafka.network.Processor.run(SocketServer.scala:209)
      at java.lang.Thread.run(Thread.java:662)

      java.io.IOException: Connection reset by peer
      at sun.nio.ch.FileDispatcher.read0(Native Method)
      at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
      at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
      at sun.nio.ch.IOUtil.read(IOUtil.java:171)
      at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
      at kafka.utils.Utils$.read(Utils.scala:485)
      at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
      at kafka.network.Processor.read(SocketServer.scala:304)
      at kafka.network.Processor.run(SocketServer.scala:207)
      at java.lang.Thread.run(Thread.java:662)

      And Many INFO messages e.g.,
      INFO: Expiring session 0x1356a43167e0009, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
      INFO: Closed socket connection for client / which had sessionid 0x1356a43167e0022 (org.apache.zookeeper.server.NIOServerCnxn)




            • Assignee:
              praveen27 Praveen Ramachandra
            • Votes:
              1 Vote for this issue
              3 Start watching this issue


              • Created: