Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala =================================================================== --- core/src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1239972) +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala (working copy) @@ -28,7 +28,9 @@ import kafka.producer._ import kafka.message._ import org.I0Itec.zkclient.ZkClient -import kafka.consumer.ConsumerConfig +import kafka.consumer.{KafkaMessageStream, ConsumerConfig} +import collection.mutable.ListBuffer +import scala.collection.Map /** * Utility functions to help with testing @@ -301,6 +303,28 @@ } } + def getConsumedMessages[T](nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[T]]]): List[T]= { + var messages: List[T] = Nil + for ((topic, messageStreams) <- topicMessageStreams) { + for (messageStream <- messageStreams) { + val iterator = messageStream.iterator + for (i <- 0 until nMessagesPerThread) { + assertTrue(iterator.hasNext) + val message = iterator.next + messages ::= message + } + } + } + messages + } + + def getMsgStrings(n: Int): Seq[String] = { + val buffer = new ListBuffer[String] + for (i <- 0 until n) + buffer += ("msg" + i) + buffer + } + } object TestZKUtils { Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (revision 1239972) +++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (working copy) @@ -52,7 +52,7 @@ val config = new ProducerConfig(props) val stringProducer1 = new Producer[String, String](config) - stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test-message"))) + stringProducer1.send(new ProducerData[String, String](topic, Array("test-message"))) Thread.sleep(200) var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) @@ -73,7 +73,7 @@ val config = new ProducerConfig(props) val stringProducer1 = new Producer[String, String](config) - stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test-message"))) + stringProducer1.send(new ProducerData[String, String](topic, Array("test-message"))) Thread.sleep(200) var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (revision 1239972) +++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (working copy) @@ -17,287 +17,385 @@ package kafka.producer -import junit.framework.{Assert, TestCase} -import java.util.Properties import org.easymock.EasyMock import kafka.api.ProducerRequest -import org.apache.log4j.{Logger, Level} import org.junit.Test import org.scalatest.junit.JUnitSuite import kafka.producer.async._ -import kafka.serializer.Encoder +import java.util.concurrent.LinkedBlockingQueue +import junit.framework.Assert._ +import collection.SortedSet +import kafka.cluster.{Broker, Partition} +import collection.mutable.{HashMap, ListBuffer} +import collection.Map import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} +import kafka.serializer.{StringEncoder, StringDecoder, Encoder} +import java.util.{LinkedList, Properties} +import kafka.utils.{TestZKUtils, TestUtils} +import kafka.common.{InvalidConfigException, NoBrokersForPartitionException, InvalidPartitionException} class AsyncProducerTest extends JUnitSuite { - private val messageContent1 = "test" - private val topic1 = "test-topic" - private val message1: Message = new Message(messageContent1.getBytes) - - private val messageContent2 = "test1" - private val topic2 = "test1$topic" - private val message2: Message = new Message(messageContent2.getBytes) - val asyncProducerLogger = Logger.getLogger(classOf[AsyncProducer[String]]) - @Test def testProducerQueueSize() { - val basicProducer = EasyMock.createMock(classOf[SyncProducer]) - basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition, - getMessageSetOfSize(List(message1), 10))))) - EasyMock.expectLastCall - basicProducer.close - EasyMock.expectLastCall - EasyMock.replay(basicProducer) + // a mock event handler that blocks + val mockEventHandler = new EventHandler[String,String] { + def handle(events: Seq[ProducerData[String,String]]) { + Thread.sleep(1000000) + } + + def close {} + } + val props = new Properties() - props.put("host", "localhost") - props.put("port", "9092") + props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("broker.list", "0:localhost:9092") + props.put("producer.type", "async") props.put("queue.size", "10") - props.put("serializer.class", "kafka.producer.StringSerializer") - val config = new AsyncProducerConfig(props) + props.put("batch.size", "1") - val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer) - - //temporarily set log4j to a higher level to avoid error in the output - producer.setLoggerLevel(Level.FATAL) - + val config = new ProducerConfig(props) + val produceData = getProduceData(12) + val producer = new Producer[String, String](config, mockEventHandler) try { - for(i <- 0 until 11) { - producer.send(messageContent1 + "-topic", messageContent1) - } - Assert.fail("Queue should be full") + // send all 10 messages, should hit the batch size and then reach broker + producer.send(produceData: _*) + fail("Queue should be full") } catch { - case e: QueueFullException => println("Queue is full..") + case e: QueueFullException => //expected } - producer.start - producer.close - Thread.sleep(2000) - EasyMock.verify(basicProducer) - producer.setLoggerLevel(Level.ERROR) } @Test - def testAddAfterQueueClosed() { - val basicProducer = EasyMock.createMock(classOf[SyncProducer]) - basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition, - getMessageSetOfSize(List(message1), 10))))) - EasyMock.expectLastCall - basicProducer.close - EasyMock.expectLastCall - EasyMock.replay(basicProducer) - + def testProduceAfterClosed() { val props = new Properties() - props.put("host", "localhost") - props.put("port", "9092") - props.put("queue.size", "10") - props.put("serializer.class", "kafka.producer.StringSerializer") - val config = new AsyncProducerConfig(props) + props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("broker.list", "0:localhost:9092") + props.put("producer.type", "async") + props.put("batch.size", "1") - val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer) - - producer.start - for(i <- 0 until 10) { - producer.send(messageContent1 + "-topic", messageContent1) - } + val config = new ProducerConfig(props) + val produceData = getProduceData(10) + val producer = new Producer[String, String](config) producer.close try { - producer.send(messageContent1 + "-topic", messageContent1) - Assert.fail("Queue should be closed") - } catch { - case e: QueueClosedException => + producer.send(produceData: _*) + fail("should complain that producer is already closed") } - EasyMock.verify(basicProducer) + catch { + case e: ProducerClosedException => //expected + } } + def getProduceData(nEvents: Int): Seq[ProducerData[String,String]] = { + val producerDataList = new ListBuffer[ProducerData[String,String]] + for (i <- 0 until nEvents) + producerDataList.append(new ProducerData[String,String]("topic1", null, List("msg" + i))) + producerDataList + } + @Test def testBatchSize() { - val basicProducer = EasyMock.createStrictMock(classOf[SyncProducer]) - basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition, - getMessageSetOfSize(List(message1), 5))))) - EasyMock.expectLastCall.times(2) - basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition, - getMessageSetOfSize(List(message1), 1))))) + /** + * Send a total of 10 messages with batch size of 5. Expect 2 calls to the handler, one for each batch. + */ + val producerDataList = getProduceData(10) + val mockHandler = EasyMock.createStrictMock(classOf[DefaultEventHandler[String,String]]) + mockHandler.handle(producerDataList.take(5)) EasyMock.expectLastCall - basicProducer.close + mockHandler.handle(producerDataList.takeRight(5)) EasyMock.expectLastCall - EasyMock.replay(basicProducer) + EasyMock.replay(mockHandler) - val props = new Properties() - props.put("host", "localhost") - props.put("port", "9092") - props.put("queue.size", "10") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("batch.size", "5") + val queue = new LinkedBlockingQueue[ProducerData[String,String]](10) + val producerSendThread = + new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE, 5) + producerSendThread.start() - val config = new AsyncProducerConfig(props) + for (producerData <- producerDataList) + queue.put(producerData) - val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer) + producerSendThread.shutdown + EasyMock.verify(mockHandler) + } - producer.start - for(i <- 0 until 10) { - producer.send(messageContent1 + "-topic", messageContent1) - } + @Test + def testQueueTimeExpired() { + /** + * Send a total of 2 messages with batch size of 5 and queue time of 200ms. + * Expect 1 calls to the handler after 200ms. + */ + val producerDataList = getProduceData(2) + val mockHandler = EasyMock.createStrictMock(classOf[DefaultEventHandler[String,String]]) + mockHandler.handle(producerDataList) + EasyMock.expectLastCall + EasyMock.replay(mockHandler) - Thread.sleep(100) - try { - producer.send(messageContent1 + "-topic", messageContent1) - } catch { - case e: QueueFullException => - Assert.fail("Queue should not be full") - } + val queue = new LinkedBlockingQueue[ProducerData[String,String]](10) + val producerSendThread = + new ProducerSendThread[String,String]("thread1", queue, mockHandler, 200, 5) + producerSendThread.start() - producer.close - EasyMock.verify(basicProducer) + for (producerData <- producerDataList) + queue.put(producerData) + + Thread.sleep(300) + producerSendThread.shutdown + EasyMock.verify(mockHandler) } @Test - def testQueueTimeExpired() { - val basicProducer = EasyMock.createMock(classOf[SyncProducer]) - basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition, - getMessageSetOfSize(List(message1), 3))))) - EasyMock.expectLastCall - basicProducer.close - EasyMock.expectLastCall - EasyMock.replay(basicProducer) + def testPartitionAndCollateEvents() { + val producerDataList = new ListBuffer[ProducerData[Int,Message]] + producerDataList.append(new ProducerData[Int,Message]("topic1", 0, new Message("msg1".getBytes))) + producerDataList.append(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes))) + producerDataList.append(new ProducerData[Int,Message]("topic1", 2, new Message("msg3".getBytes))) + producerDataList.append(new ProducerData[Int,Message]("topic1", 3, new Message("msg4".getBytes))) + producerDataList.append(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes))) val props = new Properties() - props.put("host", "localhost") - props.put("port", "9092") - props.put("queue.size", "10") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("queue.time", "200") + props.put("broker.list", "0:localhost:9092,1:localhost:9092") - val config = new AsyncProducerConfig(props) + val intPartitioner = new Partitioner[Int] { + def partition(key: Int, numPartitions: Int): Int = key % numPartitions + } + val config = new ProducerConfig(props) + val handler = new DefaultEventHandler[Int,String](config, + partitioner = intPartitioner, + encoder = null.asInstanceOf[Encoder[String]], + producerPool = null, + populateProducerPool = false, + brokerPartitionInfo = null) - val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer) - val serializer = new StringSerializer + val topic1Broker1Data = new ListBuffer[ProducerData[Int,Message]] + topic1Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic1", 0, new Message("msg1".getBytes)), + new ProducerData[Int,Message]("topic1", 2, new Message("msg3".getBytes)))) + val topic1Broker2Data = new ListBuffer[ProducerData[Int,Message]] + topic1Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic1", 3, new Message("msg4".getBytes)))) + val topic2Broker1Data = new ListBuffer[ProducerData[Int,Message]] + topic2Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes)))) + val topic2Broker2Data = new ListBuffer[ProducerData[Int,Message]] + topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes)))) + val expectedResult = Map( + 0 -> Map( + ("topic1", -1) -> topic1Broker1Data, + ("topic2", -1) -> topic2Broker1Data), + 1 -> Map( + ("topic1", -1) -> topic1Broker2Data, + ("topic2", -1) -> topic2Broker2Data) + ) - producer.start - for(i <- 0 until 3) { - producer.send(serializer.getTopic(messageContent1), messageContent1, ProducerRequest.RandomPartition) - } + val actualResult = handler.partitionAndCollate(producerDataList) + assertEquals(expectedResult, actualResult) + } - Thread.sleep(300) - producer.close - EasyMock.verify(basicProducer) + @Test + def testSerializeEvents() { + val produceData = TestUtils.getMsgStrings(5).map(m => new ProducerData[String,String]("topic1",m)) + val props = new Properties() + props.put("broker.list", "0:localhost:9092,1:localhost:9092") + val config = new ProducerConfig(props) + val handler = new DefaultEventHandler[String,String](config, + partitioner = null.asInstanceOf[Partitioner[String]], + encoder = new StringEncoder, + producerPool = null, + populateProducerPool = false, + brokerPartitionInfo = null) + + val serializedData = handler.serialize(produceData) + val decoder = new StringDecoder + val deserializedData = serializedData.map(d => new ProducerData[String,String](d.getTopic, d.getData.map(m => decoder.toEvent(m)))) + TestUtils.checkEquals(produceData.iterator, deserializedData.iterator) } @Test - def testSenderThreadShutdown() { - val syncProducerProps = new Properties() - syncProducerProps.put("host", "localhost") - syncProducerProps.put("port", "9092") - syncProducerProps.put("buffer.size", "1000") - syncProducerProps.put("connect.timeout.ms", "1000") - syncProducerProps.put("reconnect.interval", "1000") - val basicProducer = new MockProducer(new SyncProducerConfig(syncProducerProps)) + def testInvalidPartition() { + val producerDataList = new ListBuffer[ProducerData[String,Message]] + producerDataList.append(new ProducerData[String,Message]("topic1", "key1", new Message("msg1".getBytes))) + val props = new Properties() + props.put("broker.list", "0:localhost:9092,1:localhost:9092") + val config = new ProducerConfig(props) + val handler = new DefaultEventHandler[String,String](config, + partitioner = new NegativePartitioner, + encoder = null.asInstanceOf[Encoder[String]], + producerPool = null, + populateProducerPool = false, + brokerPartitionInfo = null) + try { + handler.partitionAndCollate(producerDataList) + fail("Should fail with InvalidPartitionException") + } + catch { + case e: InvalidPartitionException => // expected, do nothing + } + } - val asyncProducerProps = new Properties() - asyncProducerProps.put("host", "localhost") - asyncProducerProps.put("port", "9092") - asyncProducerProps.put("queue.size", "10") - asyncProducerProps.put("serializer.class", "kafka.producer.StringSerializer") - asyncProducerProps.put("queue.time", "100") + private def getMockBrokerPartitionInfo(): BrokerPartitionInfo ={ + new BrokerPartitionInfo { + def getBrokerPartitionInfo(topic: String = null): SortedSet[Partition] = SortedSet.empty[Partition] - val config = new AsyncProducerConfig(asyncProducerProps) - val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer) - producer.start - producer.send(messageContent1 + "-topic", messageContent1) - producer.close + def getBrokerInfo(brokerId: Int): Option[Broker] = None + + def getAllBrokerInfo: Map[Int, Broker] = new HashMap[Int, Broker] + + def updateInfo = {} + + def close = {} + } } @Test - def testCollateEvents() { - val basicProducer = EasyMock.createMock(classOf[SyncProducer]) - basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, ProducerRequest.RandomPartition, - getMessageSetOfSize(List(message2), 5)), - new ProducerRequest(topic1, ProducerRequest.RandomPartition, - getMessageSetOfSize(List(message1), 5))))) - EasyMock.expectLastCall - basicProducer.close - EasyMock.expectLastCall - EasyMock.replay(basicProducer) + def testNoBroker() { + val producerDataList = new ListBuffer[ProducerData[String,String]] + producerDataList.append(new ProducerData[String,String]("topic1", "msg1")) + val props = new Properties() + val config = new ProducerConfig(props) + val handler = new DefaultEventHandler[String,String](config, + partitioner = null.asInstanceOf[Partitioner[String]], + encoder = new StringEncoder, + producerPool = null, + populateProducerPool = false, + brokerPartitionInfo = getMockBrokerPartitionInfo) + try { + handler.handle(producerDataList) + fail("Should fail with NoBrokersForPartitionException") + } + catch { + case e: NoBrokersForPartitionException => // expected, do nothing + } + } + @Test + def testIncompatibleEncoder() { val props = new Properties() - props.put("host", "localhost") - props.put("port", "9092") - props.put("queue.size", "50") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("batch.size", "10") + props.put("broker.list", "0:localhost:9092,1:localhost:9092") + val config = new ProducerConfig(props) - val config = new AsyncProducerConfig(props) + val producer=new Producer[String, String](config) + try { + producer.send(getProduceData(1): _*) + fail("Should fail with ClassCastException due to incompatible Encoder") + } catch { + case e: ClassCastException => + } + } - val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer) + @Test + def testRandomPartitioner() { + val props = new Properties() + props.put("broker.list", "0:localhost:9092,1:localhost:9092") + val config = new ProducerConfig(props) + val handler = new DefaultEventHandler[String,String](config, + partitioner = null.asInstanceOf[Partitioner[String]], + encoder = null.asInstanceOf[Encoder[String]], + producerPool = null, + populateProducerPool = false, + brokerPartitionInfo = null) + val producerDataList = new ListBuffer[ProducerData[String,Message]] + producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg1".getBytes))) + producerDataList.append(new ProducerData[String,Message]("topic2", new Message("msg2".getBytes))) + producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg3".getBytes))) - producer.start - val serializer = new StringSerializer - for(i <- 0 until 5) { - producer.send(messageContent1 + "-topic", messageContent1) - producer.send(messageContent2 + "$topic", messageContent2, ProducerRequest.RandomPartition) + val partitionedData = handler.partitionAndCollate(producerDataList) + for ((brokerId, dataPerBroker) <- partitionedData) { + for ( ((topic, partitionId), dataPerTopic) <- dataPerBroker) + assertTrue(partitionId == ProducerRequest.RandomPartition) } - - producer.close - EasyMock.verify(basicProducer) - } @Test - def testCollateAndSerializeEvents() { - val basicProducer = EasyMock.createMock(classOf[SyncProducer]) - basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, 1, - getMessageSetOfSize(List(message2), 5)), - new ProducerRequest(topic1, 0, - getMessageSetOfSize(List(message1), 5)), - new ProducerRequest(topic1, 1, - getMessageSetOfSize(List(message1), 5)), - new ProducerRequest(topic2, 0, - getMessageSetOfSize(List(message2), 5))))) - + def testBrokerListAndAsync() { + val topic = "topic1" + val msgs = TestUtils.getMsgStrings(10) + val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) + mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, ProducerRequest.RandomPartition, + messagesToSet(msgs.take(5)))))) EasyMock.expectLastCall - basicProducer.close + mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, ProducerRequest.RandomPartition, + messagesToSet(msgs.takeRight(5)))))) EasyMock.expectLastCall - EasyMock.replay(basicProducer) + mockSyncProducer.close + EasyMock.expectLastCall + EasyMock.replay(mockSyncProducer) val props = new Properties() - props.put("host", "localhost") - props.put("port", "9092") - props.put("queue.size", "50") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("batch.size", "20") + props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("producer.type", "async") + props.put("batch.size", "5") + props.put("broker.list", "0:localhost:9092") - val config = new AsyncProducerConfig(props) + val config = new ProducerConfig(props) + val producerPool = new ProducerPool(config) + producerPool.addProducer(0, mockSyncProducer) - val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer) + val handler = new DefaultEventHandler[String,String](config, + partitioner = null.asInstanceOf[Partitioner[String]], + encoder = new StringEncoder, + producerPool = producerPool, + populateProducerPool = false, + brokerPartitionInfo = null) - producer.start - val serializer = new StringSerializer - for(i <- 0 until 5) { - producer.send(topic2, messageContent2, 0) - producer.send(topic2, messageContent2, 1) - producer.send(topic1, messageContent1, 0) - producer.send(topic1, messageContent1, 1) + val producer = new Producer[String, String](config, handler) + try { + // send all 10 messages, should create 2 batches and 2 syncproducer calls + producer.send(msgs.map(m => new ProducerData[String,String](topic, List(m))): _*) + producer.close + + } catch { + case e: Exception => fail("Not expected", e) } - producer.close - EasyMock.verify(basicProducer) + EasyMock.verify(mockSyncProducer) + } + @Test + def testJavaProducer() { + val topic = "topic1" + val msgs = TestUtils.getMsgStrings(5) + val scalaProducerData = msgs.map(m => new ProducerData[String, String](topic, List(m))) + val javaProducerData = scala.collection.JavaConversions.asList(msgs.map(m => { + val javaList = new LinkedList[String]() + javaList.add(m) + new kafka.javaapi.producer.ProducerData[String, String](topic, javaList) + })) + + val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]]) + mockScalaProducer.send(scalaProducerData.head) + EasyMock.expectLastCall() + mockScalaProducer.send(scalaProducerData: _*) + EasyMock.expectLastCall() + EasyMock.replay(mockScalaProducer) + + val javaProducer = new kafka.javaapi.producer.Producer[String, String](mockScalaProducer) + javaProducer.send(javaProducerData.get(0)) + javaProducer.send(javaProducerData) + + EasyMock.verify(mockScalaProducer) } - private def getMessageSetOfSize(messages: List[Message], counts: Int): ByteBufferMessageSet = { - var messageList = new Array[Message](counts) - for(message <- messages) { - for(i <- 0 until counts) { - messageList(i) = message - } + @Test + def testInvalidConfiguration() { + val props = new Properties() + props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("broker.list", "0:localhost:9092") + props.put("zk.connect", TestZKUtils.zookeeperConnect) + props.put("producer.type", "async") + + try { + new ProducerConfig(props) + fail("should complain about wrong config") } - new ByteBufferMessageSet(NoCompressionCodec, messageList: _*) + catch { + case e: InvalidConfigException => //expected + } } - class StringSerializer extends Encoder[String] { - def toMessage(event: String):Message = new Message(event.getBytes) - def getTopic(event: String): String = event.concat("-topic") + private def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = { + val encoder = new StringEncoder + new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*) } class MockProducer(override val config: SyncProducerConfig) extends SyncProducer(config) { Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/ProducerTest.scala (revision 1239972) +++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala (working copy) @@ -17,24 +17,18 @@ package kafka.producer -import async.{AsyncProducerConfig, AsyncProducer} -import java.util.Properties import org.apache.log4j.{Logger, Level} import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig} import kafka.zk.EmbeddedZookeeper import org.junit.{After, Before, Test} -import junit.framework.Assert -import collection.mutable.HashMap -import org.easymock.EasyMock -import java.util.concurrent.ConcurrentHashMap -import kafka.cluster.Partition +import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite -import kafka.common.{InvalidConfigException, UnavailableProducerException, InvalidPartitionException} import kafka.utils.{TestUtils, TestZKUtils, Utils} -import kafka.serializer.{StringEncoder, Encoder} -import kafka.consumer.SimpleConsumer import kafka.api.FetchRequest -import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} +import kafka.message.Message +import kafka.serializer.Encoder +import kafka.consumer.SimpleConsumer +import java.util.Properties class ProducerTest extends JUnitSuite { private val topic = "test-topic" @@ -44,8 +38,6 @@ private val (port1, port2) = (ports(0), ports(1)) private var server1: KafkaServer = null private var server2: KafkaServer = null - private var producer1: SyncProducer = null - private var producer2: SyncProducer = null private var consumer1: SimpleConsumer = null private var consumer2: SimpleConsumer = null private var zkServer:EmbeddedZookeeper = null @@ -72,16 +64,6 @@ props.put("host", "localhost") props.put("port", port1.toString) - producer1 = new SyncProducer(new SyncProducerConfig(props)) - producer1.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = new Message("test".getBytes()))) - - producer2 = new SyncProducer(new SyncProducerConfig(props) { - override val port = port2 - }) - producer2.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = new Message("test".getBytes()))) - consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024) consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024) @@ -105,293 +87,6 @@ } @Test - def testSend() { - val props = new Properties() - props.put("partitioner.class", "kafka.producer.StaticPartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("zk.connect", TestZKUtils.zookeeperConnect) - val config = new ProducerConfig(props) - val partitioner = new StaticPartitioner - val serializer = new StringSerializer - - // 2 sync producers - val syncProducers = new ConcurrentHashMap[Int, SyncProducer]() - val syncProducer1 = EasyMock.createMock(classOf[SyncProducer]) - val syncProducer2 = EasyMock.createMock(classOf[SyncProducer]) - // it should send to partition 0 (first partition) on second broker i.e broker2 - syncProducer2.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes))) - EasyMock.expectLastCall - syncProducer1.close - EasyMock.expectLastCall - syncProducer2.close - EasyMock.expectLastCall - EasyMock.replay(syncProducer1) - EasyMock.replay(syncProducer2) - - syncProducers.put(brokerId1, syncProducer1) - syncProducers.put(brokerId2, syncProducer2) - - val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) - val producer = new Producer[String, String](config, partitioner, producerPool, false, null) - - producer.send(new ProducerData[String, String](topic, "test", Array("test1"))) - producer.close - - EasyMock.verify(syncProducer1) - EasyMock.verify(syncProducer2) - } - - @Test - def testSendSingleMessage() { - val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("broker.list", "0:localhost:9092") - - - val config = new ProducerConfig(props) - val partitioner = new StaticPartitioner - val serializer = new StringSerializer - - // 2 sync producers - val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() - val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - // it should send to a random partition due to use of broker.list - syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes()))) - EasyMock.expectLastCall - syncProducer1.close - EasyMock.expectLastCall - EasyMock.replay(syncProducer1) - - syncProducers.put(brokerId1, syncProducer1) - - val producerPool = new ProducerPool[String](config, serializer, syncProducers, - new ConcurrentHashMap[Int, AsyncProducer[String]]()) - val producer = new Producer[String, String](config, partitioner, producerPool, false, null) - - producer.send(new ProducerData[String, String](topic, "t")) - producer.close - - EasyMock.verify(syncProducer1) - } - - @Test - def testInvalidPartition() { - val props = new Properties() - props.put("partitioner.class", "kafka.producer.NegativePartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("zk.connect", TestZKUtils.zookeeperConnect) - val config = new ProducerConfig(props) - - val richProducer = new Producer[String, String](config) - try { - richProducer.send(new ProducerData[String, String](topic, "test", Array("test"))) - Assert.fail("Should fail with InvalidPartitionException") - }catch { - case e: InvalidPartitionException => // expected, do nothing - } - } - - @Test - def testDefaultEncoder() { - val props = new Properties() - props.put("zk.connect", TestZKUtils.zookeeperConnect) - val config = new ProducerConfig(props) - - val stringProducer1 = new Producer[String, String](config) - try { - stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test"))) - fail("Should fail with ClassCastException due to incompatible Encoder") - } catch { - case e: ClassCastException => - } - - props.put("serializer.class", "kafka.serializer.StringEncoder") - val stringProducer2 = new Producer[String, String](new ProducerConfig(props)) - stringProducer2.send(new ProducerData[String, String](topic, "test", Array("test"))) - - val messageProducer1 = new Producer[String, Message](config) - try { - messageProducer1.send(new ProducerData[String, Message](topic, "test", Array(new Message("test".getBytes)))) - } catch { - case e: ClassCastException => fail("Should not fail with ClassCastException due to default Encoder") - } - } - - @Test - def testSyncProducerPool() { - // 2 sync producers - val syncProducers = new ConcurrentHashMap[Int, SyncProducer]() - val syncProducer1 = EasyMock.createMock(classOf[SyncProducer]) - val syncProducer2 = EasyMock.createMock(classOf[SyncProducer]) - syncProducer1.send("test-topic", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes))) - EasyMock.expectLastCall - syncProducer1.close - EasyMock.expectLastCall - syncProducer2.close - EasyMock.expectLastCall - EasyMock.replay(syncProducer1) - EasyMock.replay(syncProducer2) - - syncProducers.put(brokerId1, syncProducer1) - syncProducers.put(brokerId2, syncProducer2) - - // default for producer.type is "sync" - val props = new Properties() - props.put("partitioner.class", "kafka.producer.NegativePartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, - syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) - producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) - - producerPool.close - EasyMock.verify(syncProducer1) - EasyMock.verify(syncProducer2) - } - - @Test - def testAsyncProducerPool() { - // 2 async producers - val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() - val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) - val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]]) - asyncProducer1.send(topic, "test1", 0) - EasyMock.expectLastCall - asyncProducer1.close - EasyMock.expectLastCall - asyncProducer2.close - EasyMock.expectLastCall - EasyMock.replay(asyncProducer1) - EasyMock.replay(asyncProducer2) - - asyncProducers.put(brokerId1, asyncProducer1) - asyncProducers.put(brokerId2, asyncProducer2) - - // change producer.type to "async" - val props = new Properties() - props.put("partitioner.class", "kafka.producer.NegativePartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("producer.type", "async") - val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, - new ConcurrentHashMap[Int, SyncProducer](), asyncProducers) - producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) - - producerPool.close - EasyMock.verify(asyncProducer1) - EasyMock.verify(asyncProducer2) - } - - @Test - def testSyncUnavailableProducerException() { - val syncProducers = new ConcurrentHashMap[Int, SyncProducer]() - val syncProducer1 = EasyMock.createMock(classOf[SyncProducer]) - val syncProducer2 = EasyMock.createMock(classOf[SyncProducer]) - syncProducer2.close - EasyMock.expectLastCall - EasyMock.replay(syncProducer1) - EasyMock.replay(syncProducer2) - - syncProducers.put(brokerId2, syncProducer2) - - // default for producer.type is "sync" - val props = new Properties() - props.put("partitioner.class", "kafka.producer.NegativePartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, - syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) - try { - producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) - Assert.fail("Should fail with UnavailableProducerException") - }catch { - case e: UnavailableProducerException => // expected - } - - producerPool.close - EasyMock.verify(syncProducer1) - EasyMock.verify(syncProducer2) - } - - @Test - def testAsyncUnavailableProducerException() { - val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() - val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) - val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]]) - asyncProducer2.close - EasyMock.expectLastCall - EasyMock.replay(asyncProducer1) - EasyMock.replay(asyncProducer2) - - asyncProducers.put(brokerId2, asyncProducer2) - - // change producer.type to "async" - val props = new Properties() - props.put("partitioner.class", "kafka.producer.NegativePartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("producer.type", "async") - val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, - new ConcurrentHashMap[Int, SyncProducer](), asyncProducers) - try { - producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) - Assert.fail("Should fail with UnavailableProducerException") - }catch { - case e: UnavailableProducerException => // expected - } - - producerPool.close - EasyMock.verify(asyncProducer1) - EasyMock.verify(asyncProducer2) - } - - @Test - def testConfigBrokerPartitionInfoWithPartitioner { - val props = new Properties() - props.put("partitioner.class", "kafka.producer.StaticPartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("producer.type", "async") - props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1 + ":" + 4 + "," + - brokerId2 + ":" + "localhost" + ":" + port2 + ":" + 4) - - var config: ProducerConfig = null - try { - config = new ProducerConfig(props) - fail("should fail with InvalidConfigException due to presence of partitioner.class and broker.list") - }catch { - case e: InvalidConfigException => // expected - } - } - - @Test - def testConfigBrokerPartitionInfo() { - val props = new Properties() - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("producer.type", "async") - props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1) - - val config = new ProducerConfig(props) - val partitioner = new StaticPartitioner - val serializer = new StringSerializer - - // 2 async producers - val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() - val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) - // it should send to a random partition due to use of broker.list - asyncProducer1.send(topic, "test1", -1) - EasyMock.expectLastCall - asyncProducer1.close - EasyMock.expectLastCall - EasyMock.replay(asyncProducer1) - - asyncProducers.put(brokerId1, asyncProducer1) - - val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers) - val producer = new Producer[String, String](config, partitioner, producerPool, false, null) - - producer.send(new ProducerData[String, String](topic, "test1", Array("test1"))) - producer.close - - EasyMock.verify(asyncProducer1) - } - - @Test def testZKSendToNewTopic() { val props = new Properties() props.put("serializer.class", "kafka.serializer.StringEncoder") @@ -399,7 +94,6 @@ props.put("zk.connect", TestZKUtils.zookeeperConnect) val config = new ProducerConfig(props) - val serializer = new StringEncoder val producer = new Producer[String, String](config) try { @@ -413,11 +107,11 @@ Thread.sleep(100) // cross check if brokers got the messages val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message) + assertTrue("Message set should have 1 message", messageSet1.hasNext) + assertEquals(new Message("test1".getBytes), messageSet1.next.message) val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet2.next.message) + assertTrue("Message set should have 1 message", messageSet2.hasNext) + assertEquals(new Message("test1".getBytes), messageSet2.next.message) } catch { case e: Exception => fail("Not expected", e) } @@ -432,7 +126,6 @@ props.put("zk.connect", TestZKUtils.zookeeperConnect) val config = new ProducerConfig(props) - val serializer = new StringEncoder val producer = new Producer[String, String](config) try { @@ -450,10 +143,10 @@ Thread.sleep(100) // cross check if brokers got the messages val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message) - Assert.assertTrue("Message set should have another message", messageSet1.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message) + assertTrue("Message set should have 1 message", messageSet1.hasNext) + assertEquals(new Message("test1".getBytes), messageSet1.next.message) + assertTrue("Message set should have another message", messageSet1.hasNext) + assertEquals(new Message("test1".getBytes), messageSet1.next.message) } catch { case e: Exception => fail("Not expected") } @@ -468,7 +161,6 @@ props.put("zk.connect", TestZKUtils.zookeeperConnect) val config = new ProducerConfig(props) - val serializer = new StringEncoder val producer = new Producer[String, String](config) var server: KafkaServer = null @@ -483,8 +175,8 @@ Thread.sleep(100) // cross check if brokers got the messages val messageSet1 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext) - Assert.assertEquals(new Message("test".getBytes), messageSet1.next.message) + assertTrue("Message set should have 1 message", messageSet1.hasNext) + assertEquals(new Message("test".getBytes), messageSet1.next.message) // shutdown server2 server2.shutdown @@ -506,8 +198,8 @@ // cross check if brokers got the messages val messageSet2 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext) - Assert.assertEquals(new Message("test".getBytes), messageSet2.next.message) + assertTrue("Message set should have 1 message", messageSet2.hasNext) + assertEquals(new Message("test".getBytes), messageSet2.next.message) } catch { case e: Exception => fail("Not expected", e) @@ -517,152 +209,6 @@ } } - @Test - def testPartitionedSendToNewTopic() { - val props = new Properties() - props.put("partitioner.class", "kafka.producer.StaticPartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("zk.connect", TestZKUtils.zookeeperConnect) - - val config = new ProducerConfig(props) - val partitioner = new StaticPartitioner - val serializer = new StringSerializer - - // 2 sync producers - val syncProducers = new ConcurrentHashMap[Int, SyncProducer]() - val syncProducer1 = EasyMock.createMock(classOf[SyncProducer]) - val syncProducer2 = EasyMock.createMock(classOf[SyncProducer]) - syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = new Message("test1".getBytes))) - EasyMock.expectLastCall - syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = new Message("test1".getBytes))) - EasyMock.expectLastCall - syncProducer1.close - EasyMock.expectLastCall - syncProducer2.close - EasyMock.expectLastCall - EasyMock.replay(syncProducer1) - EasyMock.replay(syncProducer2) - - syncProducers.put(brokerId1, syncProducer1) - syncProducers.put(brokerId2, syncProducer2) - - val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) - val producer = new Producer[String, String](config, partitioner, producerPool, false, null) - - producer.send(new ProducerData[String, String]("test-topic1", "test", Array("test1"))) - Thread.sleep(100) - - // now send again to this topic using a real producer, this time all brokers would have registered - // their partitions in zookeeper - producer1.send("test-topic1", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = new Message("test".getBytes()))) - Thread.sleep(100) - - // wait for zookeeper to register the new topic - producer.send(new ProducerData[String, String]("test-topic1", "test1", Array("test1"))) - Thread.sleep(100) - producer.close - - EasyMock.verify(syncProducer1) - EasyMock.verify(syncProducer2) - } - - @Test - def testPartitionedSendToNewBrokerInExistingTopic() { - val props = new Properties() - props.put("partitioner.class", "kafka.producer.StaticPartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("zk.connect", TestZKUtils.zookeeperConnect) - - val config = new ProducerConfig(props) - val partitioner = new StaticPartitioner - val serializer = new StringSerializer - - // 2 sync producers - val syncProducers = new ConcurrentHashMap[Int, SyncProducer]() - val syncProducer1 = EasyMock.createMock(classOf[SyncProducer]) - val syncProducer2 = EasyMock.createMock(classOf[SyncProducer]) - val syncProducer3 = EasyMock.createMock(classOf[SyncProducer]) - syncProducer3.send("test-topic", 2, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = new Message("test1".getBytes))) - EasyMock.expectLastCall - syncProducer1.close - EasyMock.expectLastCall - syncProducer2.close - EasyMock.expectLastCall - syncProducer3.close - EasyMock.expectLastCall - EasyMock.replay(syncProducer1) - EasyMock.replay(syncProducer2) - EasyMock.replay(syncProducer3) - - syncProducers.put(brokerId1, syncProducer1) - syncProducers.put(brokerId2, syncProducer2) - syncProducers.put(2, syncProducer3) - - val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) - val producer = new Producer[String, String](config, partitioner, producerPool, false, null) - - val port = TestUtils.choosePort - val serverProps = TestUtils.createBrokerConfig(2, port) - val serverConfig = new KafkaConfig(serverProps) { - override val numPartitions = 4 - } - - val server3 = TestUtils.createServer(serverConfig) - Thread.sleep(500) - // send a message to the new broker to register it under topic "test-topic" - val tempProps = new Properties() - tempProps.put("host", "localhost") - tempProps.put("port", port.toString) - val tempProducer = new SyncProducer(new SyncProducerConfig(tempProps)) - tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = new Message("test".getBytes()))) - - Thread.sleep(500) - producer.send(new ProducerData[String, String]("test-topic", "test-topic", Array("test1"))) - producer.close - - EasyMock.verify(syncProducer1) - EasyMock.verify(syncProducer2) - EasyMock.verify(syncProducer3) - - server3.shutdown - Utils.rm(server3.config.logDir) - } - - @Test - def testDefaultPartitioner() { - val props = new Properties() - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("producer.type", "async") - props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1) - val config = new ProducerConfig(props) - val partitioner = new DefaultPartitioner[String] - val serializer = new StringSerializer - - // 2 async producers - val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() - val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) - // it should send to a random partition due to use of broker.list - asyncProducer1.send(topic, "test1", -1) - EasyMock.expectLastCall - asyncProducer1.close - EasyMock.expectLastCall - EasyMock.replay(asyncProducer1) - - asyncProducers.put(brokerId1, asyncProducer1) - - val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers) - val producer = new Producer[String, String](config, partitioner, producerPool, false, null) - - producer.send(new ProducerData[String, String](topic, "test", Array("test1"))) - producer.close - - EasyMock.verify(asyncProducer1) - } } class StringSerializer extends Encoder[String] { Index: core/src/test/scala/unit/kafka/producer/ProducerMethodsTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/ProducerMethodsTest.scala (revision 1239972) +++ core/src/test/scala/unit/kafka/producer/ProducerMethodsTest.scala (working copy) @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package unit.kafka.producer - -import collection.immutable.SortedSet -import java.util._ -import junit.framework.Assert._ -import kafka.cluster.Partition -import kafka.common.NoBrokersForPartitionException -import kafka.producer._ -import org.easymock.EasyMock -import org.junit.Test -import org.scalatest.junit.JUnitSuite -import scala.collection.immutable.List - -class ProducerMethodsTest extends JUnitSuite { - - @Test - def producerThrowsNoBrokersException() = { - val props = new Properties - props.put("broker.list", "placeholder") // Need to fake out having specified one - val config = new ProducerConfig(props) - val mockPartitioner = EasyMock.createMock(classOf[Partitioner[String]]) - val mockProducerPool = EasyMock.createMock(classOf[ProducerPool[String]]) - val mockBrokerPartitionInfo = EasyMock.createMock(classOf[kafka.producer.BrokerPartitionInfo]) - - EasyMock.expect(mockBrokerPartitionInfo.getBrokerPartitionInfo("the_topic")).andReturn(SortedSet[Partition]()) - EasyMock.replay(mockBrokerPartitionInfo) - - val producer = new Producer[String, String](config,mockPartitioner, mockProducerPool,false, mockBrokerPartitionInfo) - - try { - val producerData = new ProducerData[String, String]("the_topic", "the_key", List("the_datum")) - producer.send(producerData) - fail("Should have thrown a NoBrokersForPartitionException.") - } catch { - case nb: NoBrokersForPartitionException => assertTrue(nb.getMessage.contains("the_key")) - } - - } -} Index: core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala =================================================================== --- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (revision 1239972) +++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (working copy) @@ -68,7 +68,7 @@ // also the iterator should support re-entrant, so loop it twice for (i <- 0 until 2) { try { - getMessages(nMessages*2, topicMessageStreams0) + getMessagesSortedByChecksum(nMessages*2, topicMessageStreams0) fail("should get an exception") } catch { @@ -86,7 +86,7 @@ TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2)) - val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) + val receivedMessages1 = getMessagesSortedByChecksum(nMessages*2, topicMessageStreams1) assertEquals(sentMessages1, receivedMessages1) // commit consumed offsets zkConsumerConnector1.commitOffsets @@ -99,8 +99,8 @@ // send some messages to each broker val sentMessages2 = sendMessages(nMessages, "batch2") Thread.sleep(200) - val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1) - val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2) + val receivedMessages2_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1) + val receivedMessages2_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2) val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum) assertEquals(sentMessages2, receivedMessages2) @@ -113,8 +113,8 @@ Thread.sleep(200) val sentMessages3 = sendMessages(nMessages, "batch3") Thread.sleep(200) - val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1) - val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2) + val receivedMessages3_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1) + val receivedMessages3_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2) val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum) assertEquals(sentMessages3, receivedMessages3) @@ -137,7 +137,7 @@ TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2)) - val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) + val receivedMessages1 = getMessagesSortedByChecksum(nMessages*2, topicMessageStreams1) assertEquals(sentMessages1, receivedMessages1) // commit consumed offsets zkConsumerConnector1.commitOffsets @@ -151,8 +151,8 @@ // send some messages to each broker val sentMessages2 = sendMessages(nMessages, "batch2", DefaultCompressionCodec) Thread.sleep(200) - val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1) - val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2) + val receivedMessages2_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1) + val receivedMessages2_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2) val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum) assertEquals(sentMessages2, receivedMessages2) @@ -166,8 +166,8 @@ Thread.sleep(200) val sentMessages3 = sendMessages(nMessages, "batch3", DefaultCompressionCodec) Thread.sleep(200) - val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1) - val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2) + val receivedMessages3_1 = getMessagesSortedByChecksum(nMessages, topicMessageStreams1) + val receivedMessages3_2 = getMessagesSortedByChecksum(nMessages, topicMessageStreams2) val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum) assertEquals(sentMessages3, receivedMessages3) @@ -197,13 +197,13 @@ } val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true) val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> 1)) - getMessages(100, topicMessageStreams0) + getMessagesSortedByChecksum(100, topicMessageStreams0) zkConsumerConnector0.shutdown // at this point, only some part of the message set was consumed. So consumed offset should still be 0 // also fetched offset should be 0 val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig0, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1)) - val receivedMessages = getMessages(400, topicMessageStreams1) + val receivedMessages = getMessagesSortedByChecksum(400, topicMessageStreams1) val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum) val sortedSentMessages = sentMessages.sortWith((s,t) => s.checksum < t.checksum) assertEquals(sortedSentMessages, sortedReceivedMessages) @@ -270,19 +270,8 @@ messages.sortWith((s,t) => s.checksum < t.checksum) } - def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[Message]]]): List[Message]= { - var messages: List[Message] = Nil - for ((topic, messageStreams) <- topicMessageStreams) { - for (messageStream <- messageStreams) { - val iterator = messageStream.iterator - for (i <- 0 until nMessagesPerThread) { - assertTrue(iterator.hasNext) - val message = iterator.next - messages ::= message - debug("received message: " + Utils.toString(message.payload, "UTF-8")) - } - } - } + def getMessagesSortedByChecksum(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[Message]]]): List[Message]= { + val messages = TestUtils.getConsumedMessages(nMessagesPerThread, topicMessageStreams) messages.sortWith((s,t) => s.checksum < t.checksum) } } Index: core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala (revision 1239972) +++ core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala (working copy) @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.javaapi.producer - -import junit.framework.{Assert, TestCase} -import kafka.utils.SystemTime -import kafka.utils.TestUtils -import kafka.server.{KafkaServer, KafkaConfig} -import org.apache.log4j.{Logger, Level} -import org.scalatest.junit.JUnitSuite -import org.junit.{After, Before, Test} -import java.util.Properties -import kafka.producer.SyncProducerConfig -import kafka.javaapi.message.ByteBufferMessageSet -import kafka.javaapi.ProducerRequest -import kafka.message.{NoCompressionCodec, Message} - -class SyncProducerTest extends JUnitSuite { - private var messageBytes = new Array[Byte](2); - private var server: KafkaServer = null - val simpleProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer]) - - @Before - def setUp() { - server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0, 9092)) - { - override val enableZookeeper = false - }) - } - - @After - def tearDown() { - server.shutdown - } - - @Test - def testReachableServer() { - val props = new Properties() - props.put("host", "localhost") - props.put("port", "9092") - props.put("buffer.size", "102400") - props.put("connect.timeout.ms", "500") - props.put("reconnect.interval", "1000") - val producer = new SyncProducer(new SyncProducerConfig(props)) - var failed = false - val firstStart = SystemTime.milliseconds - try { - producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = getMessageList(new Message(messageBytes)))) - }catch { - case e: Exception => failed=true - } - Assert.assertFalse(failed) - failed = false - val firstEnd = SystemTime.milliseconds - Assert.assertTrue((firstEnd-firstStart) < 500) - val secondStart = SystemTime.milliseconds - try { - producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = getMessageList(new Message(messageBytes)))) - }catch { - case e: Exception => failed = true - } - Assert.assertFalse(failed) - val secondEnd = SystemTime.milliseconds - Assert.assertTrue((secondEnd-secondEnd) < 500) - - try { - producer.multiSend(Array(new ProducerRequest("test", 0, - new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = getMessageList(new Message(messageBytes)))))) - }catch { - case e: Exception => failed=true - } - Assert.assertFalse(failed) - } - - private def getMessageList(message: Message): java.util.List[Message] = { - val messageList = new java.util.ArrayList[Message]() - messageList.add(message) - messageList - } -} Index: core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (revision 1239972) +++ core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (working copy) @@ -1,630 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.javaapi.producer - -import java.util.Properties -import org.apache.log4j.{Logger, Level} -import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig} -import kafka.zk.EmbeddedZookeeper -import kafka.utils.{TestZKUtils, TestUtils} -import org.junit.{After, Before, Test} -import junit.framework.Assert -import collection.mutable.HashMap -import org.easymock.EasyMock -import kafka.utils.Utils -import java.util.concurrent.ConcurrentHashMap -import kafka.cluster.Partition -import kafka.common.{UnavailableProducerException, InvalidPartitionException, InvalidConfigException} -import org.scalatest.junit.JUnitSuite -import kafka.producer.{SyncProducerConfig, Partitioner, ProducerConfig, DefaultPartitioner} -import kafka.producer.ProducerPool -import kafka.javaapi.message.ByteBufferMessageSet -import kafka.producer.async.{AsyncProducer, AsyncProducerConfig} -import kafka.javaapi.Implicits._ -import kafka.serializer.{StringEncoder, Encoder} -import kafka.javaapi.consumer.SimpleConsumer -import kafka.api.FetchRequest -import kafka.message.{NoCompressionCodec, Message} - -class ProducerTest extends JUnitSuite { - private val topic = "test-topic" - private val brokerId1 = 0 - private val brokerId2 = 1 - private val port1 = 9092 - private val port2 = 9093 - private var server1: KafkaServer = null - private var server2: KafkaServer = null - private var producer1: SyncProducer = null - private var producer2: SyncProducer = null - private var consumer1: SimpleConsumer = null - private var consumer2: SimpleConsumer = null - private var zkServer:EmbeddedZookeeper = null - private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers]) - - @Before - def setUp() { - // set up 2 brokers with 4 partitions each - zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect) - - val props1 = TestUtils.createBrokerConfig(brokerId1, port1) - val config1 = new KafkaConfig(props1) { - override val numPartitions = 4 - } - server1 = TestUtils.createServer(config1) - - val props2 = TestUtils.createBrokerConfig(brokerId2, port2) - val config2 = new KafkaConfig(props2) { - override val numPartitions = 4 - } - server2 = TestUtils.createServer(config2) - - val props = new Properties() - props.put("host", "localhost") - props.put("port", port1.toString) - - producer1 = new SyncProducer(new SyncProducerConfig(props)) - val messages1 = new java.util.ArrayList[Message] - messages1.add(new Message("test".getBytes())) - producer1.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messages1)) - - producer2 = new SyncProducer(new SyncProducerConfig(props) { - override val port = port2 - }) - val messages2 = new java.util.ArrayList[Message] - messages2.add(new Message("test".getBytes())) - - producer2.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messages2)) - - consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024) - consumer2 = new SimpleConsumer("localhost", port2, 1000000, 64*1024) - - // temporarily set request handler logger to a higher level - requestHandlerLogger.setLevel(Level.FATAL) - - Thread.sleep(500) - } - - @After - def tearDown() { - // restore set request handler logger to a higher level - requestHandlerLogger.setLevel(Level.ERROR) - server1.shutdown - server2.shutdown - Utils.rm(server1.config.logDir) - Utils.rm(server2.config.logDir) - Thread.sleep(500) - zkServer.shutdown - } - - @Test - def testSend() { - val props = new Properties() - props.put("partitioner.class", "kafka.producer.StaticPartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("zk.connect", TestZKUtils.zookeeperConnect) - - - val config = new ProducerConfig(props) - val partitioner = new StaticPartitioner - val serializer = new StringSerializer - - // 2 sync producers - val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() - val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - val syncProducer2 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - // it should send to partition 0 (first partition) on second broker i.e broker2 - val messageList = new java.util.ArrayList[Message] - messageList.add(new Message("test1".getBytes())) - syncProducer2.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) - EasyMock.expectLastCall - syncProducer1.close - EasyMock.expectLastCall - syncProducer2.close - EasyMock.expectLastCall - EasyMock.replay(syncProducer1) - EasyMock.replay(syncProducer2) - - syncProducers.put(brokerId1, syncProducer1) - syncProducers.put(brokerId2, syncProducer2) - - val producerPool = new ProducerPool[String](config, serializer, syncProducers, - new ConcurrentHashMap[Int, AsyncProducer[String]]()) - val producer = new Producer[String, String](config, partitioner, producerPool, false) - - val messagesContent = new java.util.ArrayList[String] - messagesContent.add("test1") - producer.send(new ProducerData[String, String](topic, "test", messagesContent)) - producer.close - - EasyMock.verify(syncProducer1) - EasyMock.verify(syncProducer2) - } - - @Test - def testSendSingleMessage() { - val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("broker.list", "0:localhost:9092") - - - val config = new ProducerConfig(props) - val partitioner = new StaticPartitioner - val serializer = new StringSerializer - - // 2 sync producers - val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() - val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - // it should send to a random partition due to use of broker.list - val messageList = new java.util.ArrayList[Message] - messageList.add(new Message("t".getBytes())) - syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) - EasyMock.expectLastCall - syncProducer1.close - EasyMock.expectLastCall - EasyMock.replay(syncProducer1) - - syncProducers.put(brokerId1, syncProducer1) - - val producerPool = new ProducerPool[String](config, serializer, syncProducers, - new ConcurrentHashMap[Int, AsyncProducer[String]]()) - val producer = new Producer[String, String](config, partitioner, producerPool, false) - - producer.send(new ProducerData[String, String](topic, "t")) - producer.close - - EasyMock.verify(syncProducer1) - } - - @Test - def testInvalidPartition() { - val props = new Properties() - props.put("partitioner.class", "kafka.producer.NegativePartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("zk.connect", TestZKUtils.zookeeperConnect) - val config = new ProducerConfig(props) - - val richProducer = new Producer[String, String](config) - val messagesContent = new java.util.ArrayList[String] - messagesContent.add("test") - try { - richProducer.send(new ProducerData[String, String](topic, "test", messagesContent)) - Assert.fail("Should fail with InvalidPartitionException") - }catch { - case e: InvalidPartitionException => // expected, do nothing - } - } - - @Test - def testSyncProducerPool() { - // 2 sync producers - val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() - val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - val syncProducer2 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - val messageList = new java.util.ArrayList[Message] - messageList.add(new Message("test1".getBytes())) - syncProducer1.send("test-topic", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) - EasyMock.expectLastCall - syncProducer1.close - EasyMock.expectLastCall - syncProducer2.close - EasyMock.expectLastCall - EasyMock.replay(syncProducer1) - EasyMock.replay(syncProducer2) - - syncProducers.put(brokerId1, syncProducer1) - syncProducers.put(brokerId2, syncProducer2) - - // default for producer.type is "sync" - val props = new Properties() - props.put("partitioner.class", "kafka.producer.NegativePartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, - syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) - producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) - - producerPool.close - EasyMock.verify(syncProducer1) - EasyMock.verify(syncProducer2) - } - - @Test - def testAsyncProducerPool() { - // 2 async producers - val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() - val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) - val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]]) - asyncProducer1.send(topic, "test1", 0) - EasyMock.expectLastCall - asyncProducer1.close - EasyMock.expectLastCall - asyncProducer2.close - EasyMock.expectLastCall - EasyMock.replay(asyncProducer1) - EasyMock.replay(asyncProducer2) - - asyncProducers.put(brokerId1, asyncProducer1) - asyncProducers.put(brokerId2, asyncProducer2) - - // change producer.type to "async" - val props = new Properties() - props.put("partitioner.class", "kafka.producer.NegativePartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("producer.type", "async") - val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, - new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers) - producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) - - producerPool.close - EasyMock.verify(asyncProducer1) - EasyMock.verify(asyncProducer2) - } - - @Test - def testSyncUnavailableProducerException() { - val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() - val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - val syncProducer2 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - syncProducer2.close - EasyMock.expectLastCall - EasyMock.replay(syncProducer1) - EasyMock.replay(syncProducer2) - - syncProducers.put(brokerId2, syncProducer2) - - // default for producer.type is "sync" - val props = new Properties() - props.put("partitioner.class", "kafka.producer.NegativePartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, - syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) - try { - producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) - Assert.fail("Should fail with UnavailableProducerException") - }catch { - case e: UnavailableProducerException => // expected - } - - producerPool.close - EasyMock.verify(syncProducer1) - EasyMock.verify(syncProducer2) - } - - @Test - def testAsyncUnavailableProducerException() { - val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() - val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) - val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]]) - asyncProducer2.close - EasyMock.expectLastCall - EasyMock.replay(asyncProducer1) - EasyMock.replay(asyncProducer2) - - asyncProducers.put(brokerId2, asyncProducer2) - - // change producer.type to "async" - val props = new Properties() - props.put("partitioner.class", "kafka.producer.NegativePartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("producer.type", "async") - val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, - new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers) - try { - producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) - Assert.fail("Should fail with UnavailableProducerException") - }catch { - case e: UnavailableProducerException => // expected - } - - producerPool.close - EasyMock.verify(asyncProducer1) - EasyMock.verify(asyncProducer2) - } - - @Test - def testConfigBrokerPartitionInfoWithPartitioner { - val props = new Properties() - props.put("partitioner.class", "kafka.producer.StaticPartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("producer.type", "async") - props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1 + ":" + 4 + "," + - brokerId2 + ":" + "localhost" + ":" + port2 + ":" + 4) - - var config: ProducerConfig = null - try { - config = new ProducerConfig(props) - fail("should fail with InvalidConfigException due to presence of partitioner.class and broker.list") - }catch { - case e: InvalidConfigException => // expected - } - } - - @Test - def testConfigBrokerPartitionInfo() { - val props = new Properties() - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("producer.type", "async") - props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1) - - val config = new ProducerConfig(props) - val partitioner = new StaticPartitioner - val serializer = new StringSerializer - - // 2 async producers - val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() - val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) - // it should send to a random partition due to use of broker.list - asyncProducer1.send(topic, "test1", -1) - EasyMock.expectLastCall - asyncProducer1.close - EasyMock.expectLastCall - EasyMock.replay(asyncProducer1) - - asyncProducers.put(brokerId1, asyncProducer1) - - val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), - asyncProducers) - val producer = new Producer[String, String](config, partitioner, producerPool, false) - - val messagesContent = new java.util.ArrayList[String] - messagesContent.add("test1") - producer.send(new ProducerData[String, String](topic, "test1", messagesContent)) - producer.close - - EasyMock.verify(asyncProducer1) - } - - @Test - def testZKSendToNewTopic() { - val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("partitioner.class", "kafka.producer.StaticPartitioner") - props.put("zk.connect", TestZKUtils.zookeeperConnect) - - val config = new ProducerConfig(props) - val serializer = new StringEncoder - - val producer = new Producer[String, String](config) - try { - import scala.collection.JavaConversions._ - producer.send(new ProducerData[String, String]("new-topic", "test", asList(Array("test1")))) - Thread.sleep(100) - producer.send(new ProducerData[String, String]("new-topic", "test", asList(Array("test1")))) - Thread.sleep(100) - // cross check if brokers got the messages - val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message) - val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet2.next.message) - } catch { - case e: Exception => fail("Not expected") - } - producer.close - } - - @Test - def testZKSendWithDeadBroker() { - val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("partitioner.class", "kafka.producer.StaticPartitioner") - props.put("zk.connect", TestZKUtils.zookeeperConnect) - - val config = new ProducerConfig(props) - val serializer = new StringEncoder - - val producer = new Producer[String, String](config) - try { - import scala.collection.JavaConversions._ - producer.send(new ProducerData[String, String]("new-topic", "test", asList(Array("test1")))) - Thread.sleep(100) - // kill 2nd broker - server2.shutdown - Thread.sleep(100) - producer.send(new ProducerData[String, String]("new-topic", "test", asList(Array("test1")))) - Thread.sleep(100) - // cross check if brokers got the messages - val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message) - Assert.assertTrue("Message set should have another message", messageSet1.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message) - } catch { - case e: Exception => fail("Not expected") - } - producer.close - } - - @Test - def testPartitionedSendToNewTopic() { - val props = new Properties() - props.put("partitioner.class", "kafka.producer.StaticPartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("zk.connect", TestZKUtils.zookeeperConnect) - - val config = new ProducerConfig(props) - val partitioner = new StaticPartitioner - val serializer = new StringEncoder - - // 2 sync producers - val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() - val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - val syncProducer2 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - import scala.collection.JavaConversions._ - syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = asList(Array(new Message("test1".getBytes))))) - EasyMock.expectLastCall - syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = asList(Array(new Message("test1".getBytes))))) - EasyMock.expectLastCall - syncProducer1.close - EasyMock.expectLastCall - syncProducer2.close - EasyMock.expectLastCall - EasyMock.replay(syncProducer1) - EasyMock.replay(syncProducer2) - - syncProducers.put(brokerId1, syncProducer1) - syncProducers.put(brokerId2, syncProducer2) - - val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) - val producer = new Producer[String, String](config, partitioner, producerPool, false) - - producer.send(new ProducerData[String, String]("test-topic1", "test", asList(Array("test1")))) - Thread.sleep(100) - - // now send again to this topic using a real producer, this time all brokers would have registered - // their partitions in zookeeper - producer1.send("test-topic1", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = asList(Array(new Message("test".getBytes()))))) - Thread.sleep(100) - - // wait for zookeeper to register the new topic - producer.send(new ProducerData[String, String]("test-topic1", "test1", asList(Array("test1")))) - Thread.sleep(100) - producer.close - - EasyMock.verify(syncProducer1) - EasyMock.verify(syncProducer2) - } - - @Test - def testPartitionedSendToNewBrokerInExistingTopic() { - val props = new Properties() - props.put("partitioner.class", "kafka.producer.StaticPartitioner") - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("zk.connect", TestZKUtils.zookeeperConnect) - - val config = new ProducerConfig(props) - val partitioner = new StaticPartitioner - val serializer = new StringSerializer - - // 2 sync producers - val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() - val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - val syncProducer2 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - val syncProducer3 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - val messages1 = new java.util.ArrayList[Message] - messages1.add(new Message("test1".getBytes())) - syncProducer3.send("test-topic", 2, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messages1)) - EasyMock.expectLastCall - syncProducer1.close - EasyMock.expectLastCall - syncProducer2.close - EasyMock.expectLastCall - syncProducer3.close - EasyMock.expectLastCall - EasyMock.replay(syncProducer1) - EasyMock.replay(syncProducer2) - EasyMock.replay(syncProducer3) - - syncProducers.put(brokerId1, syncProducer1) - syncProducers.put(brokerId2, syncProducer2) - syncProducers.put(2, syncProducer3) - - val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) - val producer = new Producer[String, String](config, partitioner, producerPool, false) - - val serverProps = TestUtils.createBrokerConfig(2, 9094) - val serverConfig = new KafkaConfig(serverProps) { - override val numPartitions = 4 - } - val server3 = TestUtils.createServer(serverConfig) - - // send a message to the new broker to register it under topic "test-topic" - val tempProps = new Properties() - tempProps.put("host", "localhost") - tempProps.put("port", "9094") - val tempProducer = new kafka.producer.SyncProducer(new SyncProducerConfig(tempProps)) - val messageList = new java.util.ArrayList[Message] - messageList.add(new Message("test".getBytes())) - tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) - Thread.sleep(500) - - val messagesContent = new java.util.ArrayList[String] - messagesContent.add("test1") - producer.send(new ProducerData[String, String]("test-topic", "test-topic", messagesContent)) - producer.close - - EasyMock.verify(syncProducer1) - EasyMock.verify(syncProducer2) - EasyMock.verify(syncProducer3) - - server3.shutdown - Utils.rm(server3.config.logDir) - } - - @Test - def testDefaultPartitioner() { - val props = new Properties() - props.put("serializer.class", "kafka.producer.StringSerializer") - props.put("producer.type", "async") - props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1) - val config = new ProducerConfig(props) - val partitioner = new DefaultPartitioner[String] - val serializer = new StringSerializer - - // 2 async producers - val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() - val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) - val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]]) - // it should send to a random partition due to use of broker.list - asyncProducer1.send(topic, "test1", -1) - EasyMock.expectLastCall - asyncProducer1.close - EasyMock.expectLastCall - EasyMock.replay(asyncProducer1) - - asyncProducers.put(brokerId1, asyncProducer1) - - val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), - asyncProducers) - val producer = new Producer[String, String](config, partitioner, producerPool, false) - - val messagesContent = new java.util.ArrayList[String] - messagesContent.add("test1") - producer.send(new ProducerData[String, String](topic, "test", messagesContent)) - producer.close - - EasyMock.verify(asyncProducer1) - } -} - -class StringSerializer extends Encoder[String] { - def toEvent(message: Message):String = message.toString - def toMessage(event: String):Message = new Message(event.getBytes) - def getTopic(event: String): String = event.concat("-topic") -} - -class NegativePartitioner extends Partitioner[String] { - def partition(data: String, numPartitions: Int): Int = { - -1 - } -} - -class StaticPartitioner extends Partitioner[String] { - def partition(data: String, numPartitions: Int): Int = { - (data.length % numPartitions) - } -} - -class HashPartitioner extends Partitioner[String] { - def partition(data: String, numPartitions: Int): Int = { - (data.hashCode % numPartitions) - } -} Index: core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala =================================================================== --- core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (revision 1239972) +++ core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (working copy) @@ -70,7 +70,7 @@ def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec): List[Message]= { var messages: List[Message] = Nil - val producer = kafka.javaapi.Implicits.toJavaSyncProducer(TestUtils.createProducer("localhost", conf.port)) + val producer = new kafka.javaapi.producer.SyncProducer(TestUtils.createProducer("localhost", conf.port)) for (partition <- 0 until numParts) { val ms = 0.until(messagesPerNode).map(x => new Message((header + conf.brokerId + "-" + partition + "-" + x).getBytes)).toArray Index: core/src/main/scala/kafka/tools/ReplayLogProducer.scala =================================================================== --- core/src/main/scala/kafka/tools/ReplayLogProducer.scala (revision 1239972) +++ core/src/main/scala/kafka/tools/ReplayLogProducer.scala (working copy) @@ -20,9 +20,7 @@ import joptsimple.OptionParser import java.util.concurrent.{Executors, CountDownLatch} import java.util.Properties -import kafka.producer.async.DefaultEventHandler -import kafka.serializer.DefaultEncoder -import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer} +import kafka.producer.{ProducerData, ProducerConfig, Producer} import kafka.consumer._ import kafka.utils.{ZKStringSerializer, Logging} import kafka.api.OffsetRequest @@ -171,9 +169,7 @@ props.put("producer.type", "async") val producerConfig = new ProducerConfig(props) - val producer = new Producer[Message, Message](producerConfig, new DefaultEncoder, - new DefaultEventHandler[Message](producerConfig, null), - null, new DefaultPartitioner[Message]) + val producer = new Producer[Message, Message](producerConfig) override def run() { info("Starting consumer thread..") Index: core/src/main/scala/kafka/utils/Utils.scala =================================================================== --- core/src/main/scala/kafka/utils/Utils.scala (revision 1239972) +++ core/src/main/scala/kafka/utils/Utils.scala (working copy) @@ -24,17 +24,22 @@ import java.lang.management._ import java.util.zip.CRC32 import javax.management._ -import java.util.Properties import scala.collection._ import scala.collection.mutable import kafka.message.{NoCompressionCodec, CompressionCodec} import org.I0Itec.zkclient.ZkClient +import java.util.{Random, Properties} /** * Helper functions! */ object Utils extends Logging { + val random = new Random + + def getNextRandomInt(): Int = random.nextInt + def getNextRandomInt(upper: Int): Int = random.nextInt(upper) + /** * Wrap the given function in a java.lang.Runnable * @param fun A function Index: core/src/main/scala/kafka/producer/ProducerConfig.scala =================================================================== --- core/src/main/scala/kafka/producer/ProducerConfig.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/ProducerConfig.scala (working copy) @@ -37,7 +37,7 @@ throw new InvalidConfigException("only one of broker.list and zk.connect can be specified") /** the partitioner class for partitioning events amongst sub-topics */ - val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner") + val partitionerClass = Utils.getString(props, "partitioner.class", null) /** this parameter specifies whether the messages are sent asynchronously * * or not. Valid values are - async for asynchronous send * @@ -71,5 +71,5 @@ * ZK cache needs to be updated. * This parameter specifies the number of times the producer attempts to refresh this ZK cache. */ - val zkReadRetries = Utils.getInt(props, "zk.read.num.retries", 3) + val producerRetries = Utils.getInt(props, "producer.num.retries", 3) } Index: core/src/main/scala/kafka/producer/ProducerData.scala =================================================================== --- core/src/main/scala/kafka/producer/ProducerData.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/ProducerData.scala (working copy) @@ -23,14 +23,16 @@ * @param key the key used by the partitioner to pick a broker partition * @param data variable length data to be published as Kafka messages under topic */ -class ProducerData[K, V](private val topic: String, - private val key: K, - private val data: Seq[V]) { +case class ProducerData[K,V](topic: String, + key: K, + data: Seq[V]) { def this(t: String, d: Seq[V]) = this(topic = t, key = null.asInstanceOf[K], data = d) def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = List(d)) + def this(t: String, k: K, d: V) = this(topic = t, key = k, data = List(d)) + def getTopic: String = topic def getKey: K = key Index: core/src/main/scala/kafka/producer/ProducerPool.scala =================================================================== --- core/src/main/scala/kafka/producer/ProducerPool.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/ProducerPool.scala (working copy) @@ -17,163 +17,42 @@ package kafka.producer -import async._ import java.util.Properties -import kafka.serializer.Encoder -import java.util.concurrent.{ConcurrentMap, ConcurrentHashMap} -import kafka.cluster.{Partition, Broker} -import kafka.api.ProducerRequest -import kafka.common.{UnavailableProducerException, InvalidConfigException} -import kafka.utils.{Utils, Logging} -import kafka.message.{NoCompressionCodec, ByteBufferMessageSet} +import kafka.cluster.Broker +import kafka.utils.Logging +import java.util.concurrent.ConcurrentHashMap -class ProducerPool[V](private val config: ProducerConfig, - private val serializer: Encoder[V], - private val syncProducers: ConcurrentMap[Int, SyncProducer], - private val asyncProducers: ConcurrentMap[Int, AsyncProducer[V]], - private val inputEventHandler: EventHandler[V] = null, - private val cbkHandler: CallbackHandler[V] = null) extends Logging { +class ProducerPool(private val config: ProducerConfig) extends Logging { + private val syncProducers = new ConcurrentHashMap[Int, SyncProducer] - private var eventHandler = inputEventHandler - if(eventHandler == null) - eventHandler = new DefaultEventHandler(config, cbkHandler) - - if(serializer == null) - throw new InvalidConfigException("serializer passed in is null!") - - private var sync: Boolean = true - config.producerType match { - case "sync" => - case "async" => sync = false - case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async") - } - - def this(config: ProducerConfig, serializer: Encoder[V], - eventHandler: EventHandler[V], cbkHandler: CallbackHandler[V]) = - this(config, serializer, - new ConcurrentHashMap[Int, SyncProducer](), - new ConcurrentHashMap[Int, AsyncProducer[V]](), - eventHandler, cbkHandler) - - def this(config: ProducerConfig, serializer: Encoder[V]) = this(config, serializer, - new ConcurrentHashMap[Int, SyncProducer](), - new ConcurrentHashMap[Int, AsyncProducer[V]](), - Utils.getObject(config.eventHandler), - Utils.getObject(config.cbkHandler)) - /** - * add a new producer, either synchronous or asynchronous, connecting - * to the specified broker - * @param bid the id of the broker - * @param host the hostname of the broker - * @param port the port of the broker - */ def addProducer(broker: Broker) { val props = new Properties() props.put("host", broker.host) props.put("port", broker.port.toString) props.putAll(config.props) - if(sync) { - val producer = new SyncProducer(new SyncProducerConfig(props)) - info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port) - syncProducers.put(broker.id, producer) - } else { - val producer = new AsyncProducer[V](new AsyncProducerConfig(props), - new SyncProducer(new SyncProducerConfig(props)), - serializer, - eventHandler, config.eventHandlerProps, - cbkHandler, config.cbkHandlerProps) - producer.start - info("Creating async producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port) - asyncProducers.put(broker.id, producer) - } + val producer = new SyncProducer(new SyncProducerConfig(props)) + info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port) + syncProducers.put(broker.id, producer) } /** - * selects either a synchronous or an asynchronous producer, for - * the specified broker id and calls the send API on the selected - * producer to publish the data to the specified broker partition - * @param poolData the producer pool request object + * For testing purpose */ - def send(poolData: ProducerPoolData[V]*) { - val distinctBrokers = poolData.map(pd => pd.getBidPid.brokerId).distinct - var remainingRequests = poolData.toSeq - distinctBrokers.foreach { bid => - val requestsForThisBid = remainingRequests partition (_.getBidPid.brokerId == bid) - remainingRequests = requestsForThisBid._2 + def addProducer(brokerId: Int, syncProducer: SyncProducer) { + syncProducers.put(brokerId, syncProducer) + } - if(sync) { - val producerRequests = requestsForThisBid._1.map(req => new ProducerRequest(req.getTopic, req.getBidPid.partId, - new ByteBufferMessageSet(compressionCodec = config.compressionCodec, - messages = req.getData.map(d => serializer.toMessage(d)): _*))) - debug("Fetching sync producer for broker id: " + bid) - val producer = syncProducers.get(bid) - if(producer != null) { - if(producerRequests.size > 1) - producer.multiSend(producerRequests.toArray) - else - producer.send(topic = producerRequests(0).topic, - partition = producerRequests(0).partition, - messages = producerRequests(0).messages) - config.compressionCodec match { - case NoCompressionCodec => debug("Sending message to broker " + bid) - case _ => debug("Sending compressed messages to broker " + bid) - } - }else - throw new UnavailableProducerException("Producer pool has not been initialized correctly. " + - "Sync Producer for broker " + bid + " does not exist in the pool") - }else { - debug("Fetching async producer for broker id: " + bid) - val producer = asyncProducers.get(bid) - if(producer != null) { - requestsForThisBid._1.foreach { req => - req.getData.foreach(d => producer.send(req.getTopic, d, req.getBidPid.partId)) - } - if(logger.isDebugEnabled) - config.compressionCodec match { - case NoCompressionCodec => debug("Sending message") - case _ => debug("Sending compressed messages") - } - } - else - throw new UnavailableProducerException("Producer pool has not been initialized correctly. " + - "Async Producer for broker " + bid + " does not exist in the pool") - } - } + def getProducer(brokerId: Int) : SyncProducer = { + syncProducers.get(brokerId) } /** * Closes all the producers in the pool */ def close() = { - config.producerType match { - case "sync" => - info("Closing all sync producers") - val iter = syncProducers.values.iterator - while(iter.hasNext) - iter.next.close - case "async" => - info("Closing all async producers") - val iter = asyncProducers.values.iterator - while(iter.hasNext) - iter.next.close - } + info("Closing all sync producers") + val iter = syncProducers.values.iterator + while(iter.hasNext) + iter.next.close } - - /** - * This constructs and returns the request object for the producer pool - * @param topic the topic to which the data should be published - * @param bidPid the broker id and partition id - * @param data the data to be published - */ - def getProducerPoolData(topic: String, bidPid: Partition, data: Seq[V]): ProducerPoolData[V] = { - new ProducerPoolData[V](topic, bidPid, data) - } - - class ProducerPoolData[V](topic: String, - bidPid: Partition, - data: Seq[V]) { - def getTopic: String = topic - def getBidPid: Partition = bidPid - def getData: Seq[V] = data - } } Index: core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala =================================================================== --- core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala (working copy) @@ -17,10 +17,11 @@ package kafka.producer import collection.mutable.HashMap -import collection.mutable.Map +import collection.Map import collection.SortedSet import kafka.cluster.{Broker, Partition} import kafka.common.InvalidConfigException +import kafka.api.ProducerRequest private[producer] class ConfigBrokerPartitionInfo(config: ProducerConfig) extends BrokerPartitionInfo { private val brokerPartitions: SortedSet[Partition] = getConfigTopicPartitionInfo @@ -66,13 +67,11 @@ val brokerInfo = bInfo.split(":") if(brokerInfo.size < 3) throw new InvalidConfigException("broker.list has invalid value") } - val brokerPartitions = brokerInfoList.map(bInfo => (bInfo.split(":").head.toInt, 1)) + val brokerIds = brokerInfoList.map(bInfo => bInfo.split(":").head.toInt) var brokerParts = SortedSet.empty[Partition] - brokerPartitions.foreach { bp => - for(i <- 0 until bp._2) { - val bidPid = new Partition(bp._1, i) - brokerParts = brokerParts + bidPid - } + brokerIds.foreach { bid => + val bidPid = new Partition(bid, ProducerRequest.RandomPartition) + brokerParts += bidPid } brokerParts } Index: core/src/main/scala/kafka/producer/Producer.scala =================================================================== --- core/src/main/scala/kafka/producer/Producer.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/Producer.scala (working copy) @@ -16,82 +16,51 @@ */ package kafka.producer -import async.{CallbackHandler, EventHandler} -import kafka.serializer.Encoder +import async._ import kafka.utils._ -import java.util.Properties -import kafka.cluster.{Partition, Broker} -import java.util.concurrent.atomic.AtomicBoolean -import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException} -import kafka.api.ProducerRequest +import kafka.common.InvalidConfigException +import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} +import kafka.serializer.Encoder +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean} class Producer[K,V](config: ProducerConfig, - partitioner: Partitioner[K], - producerPool: ProducerPool[V], - populateProducerPool: Boolean, - private var brokerPartitionInfo: BrokerPartitionInfo) /* for testing purpose only. Applications should ideally */ - /* use the other constructor*/ + private val eventHandler: EventHandler[K,V]) // for testing only extends Logging { private val hasShutdown = new AtomicBoolean(false) if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerList)) throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified") if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerList)) - warn("Both zk.connect and broker.list provided (zk.connect takes precedence).") - private val random = new java.util.Random - // check if zookeeper based auto partition discovery is enabled - private val zkEnabled = Utils.propertyExists(config.zkConnect) - if(brokerPartitionInfo == null) { - zkEnabled match { - case true => - val zkProps = new Properties() - zkProps.put("zk.connect", config.zkConnect) - zkProps.put("zk.sessiontimeout.ms", config.zkSessionTimeoutMs.toString) - zkProps.put("zk.connectiontimeout.ms", config.zkConnectionTimeoutMs.toString) - zkProps.put("zk.synctime.ms", config.zkSyncTimeMs.toString) - brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(zkProps), producerCbk) - case false => - brokerPartitionInfo = new ConfigBrokerPartitionInfo(config) - } + throw new InvalidConfigException("Only one of zk.connect and broker.list should be provided") + if (config.batchSize > config.queueSize) + throw new InvalidConfigException("Batch size can't be larger than queue size.") + + private val queue = new LinkedBlockingQueue[ProducerData[K,V]](config.queueSize) + private var sync: Boolean = true + private var producerSendThread: ProducerSendThread[K,V] = null + config.producerType match { + case "sync" => + case "async" => + sync = false + val asyncProducerID = Utils.getNextRandomInt + producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, queue, + eventHandler, config.queueTime, config.batchSize) + producerSendThread.start + case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async") } - // pool of producers, one per broker - if(populateProducerPool) { - val allBrokers = brokerPartitionInfo.getAllBrokerInfo - allBrokers.foreach(b => producerPool.addProducer(new Broker(b._1, b._2.host, b._2.host, b._2.port))) - } -/** - * This constructor can be used when all config parameters will be specified through the - * ProducerConfig object - * @param config Producer Configuration object - */ - def this(config: ProducerConfig) = this(config, Utils.getObject(config.partitionerClass), - new ProducerPool[V](config, Utils.getObject(config.serializerClass)), true, null) - /** - * This constructor can be used to provide pre-instantiated objects for all config parameters - * that would otherwise be instantiated via reflection. i.e. encoder, partitioner, event handler and - * callback handler. If you use this constructor, encoder, eventHandler, callback handler and partitioner - * will not be picked up from the config. + * This constructor can be used when all config parameters will be specified through the + * ProducerConfig object * @param config Producer Configuration object - * @param encoder Encoder used to convert an object of type V to a kafka.message.Message. If this is null it - * throws an InvalidConfigException - * @param eventHandler the class that implements kafka.producer.async.IEventHandler[T] used to - * dispatch a batch of produce requests, using an instance of kafka.producer.SyncProducer. If this is null, it - * uses the DefaultEventHandler - * @param cbkHandler the class that implements kafka.producer.async.CallbackHandler[T] used to inject - * callbacks at various stages of the kafka.producer.AsyncProducer pipeline. If this is null, the producer does - * not use the callback handler and hence does not invoke any callbacks - * @param partitioner class that implements the kafka.producer.Partitioner[K], used to supply a custom - * partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T] - * object in the send API. If this is null, producer uses DefaultPartitioner */ - def this(config: ProducerConfig, - encoder: Encoder[V], - eventHandler: EventHandler[V], - cbkHandler: CallbackHandler[V], - partitioner: Partitioner[K]) = - this(config, if(partitioner == null) new DefaultPartitioner[K] else partitioner, - new ProducerPool[V](config, encoder, eventHandler, cbkHandler), true, null) + def this(config: ProducerConfig) = + this(config, + new DefaultEventHandler[K,V](config, + Utils.getObject[Partitioner[K]](config.partitionerClass), + Utils.getObject[Encoder[V]](config.serializerClass), + new ProducerPool(config), + populateProducerPool= true, + brokerPartitionInfo= null)) /** * Sends the data, partitioned by key to the topic using either the @@ -99,119 +68,90 @@ * @param producerData the producer data object that encapsulates the topic, key and message data */ def send(producerData: ProducerData[K,V]*) { - zkEnabled match { - case true => zkSend(producerData: _*) - case false => configSend(producerData: _*) + if (hasShutdown.get) + throw new ProducerClosedException + recordStats(producerData: _*) + sync match { + case true => eventHandler.handle(producerData) + case false => asyncSend(producerData: _*) } } - private def zkSend(producerData: ProducerData[K,V]*) { - val producerPoolRequests = producerData.map { pd => - var brokerIdPartition: Option[Partition] = None - var brokerInfoOpt: Option[Broker] = None + private def recordStats(producerData: ProducerData[K,V]*) { + for (data <- producerData) + ProducerTopicStat.getProducerTopicStat(data.getTopic).recordMessagesPerTopic(data.getData.size) + } - var numRetries: Int = 0 - while(numRetries <= config.zkReadRetries && brokerInfoOpt.isEmpty) { - if(numRetries > 0) { - info("Try #" + numRetries + " ZK producer cache is stale. Refreshing it by reading from ZK again") - brokerPartitionInfo.updateInfo - } - - val topicPartitionsList = getPartitionListForTopic(pd) - val totalNumPartitions = topicPartitionsList.length - - val partitionId = getPartition(pd.getKey, totalNumPartitions) - brokerIdPartition = Some(topicPartitionsList(partitionId)) - brokerInfoOpt = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.get.brokerId) - numRetries += 1 + private def asyncSend(producerData: ProducerData[K,V]*) { + for (data <- producerData) { + val added = config.enqueueTimeoutMs match { + case 0 => + queue.offer(data) + case _ => + try { + config.enqueueTimeoutMs < 0 match { + case true => + queue.put(data) + true + case _ => + queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS) + } + } + catch { + case e: InterruptedException => + false + } } - - brokerInfoOpt match { - case Some(brokerInfo) => - debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + - " on partition " + brokerIdPartition.get.partId) - case None => - throw new NoBrokersForPartitionException("Invalid Zookeeper state. Failed to get partition for topic: " + - pd.getTopic + " and key: " + pd.getKey) + if(!added) { + AsyncProducerStats.recordDroppedEvents + error("Event queue is full of unsent messages, could not send event: " + data.toString) + throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + data.toString) + }else { + trace("Added to send queue an event: " + data.toString) + trace("Remaining queue size: " + queue.remainingCapacity) } - producerPool.getProducerPoolData(pd.getTopic, - new Partition(brokerIdPartition.get.brokerId, brokerIdPartition.get.partId), - pd.getData) } - producerPool.send(producerPoolRequests: _*) } - private def configSend(producerData: ProducerData[K,V]*) { - val producerPoolRequests = producerData.map { pd => - // find the broker partitions registered for this topic - val topicPartitionsList = getPartitionListForTopic(pd) - val totalNumPartitions = topicPartitionsList.length - - val randomBrokerId = random.nextInt(totalNumPartitions) - val brokerIdPartition = topicPartitionsList(randomBrokerId) - val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get - - debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + - " on a randomly chosen partition") - val partition = ProducerRequest.RandomPartition - debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on a partition " + - brokerIdPartition.partId) - producerPool.getProducerPoolData(pd.getTopic, - new Partition(brokerIdPartition.brokerId, partition), - pd.getData) - } - producerPool.send(producerPoolRequests: _*) - } - - private def getPartitionListForTopic(pd: ProducerData[K,V]): Seq[Partition] = { - debug("Getting the number of broker partitions registered for topic: " + pd.getTopic) - val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq - debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList) - val totalNumPartitions = topicPartitionsList.length - if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey) - topicPartitionsList - } - /** - * Retrieves the partition id and throws an InvalidPartitionException if - * the value of partition is not between 0 and numPartitions-1 - * @param key the partition key - * @param numPartitions the total number of available partitions - * @returns the partition id - */ - private def getPartition(key: K, numPartitions: Int): Int = { - if(numPartitions <= 0) - throw new InvalidPartitionException("Invalid number of partitions: " + numPartitions + - "\n Valid values are > 0") - val partition = if(key == null) random.nextInt(numPartitions) - else partitioner.partition(key , numPartitions) - if(partition < 0 || partition >= numPartitions) - throw new InvalidPartitionException("Invalid partition id : " + partition + - "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]") - partition - } - - /** - * Callback to add a new producer to the producer pool. Used by ZKBrokerPartitionInfo - * on registration of new broker in zookeeper - * @param bid the id of the broker - * @param host the hostname of the broker - * @param port the port of the broker - */ - private def producerCbk(bid: Int, host: String, port: Int) = { - if(populateProducerPool) producerPool.addProducer(new Broker(bid, host, host, port)) - else debug("Skipping the callback since populateProducerPool = false") - } - - /** * Close API to close the producer pool connections to all Kafka brokers. Also closes * the zookeeper client connection if one exists */ def close() = { val canShutdown = hasShutdown.compareAndSet(false, true) if(canShutdown) { - producerPool.close - brokerPartitionInfo.close + if (producerSendThread != null) + producerSendThread.shutdown + eventHandler.close } } } + +trait ProducerTopicStatMBean { + def getMessagesPerTopic: Long +} + +@threadsafe +class ProducerTopicStat extends ProducerTopicStatMBean { + private val numCumulatedMessagesPerTopic = new AtomicLong(0) + + def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get + + def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages) +} + +object ProducerTopicStat extends Logging { + private val stats = new Pool[String, ProducerTopicStat] + + def getProducerTopicStat(topic: String): ProducerTopicStat = { + var stat = stats.get(topic) + if (stat == null) { + stat = new ProducerTopicStat + if (stats.putIfNotExists(topic, stat) == null) + Utils.registerMBean(stat, "kafka.producer.Producer:type=kafka.ProducerTopicStat." + topic) + else + stat = stats.get(topic) + } + return stat + } +} Index: core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala =================================================================== --- core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (working copy) @@ -16,7 +16,7 @@ */ package kafka.producer -import collection.mutable.Map +import collection.Map import collection.SortedSet import kafka.cluster.{Broker, Partition} Index: core/src/main/scala/kafka/producer/async/QueueClosedException.scala =================================================================== --- core/src/main/scala/kafka/producer/async/QueueClosedException.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/async/QueueClosedException.scala (working copy) @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.producer.async - -/* Indicates that client is sending event to a closed queue */ -class QueueClosedException(message: String) extends RuntimeException(message) { - def this() = this(null) -} Index: core/src/main/scala/kafka/producer/async/EventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/EventHandler.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/async/EventHandler.scala (working copy) @@ -16,30 +16,21 @@ */ package kafka.producer.async -import java.util.Properties -import kafka.producer.SyncProducer -import kafka.serializer.Encoder +import kafka.producer.ProducerData /** - * Handler that dispatches the batched data from the queue of the - * asynchronous producer. + * Handler that dispatches the batched data from the queue. */ -trait EventHandler[T] { - /** - * Initializes the event handler using a Properties object - * @param props the properties used to initialize the event handler - */ - def init(props: Properties) {} +trait EventHandler[K,V] { /** * Callback to dispatch the batched data and send it to a Kafka server * @param events the data sent to the producer - * @param producer the low-level producer used to send the data */ - def handle(events: Seq[QueueItem[T]], producer: SyncProducer, encoder: Encoder[T]) + def handle(events: Seq[ProducerData[K,V]]) /** * Cleans up and shuts down the event handler */ - def close {} + def close } Index: core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala =================================================================== --- core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala (working copy) @@ -18,33 +18,21 @@ package kafka.producer.async import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.BlockingQueue -import org.apache.log4j.Logger import kafka.utils.Utils class AsyncProducerStats extends AsyncProducerStatsMBean { val droppedEvents = new AtomicInteger(0) - val numEvents = new AtomicInteger(0) - def getAsyncProducerEvents: Int = numEvents.get - def getAsyncProducerDroppedEvents: Int = droppedEvents.get def recordDroppedEvents = droppedEvents.getAndAdd(1) - - def recordEvent = numEvents.getAndAdd(1) } -class AsyncProducerQueueSizeStats[T](private val queue: BlockingQueue[QueueItem[T]]) extends AsyncProducerQueueSizeStatsMBean { - def getAsyncProducerQueueSize: Int = queue.size -} - object AsyncProducerStats { - private val logger = Logger.getLogger(getClass()) private val stats = new AsyncProducerStats - Utils.registerMBean(stats, AsyncProducer.ProducerMBeanName) + val ProducerMBeanName = "kafka.producer.Producer:type=AsyncProducerStats" - def recordDroppedEvents = stats.recordDroppedEvents + Utils.registerMBean(stats, ProducerMBeanName) - def recordEvent = stats.recordEvent + def recordDroppedEvents = stats.recordDroppedEvents } Index: core/src/main/scala/kafka/producer/async/AsyncProducer.scala =================================================================== --- core/src/main/scala/kafka/producer/async/AsyncProducer.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/async/AsyncProducer.scala (working copy) @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.producer.async - -import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} -import kafka.utils.{Utils, Logging} -import java.util.concurrent.atomic.AtomicBoolean -import kafka.api.ProducerRequest -import kafka.serializer.Encoder -import java.util.{Random, Properties} -import kafka.producer.{ProducerConfig, SyncProducer} - -object AsyncProducer { - val Shutdown = new Object - val Random = new Random - val ProducerMBeanName = "kafka.producer.Producer:type=AsyncProducerStats" - val ProducerQueueSizeMBeanName = "kafka.producer.Producer:type=AsyncProducerQueueSizeStats" -} - -private[kafka] class AsyncProducer[T](config: AsyncProducerConfig, - producer: SyncProducer, - serializer: Encoder[T], - eventHandler: EventHandler[T] = null, - eventHandlerProps: Properties = null, - cbkHandler: CallbackHandler[T] = null, - cbkHandlerProps: Properties = null) extends Logging { - private val closed = new AtomicBoolean(false) - private val queue = new LinkedBlockingQueue[QueueItem[T]](config.queueSize) - // initialize the callback handlers - if(eventHandler != null) - eventHandler.init(eventHandlerProps) - if(cbkHandler != null) - cbkHandler.init(cbkHandlerProps) - private val asyncProducerID = AsyncProducer.Random.nextInt - private val sendThread = new ProducerSendThread("ProducerSendThread-" + asyncProducerID, queue, - serializer, producer, - if(eventHandler != null) eventHandler else new DefaultEventHandler[T](new ProducerConfig(config.props), cbkHandler), - cbkHandler, config.queueTime, config.batchSize, AsyncProducer.Shutdown) - sendThread.setDaemon(false) - Utils.swallow(logger.warn, Utils.registerMBean( - new AsyncProducerQueueSizeStats[T](queue), AsyncProducer.ProducerQueueSizeMBeanName + "-" + asyncProducerID)) - - def this(config: AsyncProducerConfig) { - this(config, - new SyncProducer(config), - Utils.getObject(config.serializerClass), - Utils.getObject(config.eventHandler), - config.eventHandlerProps, - Utils.getObject(config.cbkHandler), - config.cbkHandlerProps) - } - - def start = sendThread.start - - def send(topic: String, event: T) { send(topic, event, ProducerRequest.RandomPartition) } - - def send(topic: String, event: T, partition:Int) { - AsyncProducerStats.recordEvent - - if(closed.get) - throw new QueueClosedException("Attempt to add event to a closed queue.") - - var data = new QueueItem(event, topic, partition) - if(cbkHandler != null) - data = cbkHandler.beforeEnqueue(data) - - val added = config.enqueueTimeoutMs match { - case 0 => - queue.offer(data) - case _ => - try { - config.enqueueTimeoutMs < 0 match { - case true => - queue.put(data) - true - case _ => - queue.offer(data, config.enqueueTimeoutMs, TimeUnit.MILLISECONDS) - } - } - catch { - case e: InterruptedException => - val msg = "%s interrupted during enqueue of event %s.".format( - getClass.getSimpleName, event.toString) - error(msg) - throw new AsyncProducerInterruptedException(msg) - } - } - - if(cbkHandler != null) - cbkHandler.afterEnqueue(data, added) - - if(!added) { - AsyncProducerStats.recordDroppedEvents - logger.error("Event queue is full of unsent messages, could not send event: " + event.toString) - throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + event.toString) - }else { - if(logger.isTraceEnabled) { - logger.trace("Added event to send queue for topic: " + topic + ", partition: " + partition + ":" + event.toString) - logger.trace("Remaining queue size: " + queue.remainingCapacity) - } - } - } - - def close = { - if(cbkHandler != null) { - cbkHandler.close - logger.info("Closed the callback handler") - } - closed.set(true) - queue.put(new QueueItem(AsyncProducer.Shutdown.asInstanceOf[T], null, -1)) - if(logger.isDebugEnabled) - logger.debug("Added shutdown command to the queue") - sendThread.shutdown - sendThread.awaitShutdown - producer.close - logger.info("Closed AsyncProducer") - } - - // for testing only - import org.apache.log4j.Level - def setLoggerLevel(level: Level) = logger.setLevel(level) -} - -class QueueItem[T](data: T, topic: String, partition: Int) { - def getData: T = data - def getPartition: Int = partition - def getTopic:String = topic - override def toString = "topic: " + topic + ", partition: " + partition + ", data: " + data.toString -} Index: core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala =================================================================== --- core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala (working copy) @@ -18,7 +18,6 @@ package kafka.producer.async trait AsyncProducerStatsMBean { - def getAsyncProducerEvents: Int def getAsyncProducerDroppedEvents: Int } Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -17,45 +17,176 @@ package kafka.producer.async -import collection.mutable.HashMap -import collection.mutable.Map import kafka.api.ProducerRequest import kafka.serializer.Encoder import java.util.Properties -import kafka.utils.Logging -import kafka.producer.{ProducerConfig, SyncProducer} -import kafka.message.{NoCompressionCodec, ByteBufferMessageSet} +import kafka.producer._ +import kafka.utils.{ZKConfig, Utils, Logging} +import kafka.cluster.{Partition, Broker} +import collection.mutable.{ListBuffer, HashMap} +import scala.collection.Map +import kafka.common.{FailedToSendMessageException, InvalidPartitionException, NoBrokersForPartitionException} +import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet} +class DefaultEventHandler[K,V](config: ProducerConfig, // this api is for testing + private val partitioner: Partitioner[K], // use the other constructor + private val encoder: Encoder[V], + private val producerPool: ProducerPool, + private val populateProducerPool: Boolean, + private var brokerPartitionInfo: BrokerPartitionInfo) + extends EventHandler[K,V] with Logging { -private[kafka] class DefaultEventHandler[T](val config: ProducerConfig, - val cbkHandler: CallbackHandler[T]) extends EventHandler[T] with Logging { + private val lock = new Object() + private val zkEnabled = Utils.propertyExists(config.zkConnect) + if(brokerPartitionInfo == null) { + zkEnabled match { + case true => + val zkProps = new Properties() + zkProps.put("zk.connect", config.zkConnect) + zkProps.put("zk.sessiontimeout.ms", config.zkSessionTimeoutMs.toString) + zkProps.put("zk.connectiontimeout.ms", config.zkConnectionTimeoutMs.toString) + zkProps.put("zk.synctime.ms", config.zkSyncTimeMs.toString) + brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(zkProps), producerCbk) + case false => + brokerPartitionInfo = new ConfigBrokerPartitionInfo(config) + } + } - override def init(props: Properties) { } + // pool of producers, one per broker + if(populateProducerPool) { + val allBrokers = brokerPartitionInfo.getAllBrokerInfo + allBrokers.foreach(b => producerPool.addProducer(new Broker(b._1, b._2.host, b._2.host, b._2.port))) + } - override def handle(events: Seq[QueueItem[T]], syncProducer: SyncProducer, serializer: Encoder[T]) { - var processedEvents = events - if(cbkHandler != null) - processedEvents = cbkHandler.beforeSendingData(events) + /** + * Callback to add a new producer to the producer pool. Used by ZKBrokerPartitionInfo + * on registration of new broker in zookeeper + * @param bid the id of the broker + * @param host the hostname of the broker + * @param port the port of the broker + */ + private def producerCbk(bid: Int, host: String, port: Int) = { + if(populateProducerPool) producerPool.addProducer(new Broker(bid, host, host, port)) + else debug("Skipping the callback since populateProducerPool = false") + } - if(logger.isTraceEnabled) - processedEvents.foreach(event => trace("Handling event for Topic: %s, Partition: %d" - .format(event.getTopic, event.getPartition))) + def handle(events: Seq[ProducerData[K,V]]) { + lock synchronized { + val serializedData = serialize(events) + handleSerializedData(serializedData) + } + } - send(serialize(collate(processedEvents), serializer), syncProducer) + def handleSerializedData(messages: Seq[ProducerData[K,Message]]) { + val partitionedData = partitionAndCollate(messages) + for ( (brokerid, eventsPerBrokerMap) <- partitionedData) { + if (logger.isTraceEnabled) + eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partition: %d" + .format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) + val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap) + + try { + send(brokerid, messageSetPerBroker) + } + catch { + case t => + var sendComplete = false + warn("error sending data to broker " + brokerid, t) + var numRetries = 1 + val eventsPerBroker = new ListBuffer[ProducerData[K,Message]] + eventsPerBrokerMap.map(e => eventsPerBroker.appendAll(e._2)) + while (numRetries <= config.producerRetries && !sendComplete) { + if (numRetries == config.producerRetries) + throw new FailedToSendMessageException("can't send data after " + config.producerRetries + " retries", t) + brokerPartitionInfo.updateInfo + try { + handleSerializedData(eventsPerBroker) + sendComplete = true + } + catch { + case t => warn("error sending data to broker " + brokerid + " in " + numRetries + " retry", t) + } + numRetries +=1 + } + } + } } - private def send(messagesPerTopic: Map[(String, Int), ByteBufferMessageSet], syncProducer: SyncProducer) { + def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = { + events.map(e => new ProducerData[K,Message](e.getTopic, e.getKey, e.getData.map(m => encoder.toMessage(m)))) + } + + def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]] = { + val ret = new HashMap[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]] + for (event <- events) { + val topicPartitionsList = getPartitionListForTopic(event) + val totalNumPartitions = topicPartitionsList.length + + val partitionIndex = getPartition(event.getKey, totalNumPartitions) + val brokerPartition = topicPartitionsList(partitionIndex) + + var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null + ret.get(brokerPartition.brokerId) match { + case Some(element) => + dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]] + case None => + dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]] + ret.put(brokerPartition.brokerId, dataPerBroker) + } + + val topicAndPartition = (event.getTopic, brokerPartition.partId) + var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null + dataPerBroker.get(topicAndPartition) match { + case Some(element) => + dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]] + case None => + dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]] + dataPerBroker.put(topicAndPartition, dataPerTopicPartition) + } + dataPerTopicPartition.append(event) + } + ret + } + + private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[Partition] = { + debug("Getting the number of broker partitions registered for topic: " + pd.getTopic) + val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq + debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList) + val totalNumPartitions = topicPartitionsList.length + if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey) + topicPartitionsList + } + + /** + * Retrieves the partition id and throws an InvalidPartitionException if + * the value of partition is not between 0 and numPartitions-1 + * @param key the partition key + * @param numPartitions the total number of available partitions + * @returns the partition id + */ + private def getPartition(key: K, numPartitions: Int): Int = { + if(numPartitions <= 0) + throw new InvalidPartitionException("Invalid number of partitions: " + numPartitions + + "\n Valid values are > 0") + val partition = if(key == null) Utils.getNextRandomInt(numPartitions) + else partitioner.partition(key , numPartitions) + if(partition < 0 || partition >= numPartitions) + throw new InvalidPartitionException("Invalid partition id : " + partition + + "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]") + partition + } + + private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]) { if(messagesPerTopic.size > 0) { val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray + val syncProducer = producerPool.getProducer(brokerId) syncProducer.multiSend(requests) trace("kafka producer sent messages for topics %s to broker %s:%d" .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port)) } } - private def serialize(eventsPerTopic: Map[(String,Int), Seq[T]], - serializer: Encoder[T]): Map[(String, Int), ByteBufferMessageSet] = { - val eventsPerTopicMap = eventsPerTopic.map(e => ((e._1._1, e._1._2) , e._2.map(l => serializer.toMessage(l)))) + def groupMessagesToSet(eventsPerTopicAndPartition: Map[(String,Int), Seq[ProducerData[K,Message]]]): Map[(String, Int), ByteBufferMessageSet] = { /** enforce the compressed.topics config here. * If the compression codec is anything other than NoCompressionCodec, * Enable compression only for specified topics if any @@ -63,55 +194,49 @@ * If the compression codec is NoCompressionCodec, compression is disabled for all topics */ - val messagesPerTopicPartition = eventsPerTopicMap.map { topicAndEvents => - ((topicAndEvents._1._1, topicAndEvents._1._2), - config.compressionCodec match { - case NoCompressionCodec => - trace("Sending %d messages with no compression to topic %s on partition %d" - .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2)) - new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*) - case _ => - config.compressedTopics.size match { - case 0 => - trace("Sending %d messages with compression codec %d to topic %s on partition %d" - .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1, topicAndEvents._1._2)) - new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*) - case _ => - if(config.compressedTopics.contains(topicAndEvents._1._1)) { + val messagesPerTopicPartition = eventsPerTopicAndPartition.map { e => + { + val topicAndPartition = e._1 + val produceData = e._2 + val messages = new ListBuffer[Message] + produceData.map(p => messages.appendAll(p.getData)) + + ( topicAndPartition, + config.compressionCodec match { + case NoCompressionCodec => + trace("Sending %d messages with no compression to topic %s on partition %d" + .format(messages.size, topicAndPartition._1, topicAndPartition._2)) + new ByteBufferMessageSet(NoCompressionCodec, messages: _*) + case _ => + config.compressedTopics.size match { + case 0 => trace("Sending %d messages with compression codec %d to topic %s on partition %d" - .format(topicAndEvents._2.size, config.compressionCodec.codec, topicAndEvents._1._1, topicAndEvents._1._2)) - new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*) - } - else { - trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s" - .format(topicAndEvents._2.size, topicAndEvents._1._1, topicAndEvents._1._2, topicAndEvents._1._1, - config.compressedTopics.toString)) - new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*) - } - } - }) + .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2)) + new ByteBufferMessageSet(config.compressionCodec, messages: _*) + case _ => + if(config.compressedTopics.contains(topicAndPartition._1)) { + trace("Sending %d messages with compression codec %d to topic %s on partition %d" + .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2)) + new ByteBufferMessageSet(config.compressionCodec, messages: _*) + } + else { + trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s" + .format(messages.size, topicAndPartition._1, topicAndPartition._2, topicAndPartition._1, + config.compressedTopics.toString)) + new ByteBufferMessageSet(NoCompressionCodec, messages: _*) + } + } + } + ) + } } messagesPerTopicPartition } - private def collate(events: Seq[QueueItem[T]]): Map[(String,Int), Seq[T]] = { - val collatedEvents = new HashMap[(String, Int), Seq[T]] - val distinctTopics = events.map(e => e.getTopic).toSeq.distinct - val distinctPartitions = events.map(e => e.getPartition).distinct - - var remainingEvents = events - distinctTopics foreach { topic => - val topicEvents = remainingEvents partition (e => e.getTopic.equals(topic)) - remainingEvents = topicEvents._2 - distinctPartitions.foreach { p => - val topicPartitionEvents = (topicEvents._1 partition (e => (e.getPartition == p)))._1 - if(topicPartitionEvents.size > 0) - collatedEvents += ((topic, p) -> topicPartitionEvents.map(q => q.getData)) - } - } - collatedEvents + def close() { + if (producerPool != null) + producerPool.close + if (brokerPartitionInfo != null) + brokerPartitionInfo.close } - - override def close = { - } } Index: core/src/main/scala/kafka/producer/async/ProducerSendThread.scala =================================================================== --- core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (working copy) @@ -20,32 +20,21 @@ import kafka.utils.{SystemTime, Logging} import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue} import collection.mutable.ListBuffer -import kafka.serializer.Encoder -import kafka.producer.SyncProducer +import kafka.producer.ProducerData -private[async] class ProducerSendThread[T](val threadName: String, - val queue: BlockingQueue[QueueItem[T]], - val serializer: Encoder[T], - val underlyingProducer: SyncProducer, - val handler: EventHandler[T], - val cbkHandler: CallbackHandler[T], - val queueTime: Long, - val batchSize: Int, - val shutdownCommand: Any) extends Thread(threadName) with Logging { +class ProducerSendThread[K,V](val threadName: String, + val queue: BlockingQueue[ProducerData[K,V]], + val handler: EventHandler[K,V], + val queueTime: Long, + val batchSize: Int) extends Thread(threadName) with Logging { private val shutdownLatch = new CountDownLatch(1) + private val shutdownCommand = new ProducerData[K,V](null, null.asInstanceOf[K], null.asInstanceOf[Seq[V]]) override def run { try { - val remainingEvents = processEvents - debug("Remaining events = " + remainingEvents.size) - - // handle remaining events - if(remainingEvents.size > 0) { - debug("Dispatching last batch of %d events to the event handler".format(remainingEvents.size)) - tryToHandle(remainingEvents) - } + processEvents }catch { case e => error("Error in sending events: ", e) }finally { @@ -53,34 +42,30 @@ } } - def awaitShutdown = shutdownLatch.await - def shutdown = { - handler.close - info("Shutdown thread complete") + info("Beging shutting down ProducerSendThread") + queue.put(shutdownCommand) + shutdownLatch.await + info("Shutdown ProducerSendThread complete") } - private def processEvents(): Seq[QueueItem[T]] = { + private def processEvents() { var lastSend = SystemTime.milliseconds - var events = new ListBuffer[QueueItem[T]] + var events = new ListBuffer[ProducerData[K,V]] var full: Boolean = false // drain the queue until you get a shutdown command Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS)) - .takeWhile(item => if(item != null) item.getData != shutdownCommand else true).foreach { + .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach { currentQueueItem => val elapsed = (SystemTime.milliseconds - lastSend) // check if the queue time is reached. This happens when the poll method above returns after a timeout and // returns a null object val expired = currentQueueItem == null if(currentQueueItem != null) { - trace("Dequeued item for topic %s and partition %d" - .format(currentQueueItem.getTopic, currentQueueItem.getPartition)) - // handle the dequeued current item - if(cbkHandler != null) - events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem) - else - events += currentQueueItem + trace("Dequeued item for topic %s, partition key: %s, data: %s" + .format(currentQueueItem.getTopic, currentQueueItem.getKey.toString, currentQueueItem.getData.toString)) + events += currentQueueItem // check if the batch size is reached full = events.size >= batchSize @@ -91,36 +76,22 @@ // if either queue time has reached or batch size has reached, dispatch to event handler tryToHandle(events) lastSend = SystemTime.milliseconds - events = new ListBuffer[QueueItem[T]] + events = new ListBuffer[ProducerData[K,V]] } } if(queue.size > 0) throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue" .format(queue.size)) - if(cbkHandler != null) { - info("Invoking the callback handler before handling the last batch of %d events".format(events.size)) - val addedEvents = cbkHandler.lastBatchBeforeClose - logEvents("last batch before close", addedEvents) - events = events ++ addedEvents - } - events } - def tryToHandle(events: Seq[QueueItem[T]]) { + def tryToHandle(events: Seq[ProducerData[K,V]]) { try { debug("Handling " + events.size + " events") if(events.size > 0) - handler.handle(events, underlyingProducer, serializer) + handler.handle(events) }catch { - case e: Exception => error("Error in handling batch of " + events.size + " events", e) + case e => error("Error in handling batch of " + events.size + " events", e) } } - private def logEvents(tag: String, events: Iterable[QueueItem[T]]) { - if(logger.isTraceEnabled) { - trace("events for " + tag + ":") - for (event <- events) - trace(event.getData.toString) - } - } } Index: core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala =================================================================== --- core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/async/AsyncProducerInterruptedException.scala (working copy) @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.producer.async - -class AsyncProducerInterruptedException(message: String) extends RuntimeException(message) { - def this() = this(null) -} - Index: core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala =================================================================== --- core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala (working copy) @@ -46,16 +46,4 @@ /** the serializer class for events */ val serializerClass = Utils.getString(props, "serializer.class", "kafka.serializer.DefaultEncoder") - - /** the callback handler for one or multiple events */ - val cbkHandler = Utils.getString(props, "callback.handler", null) - - /** properties required to initialize the callback handler */ - val cbkHandlerProps = Utils.getProps(props, "callback.handler.props", null) - - /** the handler for events */ - val eventHandler = Utils.getString(props, "event.handler", null) - - /** properties required to initialize the callback handler */ - val eventHandlerProps = Utils.getProps(props, "event.handler.props", null) } Index: core/src/main/scala/kafka/producer/async/CallbackHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/CallbackHandler.scala (revision 1239972) +++ core/src/main/scala/kafka/producer/async/CallbackHandler.scala (working copy) @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package kafka.producer.async - -import java.util.Properties - -/** - * Callback handler APIs for use in the async producer. The purpose is to - * give the user some callback handles to insert custom functionality at - * various stages as the data flows through the pipeline of the async producer - */ -trait CallbackHandler[T] { - /** - * Initializes the callback handler using a Properties object - * @param props properties used to initialize the callback handler - */ - def init(props: Properties) - - /** - * Callback to process the data before it enters the batching queue - * of the asynchronous producer - * @param data the data sent to the producer - * @return the processed data that enters the queue - */ - def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] - - /** - * Callback to process the data right after it enters the batching queue - * of the asynchronous producer - * @param data the data sent to the producer - * @param added flag that indicates if the data was successfully added to the queue - */ - def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean) - - /** - * Callback to process the data item right after it has been dequeued by the - * background sender thread of the asynchronous producer - * @param data the data item dequeued from the async producer queue - * @return the processed list of data items that gets added to the data handled by the event handler - */ - def afterDequeuingExistingData(data: QueueItem[T] = null): scala.collection.mutable.Seq[QueueItem[T]] - - /** - * Callback to process the batched data right before it is being sent by the - * handle API of the event handler - * @param data the batched data received by the event handler - * @return the processed batched data that gets sent by the handle() API of the event handler - */ - def beforeSendingData(data: Seq[QueueItem[T]] = null): scala.collection.mutable.Seq[QueueItem[T]] - - /** - * Callback to process the last batch of data right before the producer send thread is shutdown - * @return the last batch of data that is sent to the EventHandler - */ - def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[T]] - - /** - * Cleans up and shuts down the callback handler - */ - def close -} Index: core/src/main/scala/kafka/producer/ProducerClosedException.scala =================================================================== --- core/src/main/scala/kafka/producer/ProducerClosedException.scala (revision 0) +++ core/src/main/scala/kafka/producer/ProducerClosedException.scala (revision 0) @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.producer + +class ProducerClosedException() extends RuntimeException("producer already closed") { +} Index: core/src/main/scala/kafka/common/FailedToSendMessageException.scala =================================================================== --- core/src/main/scala/kafka/common/FailedToSendMessageException.scala (revision 0) +++ core/src/main/scala/kafka/common/FailedToSendMessageException.scala (revision 0) @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package kafka.common + +/** + * Indicates a producer pool initialization problem +*/ +class FailedToSendMessageException(message: String, t: Throwable) extends RuntimeException(message, t) { +} Index: core/src/main/scala/kafka/javaapi/Implicits.scala =================================================================== --- core/src/main/scala/kafka/javaapi/Implicits.scala (revision 1239972) +++ core/src/main/scala/kafka/javaapi/Implicits.scala (working copy) @@ -16,8 +16,6 @@ */ package kafka.javaapi -import kafka.serializer.Encoder -import kafka.producer.async.QueueItem import kafka.utils.Logging private[javaapi] object Implicits extends Logging { @@ -30,91 +28,6 @@ messageSet.getErrorCode) } - implicit def toJavaSyncProducer(producer: kafka.producer.SyncProducer): kafka.javaapi.producer.SyncProducer = { - debug("Implicit instantiation of Java Sync Producer") - new kafka.javaapi.producer.SyncProducer(producer) - } - - implicit def toSyncProducer(producer: kafka.javaapi.producer.SyncProducer): kafka.producer.SyncProducer = { - debug("Implicit instantiation of Sync Producer") - producer.underlying - } - - implicit def toScalaEventHandler[T](eventHandler: kafka.javaapi.producer.async.EventHandler[T]) - : kafka.producer.async.EventHandler[T] = { - new kafka.producer.async.EventHandler[T] { - override def init(props: java.util.Properties) { eventHandler.init(props) } - override def handle(events: Seq[QueueItem[T]], producer: kafka.producer.SyncProducer, encoder: Encoder[T]) { - import collection.JavaConversions._ - eventHandler.handle(asList(events), producer, encoder) - } - override def close { eventHandler.close } - } - } - - implicit def toJavaEventHandler[T](eventHandler: kafka.producer.async.EventHandler[T]) - : kafka.javaapi.producer.async.EventHandler[T] = { - new kafka.javaapi.producer.async.EventHandler[T] { - override def init(props: java.util.Properties) { eventHandler.init(props) } - override def handle(events: java.util.List[QueueItem[T]], producer: kafka.javaapi.producer.SyncProducer, - encoder: Encoder[T]) { - import collection.JavaConversions._ - eventHandler.handle(asBuffer(events), producer, encoder) - } - override def close { eventHandler.close } - } - } - - implicit def toScalaCbkHandler[T](cbkHandler: kafka.javaapi.producer.async.CallbackHandler[T]) - : kafka.producer.async.CallbackHandler[T] = { - new kafka.producer.async.CallbackHandler[T] { - import collection.JavaConversions._ - override def init(props: java.util.Properties) { cbkHandler.init(props)} - override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = { - cbkHandler.beforeEnqueue(data) - } - override def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean) { - cbkHandler.afterEnqueue(data, added) - } - override def afterDequeuingExistingData(data: QueueItem[T] = null): scala.collection.mutable.Seq[QueueItem[T]] = { - cbkHandler.afterDequeuingExistingData(data) - } - override def beforeSendingData(data: Seq[QueueItem[T]] = null): scala.collection.mutable.Seq[QueueItem[T]] = { - asList(cbkHandler.beforeSendingData(asList(data))) - } - override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[T]] = { - asBuffer(cbkHandler.lastBatchBeforeClose) - } - override def close { cbkHandler.close } - } - } - - implicit def toJavaCbkHandler[T](cbkHandler: kafka.producer.async.CallbackHandler[T]) - : kafka.javaapi.producer.async.CallbackHandler[T] = { - new kafka.javaapi.producer.async.CallbackHandler[T] { - import collection.JavaConversions._ - override def init(props: java.util.Properties) { cbkHandler.init(props)} - override def beforeEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]]): QueueItem[T] = { - cbkHandler.beforeEnqueue(data) - } - override def afterEnqueue(data: QueueItem[T] = null.asInstanceOf[QueueItem[T]], added: Boolean) { - cbkHandler.afterEnqueue(data, added) - } - override def afterDequeuingExistingData(data: QueueItem[T] = null) - : java.util.List[QueueItem[T]] = { - asList(cbkHandler.afterDequeuingExistingData(data)) - } - override def beforeSendingData(data: java.util.List[QueueItem[T]] = null) - : java.util.List[QueueItem[T]] = { - asBuffer(cbkHandler.beforeSendingData(asBuffer(data))) - } - override def lastBatchBeforeClose: java.util.List[QueueItem[T]] = { - asList(cbkHandler.lastBatchBeforeClose) - } - override def close { cbkHandler.close } - } - } - implicit def toMultiFetchResponse(response: kafka.javaapi.MultiFetchResponse): kafka.api.MultiFetchResponse = response.underlying Index: core/src/main/scala/kafka/javaapi/producer/ProducerData.scala =================================================================== --- core/src/main/scala/kafka/javaapi/producer/ProducerData.scala (revision 1239972) +++ core/src/main/scala/kafka/javaapi/producer/ProducerData.scala (working copy) @@ -18,14 +18,16 @@ import scala.collection.JavaConversions._ -class ProducerData[K, V](private val topic: String, - private val key: K, - private val data: java.util.List[V]) { +case class ProducerData[K, V](topic: String, + key: K, + data: java.util.List[V]) { def this(t: String, d: java.util.List[V]) = this(topic = t, key = null.asInstanceOf[K], data = d) def this(t: String, d: V) = this(topic = t, key = null.asInstanceOf[K], data = asList(List(d))) + def this(t: String, k: K, d: V) = this(topic = t, key = k, data = asList(List(d))) + def getTopic: String = topic def getKey: K = key Index: core/src/main/scala/kafka/javaapi/producer/Producer.scala =================================================================== --- core/src/main/scala/kafka/javaapi/producer/Producer.scala (revision 1239972) +++ core/src/main/scala/kafka/javaapi/producer/Producer.scala (working copy) @@ -17,83 +17,12 @@ package kafka.javaapi.producer -import kafka.utils.Utils -import kafka.producer.async.QueueItem -import java.util.Properties -import kafka.producer.{ProducerPool, ProducerConfig, Partitioner} -import kafka.serializer.Encoder +import kafka.producer.ProducerConfig -class Producer[K,V](config: ProducerConfig, - partitioner: Partitioner[K], - producerPool: ProducerPool[V], - populateProducerPool: Boolean = true) /* for testing purpose only. Applications should ideally */ - /* use the other constructor*/ +class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only { - - private val underlying = new kafka.producer.Producer[K,V](config, partitioner, producerPool, populateProducerPool, null) - + def this(config: ProducerConfig) = this(new kafka.producer.Producer[K,V](config)) /** - * This constructor can be used when all config parameters will be specified through the - * ProducerConfig object - * @param config Producer Configuration object - */ - def this(config: ProducerConfig) = this(config, Utils.getObject(config.partitionerClass), - new ProducerPool[V](config, Utils.getObject(config.serializerClass))) - - /** - * This constructor can be used to provide pre-instantiated objects for all config parameters - * that would otherwise be instantiated via reflection. i.e. encoder, partitioner, event handler and - * callback handler - * @param config Producer Configuration object - * @param encoder Encoder used to convert an object of type V to a kafka.message.Message - * @param eventHandler the class that implements kafka.javaapi.producer.async.IEventHandler[T] used to - * dispatch a batch of produce requests, using an instance of kafka.javaapi.producer.SyncProducer - * @param cbkHandler the class that implements kafka.javaapi.producer.async.CallbackHandler[T] used to inject - * callbacks at various stages of the kafka.javaapi.producer.AsyncProducer pipeline. - * @param partitioner class that implements the kafka.javaapi.producer.Partitioner[K], used to supply a custom - * partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T] - * object in the send API - */ - def this(config: ProducerConfig, - encoder: Encoder[V], - eventHandler: kafka.javaapi.producer.async.EventHandler[V], - cbkHandler: kafka.javaapi.producer.async.CallbackHandler[V], - partitioner: Partitioner[K]) = { - this(config, partitioner, - new ProducerPool[V](config, encoder, - new kafka.producer.async.EventHandler[V] { - override def init(props: Properties) { eventHandler.init(props) } - override def handle(events: Seq[QueueItem[V]], producer: kafka.producer.SyncProducer, - encoder: Encoder[V]) { - import collection.JavaConversions._ - import kafka.javaapi.Implicits._ - eventHandler.handle(asList(events), producer, encoder) - } - override def close { eventHandler.close } - }, - new kafka.producer.async.CallbackHandler[V] { - import collection.JavaConversions._ - override def init(props: Properties) { cbkHandler.init(props)} - override def beforeEnqueue(data: QueueItem[V] = null.asInstanceOf[QueueItem[V]]): QueueItem[V] = { - cbkHandler.beforeEnqueue(data) - } - override def afterEnqueue(data: QueueItem[V] = null.asInstanceOf[QueueItem[V]], added: Boolean) { - cbkHandler.afterEnqueue(data, added) - } - override def afterDequeuingExistingData(data: QueueItem[V] = null): scala.collection.mutable.Seq[QueueItem[V]] = { - cbkHandler.afterDequeuingExistingData(data) - } - override def beforeSendingData(data: Seq[QueueItem[V]] = null): scala.collection.mutable.Seq[QueueItem[V]] = { - asList(cbkHandler.beforeSendingData(asList(data))) - } - override def lastBatchBeforeClose: scala.collection.mutable.Seq[QueueItem[V]] = { - asBuffer(cbkHandler.lastBatchBeforeClose) - } - override def close { cbkHandler.close } - })) - } - - /** * Sends the data to a single topic, partitioned by key, using either the * synchronous or the asynchronous producer * @param producerData the producer data object that encapsulates the topic, key and message data Index: core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java =================================================================== --- core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java (revision 1239972) +++ core/src/main/scala/kafka/javaapi/producer/async/EventHandler.java (working copy) @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package kafka.javaapi.producer.async; - -import kafka.javaapi.producer.SyncProducer; -import kafka.producer.async.QueueItem; -import kafka.serializer.Encoder; - -import java.util.List; -import java.util.Properties; - -/** - * Handler that dispatches the batched data from the queue of the - * asynchronous producer. - */ -public interface EventHandler { - /** - * Initializes the event handler using a Properties object - * @param props the properties used to initialize the event handler - */ - public void init(Properties props); - - /** - * Callback to dispatch the batched data and send it to a Kafka server - * @param events the data sent to the producer - * @param producer the low-level producer used to send the data - */ - public void handle(List> events, SyncProducer producer, Encoder encoder); - - /** - * Cleans up and shuts down the event handler - */ - public void close(); -} Index: core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java =================================================================== --- core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java (revision 1239972) +++ core/src/main/scala/kafka/javaapi/producer/async/CallbackHandler.java (working copy) @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package kafka.javaapi.producer.async; - -import kafka.producer.async.QueueItem; - -import java.util.Properties; - -/** - * Callback handler APIs for use in the async producer. The purpose is to - * give the user some callback handles to insert custom functionality at - * various stages as the data flows through the pipeline of the async producer - */ -public interface CallbackHandler { - /** - * Initializes the callback handler using a Properties object - * @param props the properties used to initialize the callback handler - */ - public void init(Properties props); - - /** - * Callback to process the data before it enters the batching queue - * of the asynchronous producer - * @param data the data sent to the producer - * @return the processed data that enters the queue - */ - public QueueItem beforeEnqueue(QueueItem data); - - /** - * Callback to process the data just after it enters the batching queue - * of the asynchronous producer - * @param data the data sent to the producer - * @param added flag that indicates if the data was successfully added to the queue - */ - public void afterEnqueue(QueueItem data, boolean added); - - /** - * Callback to process the data item right after it has been dequeued by the - * background sender thread of the asynchronous producer - * @param data the data item dequeued from the async producer queue - * @return the processed list of data items that gets added to the data handled by the event handler - */ - public java.util.List> afterDequeuingExistingData(QueueItem data); - - /** - * Callback to process the batched data right before it is being processed by the - * handle API of the event handler - * @param data the batched data received by the event handler - * @return the processed batched data that gets processed by the handle() API of the event handler - */ - public java.util.List> beforeSendingData(java.util.List> data); - - /** - * Callback to process the last batch of data right before the producer send thread is shutdown - * @return the last batch of data that is sent to the EventHandler - */ - public java.util.List> lastBatchBeforeClose(); - - /** - * Cleans up and shuts down the callback handler - */ - public void close(); -}