diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 2d21a2d..8f8222e 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -24,7 +24,7 @@ import java.util.Properties import java.util.Random import java.io.PrintStream import kafka.message._ -import kafka.serializer.StringDecoder +import kafka.serializer._ import kafka.utils._ import kafka.metrics.KafkaCSVMetricsReporter @@ -179,7 +179,7 @@ object ConsoleConsumer extends Logging { val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] formatter.init(formatterArgs) try { - val stream = connector.createMessageStreamsByFilter(filterSpec).get(0) + val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) val iter = if(maxMessages >= 0) stream.slice(0, maxMessages) else @@ -187,7 +187,7 @@ object ConsoleConsumer extends Logging { for(messageAndTopic <- iter) { try { - formatter.writeTo(messageAndTopic.message, System.out) + formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) } catch { case e => if (skipMessageOnError) @@ -251,53 +251,14 @@ object MessageFormatter { } trait MessageFormatter { - def writeTo(message: Message, output: PrintStream) + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) def init(props: Properties) {} def close() {} } -class DecodedMessageFormatter extends MessageFormatter { - var topicStr: String = _ - val decoder = new StringDecoder() - - override def init(props: Properties) { - topicStr = props.getProperty("topic") - if (topicStr != null) - topicStr = topicStr + ":" - else - topicStr = "" - } - - def writeTo(message: Message, output: PrintStream) { - try { - output.println(topicStr + decoder.toEvent(message) + ":payloadsize:" + message.payloadSize) - } catch { - case e => e.printStackTrace() - } - } -} - class NewlineMessageFormatter extends MessageFormatter { - def writeTo(message: Message, output: PrintStream) { - val payload = message.payload - output.write(payload.array, payload.arrayOffset, payload.limit) + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { + output.write(value) output.write('\n') } } - -class ChecksumMessageFormatter extends MessageFormatter { - private var topicStr: String = _ - - override def init(props: Properties) { - topicStr = props.getProperty("topic") - if (topicStr != null) - topicStr = topicStr + ":" - else - topicStr = "" - } - - def writeTo(message: Message, output: PrintStream) { - val chksum = message.checksum - output.println(topicStr + "checksum:" + chksum) - } -} diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala index 74b4128..457b642 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala @@ -19,38 +19,53 @@ package kafka.consumer import scala.collection._ import kafka.utils.Logging -import kafka.serializer.{DefaultDecoder, Decoder} +import kafka.serializer.{KeylessMessageDecoder, Decoder} /** * Main interface for consumer */ trait ConsumerConnector { + /** * Create a list of MessageStreams for each topic. * * @param topicCountMap a map of (topic, #streams) pair - * @param decoder Decoder to decode each Message to type T * @return a map of (topic, list of KafkaStream) pairs. * The number of items in the list is #streams. Each stream supports * an iterator over message/metadata pairs. */ - def createMessageStreams[T](topicCountMap: Map[String,Int], - decoder: Decoder[T] = new DefaultDecoder) - : Map[String,List[KafkaStream[T]]] - + def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]] + + /** + * Create a list of MessageStreams for each topic. + * + * @param topicCountMap a map of (topic, #streams) pair + * @param keyDecoder Decoder to decode the key portion of the message + * @param valueDecoder Decoder to decode the value portion of the message + * @return a map of (topic, list of KafkaStream) pairs. + * The number of items in the list is #streams. Each stream supports + * an iterator over message/metadata pairs. + */ + def createMessageStreams[K,V](topicCountMap: Map[String,Int], + keyDecoder: Decoder[K], + valueDecoder: Decoder[V]) + : Map[String,List[KafkaStream[K,V]]] + /** * Create a list of message streams for all topics that match a given filter. * * @param topicFilter Either a Whitelist or Blacklist TopicFilter object. * @param numStreams Number of streams to return - * @param decoder Decoder to decode each Message to type T + * @param keyDecoder Decoder to decode the key portion of the message + * @param valueDecoder Decoder to decode the value portion of the message * @return a list of KafkaStream each of which provides an * iterator over message/metadata pairs over allowed topics. */ - def createMessageStreamsByFilter[T](topicFilter: TopicFilter, + def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int = 1, - decoder: Decoder[T] = new DefaultDecoder) - : Seq[KafkaStream[T]] + keyDecoder: Decoder[K], + valueDecoder: Decoder[V]) + : Seq[KafkaStream[K,V]] /** * Commit the offsets of all broker partitions connected by this connector. diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 1e29da5..aaf8b25 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -28,6 +28,7 @@ import kafka.utils.ZkUtils._ import kafka.utils.{ShutdownableThread, SystemTime} import kafka.common.TopicAndPartition import kafka.client.ClientUtils +import kafka.producer.ProducerConfig /** * Usage: diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index e88d07e..ecc8080 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -17,7 +17,7 @@ package kafka.consumer -import kafka.utils.{IteratorTemplate, Logging} +import kafka.utils.{IteratorTemplate, Logging, Utils} import java.util.concurrent.{TimeUnit, BlockingQueue} import kafka.serializer.Decoder import java.util.concurrent.atomic.AtomicReference @@ -30,17 +30,18 @@ import kafka.common.{KafkaException, MessageSizeTooLargeException} * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown * */ -class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk], - consumerTimeoutMs: Int, - private val decoder: Decoder[T], - val enableShallowIterator: Boolean) - extends IteratorTemplate[MessageAndMetadata[T]] with Logging { +class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk], + consumerTimeoutMs: Int, + private val keyDecoder: Decoder[K], + private val valueDecoder: Decoder[V], + val enableShallowIterator: Boolean) + extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging { private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) - private var currentTopicInfo:PartitionTopicInfo = null + private var currentTopicInfo: PartitionTopicInfo = null private var consumedOffset: Long = -1L - override def next(): MessageAndMetadata[T] = { + override def next(): MessageAndMetadata[K, V] = { val item = super.next() if(consumedOffset < 0) throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset)) @@ -52,7 +53,7 @@ class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk], item } - protected def makeNext(): MessageAndMetadata[T] = { + protected def makeNext(): MessageAndMetadata[K, V] = { var currentDataChunk: FetchedDataChunk = null // if we don't have an iterator, get one var localCurrent = current.get() @@ -103,7 +104,10 @@ class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk], item.message.ensureValid() // validate checksum of message to ensure it is valid - new MessageAndMetadata(decoder.toEvent(item.message), currentTopicInfo.topic) + val keyBuffer = item.message.key + val key = if(keyBuffer == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(keyBuffer)) + val value = valueDecoder.fromBytes(Utils.readBytes(item.message.payload)) + new MessageAndMetadata(key, value, currentTopicInfo.topic, currentTopicInfo.partitionId) } def clearCurrentChunk() { diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala index 3ef0978..115d41a 100644 --- a/core/src/main/scala/kafka/consumer/KafkaStream.scala +++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala @@ -22,19 +22,20 @@ import java.util.concurrent.BlockingQueue import kafka.serializer.Decoder import kafka.message.MessageAndMetadata -class KafkaStream[T](private val queue: BlockingQueue[FetchedDataChunk], - consumerTimeoutMs: Int, - private val decoder: Decoder[T], - val enableShallowIterator: Boolean) - extends Iterable[MessageAndMetadata[T]] with java.lang.Iterable[MessageAndMetadata[T]] { +class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk], + consumerTimeoutMs: Int, + private val keyDecoder: Decoder[K], + private val valueDecoder: Decoder[V], + val enableShallowIterator: Boolean) + extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] { - private val iter: ConsumerIterator[T] = - new ConsumerIterator[T](queue, consumerTimeoutMs, decoder, enableShallowIterator) + private val iter: ConsumerIterator[K,V] = + new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator) /** * Create an iterator over messages in the stream. */ - def iterator(): ConsumerIterator[T] = iter + def iterator(): ConsumerIterator[K,V] = iter /** * This method clears the queue being iterated during the consumer rebalancing. This is mainly diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 8e6ce17..4263a5e 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -28,13 +28,14 @@ import java.net.InetAddress import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState import java.util.UUID -import kafka.serializer.Decoder +import kafka.serializer._ import kafka.utils.ZkUtils._ import kafka.common._ import kafka.client.ClientUtils import com.yammer.metrics.core.Gauge import kafka.api.OffsetRequest import kafka.metrics._ +import kafka.producer.ProducerConfig /** @@ -120,17 +121,20 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, KafkaCSVMetricsReporter.startCSVMetricReporter(config.props) def this(config: ConsumerConfig) = this(config, true) + + def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]] = + createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder()) - def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T]) - : Map[String,List[KafkaStream[T]]] = { + def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) + : Map[String, List[KafkaStream[K,V]]] = { if (messageStreamCreated.getAndSet(true)) throw new RuntimeException(this.getClass.getSimpleName + " can create message streams at most once") - consume(topicCountMap, decoder) + consume(topicCountMap, keyDecoder, valueDecoder) } - def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) = { - val wildcardStreamsHandler = new WildcardStreamsHandler[T](topicFilter, numStreams, decoder) + def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = { + val wildcardStreamsHandler = new WildcardStreamsHandler[K,V](topicFilter, numStreams, keyDecoder, valueDecoder) wildcardStreamsHandler.streams } @@ -173,8 +177,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - def consume[T](topicCountMap: scala.collection.Map[String,Int], decoder: Decoder[T]) - : Map[String,List[KafkaStream[T]]] = { + def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) + : Map[String,List[KafkaStream[K,V]]] = { debug("entering consume ") if (topicCountMap == null) throw new RuntimeException("topicCountMap is null") @@ -187,8 +191,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val queuesAndStreams = topicThreadIds.values.map(threadIdSet => threadIdSet.map(_ => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks) - val stream = new KafkaStream[T]( - queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator) + val stream = new KafkaStream[K,V]( + queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.enableShallowIterator) (queue, stream) }) ).flatten.toList @@ -197,7 +201,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, registerConsumerInZK(dirs, consumerIdString, topicCount) reinitializeConsumer(topicCount, queuesAndStreams) - loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[T]]]] + loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]] } // this API is used by unit tests only @@ -293,7 +297,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } class ZKRebalancerListener(val group: String, val consumerIdString: String, - val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_]]]) + val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]]) extends IZkChildListener { private var isWatcherTriggered = false private val lock = new ReentrantLock @@ -473,7 +477,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } private def closeFetchersForQueues(cluster: Cluster, - messageStreams: Map[String,List[KafkaStream[_]]], + messageStreams: Map[String,List[KafkaStream[_,_]]], queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) { val allPartitionInfos = topicRegistry.values.map(p => p.values).flatten fetcher match { @@ -496,7 +500,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster, queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]], - messageStreams: Map[String,List[KafkaStream[_]]]) { + messageStreams: Map[String,List[KafkaStream[_,_]]]) { // Clear all but the currently iterated upon chunk in the consumer thread's queue queuesTobeCleared.foreach(_.clear) @@ -510,7 +514,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } - private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_]]], + private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_,_]]], relevantTopicThreadIdsMap: Map[String, Set[String]]) { // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer // after this rebalancing attempt @@ -610,17 +614,17 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - private def reinitializeConsumer[T]( + private def reinitializeConsumer[K,V]( topicCount: TopicCount, - queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[T])]) { + queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) { val dirs = new ZKGroupDirs(config.groupId) // listener to consumer and partition changes if (loadBalancerListener == null) { - val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[T]]] + val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]] loadBalancerListener = new ZKRebalancerListener( - config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_]]]]) + config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) } // register listener for session expired event @@ -690,9 +694,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, loadBalancerListener.syncedRebalance() } - class WildcardStreamsHandler[T](topicFilter: TopicFilter, + class WildcardStreamsHandler[K,V](topicFilter: TopicFilter, numStreams: Int, - decoder: Decoder[T]) + keyDecoder: Decoder[K], + valueDecoder: Decoder[V]) extends TopicEventHandler[String] { if (messageStreamCreated.getAndSet(true)) @@ -702,8 +707,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val wildcardQueuesAndStreams = (1 to numStreams) .map(e => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks) - val stream = new KafkaStream[T]( - queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator) + val stream = new KafkaStream[K,V](queue, + config.consumerTimeoutMs, + keyDecoder, + valueDecoder, + config.enableShallowIterator) (queue, stream) }).toList @@ -760,7 +768,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams) } - def streams: Seq[KafkaStream[T]] = + def streams: Seq[KafkaStream[K,V]] = wildcardQueuesAndStreams.map(_._2) } } diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java index 9ac8860..c45c803 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java +++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java @@ -20,7 +20,6 @@ package kafka.javaapi.consumer; import kafka.consumer.KafkaStream; import kafka.consumer.TopicFilter; -import kafka.message.Message; import kafka.serializer.Decoder; import java.util.List; @@ -36,10 +35,10 @@ public interface ConsumerConnector { * The number of items in the list is #streams. Each stream supports * an iterator over message/metadata pairs. */ - public Map>> createMessageStreams( - Map topicCountMap, Decoder decoder); - public Map>> createMessageStreams( - Map topicCountMap); + public Map>> + createMessageStreams(Map topicCountMap, Decoder keyDecoder, Decoder valueDecoder); + + public Map>> createMessageStreams(Map topicCountMap); /** * Create a list of MessageAndTopicStreams containing messages of type T. @@ -47,16 +46,17 @@ public interface ConsumerConnector { * @param topicFilter a TopicFilter that specifies which topics to * subscribe to (encapsulates a whitelist or a blacklist). * @param numStreams the number of message streams to return. - * @param decoder a decoder that converts from Message to T + * @param keyDecoder a decoder that decodes the message key + * @param valueDecoder a decoder that decodes the message itself * @return a list of KafkaStream. Each stream supports an * iterator over its MessageAndMetadata elements. */ - public List> createMessageStreamsByFilter( - TopicFilter topicFilter, int numStreams, Decoder decoder); - public List> createMessageStreamsByFilter( - TopicFilter topicFilter, int numStreams); - public List> createMessageStreamsByFilter( - TopicFilter topicFilter); + public List> + createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder keyDecoder, Decoder valueDecoder); + + public List> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams); + + public List> createMessageStreamsByFilter(TopicFilter topicFilter); /** * Commit the offsets of all broker partitions connected by this connector. diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 9f54e30..2b20aa4 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -17,7 +17,7 @@ package kafka.javaapi.consumer import kafka.message.Message -import kafka.serializer.{DefaultDecoder, Decoder} +import kafka.serializer._ import kafka.consumer._ import scala.collection.JavaConversions.asList @@ -59,7 +59,7 @@ import scala.collection.JavaConversions.asList */ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, - val enableFetcher: Boolean) // for testing only + val enableFetcher: Boolean) // for testing only extends ConsumerConnector { private val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher) @@ -67,38 +67,37 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def this(config: ConsumerConfig) = this(config, true) // for java client - def createMessageStreams[T]( + def createMessageStreams[K,V]( topicCountMap: java.util.Map[String,java.lang.Integer], - decoder: Decoder[T]) - : java.util.Map[String,java.util.List[KafkaStream[T]]] = { + keyDecoder: Decoder[K], + valueDecoder: Decoder[V]) + : java.util.Map[String,java.util.List[KafkaStream[K,V]]] = { import scala.collection.JavaConversions._ val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]]) - val scalaReturn = underlying.consume(scalaTopicCountMap, decoder) - val ret = new java.util.HashMap[String,java.util.List[KafkaStream[T]]] + val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder) + val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]] for ((topic, streams) <- scalaReturn) { - var javaStreamList = new java.util.ArrayList[KafkaStream[T]] + var javaStreamList = new java.util.ArrayList[KafkaStream[K,V]] for (stream <- streams) javaStreamList.add(stream) ret.put(topic, javaStreamList) } ret } + + def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] = + createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder()) + + def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = + asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder)) - def createMessageStreams( - topicCountMap: java.util.Map[String,java.lang.Integer]) - : java.util.Map[String,java.util.List[KafkaStream[Message]]] = - createMessageStreams(topicCountMap, new DefaultDecoder) - - def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) = - asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder)) - - def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = - createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder) - - def createMessageStreamsByFilter(topicFilter: TopicFilter) = - createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder) - + def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = + createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder()) + + def createMessageStreamsByFilter(topicFilter: TopicFilter) = + createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder(), new DefaultDecoder()) + def commitOffsets() { underlying.commitOffsets } diff --git a/core/src/main/scala/kafka/message/MessageAndMetadata.scala b/core/src/main/scala/kafka/message/MessageAndMetadata.scala index 710308e..3b294f8 100644 --- a/core/src/main/scala/kafka/message/MessageAndMetadata.scala +++ b/core/src/main/scala/kafka/message/MessageAndMetadata.scala @@ -17,5 +17,5 @@ package kafka.message -case class MessageAndMetadata[T](message: T, topic: String = "") +case class MessageAndMetadata[K, V](key: K, message: V, topic: String, partition: Int) diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala index 15ce45a..5e7a466 100644 --- a/core/src/main/scala/kafka/network/BlockingChannel.scala +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -59,6 +59,14 @@ class BlockingChannel( val host: String, writeChannel = channel readChannel = Channels.newChannel(channel.socket().getInputStream) connected = true + // settings may not match what we requested above + val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d)." + logger.debug(msg.format(channel.socket.getSoTimeout, + readTimeoutMs, + channel.socket.getReceiveBufferSize, + readBufferSize, + channel.socket.getSendBufferSize, + writeBufferSize)) } } diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala index ccbb76c..d0a89eb 100644 --- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala +++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala @@ -19,7 +19,9 @@ package kafka.producer import kafka.utils.Utils -private[kafka] class DefaultPartitioner[T] extends Partitioner[T] { +import kafka.utils._ + +private class DefaultPartitioner[T](props: VerifiableProperties = null) extends Partitioner[T] { private val random = new java.util.Random def partition(key: T, numPartitions: Int): Int = { diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 172e108..71ef749 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -54,8 +54,9 @@ extends Logging { def this(config: ProducerConfig) = this(config, new DefaultEventHandler[K,V](config, - Utils.createObject[Partitioner[K]](config.partitionerClass), - Utils.createObject[Encoder[V]](config.serializerClass), + Utils.createObject[Partitioner[K]](config.partitionerClass, config.props), + Utils.createObject[Encoder[V]](config.serializerClass, config.props), + Utils.createObject[Encoder[K]](config.keySerializerClass, config.props), new ProducerPool(config))) /** diff --git a/core/src/main/scala/kafka/producer/ProducerData.scala b/core/src/main/scala/kafka/producer/ProducerData.scala index 344f8e0..0c7598b 100644 --- a/core/src/main/scala/kafka/producer/ProducerData.scala +++ b/core/src/main/scala/kafka/producer/ProducerData.scala @@ -35,6 +35,8 @@ case class ProducerData[K,V](topic: String, def getTopic: String = topic + def hasKey = key == null + def getKey: K = key def getData: Seq[V] = data diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala index 55a9fab..07935d7 100644 --- a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala @@ -38,6 +38,10 @@ trait AsyncProducerConfig { /** the number of messages batched at the producer */ val batchSize = props.getInt("batch.size", 200) - /** the serializer class for events */ + /** the serializer class for values */ val serializerClass = props.getString("serializer.class", "kafka.serializer.DefaultEncoder") + + /** the serializer class for keys (defaults to the same as for values) */ + val keySerializerClass = props.getString("key.serializer.class", serializerClass) + } diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 0e520bc..d232cc8 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -31,6 +31,7 @@ import kafka.api.{TopicMetadata, ProducerRequest} class DefaultEventHandler[K,V](config: ProducerConfig, private val partitioner: Partitioner[K], private val encoder: Encoder[V], + private val keyEncoder: Encoder[K], private val producerPool: ProducerPool, private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata]) extends EventHandler[K,V] with Logging { @@ -106,7 +107,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig, val serializedMessages = new ListBuffer[Message] for (d <- e.getData) { try { - serializedMessages += encoder.toMessage(d) + if(e.hasKey) + serializedMessages += new Message(key = keyEncoder.toBytes(e.getKey), bytes = encoder.toBytes(d)) + else + serializedMessages += new Message(bytes = encoder.toBytes(d)) } catch { case t => ProducerStats.serializationErrorRate.mark() diff --git a/core/src/main/scala/kafka/serializer/Decoder.scala b/core/src/main/scala/kafka/serializer/Decoder.scala index 7d1c138..3e0e4d7 100644 --- a/core/src/main/scala/kafka/serializer/Decoder.scala +++ b/core/src/main/scala/kafka/serializer/Decoder.scala @@ -17,21 +17,42 @@ package kafka.serializer -import kafka.message.Message +import kafka.message._ +import kafka.utils.VerifiableProperties +/** + * A decoder is a method of turning byte arrays into objects + */ trait Decoder[T] { - def toEvent(message: Message):T + def fromBytes(bytes: Array[Byte]): T +} + +/** + * The default implementation does nothing, just returns the same byte array it takes in. + */ +class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[Byte]] { + def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes } -class DefaultDecoder extends Decoder[Message] { - def toEvent(message: Message):Message = message +/** + * Decode messages without any key + */ +class KeylessMessageDecoder(props: VerifiableProperties = null) extends Decoder[Message] { + def fromBytes(bytes: Array[Byte]) = new Message(bytes) } -class StringDecoder extends Decoder[String] { - def toEvent(message: Message):String = { - val buf = message.payload - val arr = new Array[Byte](buf.remaining) - buf.get(arr) - new String(arr) +/** + * The string encoder translates strings into bytes. It uses UTF8 by default but takes + * an optional property serializer.encoding to control this. + */ +class StringDecoder(props: VerifiableProperties = null) extends Decoder[String] { + val encoding = + if(props == null) + "UTF8" + else + props.getString("serializer.encoding", "UTF8") + + def fromBytes(bytes: Array[Byte]): String = { + new String(bytes, encoding) } } diff --git a/core/src/main/scala/kafka/serializer/Encoder.scala b/core/src/main/scala/kafka/serializer/Encoder.scala index 222e51b..fc5c9c6 100644 --- a/core/src/main/scala/kafka/serializer/Encoder.scala +++ b/core/src/main/scala/kafka/serializer/Encoder.scala @@ -17,16 +17,42 @@ package kafka.serializer -import kafka.message.Message +import kafka.utils.VerifiableProperties +import kafka.message._ +import kafka.utils.Utils +/** + * An encoder is a method of turning objects into byte arrays + */ trait Encoder[T] { - def toMessage(event: T):Message + def toBytes(t: T): Array[Byte] +} + +/** + * The default implementation is a no-op, it just returns the same array it takes in + */ +class DefaultEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] { + override def toBytes(value: Array[Byte]): Array[Byte] = value } -class DefaultEncoder extends Encoder[Message] { - override def toMessage(event: Message):Message = event +class KeylessMessageEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] { + override def toBytes(value: Array[Byte]): Array[Byte] = Utils.readBytes(new Message(value).buffer) } -class StringEncoder extends Encoder[String] { - override def toMessage(event: String):Message = new Message(event.getBytes) +/** + * The string encoder takes an optional parameter serializer.encoding which controls + * the character set used in encoding the string into bytes. + */ +class StringEncoder(props: VerifiableProperties = null) extends Encoder[String] { + val encoding = + if(props == null) + "UTF8" + else + props.getString("serializer.encoding", "UTF8") + + override def toBytes(s: String): Array[Byte] = + if(s == null) + null + else + s.getBytes(encoding) } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 8a2588d..d988539 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -24,6 +24,7 @@ import kafka.producer.{ProducerData, ProducerConfig, Producer} import scala.collection.JavaConversions._ import java.util.concurrent.CountDownLatch import kafka.consumer._ +import kafka.serializer._ object MirrorMaker extends Logging { @@ -92,7 +93,7 @@ object MirrorMaker extends Logging { val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => { val config = new ProducerConfig( Utils.loadProps(options.valueOf(producerConfigOpt))) - new Producer[Null, Message](config) + new Producer[Array[Byte], Array[Byte]](config) }) val threads = { @@ -113,11 +114,9 @@ object MirrorMaker extends Logging { new Blacklist(options.valueOf(blacklistOpt)) val streams = - connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue())) + connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())) - streams.flatten.zipWithIndex.map(streamAndIndex => { - new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2) - }) + streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producers, streamAndIndex._2)) } threads.foreach(_.start()) @@ -125,8 +124,8 @@ object MirrorMaker extends Logging { threads.foreach(_.awaitShutdown()) } - class MirrorMakerThread(stream: KafkaStream[Message], - producers: Seq[Producer[Null, Message]], + class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]], + producers: Seq[Producer[Array[Byte], Array[Byte]]], threadId: Int) extends Thread with Logging { @@ -140,16 +139,14 @@ object MirrorMaker extends Logging { try { for (msgAndMetadata <- stream) { val producer = producerSelector.next() - val pd = new ProducerData[Null, Message]( + val pd = new ProducerData[Array[Byte], Array[Byte]]( msgAndMetadata.topic, msgAndMetadata.message) producer.send(pd) } - } - catch { + } catch { case e => fatal("%s stream unexpectedly exited.", e) - } - finally { + } finally { shutdownLatch.countDown() info("Stopped thread %s.".format(threadName)) } @@ -158,8 +155,7 @@ object MirrorMaker extends Logging { def awaitShutdown() { try { shutdownLatch.await() - } - catch { + } catch { case e: InterruptedException => fatal( "Shutdown of thread %s interrupted. This might leak data!" .format(threadName)) diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 5f9e313..fbbd9b2 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -136,7 +136,7 @@ object ReplayLogProducer extends Logging { val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue) } - class ZKConsumerThread(config: Config, stream: KafkaStream[Message]) extends Thread with Logging { + class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging { val shutdownLatch = new CountDownLatch(1) val props = new Properties() props.put("broker.list", config.brokerList) @@ -150,7 +150,7 @@ object ReplayLogProducer extends Logging { props.put("producer.type", "async") val producerConfig = new ProducerConfig(props) - val producer = new Producer[Message, Message](producerConfig) + val producer = new Producer[Array[Byte], Array[Byte]](producerConfig) override def run() { info("Starting consumer thread..") @@ -163,7 +163,7 @@ object ReplayLogProducer extends Logging { stream for (messageAndMetadata <- iter) { try { - producer.send(new ProducerData[Message, Message](config.outputTopic, messageAndMetadata.message)) + producer.send(new ProducerData[Array[Byte], Array[Byte]](config.outputTopic, messageAndMetadata.message)) if (config.delayedMSBtwSend > 0 && (messageCount + 1) % config.batchSize == 0) Thread.sleep(config.delayedMSBtwSend) messageCount += 1 diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 09e40e6..dac7056 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -19,10 +19,12 @@ package kafka.tools import joptsimple._ import kafka.utils._ +import kafka.producer.ProducerConfig import kafka.consumer._ import kafka.client.ClientUtils import kafka.api.{OffsetRequest, FetchRequestBuilder, Request} import kafka.cluster.Broker +import java.util.Properties import scala.collection.JavaConversions._ /** @@ -194,7 +196,9 @@ object SimpleConsumerShell extends Logging { offset = messageAndOffset.nextOffset if(printOffsets) System.out.println("next offset = " + offset) - formatter.writeTo(messageAndOffset.message, System.out) + val message = messageAndOffset.message + val key = if(message.hasKey) Utils.readBytes(message.key) else null + formatter.writeTo(key, Utils.readBytes(message.payload), System.out) } catch { case e => if (skipMessageOnError) diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 898a5b2..42e3e18 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -133,19 +133,25 @@ object Utils extends Logging { }) thread } + + /** + * Read the given byte buffer into a byte array + */ + def readBytes(buffer: ByteBuffer): Array[Byte] = readBytes(buffer, 0, buffer.limit) /** * Read a byte array from the given offset and size in the buffer - * TODO: Should use System.arraycopy */ def readBytes(buffer: ByteBuffer, offset: Int, size: Int): Array[Byte] = { - val bytes = new Array[Byte](size) - var i = 0 - while(i < size) { - bytes(i) = buffer.get(offset + i) - i += 1 + val dest = new Array[Byte](size) + if(buffer.hasArray) { + System.arraycopy(buffer.array, buffer.arrayOffset() + offset, dest, 0, size) + } else { + buffer.mark() + buffer.get(dest) + buffer.reset() } - bytes + dest } /** @@ -204,7 +210,7 @@ object Utils extends Logging { * @param buffer The buffer to translate * @param encoding The encoding to use in translating bytes to characters */ - def readString(buffer: ByteBuffer, encoding: String): String = { + def readString(buffer: ByteBuffer, encoding: String = Charset.defaultCharset.toString): String = { val bytes = new Array[Byte](buffer.remaining) buffer.get(bytes) new String(bytes, encoding) @@ -446,16 +452,10 @@ object Utils extends Logging { /** * Create an instance of the class with the given class name */ - def createObject[T<:AnyRef](className: String): T = { - className match { - case null => null.asInstanceOf[T] - case _ => - val clazz = Class.forName(className) - val clazzT = clazz.asInstanceOf[Class[T]] - val constructors = clazzT.getConstructors - require(constructors.length == 1) - constructors.head.newInstance().asInstanceOf[T] - } + def createObject[T<:AnyRef](className: String, args: AnyRef*): T = { + val klass = Class.forName(className).asInstanceOf[Class[T]] + val constructor = klass.getConstructor(args.map(_.getClass): _*) + constructor.newInstance(args: _*).asInstanceOf[T] } /** diff --git a/core/src/test/scala/other/kafka/TestKafkaAppender.scala b/core/src/test/scala/other/kafka/TestKafkaAppender.scala index 8328e99..bd09d78 100644 --- a/core/src/test/scala/other/kafka/TestKafkaAppender.scala +++ b/core/src/test/scala/other/kafka/TestKafkaAppender.scala @@ -44,7 +44,7 @@ object TestKafkaAppender extends Logging { } } -class AppenderStringSerializer extends Encoder[AnyRef] { - def toMessage(event: AnyRef):Message = new Message(event.asInstanceOf[String].getBytes) +class AppenderStringSerializer(encoding: String = "UTF-8") extends Encoder[AnyRef] { + def toBytes(event: AnyRef): Array[Byte] = event.toString.getBytes(encoding) } diff --git a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala index 2260111..7d48458 100644 --- a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala +++ b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala @@ -56,13 +56,13 @@ object TestZKConsumerOffsets { } } -private class ConsumerThread(stream: KafkaStream[Message]) extends Thread { +private class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread { val shutdownLatch = new CountDownLatch(1) override def run() { println("Starting consumer thread..") for (messageAndMetadata <- stream) { - println("consumed: " + Utils.readString(messageAndMetadata.message.payload, "UTF-8")) + println("consumed: " + new String(messageAndMetadata.message, "UTF-8")) } shutdownLatch.countDown println("thread shutdown !" ) diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 8060615..962d5f9 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -26,10 +26,10 @@ import junit.framework.Assert._ import kafka.message._ import kafka.server._ import kafka.utils.TestUtils._ -import kafka.utils.{TestZKUtils, TestUtils} +import kafka.utils._ import kafka.admin.CreateTopicCommand import org.junit.Test -import kafka.serializer.DefaultDecoder +import kafka.serializer._ import kafka.cluster.{Broker, Cluster} import org.scalatest.junit.JUnit3Suite import kafka.integration.KafkaServerTestHarness @@ -46,13 +46,14 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { val topic = "topic" val group = "group1" val consumer0 = "consumer0" + val consumedOffset = 5 val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port))) val queue = new LinkedBlockingQueue[FetchedDataChunk] val topicInfos = configs.map(c => new PartitionTopicInfo(topic, c.brokerId, 0, queue, - new AtomicLong(5), + new AtomicLong(consumedOffset), new AtomicLong(0), new AtomicInteger(0))) val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) @@ -65,24 +66,25 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testConsumerIteratorDeduplicationDeepIterator() { - val messages = 0.until(10).map(x => new Message((configs(0).brokerId * 5 + x).toString.getBytes)).toList + val messageStrings = (0 until 10).map(_.toString).toList + val messages = messageStrings.map(s => new Message(s.getBytes)) val messageSet = new ByteBufferMessageSet(DefaultCompressionCodec, new AtomicLong(0), messages:_*) topicInfos(0).enqueue(messageSet) assertEquals(1, queue.size) queue.put(ZookeeperConsumerConnector.shutdownCommand) - val iter: ConsumerIterator[Message] = new ConsumerIterator[Message](queue, consumerConfig.consumerTimeoutMs, - new DefaultDecoder, false) - var receivedMessages: List[Message] = Nil - for (i <- 0 until 5) { - assertTrue(iter.hasNext) - receivedMessages ::= iter.next.message - } + val iter = new ConsumerIterator[String, String](queue, + consumerConfig.consumerTimeoutMs, + new StringDecoder(), + new StringDecoder(), + enableShallowIterator = false) + var receivedMessages = (0 until 5).map(i => iter.next.message).toList - assertTrue(!iter.hasNext) + assertFalse(iter.hasNext) assertEquals(1, queue.size) // This is only the shutdown command. assertEquals(5, receivedMessages.size) - assertEquals(receivedMessages.sortWith((s,t) => s.checksum < t.checksum), messages.takeRight(5).sortWith((s,t) => s.checksum < t.checksum)) + val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => Utils.readString(m.message.payload)) + assertEquals(unconsumed, receivedMessages) } } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 18f3f80..eab6e36 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -25,7 +25,7 @@ import scala.collection._ import org.scalatest.junit.JUnit3Suite import org.apache.log4j.{Level, Logger} import kafka.message._ -import kafka.serializer.StringDecoder +import kafka.serializer._ import kafka.admin.CreateTopicCommand import org.I0Itec.zkclient.ZkClient import kafka.utils._ @@ -73,7 +73,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar override val consumerTimeoutMs = 200 } val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true) - val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> 1)) + val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // no messages to consume, we should hit timeout; // also the iterator should support re-entrant, so loop it twice @@ -90,9 +90,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar zkConsumerConnector0.shutdown // send some messages to each broker - val sentMessages1_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) - val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) - val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum) + val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++ + sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) // wait to make sure the topic and partition have a leader for the successful case waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) @@ -101,11 +100,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) - val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1)) + val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) - assertEquals(sentMessages1.size, receivedMessages1.size) - assertEquals(sentMessages1, receivedMessages1) + assertEquals(sentMessages1.sorted, receivedMessages1.sorted) // also check partition ownership val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir) @@ -121,19 +119,16 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar override val rebalanceBackoffMs = RebalanceBackoffMs } val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) - val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> 1)) + val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages2_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) - val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) - val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum) + val sentMessages2 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++ + sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500) - val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1) - val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2) - val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum) - assertEquals(sentMessages2, receivedMessages2) + val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + assertEquals(sentMessages2.sorted, receivedMessages2.sorted) // also check partition ownership val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir) @@ -147,18 +142,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]()) // send some messages to each broker - val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) - val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) - val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum) + val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++ + sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500) - val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1) - val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2) - val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum) - assertEquals(sentMessages3.size, receivedMessages3.size) - assertEquals(sentMessages3, receivedMessages3) + val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + assertEquals(sentMessages3.sorted, receivedMessages3.sorted) // also check partition ownership val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir) @@ -176,9 +167,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.FATAL) // send some messages to each broker - val sentMessages1_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) - val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) - val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum) + val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++ + sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500) @@ -187,10 +177,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val consumerConfig1 = new ConsumerConfig( TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) - val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1)) + val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) - assertEquals(sentMessages1.size, receivedMessages1.size) - assertEquals(sentMessages1, receivedMessages1) + assertEquals(sentMessages1.sorted, receivedMessages1.sorted) // also check partition ownership val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir) @@ -206,19 +195,16 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar override val rebalanceBackoffMs = RebalanceBackoffMs } val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) - val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> 1)) + val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages2_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) - val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) - val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum) + val sentMessages2 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++ + sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500) - val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1) - val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2) - val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum) - assertEquals(sentMessages2, receivedMessages2) + val receivedMessages2 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + assertEquals(sentMessages2.sorted, receivedMessages2.sorted) // also check partition ownership val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir) @@ -230,20 +216,16 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val consumerConfig3 = new ConsumerConfig( TestUtils.createConsumerProperties(zkConnect, group, consumer3)) val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) - val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]()) + val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages3_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) - val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) - val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum) + val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++ + sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500) - val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1) - val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2) - val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum) - assertEquals(sentMessages3.size, receivedMessages3.size) - assertEquals(sentMessages3, receivedMessages3) + val receivedMessages3 = getMessages(nMessages, topicMessageStreams1) ++ getMessages(nMessages, topicMessageStreams2) + assertEquals(sentMessages3.sorted, receivedMessages3.sorted) // also check partition ownership val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir) @@ -258,17 +240,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def testCompressionSetConsumption() { // send some messages to each broker - val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) - val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec) - val sentMessages = (sentMessages1 ++ sentMessages2).sortWith((s,t) => s.checksum < t.checksum) + val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++ + sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec) val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) - val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1)) + val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) val receivedMessages = getMessages(400, topicMessageStreams1) - val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum) - val sortedSentMessages = sentMessages.sortWith((s,t) => s.checksum < t.checksum) - assertEquals(sortedSentMessages, sortedReceivedMessages) + assertEquals(sentMessages.sorted, receivedMessages.sorted) // also check partition ownership val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir) @@ -284,10 +263,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.FATAL) // send some messages to each broker - val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) - val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec) - val sentMessages = (sentMessages1 ++ sentMessages2).map(m => Utils.readString(m.payload, "UTF-8")). - sortWith((s, t) => s.compare(t) == -1) + val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++ + sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec) val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -297,8 +274,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector = new ZookeeperConsumerConnector(consumerConfig, true) val topicMessageStreams = - zkConsumerConnector.createMessageStreams(Predef.Map(topic -> 1), new StringDecoder) - + zkConsumerConnector.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) var receivedMessages: List[String] = Nil for ((topic, messageStreams) <- topicMessageStreams) { @@ -312,8 +288,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar } } } - receivedMessages = receivedMessages.sortWith((s, t) => s.compare(t) == -1) - assertEquals(sentMessages, receivedMessages) + assertEquals(sentMessages.sorted, receivedMessages.sorted) zkConsumerConnector.shutdown() requestHandlerLogger.setLevel(Level.ERROR) @@ -331,7 +306,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) - val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1)) + val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) val topicRegistry = zkConsumerConnector1.getTopicRegistry assertEquals(1, topicRegistry.map(r => r._1).size) assertEquals(topic, topicRegistry.map(r => r._1).head) @@ -346,54 +321,59 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar assertEquals(expected_1, actual_1) val receivedMessages1 = getMessages(nMessages, topicMessageStreams1) - assertEquals(nMessages, receivedMessages1.size) - assertEquals(sentMessages1.sortWith((s,t) => s.checksum < t.checksum), receivedMessages1) + assertEquals(sentMessages1, receivedMessages1) } - def sendMessagesToBrokerPartition(config: KafkaConfig, topic: String, partition: Int, numMessages: Int, compression: CompressionCodec = NoCompressionCodec): List[Message] = { + def sendMessagesToBrokerPartition(config: KafkaConfig, + topic: String, + partition: Int, + numMessages: Int, + compression: CompressionCodec = NoCompressionCodec): List[String] = { val header = "test-%d-%d".format(config.brokerId, partition) val props = new Properties() props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) props.put("partitioner.class", "kafka.utils.FixedValuePartitioner") props.put("compression.codec", compression.codec.toString) - val producer: Producer[Int, Message] = new Producer[Int, Message](new ProducerConfig(props)) - val ms = 0.until(numMessages).map(x => - new Message((header + config.brokerId + "-" + partition + "-" + x).getBytes)).toArray - producer.send(new ProducerData[Int, Message](topic, partition, ms)) + props.put("serializer.class", classOf[StringEncoder].getName) + val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props)) + val ms = 0.until(numMessages).map(x => header + config.brokerId + "-" + partition + "-" + x) + producer.send(new ProducerData[Int, String](topic, partition, ms)) debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size, config.brokerId, topic, partition)) - producer + producer.close() ms.toList } - def sendMessages(config: KafkaConfig, messagesPerNode: Int, header: String, compression: CompressionCodec, numParts: Int): List[Message]= { - var messages: List[Message] = Nil + def sendMessages(config: KafkaConfig, + messagesPerNode: Int, + header: String, + compression: CompressionCodec, + numParts: Int): List[String]= { + var messages: List[String] = Nil val props = new Properties() props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) props.put("partitioner.class", "kafka.utils.FixedValuePartitioner") - val producer: Producer[Int, Message] = new Producer[Int, Message](new ProducerConfig(props)) - + props.put("serializer.class", classOf[StringEncoder].getName) + val producer: Producer[Int, String] = new Producer[Int, String](new ProducerConfig(props)) for (partition <- 0 until numParts) { - val ms = 0.until(messagesPerNode).map(x => - new Message((header + config.brokerId + "-" + partition + "-" + x).getBytes)).toArray - for (message <- ms) - messages ::= message - producer.send(new ProducerData[Int, Message](topic, partition, ms)) + val ms = 0.until(messagesPerNode).map(x => header + config.brokerId + "-" + partition + "-" + x) + producer.send(new ProducerData[Int, String](topic, partition, ms)) + messages ++= ms debug("Sent %d messages to broker %d for topic %s and partition %d".format(ms.size, config.brokerId, topic, partition)) } producer.close() - messages.reverse + messages } - def sendMessages(messagesPerNode: Int, header: String, compression: CompressionCodec = NoCompressionCodec): List[Message]= { - var messages: List[Message] = Nil - for(conf <- configs) { + def sendMessages(messagesPerNode: Int, header: String, compression: CompressionCodec = NoCompressionCodec): List[String]= { + var messages: List[String] = Nil + for(conf <- configs) messages ++= sendMessages(conf, messagesPerNode, header, compression, numParts) - } - messages.sortWith((s,t) => s.checksum < t.checksum) + messages } - def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaStream[Message]]]): List[Message]= { - var messages: List[Message] = Nil + def getMessages(nMessagesPerThread: Int, + topicMessageStreams: Map[String,List[KafkaStream[String, String]]]): List[String]= { + var messages: List[String] = Nil for((topic, messageStreams) <- topicMessageStreams) { for (messageStream <- messageStreams) { val iterator = messageStream.iterator @@ -401,11 +381,11 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar assertTrue(iterator.hasNext) val message = iterator.next.message messages ::= message - debug("received message: " + Utils.readString(message.payload, "UTF-8")) + debug("received message: " + message) } } } - messages.sortWith((s,t) => s.checksum < t.checksum) + messages.reverse } def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = { diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index 57fbab4..4503704 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -25,6 +25,7 @@ import org.apache.log4j.{Level, Logger} import org.scalatest.junit.JUnit3Suite import kafka.utils.TestUtils import kafka.message.Message +import kafka.serializer._ import kafka.producer.{Producer, ProducerData} class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging { @@ -69,10 +70,11 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L * Returns the count of messages received. */ def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = { - val producer: Producer[String, Message] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs)) + val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), + new DefaultEncoder(), new StringEncoder()) for(i <- 0 until numMessages) - producer.send(new ProducerData[String, Message](topic, topic, new Message("test".getBytes()))) + producer.send(new ProducerData[String, Array[Byte]](topic, topic, "test".getBytes)) // update offset in zookeeper for consumer to jump "forward" in time val dirs = new ZKGroupTopicDirs(group, topic) diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 4af7a52..bf5c852 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -27,6 +27,7 @@ import kafka.message._ import kafka.server._ import org.scalatest.junit.JUnit3Suite import kafka.consumer._ +import kafka.serializer._ import kafka.producer.{ProducerData, Producer} import kafka.utils.TestUtils._ import kafka.utils.TestUtils @@ -38,7 +39,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { val configs = for(props <- TestUtils.createBrokerConfigs(numNodes)) yield new KafkaConfig(props) - val messages = new mutable.HashMap[Int, Seq[Message]] + val messages = new mutable.HashMap[Int, Seq[Array[Byte]]] val topic = "topic" val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port))) val shutdown = ZookeeperConsumerConnector.shutdownCommand @@ -83,10 +84,10 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { def sendMessages(messagesPerNode: Int): Int = { var count = 0 for(conf <- configs) { - val producer: Producer[String, Message] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs)) - val ms = 0.until(messagesPerNode).map(x => new Message((conf.brokerId * 5 + x).toString.getBytes)).toArray + val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new DefaultEncoder(), new StringEncoder()) + val ms = 0.until(messagesPerNode).map(x => (conf.brokerId * 5 + x).toString.getBytes).toArray messages += conf.brokerId -> ms - producer.send(new ProducerData[String, Message](topic, topic, ms)) + producer.send(new ProducerData[String, Array[Byte]](topic, topic, ms)) producer.close() count += ms.size } diff --git a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala index e460d63..5007aa8 100644 --- a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala +++ b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala @@ -21,10 +21,11 @@ import kafka.api.FetchRequestBuilder import kafka.message.{Message, ByteBufferMessageSet} import kafka.server.{KafkaRequestHandler, KafkaConfig} import org.apache.log4j.{Level, Logger} +import org.junit.Assert._ import org.scalatest.junit.JUnit3Suite import scala.collection._ import kafka.producer.ProducerData -import kafka.utils.TestUtils +import kafka.utils._ import kafka.common.{ErrorMapping, KafkaException, OffsetOutOfRangeException} /** @@ -57,8 +58,8 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness def testProduceAndFetch() { // send some messages val topic = "test" - val sentMessages = List(new Message("hello".getBytes()), new Message("there".getBytes())) - val producerData = new ProducerData[String, Message](topic, topic, sentMessages) + val sentMessages = List("hello", "there") + val producerData = new ProducerData[String, String](topic, topic, sentMessages) producer.send(producerData) @@ -67,7 +68,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) fetchedMessage = fetched.messageSet(topic, 0) } - TestUtils.checkEquals(sentMessages.iterator, fetchedMessage.map(m => m.message).iterator) + assertEquals(sentMessages, fetchedMessage.map(m => Utils.readString(m.message.payload)).toList) // send an invalid offset try { @@ -83,12 +84,12 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness // send some messages, with non-ordered topics val topicOffsets = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { - val messages = new mutable.HashMap[String, Seq[Message]] + val messages = new mutable.HashMap[String, Seq[String]] val builder = new FetchRequestBuilder() for( (topic, offset) <- topicOffsets) { - val producedData = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) + val producedData = List("a_" + topic, "b_" + topic) messages += topic -> producedData - producer.send(new ProducerData[String, Message](topic, topic, producedData)) + producer.send(new ProducerData[String, String](topic, topic, producedData)) builder.addFetch(topic, offset, 0, 10000) } @@ -97,7 +98,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness val response = consumer.fetch(request) for( (topic, offset) <- topicOffsets) { val fetched = response.messageSet(topic, offset) - TestUtils.checkEquals(messages(topic).iterator, fetched.map(m => m.message).iterator) + assertEquals(messages(topic), fetched.map(m => Utils.readString(m.message.payload))) } } @@ -121,13 +122,13 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness def testMultiProduce() { // send some messages val topics = List("test1", "test2", "test3"); - val messages = new mutable.HashMap[String, Seq[Message]] + val messages = new mutable.HashMap[String, Seq[String]] val builder = new FetchRequestBuilder() - var produceList: List[ProducerData[String, Message]] = Nil + var produceList: List[ProducerData[String, String]] = Nil for(topic <- topics) { - val set = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) + val set = List("a_" + topic, "b_" + topic) messages += topic -> set - produceList ::= new ProducerData[String, Message](topic, topic, set) + produceList ::= new ProducerData[String, String](topic, topic, set) builder.addFetch(topic, 0, 0, 10000) } producer.send(produceList: _*) @@ -137,20 +138,20 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness val response = consumer.fetch(request) for(topic <- topics) { val fetched = response.messageSet(topic, 0) - TestUtils.checkEquals(messages(topic).iterator, fetched.map(m => m.message).iterator) + assertEquals(messages(topic), fetched.map(m => Utils.readString(m.message.payload))) } } def testMultiProduceResend() { // send some messages val topics = List("test1", "test2", "test3"); - val messages = new mutable.HashMap[String, Seq[Message]] + val messages = new mutable.HashMap[String, Seq[String]] val builder = new FetchRequestBuilder() - var produceList: List[ProducerData[String, Message]] = Nil + var produceList: List[ProducerData[String, String]] = Nil for(topic <- topics) { - val set = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) + val set = List("a_" + topic, "b_" + topic) messages += topic -> set - produceList ::= new ProducerData[String, Message](topic, topic, set) + produceList ::= new ProducerData[String, String](topic, topic, set) builder.addFetch(topic, 0, 0, 10000) } producer.send(produceList: _*) @@ -161,9 +162,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness val response = consumer.fetch(request) for(topic <- topics) { val topicMessages = response.messageSet(topic, 0) - TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator, - messages(topic).iterator), - topicMessages.iterator.map(_.message)) + assertEquals(messages(topic) ++ messages(topic), topicMessages.map(m => Utils.readString(m.message.payload))) } } } diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 7c41310..62fe6ad 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -22,8 +22,9 @@ import junit.framework.Assert._ import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder} import kafka.server.{KafkaRequestHandler, KafkaConfig} import java.util.Properties +import kafka.utils.Utils import kafka.producer.{ProducerData, Producer, ProducerConfig} -import kafka.serializer.StringDecoder +import kafka.serializer._ import kafka.message.Message import kafka.utils.TestUtils import org.apache.log4j.{Level, Logger} @@ -108,15 +109,13 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with assertTrue(messageSet.iterator.hasNext) val fetchedMessageAndOffset = messageSet.head - val stringDecoder = new StringDecoder - val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message) - assertEquals("test-message", fetchedStringMessage) + assertEquals("test-message", Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8")) } def testDefaultEncoderProducerAndFetchWithCompression() { val topic = "test-topic" val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("serializer.class", classOf[StringEncoder].getName.toString) props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) props.put("compression", "true") val config = new ProducerConfig(props) @@ -129,9 +128,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with assertTrue(messageSet.iterator.hasNext) val fetchedMessageAndOffset = messageSet.head - val stringDecoder = new StringDecoder - val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message) - assertEquals("test-message", fetchedStringMessage) + assertEquals("test-message", Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8")) } def testProduceAndMultiFetch() { @@ -140,22 +137,21 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with // send some messages val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { - val messages = new mutable.HashMap[String, Seq[Message]] + val messages = new mutable.HashMap[String, Seq[String]] val builder = new FetchRequestBuilder() for( (topic, partition) <- topics) { - val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) - val producerData = new ProducerData[String, Message](topic, topic, messageList) + val messageList = List("a_" + topic, "b_" + topic) + val producerData = new ProducerData[String, String](topic, topic, messageList) messages += topic -> messageList producer.send(producerData) builder.addFetch(topic, partition, 0, 10000) - } + } - // wait a bit for produced message to be available val request = builder.build() val response = consumer.fetch(request) - for( (topic, partition) <- topics) { + for((topic, partition) <- topics) { val fetched = response.messageSet(topic, partition) - TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator) + assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload))) } } @@ -204,11 +200,11 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with // send some messages val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { - val messages = new mutable.HashMap[String, Seq[Message]] + val messages = new mutable.HashMap[String, Seq[String]] val builder = new FetchRequestBuilder() for( (topic, partition) <- topics) { - val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) - val producerData = new ProducerData[String, Message](topic, topic, messageList) + val messageList = List("a_" + topic, "b_" + topic) + val producerData = new ProducerData[String, String](topic, topic, messageList) messages += topic -> messageList producer.send(producerData) builder.addFetch(topic, partition, 0, 10000) @@ -219,7 +215,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val response = consumer.fetch(request) for( (topic, partition) <- topics) { val fetched = response.messageSet(topic, partition) - TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator) + assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload))) } } @@ -267,12 +263,12 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with // send some messages val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); - val messages = new mutable.HashMap[String, Seq[Message]] + val messages = new mutable.HashMap[String, Seq[String]] val builder = new FetchRequestBuilder() - var produceList: List[ProducerData[String, Message]] = Nil + var produceList: List[ProducerData[String, String]] = Nil for( (topic, partition) <- topics) { - val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) - val producerData = new ProducerData[String, Message](topic, topic, messageList) + val messageList = List("a_" + topic, "b_" + topic) + val producerData = new ProducerData[String, String](topic, topic, messageList) messages += topic -> messageList producer.send(producerData) builder.addFetch(topic, partition, 0, 10000) @@ -284,19 +280,19 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val response = consumer.fetch(request) for( (topic, partition) <- topics) { val fetched = response.messageSet(topic, partition) - TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator) + assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload))) } } def testMultiProduceWithCompression() { // send some messages val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); - val messages = new mutable.HashMap[String, Seq[Message]] + val messages = new mutable.HashMap[String, Seq[String]] val builder = new FetchRequestBuilder() - var produceList: List[ProducerData[String, Message]] = Nil + var produceList: List[ProducerData[String, String]] = Nil for( (topic, partition) <- topics) { - val messageList = List(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) - val producerData = new ProducerData[String, Message](topic, topic, messageList) + val messageList = List("a_" + topic, "b_" + topic) + val producerData = new ProducerData[String, String](topic, topic, messageList) messages += topic -> messageList producer.send(producerData) builder.addFetch(topic, partition, 0, 10000) @@ -308,7 +304,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with val response = consumer.fetch(request) for( (topic, partition) <- topics) { val fetched = response.messageSet(topic, 0) - TestUtils.checkEquals(messages(topic).iterator, fetched.map(messageAndOffset => messageAndOffset.message).iterator) + assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload))) } } diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala index c1ccee7..fd9fae5 100644 --- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala @@ -23,11 +23,12 @@ import java.util.Properties import kafka.producer.{ProducerConfig, Producer} import kafka.message.Message import kafka.utils.TestUtils +import kafka.serializer._ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness { val port: Int val host = "localhost" - var producer: Producer[String, Message] = null + var producer: Producer[String, String] = null var consumer: SimpleConsumer = null override def setUp() { @@ -41,6 +42,7 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes props.put("producer.retry.backoff.ms", "1000") props.put("producer.num.retries", "3") props.put("producer.request.required.acks", "-1") + props.put("serializer.class", classOf[StringEncoder].getName.toString) producer = new Producer(new ProducerConfig(props)) consumer = new SimpleConsumer(host, port, diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index e5cc792..69476fc 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -24,7 +24,9 @@ import org.scalatest.junit.JUnit3Suite import scala.collection.JavaConversions._ import org.apache.log4j.{Level, Logger} import kafka.message._ +import kafka.serializer._ import kafka.javaapi.producer.{ProducerData, Producer} +import kafka.utils.IntEncoder import kafka.utils.TestUtils._ import kafka.utils.{Utils, Logging, TestUtils} import kafka.consumer.{KafkaStream, ConsumerConfig} @@ -60,43 +62,46 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) - val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2))) + val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic -> numNodes*numParts/2)), new StringDecoder(), new StringDecoder()) val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1) - assertEquals(sentMessages1, receivedMessages1) + assertEquals(sentMessages1.sorted, receivedMessages1.sorted) zkConsumerConnector1.shutdown info("all consumer connectors stopped") requestHandlerLogger.setLevel(Level.ERROR) } - def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec): List[Message]= { - var messages: List[Message] = Nil - val producer: kafka.producer.Producer[Int, Message] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs)) - val javaProducer: Producer[Int, Message] = new kafka.javaapi.producer.Producer(producer) + def sendMessages(conf: KafkaConfig, + messagesPerNode: Int, + header: String, + compressed: CompressionCodec): List[String] = { + var messages: List[String] = Nil + val producer: kafka.producer.Producer[Int, String] = + TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder(), new IntEncoder()) + val javaProducer: Producer[Int, String] = new kafka.javaapi.producer.Producer(producer) for (partition <- 0 until numParts) { - val ms = 0.until(messagesPerNode).map(x => - new Message((header + conf.brokerId + "-" + partition + "-" + x).getBytes)).toArray - for (message <- ms) - messages ::= message + val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) + messages ++= ms import scala.collection.JavaConversions._ - javaProducer.send(new ProducerData[Int, Message](topic, partition, asList(ms))) + javaProducer.send(new ProducerData[Int, String](topic, partition, asList(ms))) } javaProducer.close messages } - def sendMessages(messagesPerNode: Int, header: String, compressed: CompressionCodec = NoCompressionCodec): List[Message]= { - var messages: List[Message] = Nil - for(conf <- configs) { + def sendMessages(messagesPerNode: Int, + header: String, + compressed: CompressionCodec = NoCompressionCodec): List[String] = { + var messages: List[String] = Nil + for(conf <- configs) messages ++= sendMessages(conf, messagesPerNode, header, compressed) - } - messages.sortWith((s,t) => s.checksum < t.checksum) + messages } - def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[Message]]]) - : List[Message]= { - var messages: List[Message] = Nil + def getMessages(nMessagesPerThread: Int, + jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = { + var messages: List[String] = Nil val topicMessageStreams = asMap(jTopicMessageStreams) for ((topic, messageStreams) <- topicMessageStreams) { for (messageStream <- messageStreams) { @@ -105,11 +110,11 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar assertTrue(iterator.hasNext) val message = iterator.next.message messages ::= message - debug("received message: " + Utils.readString(message.payload, "UTF-8")) + debug("received message: " + message) } } } - messages.sortWith((s,t) => s.checksum < t.checksum) + messages } private def toJavaMap(scalaMap: Map[String, Int]): java.util.Map[String, java.lang.Integer] = { diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index 39bf577..da3c704 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -169,10 +169,9 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with } } -class AppenderStringEncoder extends Encoder[LoggingEvent] { - def toMessage(event: LoggingEvent):Message = { - val logMessage = event.getMessage - new Message(logMessage.asInstanceOf[String].getBytes) +class AppenderStringEncoder(encoding: String = "UTF-8") extends Encoder[LoggingEvent] { + def toBytes(event: LoggingEvent): Array[Byte] = { + event.getMessage.toString.getBytes(encoding) } } diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 6ac59ce..9245158 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -185,20 +185,18 @@ class AsyncProducerTest extends JUnit3Suite { val producerPool = new ProducerPool(config) val handler = new DefaultEventHandler[Int,String](config, - partitioner = intPartitioner, - encoder = null.asInstanceOf[Encoder[String]], - producerPool = producerPool, - topicPartitionInfos) + partitioner = intPartitioner, + encoder = null.asInstanceOf[Encoder[String]], + keyEncoder = new IntEncoder(), + producerPool = producerPool, + topicPartitionInfos) - val topic1Broker1Data = new ListBuffer[ProducerData[Int,Message]] - topic1Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic1", 0, new Message("msg1".getBytes)), - new ProducerData[Int,Message]("topic1", 2, new Message("msg3".getBytes)))) - val topic1Broker2Data = new ListBuffer[ProducerData[Int,Message]] - topic1Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic1", 3, new Message("msg4".getBytes)))) - val topic2Broker1Data = new ListBuffer[ProducerData[Int,Message]] - topic2Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes)))) - val topic2Broker2Data = new ListBuffer[ProducerData[Int,Message]] - topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes)))) + val topic1Broker1Data = + ListBuffer[ProducerData[Int,Message]](new ProducerData[Int,Message]("topic1", 0, new Message("msg1".getBytes)), + new ProducerData[Int,Message]("topic1", 2, new Message("msg3".getBytes))) + val topic1Broker2Data = ListBuffer[ProducerData[Int,Message]](new ProducerData[Int,Message]("topic1", 3, new Message("msg4".getBytes))) + val topic2Broker1Data = ListBuffer[ProducerData[Int,Message]](new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes))) + val topic2Broker2Data = ListBuffer[ProducerData[Int,Message]](new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes))) val expectedResult = Some(Map( 0 -> Map( TopicAndPartition("topic1", 0) -> topic1Broker1Data, @@ -228,13 +226,13 @@ class AsyncProducerTest extends JUnit3Suite { val handler = new DefaultEventHandler[String,String](config, partitioner = null.asInstanceOf[Partitioner[String]], encoder = new StringEncoder, + keyEncoder = new StringEncoder, producerPool = producerPool, topicPartitionInfos ) val serializedData = handler.serialize(produceData) - val decoder = new StringDecoder - val deserializedData = serializedData.map(d => new ProducerData[String,String](d.getTopic, d.getData.map(m => decoder.toEvent(m)))) + val deserializedData = serializedData.map(d => new ProducerData[String,String](d.getTopic, d.getData.map(m => Utils.readString(m.payload)))) TestUtils.checkEquals(produceData.iterator, deserializedData.iterator) } @@ -257,6 +255,7 @@ class AsyncProducerTest extends JUnit3Suite { val handler = new DefaultEventHandler[String,String](config, partitioner = new NegativePartitioner, encoder = null.asInstanceOf[Encoder[String]], + keyEncoder = null.asInstanceOf[Encoder[String]], producerPool = producerPool, topicPartitionInfos) try { @@ -287,6 +286,7 @@ class AsyncProducerTest extends JUnit3Suite { val handler = new DefaultEventHandler[String,String](config, partitioner = null.asInstanceOf[Partitioner[String]], encoder = new StringEncoder, + keyEncoder = new StringEncoder, producerPool = producerPool, topicPartitionInfos) try { @@ -333,6 +333,7 @@ class AsyncProducerTest extends JUnit3Suite { val handler = new DefaultEventHandler[String,String](config, partitioner = null.asInstanceOf[Partitioner[String]], encoder = null.asInstanceOf[Encoder[String]], + keyEncoder = null.asInstanceOf[Encoder[String]], producerPool = producerPool, topicPartitionInfos) val producerDataList = new ListBuffer[ProducerData[String,Message]] @@ -375,6 +376,7 @@ class AsyncProducerTest extends JUnit3Suite { val handler = new DefaultEventHandler[String,String]( config, partitioner = null.asInstanceOf[Partitioner[String]], encoder = new StringEncoder, + keyEncoder = new StringEncoder, producerPool = producerPool, topicPartitionInfos) @@ -429,7 +431,8 @@ class AsyncProducerTest extends JUnit3Suite { val handler = new DefaultEventHandler[Int,String](config, partitioner = new FixedValuePartitioner(), - encoder = new StringEncoder, + encoder = new StringEncoder(), + keyEncoder = new IntEncoder(), producerPool = producerPool, topicPartitionInfos) val data = List(new ProducerData[Int,String](topic1, 0, msgs), @@ -496,3 +499,7 @@ class AsyncProducerTest extends JUnit3Suite { new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1)))) } } + +class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner[String] { + def partition(data: String, numPartitions: Int): Int = -1 +} diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index cce858f..ac5eb2a 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -22,6 +22,7 @@ import kafka.admin.CreateTopicCommand import kafka.utils.TestUtils._ import kafka.utils.{Utils, TestUtils} import kafka.zk.ZooKeeperTestHarness +import kafka.serializer._ import kafka.message.Message import kafka.producer.{ProducerConfig, ProducerData, Producer} @@ -42,12 +43,16 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { val configProps1 = configs.head val configProps2 = configs.last - val message = new Message("hello".getBytes()) + val message = "hello" - var producer: Producer[Int, Message] = null + var producer: Producer[Int, String] = null var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDirs(0)) var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0)) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + + val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000) + producerProps.put("producer.request.required.acks", "-1") + def testHWCheckpointNoFailuresSingleLogSegment { // start both servers @@ -55,10 +60,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { server2 = TestUtils.createServer(configProps2) servers ++= List(server1, server2) - val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000) - producerProps.put("producer.request.timeout.ms", "1000") - producerProps.put("producer.request.required.acks", "-1") - producer = new Producer[Int, Message](new ProducerConfig(producerProps)) + producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":")) @@ -92,10 +94,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { server2 = TestUtils.createServer(configProps2) servers ++= List(server1, server2) - val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000) - producerProps.put("producer.request.timeout.ms", "1000") - producerProps.put("producer.request.required.acks", "-1") - producer = new Producer[Int, Message](new ProducerConfig(producerProps)) + producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":")) @@ -152,14 +151,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { } def testHWCheckpointNoFailuresMultipleLogSegments { - val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { - override val replicaMaxLagTimeMs = 5000L - override val replicaMaxLagBytes = 10L - override val flushInterval = 10 - override val replicaMinBytes = 20 - override val logFileSize = 30 - }) - // start both servers server1 = TestUtils.createServer(configs.head) server2 = TestUtils.createServer(configs.last) @@ -168,10 +159,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0)) hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0)) - val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000) - producerProps.put("producer.request.timeout.ms", "1000") - producerProps.put("producer.request.required.acks", "-1") - producer = new Producer[Int, Message](new ProducerConfig(producerProps)) + producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":")) @@ -197,14 +185,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { } def testHWCheckpointWithFailuresMultipleLogSegments { - val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) { - override val replicaMaxLagTimeMs = 5000L - override val replicaMaxLagBytes = 10L - override val flushInterval = 1000 - override val replicaMinBytes = 20 - override val logFileSize = 30 - }) - // start both servers server1 = TestUtils.createServer(configs.head) server2 = TestUtils.createServer(configs.last) @@ -213,10 +193,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0)) hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0)) - val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000) - producerProps.put("producer.request.timeout.ms", "1000") - producerProps.put("producer.request.required.acks", "-1") - producer = new Producer[Int, Message](new ProducerConfig(producerProps)) + producer = new Producer[Int, String](new ProducerConfig(producerProps)) // create topic with 1 partition, 2 replicas, one on each broker CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":")) @@ -268,6 +245,6 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { private def sendMessages(n: Int = 1) { for(i <- 0 until n) - producer.send(new ProducerData[Int, Message](topic, 0, message)) + producer.send(new ProducerData[Int, String](topic, 0, message)) } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 749c0ae..b356dad 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -55,7 +55,9 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { } // send test messages to leader - val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder) + val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), + new StringEncoder(), + new StringEncoder()) producer.send(new ProducerData[String, String](topic1, testMessageList1), new ProducerData[String, String](topic2, testMessageList2)) producer.close() diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index b518a33..c322100 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -36,19 +36,19 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val host = "localhost" val topic = "test" - val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes())) - val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes())) + val sent1 = List("hello", "there") + val sent2 = List("more", "messages") @Test def testCleanShutdown() { var server = new KafkaServer(config) server.startup() - var producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000))) + var producer = new Producer[Int, String](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000))) // create topic CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") // send some messages - producer.send(new ProducerData[Int, Message](topic, 0, sent1)) + producer.send(new ProducerData[Int, String](topic, 0, sent1)) // do a clean shutdown and check that the clean shudown file is written out server.shutdown() @@ -62,7 +62,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { server = new KafkaServer(config) server.startup() - producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000))) + producer = new Producer[Int, String](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000))) val consumer = new SimpleConsumer(host, port, 1000000, @@ -75,18 +75,18 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build()) fetchedMessage = fetched.messageSet(topic, 0) } - TestUtils.checkEquals(sent1.iterator, fetchedMessage.map(m => m.message).iterator) + assertEquals(sent1, fetchedMessage.map(m => Utils.readString(m.message.payload))) val newOffset = fetchedMessage.last.nextOffset // send some more messages - producer.send(new ProducerData[Int, Message](topic, 0, sent2)) + producer.send(new ProducerData[Int, String](topic, 0, sent2)) fetchedMessage = null while(fetchedMessage == null || fetchedMessage.validBytes == 0) { val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build()) fetchedMessage = fetched.messageSet(topic, 0) } - TestUtils.checkEquals(sent2.iterator, fetchedMessage.map(m => m.message).iterator) + assertEquals(sent2, fetchedMessage.map(m => Utils.readString(m.message.payload))) consumer.close() producer.close() diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 8dbd85e..a902a94 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -288,13 +288,16 @@ object TestUtils extends Logging { /** * Create a producer for the given host and port */ - def createProducer[K, V](brokerList: String, encoder: Encoder[V] = new DefaultEncoder): Producer[K, V] = { + def createProducer[K, V](brokerList: String, + encoder: Encoder[V] = new DefaultEncoder(), + keyEncoder: Encoder[K] = new DefaultEncoder()): Producer[K, V] = { val props = new Properties() props.put("broker.list", brokerList) props.put("buffer.size", "65536") props.put("connect.timeout.ms", "100000") props.put("reconnect.interval", "10000") props.put("serializer.class", encoder.getClass.getCanonicalName) + props.put("key.serializer.class", keyEncoder.getClass.getCanonicalName) new Producer[K, V](new ProducerConfig(props)) } @@ -307,6 +310,8 @@ object TestUtils extends Logging { props.put("buffer.size", bufferSize.toString) props.put("connect.timeout.ms", connectTimeout.toString) props.put("reconnect.interval", reconnectInterval.toString) + props.put("producer.request.timeout.ms", 30000.toString) + props.put("serializer.class", classOf[StringEncoder].getName.toString) props } @@ -354,8 +359,7 @@ object TestUtils extends Logging { } def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = { - val encoder = new StringEncoder - new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*) + new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => new Message(m.getBytes)): _*) } def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { @@ -490,30 +494,22 @@ object TestZKUtils { val zookeeperConnect = "127.0.0.1:2182" } -class StringSerializer extends Encoder[String] { - def toEvent(message: Message):String = message.toString - def toMessage(event: String):Message = new Message(event.getBytes) - def getTopic(event: String): String = event.concat("-topic") +class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] { + override def toBytes(n: Int) = n.toString.getBytes } -class NegativePartitioner extends Partitioner[String] { - def partition(data: String, numPartitions: Int): Int = { - -1 - } -} - -class StaticPartitioner extends Partitioner[String] { +class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner[String] { def partition(data: String, numPartitions: Int): Int = { (data.length % numPartitions) } } -class HashPartitioner extends Partitioner[String] { +class HashPartitioner(props: VerifiableProperties = null) extends Partitioner[String] { def partition(data: String, numPartitions: Int): Int = { (data.hashCode % numPartitions) } } -class FixedValuePartitioner extends Partitioner[Int] { +class FixedValuePartitioner(props: VerifiableProperties = null) extends Partitioner[Int] { def partition(data: Int, numPartitions: Int): Int = data } diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala index 9d38d16..a4d3a27 100644 --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -17,6 +17,8 @@ package kafka.utils +import java.util.Arrays +import java.nio.ByteBuffer import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite import org.junit.Test @@ -51,5 +53,13 @@ class UtilsTest extends JUnitSuite { assertEquals(2, its.next()) assertEquals(1, its.next()) } + + @Test + def testReadBytes() { + for(testCase <- List("", "a", "abcd")) { + val bytes = testCase.getBytes + assertTrue(Arrays.equals(bytes, Utils.readBytes(ByteBuffer.wrap(bytes)))) + } + } } diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index cb01577..2b87560 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -56,10 +56,10 @@ public class Consumer extends Thread public void run() { Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(1)); - Map>> consumerMap = consumer.createMessageStreams(topicCountMap); - KafkaStream stream = consumerMap.get(topic).get(0); - ConsumerIterator it = stream.iterator(); + Map>> consumerMap = consumer.createMessageStreams(topicCountMap); + KafkaStream stream = consumerMap.get(topic).get(0); + ConsumerIterator it = stream.iterator(); while(it.hasNext()) - System.out.println(ExampleUtils.getMessage(it.next().message())); + System.out.println(new String(it.next().message())); } } diff --git a/examples/src/main/java/kafka/examples/ExampleUtils.java b/examples/src/main/java/kafka/examples/ExampleUtils.java deleted file mode 100644 index 34fd1c0..0000000 --- a/examples/src/main/java/kafka/examples/ExampleUtils.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.examples; - - -import java.nio.ByteBuffer; -import kafka.message.Message; - -public class ExampleUtils -{ - public static String getMessage(Message message) - { - ByteBuffer buffer = message.payload(); - byte [] bytes = new byte[buffer.remaining()]; - buffer.get(bytes); - return new String(bytes); - } -} diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java index f995120..b87d50c 100644 --- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java +++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java @@ -19,6 +19,9 @@ package kafka.examples; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.javaapi.FetchResponse; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import kafka.javaapi.consumer.SimpleConsumer; @@ -29,9 +32,12 @@ import java.util.Map; public class SimpleConsumerDemo { - private static void printMessages(ByteBufferMessageSet messageSet) { - for (MessageAndOffset messageAndOffset : messageSet) { - System.out.println(ExampleUtils.getMessage(messageAndOffset.message())); + private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException { + for(MessageAndOffset messageAndOffset: messageSet) { + ByteBuffer payload = messageAndOffset.message().payload(); + byte[] bytes = new byte[payload.limit()]; + payload.get(bytes); + System.out.println(new String(bytes, "UTF-8")); } } @@ -47,7 +53,7 @@ public class SimpleConsumerDemo { } } - public static void main(String[] args) { + public static void main(String[] args) throws Exception { generateData(); SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL, diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala index d616fa8..a720ced 100644 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala @@ -52,7 +52,7 @@ object ConsumerPerformance { val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig) - val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.topic -> config.numThreads)) + val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads)) var threadList = List[ConsumerPerfThread]() for ((topic, streamList) <- topicMessageStreams) for (i <- 0 until streamList.length) @@ -140,7 +140,7 @@ object ConsumerPerformance { val hideHeader = options.has(hideHeaderOpt) } - class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Message], + class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]], config:ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) extends Thread(name) { private val shutdownLatch = new CountDownLatch(1) @@ -160,7 +160,7 @@ object ConsumerPerformance { try { for (messageAndMetadata <- stream if messagesRead < config.numMessages) { messagesRead += 1 - bytesRead += messageAndMetadata.message.payloadSize + bytesRead += messageAndMetadata.message.length if (messagesRead % config.reportingInterval == 0) { if(config.showDetailedStats)