Kafka
  1. Kafka
  2. KAFKA-270

sync producer / consumer test producing lot of kafka server exceptions & not getting the throughput mentioned here http://incubator.apache.org/kafka/performance.html

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.7
    • Fix Version/s: None
    • Component/s: clients, core
    • Environment:
      Linux 2.6.18-238.1.1.el5 , x86_64 x86_64 x86_64 GNU/Linux
      ext3 file system with raid10

      Description

      I am getting ridiculously low producer and consumer throughput.

      I am using default config values for producer, consumer and broker
      which are very good starting points, as they should yield sufficient
      throughput.

      Appreciate if you point what settings/changes-in-code needs to be done
      to get higher throughput.

      I changed num.partitions in the server.config to 10.

      Please look below for exception and error messages from the server
      BTW: I am running server, zookeeper, producer, consumer on the same host.

      ====Consumer Code=====
      long startTime = System.currentTimeMillis();
      long endTime = startTime + runDuration*1000l;

      Properties props = new Properties();
      props.put("zk.connect", "localhost:2181");
      props.put("groupid", subscriptionName); // to support multiple
      subscribers
      props.put("zk.sessiontimeout.ms", "400");
      props.put("zk.synctime.ms", "200");
      props.put("autocommit.interval.ms", "1000");

      consConfig = new ConsumerConfig(props);
      consumer =
      kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);

      Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
      topicCountMap.put(topicName, new Integer(1)); // has the topic
      to which to subscribe to
      Map<String, List<KafkaMessageStream<Message>>> consumerMap =
      consumer.createMessageStreams(topicCountMap);
      KafkaMessageStream<Message> stream = consumerMap.get(topicName).get(0);
      ConsumerIterator<Message> it = stream.iterator();

      while(System.currentTimeMillis() <= endTime )

      { it.next(); // discard data consumeMsgCount.incrementAndGet(); }

      ====End consumer CODE============================

      =====Producer CODE========================
      props.put("serializer.class", "kafka.serializer.StringEncoder");
      props.put("zk.connect", "localhost:2181");
      // Use random partitioner. Don't need the key type. Just
      set it to Integer.
      // The message is of type String.
      producer = new kafka.javaapi.producer.Producer<Integer,
      String>(new ProducerConfig(props));

      long endTime = startTime + runDuration*1000l; // run duration
      is in seconds
      while(System.currentTimeMillis() <= endTime )

      { String msg = org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0)); producer.send(new ProducerData<Integer, String>(topicName, msg)); pc.incrementAndGet(); }

      java.util.Date date = new java.util.Date(System.currentTimeMillis());
      System.out.println(date+" :: stopped producer for topic"+topicName);

      =====END Producer CODE========================

      I see a bunch of exceptions like this

      [2012-02-11 02:44:11,945] ERROR Closing socket for /188.125.88.145 because of error (kafka.network.Processor)
      java.io.IOException: Connection reset by peer
      at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
      at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405)
      at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506)
      at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:107)
      at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:51)
      at kafka.network.MultiSend.writeTo(Transmission.scala:95)
      at kafka.network.Processor.write(SocketServer.scala:332)
      at kafka.network.Processor.run(SocketServer.scala:209)
      at java.lang.Thread.run(Thread.java:662)

      java.io.IOException: Connection reset by peer
      at sun.nio.ch.FileDispatcher.read0(Native Method)
      at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
      at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
      at sun.nio.ch.IOUtil.read(IOUtil.java:171)
      at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
      at kafka.utils.Utils$.read(Utils.scala:485)
      at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
      at kafka.network.Processor.read(SocketServer.scala:304)
      at kafka.network.Processor.run(SocketServer.scala:207)
      at java.lang.Thread.run(Thread.java:662)

      And Many INFO messages e.g.,
      INFO: Expiring session 0x1356a43167e0009, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
      INFO: Closed socket connection for client /127.0.0.1:59884 which had sessionid 0x1356a43167e0022 (org.apache.zookeeper.server.NIOServerCnxn)

        Activity

        Hide
        Jun Rao added a comment -

        A few things to try:

        1. It seems that there is ZK session expiration in the client. This should be rare. If it's frequent, it's very likely caused by client GC. Please check your GC log.
        2. Enable debug level logging in FileMessageSet in the broker. You will see the flush time for each log write. See if the flush time is reasonable (typically low 10s of ms) since it controls how many IOs a broker can do per second.

        Show
        Jun Rao added a comment - A few things to try: 1. It seems that there is ZK session expiration in the client. This should be rare. If it's frequent, it's very likely caused by client GC. Please check your GC log. 2. Enable debug level logging in FileMessageSet in the broker. You will see the flush time for each log write. See if the flush time is reasonable (typically low 10s of ms) since it controls how many IOs a broker can do per second.
        Hide
        wangxu(alvin) added a comment -

        can also try closing the producer.
        I would get the below error in broker console without closing producer. If I close the producer, the error will never appear.
        [2013-11-05 14:07:34,097] ERROR Closing socket for /192.168.30.114 because of error (kafka.network.Processor)
        java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcher.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
        at sun.nio.ch.IOUtil.read(IOUtil.java:171)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
        at kafka.utils.Utils$.read(Utils.scala:538)
        at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Processor.read(SocketServer.scala:311)
        at kafka.network.Processor.run(SocketServer.scala:214)
        at java.lang.Thread.run(Thread.java:662)

        Show
        wangxu(alvin) added a comment - can also try closing the producer. I would get the below error in broker console without closing producer. If I close the producer, the error will never appear. [2013-11-05 14:07:34,097] ERROR Closing socket for /192.168.30.114 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) at sun.nio.ch.IOUtil.read(IOUtil.java:171) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243) at kafka.utils.Utils$.read(Utils.scala:538) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:311) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:662)

          People

          • Assignee:
            Unassigned
            Reporter:
            Praveen Ramachandra
          • Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:

              Development