From 19a095814281f09c344a994897d80d61d988e7cc Mon Sep 17 00:00:00 2001 From: Scott Carey Date: Sat, 6 Apr 2013 20:30:44 -0700 Subject: [PATCH 2/4] java conversions changes --- .../main/scala/kafka/controller/PartitionStateMachine.scala | 6 +++--- core/src/main/scala/kafka/javaapi/FetchRequest.scala | 2 +- core/src/main/scala/kafka/javaapi/OffsetRequest.scala | 2 +- core/src/main/scala/kafka/javaapi/TopicMetadata.scala | 10 +++++----- core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala | 4 ++-- .../kafka/javaapi/consumer/ZookeeperConsumerConnector.scala | 7 +++---- .../scala/kafka/javaapi/message/ByteBufferMessageSet.scala | 3 ++- core/src/main/scala/kafka/javaapi/producer/Producer.scala | 4 ++-- core/src/main/scala/kafka/tools/JmxTool.scala | 2 +- core/src/main/scala/kafka/utils/Pool.scala | 11 +++++------ .../unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala | 4 ++-- .../javaapi/consumer/ZookeeperConsumerConnectorTest.scala | 7 ++++--- .../unit/kafka/javaapi/message/BaseMessageSetTestCases.scala | 11 +++++------ .../test/scala/unit/kafka/producer/AsyncProducerTest.scala | 2 +- 14 files changed, 37 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index deebed0..3722b99 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -17,7 +17,7 @@ package kafka.controller import collection._ -import collection.JavaConversions._ +import collection.JavaConverters._ import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} @@ -359,8 +359,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { controllerContext.controllerLock synchronized { if (hasStarted.get) { try { - debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(","))) - val currentChildren = JavaConversions.asBuffer(children).toSet + debug("Topic change listener fired for path %s with children %s".format(parentPath, children.asScala.mkString(","))) + val currentChildren = children.asScala.toSet val newTopics = currentChildren -- controllerContext.allTopics val deletedTopics = controllerContext.allTopics -- currentChildren // val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1)) diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala index b475240..0ecadb3 100644 --- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala +++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala @@ -29,7 +29,7 @@ class FetchRequest(correlationId: Int, requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) { val underlying = { - val scalaMap = JavaConversions.asMap(requestInfo).toMap + val scalaMap = JavaConversions.asScalaMap(requestInfo).toMap kafka.api.FetchRequest( correlationId = correlationId, clientId = clientId, diff --git a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala index 1c77ff8..79d952f 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala @@ -28,7 +28,7 @@ class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffse clientId: String) { val underlying = { - val scalaMap = JavaConversions.asMap(requestInfo).toMap + val scalaMap = JavaConversions.asScalaMap(requestInfo).toMap kafka.api.OffsetRequest( requestInfo = scalaMap, versionId = versionId, diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala index 97b6dcd..2a780b5 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala @@ -17,16 +17,16 @@ package kafka.javaapi import kafka.cluster.Broker -import scala.collection.JavaConversions.asList +import scala.collection.JavaConversions.asJavaList private[javaapi] object MetadataListImplicits { implicit def toJavaTopicMetadataList(topicMetadataSeq: Seq[kafka.api.TopicMetadata]): java.util.List[kafka.javaapi.TopicMetadata] = - asList(topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))) + asJavaList(topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))) implicit def toPartitionMetadataList(partitionMetadataSeq: Seq[kafka.api.PartitionMetadata]): java.util.List[kafka.javaapi.PartitionMetadata] = - asList(partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))) + asJavaList(partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))) } class TopicMetadata(private val underlying: kafka.api.TopicMetadata) { @@ -51,9 +51,9 @@ class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) { underlying.leader } - def replicas: java.util.List[Broker] = asList(underlying.replicas) + def replicas: java.util.List[Broker] = asJavaList(underlying.replicas) - def isr: java.util.List[Broker] = asList(underlying.isr) + def isr: java.util.List[Broker] = asJavaList(underlying.isr) def errorCode: Short = underlying.errorCode diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index 5f80df7..f7c4fa1 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -18,7 +18,7 @@ package kafka.javaapi import kafka.api._ import java.nio.ByteBuffer -import scala.collection.JavaConversions +import scala.collection.JavaConverters._ class TopicMetadataRequest(val versionId: Short, override val correlationId: Int, @@ -27,7 +27,7 @@ class TopicMetadataRequest(val versionId: Short, extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) { val underlying: kafka.api.TopicMetadataRequest = - new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, JavaConversions.asBuffer(topics)) + new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics.asScala) def this(topics: java.util.List[String]) = this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics) diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 14c4c8a..cb93f80 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -18,7 +18,7 @@ package kafka.javaapi.consumer import kafka.serializer._ import kafka.consumer._ -import scala.collection.JavaConversions.asList +import collection.JavaConverters._ /** @@ -71,9 +71,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : java.util.Map[String,java.util.List[KafkaStream[K,V]]] = { - import scala.collection.JavaConversions._ - val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]]) + val scalaTopicCountMap: Map[String, Int] = Map.empty ++ topicCountMap.asInstanceOf[java.util.Map[String, Int]].asScala val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder) val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]] for ((topic, streams) <- scalaReturn) { @@ -89,7 +88,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder()) def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = - asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder)) + underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder).asJava def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder()) diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala index 0a95248..6c78fd3 100644 --- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala @@ -20,12 +20,13 @@ import java.util.concurrent.atomic.AtomicLong import scala.reflect.BeanProperty import java.nio.ByteBuffer import kafka.message._ +import collection.JavaConverters._ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet { private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer) def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) { - this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), scala.collection.JavaConversions.asBuffer(messages): _*).buffer) + this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), messages.asScala: _*).buffer) } def this(messages: java.util.List[Message]) { diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala index 7265328..33c234f 100644 --- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala +++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala @@ -19,6 +19,7 @@ package kafka.javaapi.producer import kafka.producer.ProducerConfig import kafka.producer.KeyedMessage +import collection.JavaConverters._ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only { @@ -37,8 +38,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for * @param messages list of producer data objects that encapsulate the topic, key and message data */ def send(messages: java.util.List[KeyedMessage[K,V]]) { - import collection.JavaConversions._ - underlying.send(asBuffer(messages):_*) + underlying.send(messages.asScala: _*) } /** diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index 7e424e7..26badb1 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -86,7 +86,7 @@ object JmxTool extends Logging { else List(null) - val names = queries.map((name: ObjectName) => asSet(mbsc.queryNames(name, null))).flatten + val names = queries.map((name: ObjectName) => asJavaSet(mbsc.queryNames(name, null))).flatten val allAttributes: Iterable[(ObjectName, Array[String])] = names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName))) diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala index 9a86eab..738cd0b 100644 --- a/core/src/main/scala/kafka/utils/Pool.scala +++ b/core/src/main/scala/kafka/utils/Pool.scala @@ -19,7 +19,7 @@ package kafka.utils import java.util.ArrayList import java.util.concurrent._ -import collection.JavaConversions +import collection.JavaConverters._ import kafka.common.KafkaException import java.lang.Object @@ -70,11 +70,10 @@ class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] def get(key: K): V = pool.get(key) def remove(key: K): V = pool.remove(key) - - def keys = JavaConversions.asSet(pool.keySet()) - - def values: Iterable[V] = - JavaConversions.asIterable(new ArrayList[V](pool.values())) + + def keys = pool.keySet.asScala.toSet + + def values: Iterable[V] = new ArrayList[V](pool.values).asScala def clear() { pool.clear() } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index fcfc583..b966300 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -31,6 +31,7 @@ import kafka.utils._ import kafka.producer.{ProducerConfig, KeyedMessage, Producer} import java.util.{Collections, Properties} import org.apache.log4j.{Logger, Level} +import collection.JavaConverters._ import kafka.utils.TestUtils._ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { @@ -406,10 +407,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar } def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = { - import scala.collection.JavaConversions val children = zkClient.getChildren(path) Collections.sort(children) - val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children) + val childrenAsSeq : Seq[java.lang.String] = children.asScala childrenAsSeq.map(partition => (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String])) } diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index 9f243f0..a86e3ce 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -21,7 +21,6 @@ import junit.framework.Assert._ import kafka.integration.KafkaServerTestHarness import kafka.server._ import org.scalatest.junit.JUnit3Suite -import scala.collection.JavaConversions._ import org.apache.log4j.{Level, Logger} import kafka.message._ import kafka.serializer._ @@ -32,6 +31,7 @@ import kafka.utils.TestUtils._ import kafka.utils.{Logging, TestUtils} import kafka.consumer.{KafkaStream, ConsumerConfig} import kafka.zk.ZooKeeperTestHarness +import scala.collection.JavaConversions class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging { @@ -85,7 +85,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) messages ++= ms import scala.collection.JavaConversions._ - javaProducer.send(asList(ms.map(new KeyedMessage[Int, String](topic, partition, _)))) + javaProducer.send(asJavaList(ms.map(new KeyedMessage[Int, String](topic, partition, _)))) } javaProducer.close messages @@ -102,8 +102,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = { + import scala.collection.JavaConversions._ var messages: List[String] = Nil - val topicMessageStreams = asMap(jTopicMessageStreams) + val topicMessageStreams = JavaConversions.asScalaMap(jTopicMessageStreams).toMap for ((topic, messageStreams) <- topicMessageStreams) { for (messageStream <- messageStreams) { val iterator = messageStream.iterator diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala index abee11b..a889630 100644 --- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala @@ -22,14 +22,15 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test import kafka.utils.TestUtils import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, CompressionCodec, Message} +import scala.collection.JavaConversions._ + trait BaseMessageSetTestCases extends JUnitSuite { val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes())) def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet def toMessageIterator(messageSet: MessageSet): Iterator[Message] = { - import scala.collection.JavaConversions._ - val messages = asIterable(messageSet) + val messages = asJavaIterable(messageSet) messages.map(m => m.message).iterator } @@ -41,18 +42,16 @@ trait BaseMessageSetTestCases extends JUnitSuite { @Test def testIteratorIsConsistent() { - import scala.collection.JavaConversions._ val m = createMessageSet(messages) // two iterators over the same set should give the same results - TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator)) + TestUtils.checkEquals(m.iterator, m.iterator) } @Test def testIteratorIsConsistentWithCompression() { - import scala.collection.JavaConversions._ val m = createMessageSet(messages, DefaultCompressionCodec) // two iterators over the same set should give the same results - TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator)) + TestUtils.checkEquals(m.iterator, m.iterator) } @Test diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 1781bc0..ccf24b2 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -450,7 +450,7 @@ class AsyncProducerTest extends JUnit3Suite { val topic = "topic1" val msgs = TestUtils.getMsgStrings(5) val scalaProducerData = msgs.map(m => new KeyedMessage[String, String](topic, m)) - val javaProducerData = scala.collection.JavaConversions.asList(scalaProducerData) + val javaProducerData = scala.collection.JavaConversions.asJavaList(scalaProducerData) val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]]) mockScalaProducer.send(scalaProducerData.head) -- 1.8.3.1