Details
Description
I am getting ridiculously low producer and consumer throughput.
I am using default config values for producer, consumer and broker
which are very good starting points, as they should yield sufficient
throughput.
Appreciate if you point what settings/changes-in-code needs to be done
to get higher throughput.
I changed num.partitions in the server.config to 10.
Please look below for exception and error messages from the server
BTW: I am running server, zookeeper, producer, consumer on the same host.
====Consumer Code=====
long startTime = System.currentTimeMillis();
long endTime = startTime + runDuration*1000l;
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("groupid", subscriptionName); // to support multiple
subscribers
props.put("zk.sessiontimeout.ms", "400");
props.put("zk.synctime.ms", "200");
props.put("autocommit.interval.ms", "1000");
consConfig = new ConsumerConfig(props);
consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topicName, new Integer(1)); // has the topic
to which to subscribe to
Map<String, List<KafkaMessageStream<Message>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
KafkaMessageStream<Message> stream = consumerMap.get(topicName).get(0);
ConsumerIterator<Message> it = stream.iterator();
while(System.currentTimeMillis() <= endTime )
{ it.next(); // discard data consumeMsgCount.incrementAndGet(); }====End consumer CODE============================
=====Producer CODE========================
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("zk.connect", "localhost:2181");
// Use random partitioner. Don't need the key type. Just
set it to Integer.
// The message is of type String.
producer = new kafka.javaapi.producer.Producer<Integer,
String>(new ProducerConfig(props));
long endTime = startTime + runDuration*1000l; // run duration
is in seconds
while(System.currentTimeMillis() <= endTime )
java.util.Date date = new java.util.Date(System.currentTimeMillis());
System.out.println(date+" :: stopped producer for topic"+topicName);
=====END Producer CODE========================
I see a bunch of exceptions like this
[2012-02-11 02:44:11,945] ERROR Closing socket for /188.125.88.145 because of error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405)
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506)
at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:107)
at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:51)
at kafka.network.MultiSend.writeTo(Transmission.scala:95)
at kafka.network.Processor.write(SocketServer.scala:332)
at kafka.network.Processor.run(SocketServer.scala:209)
at java.lang.Thread.run(Thread.java:662)
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
at sun.nio.ch.IOUtil.read(IOUtil.java:171)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
at kafka.utils.Utils$.read(Utils.scala:485)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:304)
at kafka.network.Processor.run(SocketServer.scala:207)
at java.lang.Thread.run(Thread.java:662)
And Many INFO messages e.g.,
INFO: Expiring session 0x1356a43167e0009, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
INFO: Closed socket connection for client /127.0.0.1:59884 which had sessionid 0x1356a43167e0022 (org.apache.zookeeper.server.NIOServerCnxn)