From de91cb08ae629822036cf3e29be7eba90042cffc Mon Sep 17 00:00:00 2001 From: Christopher Freeman Date: Sat, 7 Sep 2013 15:58:05 -0700 Subject: [PATCH] added support for Scala 2.10 --- core/build.sbt | 1 + core/src/main/scala/kafka/Kafka.scala | 2 +- .../scala/kafka/admin/AddPartitionsCommand.scala | 2 +- core/src/main/scala/kafka/admin/AdminUtils.scala | 14 +++++--- .../scala/kafka/admin/DeleteTopicCommand.scala | 2 +- .../PreferredReplicaLeaderElectionCommand.scala | 6 ++-- .../kafka/admin/ReassignPartitionsCommand.scala | 4 +-- core/src/main/scala/kafka/client/ClientUtils.scala | 2 +- core/src/main/scala/kafka/cluster/Broker.scala | 2 +- .../scala/kafka/consumer/ConsoleConsumer.scala | 6 ++-- .../kafka/consumer/ConsumerFetcherManager.scala | 4 +-- .../main/scala/kafka/consumer/SimpleConsumer.scala | 2 +- .../src/main/scala/kafka/consumer/TopicCount.scala | 2 +- .../consumer/ZookeeperConsumerConnector.scala | 10 +++--- .../consumer/ZookeeperTopicEventWatcher.scala | 2 +- .../controller/ControllerChannelManager.scala | 4 +-- .../scala/kafka/controller/KafkaController.scala | 16 ++++----- .../kafka/controller/PartitionStateMachine.scala | 20 ++++++----- .../kafka/controller/ReplicaStateMachine.scala | 4 +-- .../main/scala/kafka/javaapi/FetchRequest.scala | 7 ++-- core/src/main/scala/kafka/javaapi/Implicits.scala | 6 ++++ .../scala/kafka/javaapi/OffsetCommitRequest.scala | 5 ++- .../scala/kafka/javaapi/OffsetCommitResponse.scala | 3 +- .../scala/kafka/javaapi/OffsetFetchRequest.scala | 6 +++- .../scala/kafka/javaapi/OffsetFetchResponse.scala | 3 +- .../main/scala/kafka/javaapi/OffsetRequest.scala | 7 ++-- .../main/scala/kafka/javaapi/TopicMetadata.scala | 24 +++++++++---- .../scala/kafka/javaapi/TopicMetadataRequest.scala | 8 +++-- .../consumer/ZookeeperConsumerConnector.scala | 15 +++++--- .../javaapi/message/ByteBufferMessageSet.scala | 4 ++- .../scala/kafka/javaapi/producer/Producer.scala | 3 +- core/src/main/scala/kafka/log/Log.scala | 14 +++++--- core/src/main/scala/kafka/log/LogConfig.scala | 3 +- core/src/main/scala/kafka/log/LogManager.scala | 4 +-- .../kafka/network/BoundedByteBufferReceive.scala | 2 +- .../main/scala/kafka/producer/SyncProducer.scala | 2 +- .../kafka/producer/async/DefaultEventHandler.scala | 4 +-- .../kafka/producer/async/ProducerSendThread.scala | 4 +-- .../scala/kafka/server/AbstractFetcherThread.scala | 6 ++-- core/src/main/scala/kafka/server/KafkaApis.scala | 12 +++---- .../scala/kafka/server/KafkaServerStartable.scala | 4 +-- .../main/scala/kafka/server/ReplicaManager.scala | 2 +- .../scala/kafka/server/TopicConfigManager.scala | 6 ++-- .../kafka/server/ZookeeperLeaderElector.scala | 2 +- .../main/scala/kafka/tools/ImportZkOffsets.scala | 2 +- core/src/main/scala/kafka/tools/JmxTool.scala | 2 +- core/src/main/scala/kafka/tools/MirrorMaker.scala | 4 +-- .../scala/kafka/tools/SimpleConsumerShell.scala | 2 +- core/src/main/scala/kafka/utils/Annotations.scala | 36 ------------------- .../main/scala/kafka/utils/Annotations_2.8.scala | 36 +++++++++++++++++++ .../main/scala/kafka/utils/Annotations_2.9+.scala | 38 ++++++++++++++++++++ core/src/main/scala/kafka/utils/Json.scala | 2 +- .../main/scala/kafka/utils/KafkaScheduler.scala | 2 +- core/src/main/scala/kafka/utils/Mx4jLoader.scala | 2 +- core/src/main/scala/kafka/utils/Pool.scala | 12 +++++-- .../scala/kafka/utils/VerifiableProperties.scala | 5 ++- core/src/main/scala/kafka/utils/ZkUtils.scala | 24 ++++++------- .../scala/unit/kafka/admin/AddPartitionsTest.scala | 4 +-- .../consumer/ZookeeperConsumerConnectorTest.scala | 8 +++-- .../consumer/ZookeeperConsumerConnectorTest.scala | 9 ++--- .../javaapi/message/BaseMessageSetTestCases.scala | 7 ++-- core/src/test/scala/unit/kafka/log/LogTest.scala | 2 +- .../scala/unit/kafka/metrics/KafkaTimerTest.scala | 5 +-- .../unit/kafka/producer/AsyncProducerTest.scala | 7 ++-- .../scala/unit/kafka/producer/ProducerTest.scala | 14 ++++---- .../unit/kafka/producer/SyncProducerTest.scala | 4 +-- .../test/scala/unit/kafka/utils/TestUtils.scala | 2 +- .../scala/kafka/perf/ConsumerPerformance.scala | 2 +- project/Build.scala | 3 +- 69 files changed, 304 insertions(+), 191 deletions(-) delete mode 100644 core/src/main/scala/kafka/utils/Annotations.scala create mode 100644 core/src/main/scala/kafka/utils/Annotations_2.8.scala create mode 100644 core/src/main/scala/kafka/utils/Annotations_2.9+.scala diff --git a/core/build.sbt b/core/build.sbt index c54cf44..b5bcb44 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -23,6 +23,7 @@ libraryDependencies ++= Seq( libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) => deps :+ (sv match { case "2.8.0" => "org.scalatest" % "scalatest" % "1.2" % "test" + case v if v.startsWith("2.10") => "org.scalatest" %% "scalatest" % "1.9.1" % "test" case _ => "org.scalatest" %% "scalatest" % "1.8" % "test" }) } diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index dafb1ee..988014a 100644 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -47,7 +47,7 @@ object Kafka extends Logging { kafkaServerStartble.awaitShutdown } catch { - case e => fatal(e) + case e: Throwable => fatal(e) } System.exit(0) } diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala index fd41661..c74d9c2 100644 --- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala @@ -68,7 +68,7 @@ object AddPartitionsCommand extends Logging { addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) println("adding partitions succeeded!") } catch { - case e => + case e: Throwable => println("adding partitions failed because of " + e.getMessage) println(Utils.stackTrace(e)) } finally { diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 83ba729..6560fc6 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -138,7 +138,7 @@ object AdminUtils extends Logging { debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) } catch { case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic)) - case e2 => throw new AdminOperationException(e2.toString) + case e2: Throwable => throw new AdminOperationException(e2.toString) } } @@ -162,7 +162,11 @@ object AdminUtils extends Logging { */ private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) { if(config.size > 0) { - val map = Map("version" -> 1, "config" -> JavaConversions.asMap(config)) + val configMap: mutable.Map[String, String] = { + import JavaConversions._ + config + } + val map = Map("version" -> 1, "config" -> configMap) ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map)) } } @@ -222,7 +226,7 @@ object AdminUtils extends Logging { try { Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) } catch { - case e => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e) + case e: Throwable => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e) } case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) } @@ -230,7 +234,7 @@ object AdminUtils extends Logging { replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)) isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas) } catch { - case e => throw new ReplicaNotAvailableException(e) + case e: Throwable => throw new ReplicaNotAvailableException(e) } if(replicaInfo.size < replicas.size) throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + @@ -240,7 +244,7 @@ object AdminUtils extends Logging { inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) } catch { - case e => + case e: Throwable => debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e) new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) diff --git a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala index 3da4518..804b331 100644 --- a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala @@ -54,7 +54,7 @@ object DeleteTopicCommand { println("deletion succeeded!") } catch { - case e => + case e: Throwable => println("delection failed because of " + e.getMessage) println(Utils.stackTrace(e)) } diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 53fc433..26beb96 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -60,7 +60,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { preferredReplicaElectionCommand.moveLeaderToPreferredReplica() println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection)) } catch { - case e => + case e: Throwable => println("Failed to start preferred replica election") println(Utils.stackTrace(e)) } finally { @@ -105,7 +105,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1) throw new AdminOperationException("Preferred replica leader election currently in progress for " + "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection)) - case e2 => throw new AdminOperationException(e2.toString) + case e2: Throwable => throw new AdminOperationException(e2.toString) } } } @@ -117,7 +117,7 @@ class PreferredReplicaLeaderElectionCommand(zkClient: ZkClient, partitions: scal val validPartitions = partitions.filter(p => validatePartition(zkClient, p.topic, p.partition)) PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions) } catch { - case e => throw new AdminCommandFailedException("Admin command failed", e) + case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e) } } diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index aa61fa1..f333d29 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -119,7 +119,7 @@ object ReassignPartitionsCommand extends Logging { "The replica assignment is \n" + partitionsToBeReassigned.toString()) } } catch { - case e => + case e: Throwable => println("Partitions reassignment failed due to " + e.getMessage) println(Utils.stackTrace(e)) } finally { @@ -142,7 +142,7 @@ class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[T val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient) throw new AdminCommandFailedException("Partition reassignment currently in " + "progress for %s. Aborting operation".format(partitionsBeingReassigned)) - case e => error("Admin command failed", e); false + case e: Throwable => error("Admin command failed", e); false } } diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index cc526ec..1d2f81b 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -54,7 +54,7 @@ object ClientUtils extends Logging{ fetchMetaDataSucceeded = true } catch { - case e => + case e: Throwable => warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed" .format(correlationId, topics, shuffledBrokers(i).toString), e) t = e diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index b03dea2..9407ed2 100644 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -42,7 +42,7 @@ private[kafka] object Broker { throw new BrokerNotAvailableException("Broker id %d does not exist".format(id)) } } catch { - case t => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t) + case t: Throwable => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t) } } diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 140f2e3..dc066c2 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -204,7 +204,7 @@ object ConsoleConsumer extends Logging { formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) numMessages += 1 } catch { - case e => + case e: Throwable => if (skipMessageOnError) error("Error processing message, skipping this message: ", e) else @@ -220,7 +220,7 @@ object ConsoleConsumer extends Logging { } } } catch { - case e => error("Error processing message, stopping consumer: ", e) + case e: Throwable => error("Error processing message, stopping consumer: ", e) } System.err.println("Consumed %d messages".format(numMessages)) System.out.flush() @@ -247,7 +247,7 @@ object ConsoleConsumer extends Logging { zk.deleteRecursive(dir) zk.close() } catch { - case _ => // swallow + case _: Throwable => // swallow } } } diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index fa6b213..8c03308 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -79,7 +79,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, } } } catch { - case t => { + case t: Throwable => { if (!isRunning.get()) throw t /* If this thread is stopped, propagate this exception to kill the thread. */ else @@ -95,7 +95,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, try { addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) } catch { - case t => { + case t: Throwable => { if (!isRunning.get()) throw t /* If this thread is stopped, propagate this exception to kill the thread. */ else { diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 77e1ce2..24f7fb5 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -84,7 +84,7 @@ class SimpleConsumer(val host: String, disconnect() throw ioe } - case e => throw e + case e: Throwable => throw e } response } diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index c8e8406..a3eb53e 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -67,7 +67,7 @@ private[kafka] object TopicCount extends Logging { case None => throw new KafkaException("error constructing TopicCount : " + topicCountString) } } catch { - case e => + case e: Throwable => error("error parsing consumer json string " + topicCountString, e) throw e } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 08c9e4f..857fd4d 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -178,7 +178,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, zkClient = null } } catch { - case e => + case e: Throwable => fatal("error during consumer connector shutdown", e) } info("ZKConsumerConnector shut down completed") @@ -335,7 +335,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (doRebalance) syncedRebalance } catch { - case t => error("error during syncedRebalance", t) + case t: Throwable => error("error during syncedRebalance", t) } } info("stopping watcher executor thread for consumer " + consumerIdString) @@ -387,7 +387,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, cluster = getCluster(zkClient) done = rebalance(cluster) } catch { - case e => + case e: Throwable => /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. * For example, a ZK node can disappear between the time we get all children and the time we try to get * the value of a child. Just let this go since another rebalance will be triggered. @@ -464,7 +464,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, " for topic " + topic + " with consumers: " + curConsumers) for (consumerThreadId <- consumerThreadIdSet) { - val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId) + val myConsumerPosition = curConsumers.indexOf(consumerThreadId) assert(myConsumerPosition >= 0) val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) @@ -584,7 +584,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // The node hasn't been deleted by the original owner. So wait a bit and retry. info("waiting for the partition ownership to be deleted: " + partition) false - case e2 => throw e2 + case e2: Throwable => throw e2 } } val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1)) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index df83baa..a67c193 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -75,7 +75,7 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig, } } catch { - case e => + case e: Throwable => error("error in handling child changes", e) } } diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index ed1ce0b..beca460 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -93,7 +93,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext brokerStateInfo(brokerId).requestSendThread.shutdown() brokerStateInfo.remove(brokerId) }catch { - case e => error("Error while removing broker by the controller", e) + case e: Throwable => error("Error while removing broker by the controller", e) } } @@ -142,7 +142,7 @@ class RequestSendThread(val controllerId: Int, } } } catch { - case e => + case e: Throwable => warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e) // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated. channel.disconnect() diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index ab18b7a..aef41ad 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -89,14 +89,14 @@ object KafkaController extends Logging { case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString)) } } catch { - case t => + case t: Throwable => // It may be due to an incompatible controller register version warn("Failed to parse the controller info as json. " + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString)) try { return controllerInfoString.toInt } catch { - case t => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t) + case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t) } } } @@ -436,7 +436,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg .format(topicAndPartition)) } } catch { - case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e) + case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e) // remove the partition from the admin path to unblock the admin client removePartitionFromReassignedPartitions(topicAndPartition) } @@ -448,7 +448,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector) } catch { - case e => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e) + case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e) } finally { removePartitionsFromPreferredReplicaElection(partitions) } @@ -514,9 +514,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } catch { case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " + "Aborting controller startup procedure") - case oe => error("Error while incrementing controller epoch", oe) + case oe: Throwable => error("Error while incrementing controller epoch", oe) } - case oe => error("Error while incrementing controller epoch", oe) + case oe: Throwable => error("Error while incrementing controller epoch", oe) } info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch)) @@ -693,7 +693,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap)) } catch { case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic)) - case e2 => throw new KafkaException(e2.toString) + case e2: Throwable => throw new KafkaException(e2.toString) } } @@ -905,7 +905,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: } } }catch { - case e => error("Error while handling partition reassignment", e) + case e: Throwable => error("Error while handling partition reassignment", e) } } diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index a084830..829163a 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -17,7 +17,8 @@ package kafka.controller import collection._ -import collection.JavaConversions._ +import collection.JavaConversions +import collection.mutable.Buffer import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} @@ -91,7 +92,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement) } catch { - case e => error("Error while moving some partitions to the online state", e) + case e: Throwable => error("Error while moving some partitions to the online state", e) // TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions } } @@ -111,7 +112,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement) }catch { - case e => error("Error while moving some partitions to %s state".format(targetState), e) + case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e) // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions } } @@ -321,7 +322,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } catch { case lenne: LeaderElectionNotNeededException => // swallow case nroe: NoReplicaOnlineException => throw nroe - case sce => + case sce: Throwable => val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage) stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg) throw new StateChangeFailedException(failMsg, sce) @@ -359,8 +360,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { controllerContext.controllerLock synchronized { if (hasStarted.get) { try { - debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(","))) - val currentChildren = JavaConversions.asBuffer(children).toSet + val currentChildren = { + import JavaConversions._ + debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(","))) + (children: Buffer[String]).toSet + } val newTopics = currentChildren -- controllerContext.allTopics val deletedTopics = controllerContext.allTopics -- currentChildren // val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1)) @@ -375,7 +379,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { if(newTopics.size > 0) controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet) } catch { - case e => error("Error while handling new topic", e ) + case e: Throwable => error("Error while handling new topic", e ) } // TODO: kafka-330 Handle deleted topics } @@ -399,7 +403,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded)) controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet) } catch { - case e => error("Error while handling add partitions for data path " + dataPath, e ) + case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e ) } } } diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index c964857..212c05d 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -89,7 +89,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState)) brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement) }catch { - case e => error("Error while moving some replicas to %s state".format(targetState), e) + case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e) } } @@ -273,7 +273,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { if(deadBrokerIds.size > 0) controller.onBrokerFailure(deadBrokerIds.toSeq) } catch { - case e => error("Error while handling broker changes", e) + case e: Throwable => error("Error while handling broker changes", e) } } } diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala index 83d8cbc..4060077 100644 --- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala +++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala @@ -17,10 +17,10 @@ package kafka.javaapi -import scala.collection.JavaConversions import java.nio.ByteBuffer import kafka.common.TopicAndPartition import kafka.api.{Request, PartitionFetchInfo} +import scala.collection.mutable class FetchRequest(correlationId: Int, clientId: String, @@ -29,7 +29,10 @@ class FetchRequest(correlationId: Int, requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) { val underlying = { - val scalaMap = JavaConversions.asMap(requestInfo).toMap + val scalaMap: Map[TopicAndPartition, PartitionFetchInfo] = { + import scala.collection.JavaConversions._ + (requestInfo: mutable.Map[TopicAndPartition, PartitionFetchInfo]).toMap + } kafka.api.FetchRequest( correlationId = correlationId, clientId = clientId, diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala index 9a63914..8baf4d4 100644 --- a/core/src/main/scala/kafka/javaapi/Implicits.scala +++ b/core/src/main/scala/kafka/javaapi/Implicits.scala @@ -46,4 +46,10 @@ private[javaapi] object Implicits extends Logging { case None => null.asInstanceOf[T] } } + + // used explicitly by ByteBufferMessageSet constructor as due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors + implicit def javaListToScalaBuffer[A](l: java.util.List[A]) = { + import scala.collection.JavaConversions._ + l: collection.mutable.Buffer[A] + } } diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala index 32033d6..57b9d2a 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala @@ -27,7 +27,10 @@ class OffsetCommitRequest(groupId: String, correlationId: Int, clientId: String) { val underlying = { - val scalaMap = JavaConversions.asMap(requestInfo).toMap + val scalaMap: Map[TopicAndPartition, OffsetMetadataAndError] = { + import JavaConversions._ + requestInfo.toMap + } kafka.api.OffsetCommitRequest( groupId = groupId, requestInfo = scalaMap, diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala index d1c50c4..570bf31 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala @@ -23,7 +23,8 @@ import collection.JavaConversions class OffsetCommitResponse(private val underlying: kafka.api.OffsetCommitResponse) { def errors: java.util.Map[TopicAndPartition, Short] = { - JavaConversions.asMap(underlying.requestInfo) + import JavaConversions._ + underlying.requestInfo } } diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala index 64d134b..5b4f4bb 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala @@ -18,6 +18,7 @@ package kafka.javaapi import kafka.common.TopicAndPartition +import scala.collection.mutable import collection.JavaConversions import java.nio.ByteBuffer @@ -28,7 +29,10 @@ class OffsetFetchRequest(groupId: String, clientId: String) { val underlying = { - val scalaSeq = JavaConversions.asBuffer(requestInfo) + val scalaSeq = { + import JavaConversions._ + requestInfo: mutable.Buffer[TopicAndPartition] + } kafka.api.OffsetFetchRequest( groupId = groupId, requestInfo = scalaSeq, diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala index 9f83c1b..60924d2 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala @@ -23,7 +23,8 @@ import collection.JavaConversions class OffsetFetchResponse(private val underlying: kafka.api.OffsetFetchResponse) { def offsets: java.util.Map[TopicAndPartition, OffsetMetadataAndError] = { - JavaConversions.asMap(underlying.requestInfo) + import JavaConversions._ + underlying.requestInfo } } diff --git a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala index 3565a15..c8a0ded 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala @@ -19,7 +19,7 @@ package kafka.javaapi import kafka.common.TopicAndPartition import kafka.api.{Request, PartitionOffsetRequestInfo} -import collection.JavaConversions +import scala.collection.mutable import java.nio.ByteBuffer @@ -28,7 +28,10 @@ class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffse clientId: String) { val underlying = { - val scalaMap = JavaConversions.asMap(requestInfo).toMap + val scalaMap = { + import collection.JavaConversions._ + (requestInfo: mutable.Map[TopicAndPartition, PartitionOffsetRequestInfo]).toMap + } kafka.api.OffsetRequest( requestInfo = scalaMap, versionId = versionId, diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala index 97b6dcd..d08c3f4 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala @@ -17,16 +17,20 @@ package kafka.javaapi import kafka.cluster.Broker -import scala.collection.JavaConversions.asList +import scala.collection.JavaConversions private[javaapi] object MetadataListImplicits { implicit def toJavaTopicMetadataList(topicMetadataSeq: Seq[kafka.api.TopicMetadata]): - java.util.List[kafka.javaapi.TopicMetadata] = - asList(topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))) + java.util.List[kafka.javaapi.TopicMetadata] = { + import JavaConversions._ + topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_)) + } implicit def toPartitionMetadataList(partitionMetadataSeq: Seq[kafka.api.PartitionMetadata]): - java.util.List[kafka.javaapi.PartitionMetadata] = - asList(partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))) + java.util.List[kafka.javaapi.PartitionMetadata] = { + import JavaConversions._ + partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_)) + } } class TopicMetadata(private val underlying: kafka.api.TopicMetadata) { @@ -51,9 +55,15 @@ class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) { underlying.leader } - def replicas: java.util.List[Broker] = asList(underlying.replicas) + def replicas: java.util.List[Broker] = { + import JavaConversions._ + underlying.replicas + } - def isr: java.util.List[Broker] = asList(underlying.isr) + def isr: java.util.List[Broker] = { + import JavaConversions._ + underlying.isr + } def errorCode: Short = underlying.errorCode diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index 5f80df7..05757a1 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -18,7 +18,7 @@ package kafka.javaapi import kafka.api._ import java.nio.ByteBuffer -import scala.collection.JavaConversions +import scala.collection.mutable class TopicMetadataRequest(val versionId: Short, override val correlationId: Int, @@ -26,8 +26,10 @@ class TopicMetadataRequest(val versionId: Short, val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) { - val underlying: kafka.api.TopicMetadataRequest = - new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, JavaConversions.asBuffer(topics)) + val underlying: kafka.api.TopicMetadataRequest = { + import scala.collection.JavaConversions._ + new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String]) + } def this(topics: java.util.List[String]) = this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics) diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 14c4c8a..58e83f6 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -18,7 +18,8 @@ package kafka.javaapi.consumer import kafka.serializer._ import kafka.consumer._ -import scala.collection.JavaConversions.asList +import scala.collection.mutable +import scala.collection.JavaConversions /** @@ -71,9 +72,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : java.util.Map[String,java.util.List[KafkaStream[K,V]]] = { - import scala.collection.JavaConversions._ - val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]]) + val scalaTopicCountMap: Map[String, Int] = { + import JavaConversions._ + Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]: mutable.Map[String, Int]) + } val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder) val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]] for ((topic, streams) <- scalaReturn) { @@ -88,8 +91,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] = createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder()) - def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = - asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder)) + def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = { + import JavaConversions._ + underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder) + } def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder()) diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala index 0a95248..fecee8d 100644 --- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala @@ -20,12 +20,14 @@ import java.util.concurrent.atomic.AtomicLong import scala.reflect.BeanProperty import java.nio.ByteBuffer import kafka.message._ +import kafka.javaapi.Implicits.javaListToScalaBuffer class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet { private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer) def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) { - this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), scala.collection.JavaConversions.asBuffer(messages): _*).buffer) + // due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors and must be used explicitly + this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), javaListToScalaBuffer(messages).toSeq : _*).buffer) } def this(messages: java.util.List[Message]) { diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala index 7265328..c465da5 100644 --- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala +++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala @@ -19,6 +19,7 @@ package kafka.javaapi.producer import kafka.producer.ProducerConfig import kafka.producer.KeyedMessage +import scala.collection.mutable class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only { @@ -38,7 +39,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for */ def send(messages: java.util.List[KeyedMessage[K,V]]) { import collection.JavaConversions._ - underlying.send(asBuffer(messages):_*) + underlying.send((messages: mutable.Buffer[KeyedMessage[K,V]]).toSeq: _*) } /** diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 626eb8f..9fe61ff 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -21,7 +21,7 @@ import java.io.{IOException, File} import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} import java.util.concurrent.atomic._ import kafka.utils._ -import scala.collection.JavaConversions.asIterable; +import scala.collection.JavaConversions import java.text.NumberFormat import kafka.message._ import kafka.common._ @@ -162,7 +162,7 @@ class Log(val dir: File, } private def recoverLog() { - val lastOffset = try {activeSegment.nextOffset} catch {case _ => -1L} + val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L} if(lastOffset <= this.recoveryPoint) { info("Log '%s' is fully intact, skipping recovery".format(name)) this.recoveryPoint = lastOffset @@ -581,13 +581,19 @@ class Log(val dir: File, /** * All the log segments in this log ordered from oldest to newest */ - def logSegments: Iterable[LogSegment] = asIterable(segments.values) + def logSegments: Iterable[LogSegment] = { + import JavaConversions._ + segments.values + } /** * Get all segments beginning with the segment that includes "from" and ending with the segment * that includes up to "to-1" or the end of the log (if to > logEndOffset) */ - def logSegments(from: Long, to: Long) = asIterable(segments.subMap(from, true, to, false).values) + def logSegments(from: Long, to: Long): Iterable[LogSegment] = { + import JavaConversions._ + segments.subMap(from, true, to, false).values + } override def toString() = "Log(" + dir + ")" diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 48660bc..51ec796 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -133,7 +133,8 @@ object LogConfig { * Check that property names are valid */ private def validateNames(props: Properties) { - for(name <- JavaConversions.asMap(props).keys) + import JavaConversions._ + for(name <- props.keys) require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name)) } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index d039f9d..4719715 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -228,7 +228,7 @@ class LogManager(val logDirs: Array[File], .format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath, - JavaConversions.asMap(config.toProps).mkString(", "))) + {import JavaConversions._; config.toProps.mkString(", ")})) log } } @@ -320,7 +320,7 @@ class LogManager(val logDirs: Array[File], if(timeSinceLastFlush >= log.config.flushMs) log.flush } catch { - case e => + case e: Throwable => error("Error flushing topic " + topicAndPartition.topic, e) e match { case _: IOException => diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala index cab1864..a442545 100644 --- a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala +++ b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala @@ -82,7 +82,7 @@ private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive case e: OutOfMemoryError => error("OOME with size " + size, e) throw e - case e2 => + case e2: Throwable => throw e2 } buffer diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 306f200..419156e 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -79,7 +79,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry disconnect() throw e - case e => throw e + case e: Throwable => throw e } response } diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 65613ce..c8326a8 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -129,7 +129,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, else serializedMessages += new KeyedMessage[K,Message](topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(bytes = encoder.toBytes(e.message))) } catch { - case t => + case t: Throwable => producerStats.serializationErrorRate.mark() if (isSync) { throw t @@ -178,7 +178,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, }catch { // Swallow recoverable exceptions and return None so that they can be retried. case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None - case oe => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None + case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None } } diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 2b41a49..42e9c74 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -43,7 +43,7 @@ class ProducerSendThread[K,V](val threadName: String, try { processEvents }catch { - case e => error("Error in sending events: ", e) + case e: Throwable => error("Error in sending events: ", e) }finally { shutdownLatch.countDown } @@ -103,7 +103,7 @@ class ProducerSendThread[K,V](val threadName: String, if(size > 0) handler.handle(events) }catch { - case e => error("Error in handling batch of " + size + " events", e) + case e: Throwable => error("Error in handling batch of " + size + " events", e) } } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index d5addb3..a5fc96d 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -95,7 +95,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) response = simpleConsumer.fetch(fetchRequest) } catch { - case t => + case t: Throwable => if (isRunning.get) { warn("Error in fetch %s".format(fetchRequest), t) partitionMapLock synchronized { @@ -136,7 +136,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and // should get fixed in the subsequent fetches logger.warn("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) - case e => + case e: Throwable => throw new KafkaException("error processing data for partition [%s,%d] offset %d" .format(topic, partitionId, currentOffset.get), e) } @@ -147,7 +147,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke warn("Current offset %d for partition [%s,%d] out of range; reset offset to %d" .format(currentOffset.get, topic, partitionId, newOffset)) } catch { - case e => + case e: Throwable => warn("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) partitionsWithError += topicAndPartition } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0287f87..338d1cc 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -277,7 +277,7 @@ class KafkaApis(val requestChannel: RequestChannel, warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage)) new ProduceResult(topicAndPartition, nle) - case e => + case e: Throwable => BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() error("Error processing ProducerRequest with correlation id %d from client %s on partition %s" @@ -366,7 +366,7 @@ class KafkaApis(val requestChannel: RequestChannel, warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage)) new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) - case t => + case t: Throwable => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d" @@ -446,7 +446,7 @@ class KafkaApis(val requestChannel: RequestChannel, warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage)) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) ) - case e => + case e: Throwable => warn("Error while responding to offset request", e) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) ) } @@ -550,7 +550,7 @@ class KafkaApis(val requestChannel: RequestChannel, isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) } catch { - case e => + case e: Throwable => error("Error while fetching metadata for partition %s".format(topicAndPartition), e) new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) @@ -609,7 +609,7 @@ class KafkaApis(val requestChannel: RequestChannel, (topicAndPartition, ErrorMapping.NoError) } } catch { - case e => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + case e: Throwable => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } } } @@ -635,7 +635,7 @@ class KafkaApis(val requestChannel: RequestChannel, ErrorMapping.UnknownTopicOrPartitionCode)) } } catch { - case e => + case e: Throwable => (t, OffsetMetadataAndError(OffsetMetadataAndError.InvalidOffset, OffsetMetadataAndError.NoMetadata, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))) } diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala index 5be65e9..acda52b 100644 --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala +++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala @@ -34,7 +34,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { server.startup() } catch { - case e => + case e: Throwable => fatal("Fatal error during KafkaServerStable startup. Prepare to shutdown", e) shutdown() System.exit(1) @@ -46,7 +46,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { server.shutdown() } catch { - case e => + case e: Throwable => fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e) System.exit(1) } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 3ca4419..ee1cc0c 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -225,7 +225,7 @@ class ReplicaManager(val config: KafkaConfig, makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId) } catch { - case e => + case e: Throwable => val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + "epoch %d for partition %s").format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicAndPartition) diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala index 5814cb7..56cae58 100644 --- a/core/src/main/scala/kafka/server/TopicConfigManager.scala +++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala @@ -77,7 +77,8 @@ class TopicConfigManager(private val zkClient: ZkClient, */ private def processAllConfigChanges() { val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath) - processConfigChanges(JavaConversions.asBuffer(configChanges).sorted) + import JavaConversions._ + processConfigChanges((configChanges: mutable.Buffer[String]).sorted) } /** @@ -123,7 +124,8 @@ class TopicConfigManager(private val zkClient: ZkClient, object ConfigChangeListener extends IZkChildListener { override def handleChildChange(path: String, chillins: java.util.List[String]) { try { - processConfigChanges(JavaConversions.asBuffer(chillins)) + import JavaConversions._ + processConfigChanges(chillins: mutable.Buffer[String]) } catch { case e: Exception => error("Error processing config change:", e) } diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index f1f0625..33b7360 100644 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -72,7 +72,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: } if (leaderId != -1) debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) - case e2 => + case e2: Throwable => error("Error while electing or becoming leader on broker %d".format(brokerId), e2) leaderId = -1 } diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index 55709b5..c8023ee 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -102,7 +102,7 @@ object ImportZkOffsets extends Logging { try { ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) } catch { - case e => e.printStackTrace() + case e: Throwable => e.printStackTrace() } } } diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index 7e424e7..747a675 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -86,7 +86,7 @@ object JmxTool extends Logging { else List(null) - val names = queries.map((name: ObjectName) => asSet(mbsc.queryNames(name, null))).flatten + val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten val allAttributes: Iterable[(ObjectName, Array[String])] = names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName))) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 6fb545a..f0f871c 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -129,7 +129,7 @@ object MirrorMaker extends Logging { try { streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())).flatten } catch { - case t => + case t: Throwable => fatal("Unable to create stream - shutting down mirror maker.") connectors.foreach(_.shutdown) } @@ -204,7 +204,7 @@ object MirrorMaker extends Logging { } } } catch { - case e => + case e: Throwable => fatal("Stream unexpectedly exited.", e) } finally { shutdownLatch.countDown() diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 7629329..747e072 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -217,7 +217,7 @@ object SimpleConsumerShell extends Logging { formatter.writeTo(key, if(message.isNull) null else Utils.readBytes(message.payload), System.out) numMessagesConsumed += 1 } catch { - case e => + case e: Throwable => if (skipMessageOnError) error("Error processing message, skipping this message: ", e) else diff --git a/core/src/main/scala/kafka/utils/Annotations.scala b/core/src/main/scala/kafka/utils/Annotations.scala deleted file mode 100644 index 28269eb..0000000 --- a/core/src/main/scala/kafka/utils/Annotations.scala +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -/* Some helpful annotations */ - -/** - * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation - * must respect - */ -class threadsafe extends StaticAnnotation - -/** - * Indicates that the annotated class is not threadsafe - */ -class nonthreadsafe extends StaticAnnotation - -/** - * Indicates that the annotated class is immutable - */ -class immutable extends StaticAnnotation diff --git a/core/src/main/scala/kafka/utils/Annotations_2.8.scala b/core/src/main/scala/kafka/utils/Annotations_2.8.scala new file mode 100644 index 0000000..28269eb --- /dev/null +++ b/core/src/main/scala/kafka/utils/Annotations_2.8.scala @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +/* Some helpful annotations */ + +/** + * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation + * must respect + */ +class threadsafe extends StaticAnnotation + +/** + * Indicates that the annotated class is not threadsafe + */ +class nonthreadsafe extends StaticAnnotation + +/** + * Indicates that the annotated class is immutable + */ +class immutable extends StaticAnnotation diff --git a/core/src/main/scala/kafka/utils/Annotations_2.9+.scala b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala new file mode 100644 index 0000000..ab95ce1 --- /dev/null +++ b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import scala.annotation.StaticAnnotation + +/* Some helpful annotations */ + +/** + * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation + * must respect + */ +class threadsafe extends StaticAnnotation + +/** + * Indicates that the annotated class is not threadsafe + */ +class nonthreadsafe extends StaticAnnotation + +/** + * Indicates that the annotated class is immutable + */ +class immutable extends StaticAnnotation diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala index 3f1252c..d110284 100644 --- a/core/src/main/scala/kafka/utils/Json.scala +++ b/core/src/main/scala/kafka/utils/Json.scala @@ -36,7 +36,7 @@ object Json extends Logging { try { JSON.parseFull(input) } catch { - case t => + case t: Throwable => throw new KafkaException("Can't parse json string: %s".format(input), t) } } diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala index 73457e1..8e37505 100644 --- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala +++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala @@ -99,7 +99,7 @@ class KafkaScheduler(val threads: Int, trace("Begining execution of scheduled task '%s'.".format(name)) fun() } catch { - case t => error("Uncaught exception in scheduled task '" + name +"'", t) + case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t) } finally { trace("Completed execution of scheduled task '%s'.".format(name)) } diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala index 64d84cc..db9f20b 100644 --- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala +++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala @@ -64,7 +64,7 @@ object Mx4jLoader extends Logging { case e: ClassNotFoundException => { info("Will not load MX4J, mx4j-tools.jar is not in the classpath"); } - case e => { + case e: Throwable => { warn("Could not start register mbean in JMX", e); } } diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala index 9a86eab..9ddcde7 100644 --- a/core/src/main/scala/kafka/utils/Pool.scala +++ b/core/src/main/scala/kafka/utils/Pool.scala @@ -19,6 +19,7 @@ package kafka.utils import java.util.ArrayList import java.util.concurrent._ +import collection.mutable import collection.JavaConversions import kafka.common.KafkaException import java.lang.Object @@ -71,10 +72,15 @@ class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] def remove(key: K): V = pool.remove(key) - def keys = JavaConversions.asSet(pool.keySet()) + def keys: mutable.Set[K] = { + import JavaConversions._ + pool.keySet() + } - def values: Iterable[V] = - JavaConversions.asIterable(new ArrayList[V](pool.values())) + def values: Iterable[V] = { + import JavaConversions._ + new ArrayList[V](pool.values()) + } def clear() { pool.clear() } diff --git a/core/src/main/scala/kafka/utils/VerifiableProperties.scala b/core/src/main/scala/kafka/utils/VerifiableProperties.scala index 9009a9d..a288ad5 100644 --- a/core/src/main/scala/kafka/utils/VerifiableProperties.scala +++ b/core/src/main/scala/kafka/utils/VerifiableProperties.scala @@ -195,7 +195,10 @@ class VerifiableProperties(val props: Properties) extends Logging { def verify() { info("Verifying properties") - val propNames = JavaConversions.asBuffer(Collections.list(props.propertyNames)).map(_.toString).sorted + val propNames = { + import JavaConversions._ + Collections.list(props.propertyNames).map(_.toString).sorted + } for(key <- propNames) { if (!referenceSet.contains(key)) warn("Property %s is not valid".format(key)) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 4094dcb..d1c4b3d 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -278,7 +278,7 @@ object ZkUtils extends Logging { storedData = readData(client, path)._1 } catch { case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this - case e2 => throw e2 + case e2: Throwable => throw e2 } if (storedData == null || storedData != data) { info("conflict in " + path + " data: " + data + " stored data: " + storedData) @@ -288,7 +288,7 @@ object ZkUtils extends Logging { info(path + " exists with value " + data + " during connection loss; this is ok") } } - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -328,7 +328,7 @@ object ZkUtils extends Logging { case None => // the node disappeared; retry creating the ephemeral node immediately } } - case e2 => throw e2 + case e2: Throwable => throw e2 } } } @@ -367,10 +367,10 @@ object ZkUtils extends Logging { } catch { case e: ZkNodeExistsException => client.writeData(path, data) - case e2 => throw e2 + case e2: Throwable => throw e2 } } - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -423,7 +423,7 @@ object ZkUtils extends Logging { createParentPath(client, path) client.createEphemeral(path, data) } - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -435,7 +435,7 @@ object ZkUtils extends Logging { // this can happen during a connection loss event, return normally info(path + " deleted during connection loss; this is ok") false - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -446,7 +446,7 @@ object ZkUtils extends Logging { case e: ZkNoNodeException => // this can happen during a connection loss event, return normally info(path + " deleted during connection loss; this is ok") - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -456,7 +456,7 @@ object ZkUtils extends Logging { zk.deleteRecursive(dir) zk.close() } catch { - case _ => // swallow + case _: Throwable => // swallow } } @@ -473,7 +473,7 @@ object ZkUtils extends Logging { } catch { case e: ZkNoNodeException => (None, stat) - case e2 => throw e2 + case e2: Throwable => throw e2 } dataAndStat } @@ -491,7 +491,7 @@ object ZkUtils extends Logging { client.getChildren(path) } catch { case e: ZkNoNodeException => return Nil - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -682,7 +682,7 @@ object ZkUtils extends Logging { case nne: ZkNoNodeException => ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData)) - case e2 => throw new AdminOperationException(e2.toString) + case e2: Throwable => throw new AdminOperationException(e2.toString) } } } diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index abcbed8..09254cc 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -104,7 +104,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { fail("Topic should not exist") } catch { case e: AdminOperationException => //this is good - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -114,7 +114,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { fail("Add partitions should fail") } catch { case e: AdminOperationException => //this is good - case e2 => throw e2 + case e2: Throwable => throw e2 } } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 121b6c5..8fe7259 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -83,7 +83,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar fail("should get an exception") } catch { case e: ConsumerTimeoutException => // this is ok - case e => throw e + case e: Throwable => throw e } } @@ -406,10 +406,12 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar } def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = { - import scala.collection.JavaConversions val children = zkClient.getChildren(path) Collections.sort(children) - val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children) + val childrenAsSeq : Seq[java.lang.String] = { + import JavaConversions._ + children.toSeq + } childrenAsSeq.map(partition => (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String])) } diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index 9f243f0..43af649 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -21,7 +21,7 @@ import junit.framework.Assert._ import kafka.integration.KafkaServerTestHarness import kafka.server._ import org.scalatest.junit.JUnit3Suite -import scala.collection.JavaConversions._ +import scala.collection.JavaConversions import org.apache.log4j.{Level, Logger} import kafka.message._ import kafka.serializer._ @@ -84,8 +84,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar for (partition <- 0 until numParts) { val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) messages ++= ms - import scala.collection.JavaConversions._ - javaProducer.send(asList(ms.map(new KeyedMessage[Int, String](topic, partition, _)))) + import JavaConversions._ + javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]]) } javaProducer.close messages @@ -103,7 +103,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = { var messages: List[String] = Nil - val topicMessageStreams = asMap(jTopicMessageStreams) + import scala.collection.JavaConversions._ + val topicMessageStreams: collection.mutable.Map[String, java.util.List[KafkaStream[String, String]]] = jTopicMessageStreams for ((topic, messageStreams) <- topicMessageStreams) { for (messageStream <- messageStreams) { val iterator = messageStream.iterator diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala index abee11b..726399e 100644 --- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala @@ -29,8 +29,7 @@ trait BaseMessageSetTestCases extends JUnitSuite { def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet def toMessageIterator(messageSet: MessageSet): Iterator[Message] = { import scala.collection.JavaConversions._ - val messages = asIterable(messageSet) - messages.map(m => m.message).iterator + messageSet.map(m => m.message).iterator } @Test @@ -44,7 +43,7 @@ trait BaseMessageSetTestCases extends JUnitSuite { import scala.collection.JavaConversions._ val m = createMessageSet(messages) // two iterators over the same set should give the same results - TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator)) + TestUtils.checkEquals(m.iterator, m.iterator) } @Test @@ -52,7 +51,7 @@ trait BaseMessageSetTestCases extends JUnitSuite { import scala.collection.JavaConversions._ val m = createMessageSet(messages, DefaultCompressionCodec) // two iterators over the same set should give the same results - TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator)) + TestUtils.checkEquals(m.iterator, m.iterator) } @Test diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index b7f43e2..18d2e7c 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -201,7 +201,7 @@ class LogTest extends JUnitSuite { log.read(1025, 1000) fail("Expected exception on invalid read.") } catch { - case e: OffsetOutOfRangeException => "This is good." + case e: OffsetOutOfRangeException => // This is good. } } diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala index fe5bc09..7df7405 100644 --- a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala @@ -30,14 +30,15 @@ class KafkaTimerTest extends JUnit3Suite { val clock = new ManualClock val testRegistry = new MetricsRegistry(clock) val metric = testRegistry.newTimer(this.getClass, "TestTimer") + val Epsilon = java.lang.Double.longBitsToDouble(0x3ca0000000000000L) val timer = new KafkaTimer(metric) timer.time { clock.addMillis(1000) } assertEquals(1, metric.count()) - assertTrue((metric.max() - 1000).abs <= Double.Epsilon) - assertTrue((metric.min() - 1000).abs <= Double.Epsilon) + assertTrue((metric.max() - 1000).abs <= Epsilon) + assertTrue((metric.min() - 1000).abs <= Epsilon) } private class ManualClock extends Clock { diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 74a2743..18e3555 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -264,7 +264,7 @@ class AsyncProducerTest extends JUnit3Suite { } catch { // should not throw any exception - case e => fail("Should not throw any exception") + case e: Throwable => fail("Should not throw any exception") } } @@ -452,7 +452,10 @@ class AsyncProducerTest extends JUnit3Suite { val topic = "topic1" val msgs = TestUtils.getMsgStrings(5) val scalaProducerData = msgs.map(m => new KeyedMessage[String, String](topic, m)) - val javaProducerData = scala.collection.JavaConversions.asList(scalaProducerData) + val javaProducerData: java.util.List[KeyedMessage[String, String]] = { + import scala.collection.JavaConversions._ + scalaProducerData + } val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]]) mockScalaProducer.send(scalaProducerData.head) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index f546c15..2fb059b 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -105,7 +105,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ fail("Test should fail because the broker list provided are not valid") } catch { case e: FailedToSendMessageException => - case oe => fail("fails with exception", oe) + case oe: Throwable => fail("fails with exception", oe) } finally { producer1.close() } @@ -118,7 +118,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ try{ producer2.send(new KeyedMessage[String, String](topic, "test", "test1")) } catch { - case e => fail("Should succeed sending the message", e) + case e: Throwable => fail("Should succeed sending the message", e) } finally { producer2.close() } @@ -131,7 +131,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ try{ producer3.send(new KeyedMessage[String, String](topic, "test", "test1")) } catch { - case e => fail("Should succeed sending the message", e) + case e: Throwable => fail("Should succeed sending the message", e) } finally { producer3.close() } @@ -188,7 +188,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ } catch { case se: FailedToSendMessageException => true - case e => fail("Not expected", e) + case e: Throwable => fail("Not expected", e) } finally { producer2.close() @@ -222,7 +222,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // on broker 0 producer.send(new KeyedMessage[String, String](topic, "test", "test1")) } catch { - case e => fail("Unexpected exception: " + e) + case e: Throwable => fail("Unexpected exception: " + e) } // kill the broker @@ -235,7 +235,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ fail("Should fail since no leader exists for the partition.") } catch { case e : TestFailedException => throw e // catch and re-throw the failure message - case e2 => // otherwise success + case e2: Throwable => // otherwise success } // restart server 1 @@ -284,7 +284,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ assertTrue("Message set should have 1 message", messageSet1.hasNext) assertEquals(new Message("test".getBytes), messageSet1.next.message) } catch { - case e => case e: Exception => producer.close; fail("Not expected", e) + case e: Throwable => case e: Exception => producer.close; fail("Not expected", e) } // stop IO threads and request handling, but leave networking operational diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 6fa1abc..8d63e31 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -136,7 +136,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0)) } catch { case e : java.io.IOException => // success - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -205,7 +205,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.fail("Should have received timeout exception since request handling is stopped.") } catch { case e: SocketTimeoutException => /* success */ - case e => Assert.fail("Unexpected exception when expecting timeout: " + e) + case e: Throwable => Assert.fail("Unexpected exception when expecting timeout: " + e) } val t2 = SystemTime.milliseconds // make sure we don't wait fewer than timeoutMs for a response diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 2375758..777b315 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -411,7 +411,7 @@ object TestUtils extends Logging { ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)) } catch { - case oe => error("Error while electing leader for partition [%s,%d]".format(topic, partition), oe) + case oe: Throwable => error("Error while electing leader for partition [%s,%d]".format(topic, partition), oe) } } } diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala index 3158a22..ec3cd29 100644 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala @@ -175,7 +175,7 @@ object ConsumerPerformance { case _: InterruptedException => case _: ClosedByInterruptException => case _: ConsumerTimeoutException => - case e => throw e + case e: Throwable => throw e } totalMessagesRead.addAndGet(messagesRead) totalBytesRead.addAndGet(bytesRead) diff --git a/project/Build.scala b/project/Build.scala index b3858f3..2cdbc9e 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -41,7 +41,8 @@ object KafkaBuild extends Build { , scalacOptions ++= Seq("-deprecation", "-unchecked", "-g:none"), - crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2"), + crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2", "2.10.1"), + excludeFilter in unmanagedSources <<= scalaVersion(v => if (v.startsWith("2.8")) "*_2.9+.scala" else "*_2.8.scala"), scalaVersion := "2.8.0", version := "0.8.0-beta1", publishTo := Some("Apache Maven Repo" at "https://repository.apache.org/service/local/staging/deploy/maven2"), -- 1.7.10.2 (Apple Git-33)