From 86f0aa9c58aa43e5f9303ff00bd93743d56873ed Mon Sep 17 00:00:00 2001 From: Christopher Freeman Date: Sat, 7 Sep 2013 15:58:05 -0700 Subject: [PATCH] added support for Scala 2.10 --- core/build.sbt | 1 + core/src/main/scala/kafka/Kafka.scala | 2 +- .../scala/kafka/admin/AddPartitionsCommand.scala | 2 +- core/src/main/scala/kafka/admin/AdminUtils.scala | 2 +- .../scala/kafka/admin/CreateTopicCommand.scala | 2 +- .../scala/kafka/admin/DeleteTopicCommand.scala | 2 +- .../main/scala/kafka/admin/ListTopicCommand.scala | 2 +- .../PreferredReplicaLeaderElectionCommand.scala | 6 +- .../kafka/admin/ReassignPartitionsCommand.scala | 4 +- core/src/main/scala/kafka/client/ClientUtils.scala | 2 +- core/src/main/scala/kafka/cluster/Broker.scala | 2 +- .../scala/kafka/consumer/ConsoleConsumer.scala | 6 +- .../kafka/consumer/ConsumerFetcherManager.scala | 4 +- .../main/scala/kafka/consumer/SimpleConsumer.scala | 2 +- .../src/main/scala/kafka/consumer/TopicCount.scala | 2 +- .../consumer/ZookeeperConsumerConnector.scala | 10 +-- .../consumer/ZookeeperTopicEventWatcher.scala | 2 +- .../controller/ControllerChannelManager.scala | 4 +- .../scala/kafka/controller/KafkaController.scala | 16 ++-- .../kafka/controller/PartitionStateMachine.scala | 20 +++-- .../kafka/controller/ReplicaStateMachine.scala | 4 +- .../main/scala/kafka/javaapi/FetchRequest.scala | 6 +- core/src/main/scala/kafka/javaapi/Implicits.scala | 6 ++ .../main/scala/kafka/javaapi/OffsetRequest.scala | 5 +- .../main/scala/kafka/javaapi/TopicMetadata.scala | 24 ++++-- .../scala/kafka/javaapi/TopicMetadataRequest.scala | 8 +- .../consumer/ZookeeperConsumerConnector.scala | 15 ++-- .../javaapi/message/ByteBufferMessageSet.scala | 4 +- .../scala/kafka/javaapi/producer/Producer.scala | 3 +- core/src/main/scala/kafka/log/LogManager.scala | 2 +- .../kafka/network/BoundedByteBufferReceive.scala | 2 +- .../main/scala/kafka/producer/SyncProducer.scala | 2 +- .../kafka/producer/async/DefaultEventHandler.scala | 4 +- .../kafka/producer/async/ProducerSendThread.scala | 4 +- .../scala/kafka/server/AbstractFetcherThread.scala | 6 +- core/src/main/scala/kafka/server/KafkaApis.scala | 8 +- .../scala/kafka/server/KafkaServerStartable.scala | 4 +- .../main/scala/kafka/server/ReplicaManager.scala | 2 +- .../kafka/server/ZookeeperLeaderElector.scala | 2 +- .../main/scala/kafka/tools/ImportZkOffsets.scala | 2 +- core/src/main/scala/kafka/tools/JmxTool.scala | 2 +- core/src/main/scala/kafka/tools/MirrorMaker.scala | 4 +- .../scala/kafka/tools/SimpleConsumerShell.scala | 2 +- core/src/main/scala/kafka/utils/Annotations.scala | 36 -------- .../main/scala/kafka/utils/Annotations_2.8.scala | 36 ++++++++ .../main/scala/kafka/utils/Annotations_2.9+.scala | 38 +++++++++ core/src/main/scala/kafka/utils/Json.scala | 2 +- core/src/main/scala/kafka/utils/Mx4jLoader.scala | 2 +- core/src/main/scala/kafka/utils/Pool.scala | 12 ++- core/src/main/scala/kafka/utils/Utils.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 24 +++--- .../scala/unit/kafka/admin/AddPartitionsTest.scala | 4 +- .../test/scala/unit/kafka/admin/AdminTest.scala | 12 +-- .../consumer/ZookeeperConsumerConnectorTest.scala | 6 +- .../consumer/ZookeeperConsumerConnectorTest.scala | 5 +- .../javaapi/message/BaseMessageSetTestCases.scala | 7 +- core/src/test/scala/unit/kafka/log/LogTest.scala | 88 ++++++++++---------- .../scala/unit/kafka/metrics/KafkaTimerTest.scala | 5 +- .../unit/kafka/producer/AsyncProducerTest.scala | 5 +- .../scala/unit/kafka/producer/ProducerTest.scala | 14 ++-- .../unit/kafka/producer/SyncProducerTest.scala | 4 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 2 +- .../scala/kafka/perf/ConsumerPerformance.scala | 2 +- project/Build.scala | 3 +- 64 files changed, 302 insertions(+), 221 deletions(-) delete mode 100644 core/src/main/scala/kafka/utils/Annotations.scala create mode 100644 core/src/main/scala/kafka/utils/Annotations_2.8.scala create mode 100644 core/src/main/scala/kafka/utils/Annotations_2.9+.scala diff --git a/core/build.sbt b/core/build.sbt index c54cf44..b5bcb44 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -23,6 +23,7 @@ libraryDependencies ++= Seq( libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) => deps :+ (sv match { case "2.8.0" => "org.scalatest" % "scalatest" % "1.2" % "test" + case v if v.startsWith("2.10") => "org.scalatest" %% "scalatest" % "1.9.1" % "test" case _ => "org.scalatest" %% "scalatest" % "1.8" % "test" }) } diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index dafb1ee..988014a 100644 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -47,7 +47,7 @@ object Kafka extends Logging { kafkaServerStartble.awaitShutdown } catch { - case e => fatal(e) + case e: Throwable => fatal(e) } System.exit(0) } diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala index 5757c32..7f03708 100644 --- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala @@ -68,7 +68,7 @@ object AddPartitionsCommand extends Logging { addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) println("adding partitions succeeded!") } catch { - case e => + case e: Throwable => println("adding partitions failed because of " + e.getMessage) println(Utils.stackTrace(e)) } finally { diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index c399bc7..d6ab275 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -90,7 +90,7 @@ object AdminUtils extends Logging { debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) } catch { case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic)) - case e2 => throw new AdministrationException(e2.toString) + case e2: Throwable => throw new AdministrationException(e2.toString) } } diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala index 21c1186..84c2095 100644 --- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala @@ -74,7 +74,7 @@ object CreateTopicCommand extends Logging { createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr) println("creation succeeded!") } catch { - case e => + case e: Throwable => println("creation failed because of " + e.getMessage) println(Utils.stackTrace(e)) } finally { diff --git a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala index 3da4518..804b331 100644 --- a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala @@ -54,7 +54,7 @@ object DeleteTopicCommand { println("deletion succeeded!") } catch { - case e => + case e: Throwable => println("delection failed because of " + e.getMessage) println(Utils.stackTrace(e)) } diff --git a/core/src/main/scala/kafka/admin/ListTopicCommand.scala b/core/src/main/scala/kafka/admin/ListTopicCommand.scala index c760cc0..eed49e1 100644 --- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/ListTopicCommand.scala @@ -72,7 +72,7 @@ object ListTopicCommand { showTopic(t, zkClient, reportUnderReplicatedPartitions, reportUnavailablePartitions, liveBrokers) } catch { - case e => + case e: Throwable => println("list topic failed because of " + e.getMessage) println(Utils.stackTrace(e)) } diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index d5de5f3..34ed7aa 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -60,7 +60,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { preferredReplicaElectionCommand.moveLeaderToPreferredReplica() println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection)) } catch { - case e => + case e: Throwable => println("Failed to start preferred replica election") println(Utils.stackTrace(e)) } finally { @@ -104,7 +104,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { val partitionsUndergoingPreferredReplicaElection = parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1) throw new AdministrationException("Preferred replica leader election currently in progress for " + "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection)) - case e2 => throw new AdministrationException(e2.toString) + case e2: Throwable => throw new AdministrationException(e2.toString) } } } @@ -116,7 +116,7 @@ class PreferredReplicaLeaderElectionCommand(zkClient: ZkClient, partitions: scal val validPartitions = partitions.filter(p => validatePartition(zkClient, p.topic, p.partition)) PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions) } catch { - case e => throw new AdminCommandFailedException("Admin command failed", e) + case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e) } } diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index aa61fa1..f333d29 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -119,7 +119,7 @@ object ReassignPartitionsCommand extends Logging { "The replica assignment is \n" + partitionsToBeReassigned.toString()) } } catch { - case e => + case e: Throwable => println("Partitions reassignment failed due to " + e.getMessage) println(Utils.stackTrace(e)) } finally { @@ -142,7 +142,7 @@ class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[T val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient) throw new AdminCommandFailedException("Partition reassignment currently in " + "progress for %s. Aborting operation".format(partitionsBeingReassigned)) - case e => error("Admin command failed", e); false + case e: Throwable => error("Admin command failed", e); false } } diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index cc526ec..1d2f81b 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -54,7 +54,7 @@ object ClientUtils extends Logging{ fetchMetaDataSucceeded = true } catch { - case e => + case e: Throwable => warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed" .format(correlationId, topics, shuffledBrokers(i).toString), e) t = e diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index b03dea2..9407ed2 100644 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -42,7 +42,7 @@ private[kafka] object Broker { throw new BrokerNotAvailableException("Broker id %d does not exist".format(id)) } } catch { - case t => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t) + case t: Throwable => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t) } } diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 719beb5..48fa7a3 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -204,7 +204,7 @@ object ConsoleConsumer extends Logging { formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) numMessages += 1 } catch { - case e => + case e: Throwable => if (skipMessageOnError) error("Error processing message, skipping this message: ", e) else @@ -220,7 +220,7 @@ object ConsoleConsumer extends Logging { } } } catch { - case e => error("Error processing message, stopping consumer: ", e) + case e: Throwable => error("Error processing message, stopping consumer: ", e) } System.err.println("Consumed %d messages".format(numMessages)) System.out.flush() @@ -247,7 +247,7 @@ object ConsoleConsumer extends Logging { zk.deleteRecursive(dir) zk.close() } catch { - case _ => // swallow + case _: Throwable => // swallow } } } diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index fa6b213..8c03308 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -79,7 +79,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, } } } catch { - case t => { + case t: Throwable => { if (!isRunning.get()) throw t /* If this thread is stopped, propagate this exception to kill the thread. */ else @@ -95,7 +95,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, try { addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) } catch { - case t => { + case t: Throwable => { if (!isRunning.get()) throw t /* If this thread is stopped, propagate this exception to kill the thread. */ else { diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 4395fe3..fac64aa 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -84,7 +84,7 @@ class SimpleConsumer(val host: String, disconnect() throw ioe } - case e => throw e + case e: Throwable => throw e } response } diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index c8e8406..a3eb53e 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -67,7 +67,7 @@ private[kafka] object TopicCount extends Logging { case None => throw new KafkaException("error constructing TopicCount : " + topicCountString) } } catch { - case e => + case e: Throwable => error("error parsing consumer json string " + topicCountString, e) throw e } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e7a692a..81bf0bd 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -175,7 +175,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, zkClient = null } } catch { - case e => + case e: Throwable => fatal("error during consumer connector shutdown", e) } info("ZKConsumerConnector shut down completed") @@ -332,7 +332,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (doRebalance) syncedRebalance } catch { - case t => error("error during syncedRebalance", t) + case t: Throwable => error("error during syncedRebalance", t) } } info("stopping watcher executor thread for consumer " + consumerIdString) @@ -384,7 +384,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, cluster = getCluster(zkClient) done = rebalance(cluster) } catch { - case e => + case e: Throwable => /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. * For example, a ZK node can disappear between the time we get all children and the time we try to get * the value of a child. Just let this go since another rebalance will be triggered. @@ -461,7 +461,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, " for topic " + topic + " with consumers: " + curConsumers) for (consumerThreadId <- consumerThreadIdSet) { - val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId) + val myConsumerPosition = curConsumers.indexOf(consumerThreadId) assert(myConsumerPosition >= 0) val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) @@ -581,7 +581,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // The node hasn't been deleted by the original owner. So wait a bit and retry. info("waiting for the partition ownership to be deleted: " + partition) false - case e2 => throw e2 + case e2: Throwable => throw e2 } } val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1)) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index df83baa..a67c193 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -75,7 +75,7 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig, } } catch { - case e => + case e: Throwable => error("error in handling child changes", e) } } diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index ed1ce0b..beca460 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -93,7 +93,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext brokerStateInfo(brokerId).requestSendThread.shutdown() brokerStateInfo.remove(brokerId) }catch { - case e => error("Error while removing broker by the controller", e) + case e: Throwable => error("Error while removing broker by the controller", e) } } @@ -142,7 +142,7 @@ class RequestSendThread(val controllerId: Int, } } } catch { - case e => + case e: Throwable => warn("Controller %d fails to send a request to broker %d".format(controllerId, toBrokerId), e) // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated. channel.disconnect() diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index ab18b7a..aef41ad 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -89,14 +89,14 @@ object KafkaController extends Logging { case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString)) } } catch { - case t => + case t: Throwable => // It may be due to an incompatible controller register version warn("Failed to parse the controller info as json. " + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString)) try { return controllerInfoString.toInt } catch { - case t => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t) + case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t) } } } @@ -436,7 +436,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg .format(topicAndPartition)) } } catch { - case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e) + case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e) // remove the partition from the admin path to unblock the admin client removePartitionFromReassignedPartitions(topicAndPartition) } @@ -448,7 +448,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector) } catch { - case e => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e) + case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e) } finally { removePartitionsFromPreferredReplicaElection(partitions) } @@ -514,9 +514,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } catch { case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " + "Aborting controller startup procedure") - case oe => error("Error while incrementing controller epoch", oe) + case oe: Throwable => error("Error while incrementing controller epoch", oe) } - case oe => error("Error while incrementing controller epoch", oe) + case oe: Throwable => error("Error while incrementing controller epoch", oe) } info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch)) @@ -693,7 +693,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap)) } catch { case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic)) - case e2 => throw new KafkaException(e2.toString) + case e2: Throwable => throw new KafkaException(e2.toString) } } @@ -905,7 +905,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: } } }catch { - case e => error("Error while handling partition reassignment", e) + case e: Throwable => error("Error while handling partition reassignment", e) } } diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index a084830..829163a 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -17,7 +17,8 @@ package kafka.controller import collection._ -import collection.JavaConversions._ +import collection.JavaConversions +import collection.mutable.Buffer import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} @@ -91,7 +92,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement) } catch { - case e => error("Error while moving some partitions to the online state", e) + case e: Throwable => error("Error while moving some partitions to the online state", e) // TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions } } @@ -111,7 +112,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement) }catch { - case e => error("Error while moving some partitions to %s state".format(targetState), e) + case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e) // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions } } @@ -321,7 +322,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } catch { case lenne: LeaderElectionNotNeededException => // swallow case nroe: NoReplicaOnlineException => throw nroe - case sce => + case sce: Throwable => val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage) stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg) throw new StateChangeFailedException(failMsg, sce) @@ -359,8 +360,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { controllerContext.controllerLock synchronized { if (hasStarted.get) { try { - debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(","))) - val currentChildren = JavaConversions.asBuffer(children).toSet + val currentChildren = { + import JavaConversions._ + debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(","))) + (children: Buffer[String]).toSet + } val newTopics = currentChildren -- controllerContext.allTopics val deletedTopics = controllerContext.allTopics -- currentChildren // val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1)) @@ -375,7 +379,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { if(newTopics.size > 0) controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet) } catch { - case e => error("Error while handling new topic", e ) + case e: Throwable => error("Error while handling new topic", e ) } // TODO: kafka-330 Handle deleted topics } @@ -399,7 +403,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded)) controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet) } catch { - case e => error("Error while handling add partitions for data path " + dataPath, e ) + case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e ) } } } diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index c964857..212c05d 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -89,7 +89,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState)) brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement) }catch { - case e => error("Error while moving some replicas to %s state".format(targetState), e) + case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e) } } @@ -273,7 +273,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { if(deadBrokerIds.size > 0) controller.onBrokerFailure(deadBrokerIds.toSeq) } catch { - case e => error("Error while handling broker changes", e) + case e: Throwable => error("Error while handling broker changes", e) } } } diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala index b475240..6abdc17 100644 --- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala +++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala @@ -17,10 +17,10 @@ package kafka.javaapi -import scala.collection.JavaConversions import java.nio.ByteBuffer import kafka.common.TopicAndPartition import kafka.api.{Request, PartitionFetchInfo} +import scala.collection.mutable class FetchRequest(correlationId: Int, clientId: String, @@ -28,8 +28,10 @@ class FetchRequest(correlationId: Int, minBytes: Int, requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) { + import scala.collection.JavaConversions._ + val underlying = { - val scalaMap = JavaConversions.asMap(requestInfo).toMap + val scalaMap = (requestInfo: mutable.Map[TopicAndPartition, PartitionFetchInfo]).toMap kafka.api.FetchRequest( correlationId = correlationId, clientId = clientId, diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala index ee0a71d..0af3a67 100644 --- a/core/src/main/scala/kafka/javaapi/Implicits.scala +++ b/core/src/main/scala/kafka/javaapi/Implicits.scala @@ -40,4 +40,10 @@ private[javaapi] object Implicits extends Logging { case None => null.asInstanceOf[T] } } + + // used explicitly by ByteBufferMessageSet constructor as due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors + implicit def javaListToScalaBuffer[A](l: java.util.List[A]) = { + import scala.collection.JavaConversions._ + l: collection.mutable.Buffer[A] + } } diff --git a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala index 1c77ff8..d88c7e4 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala @@ -19,7 +19,7 @@ package kafka.javaapi import kafka.common.TopicAndPartition import kafka.api.{Request, PartitionOffsetRequestInfo} -import collection.JavaConversions +import scala.collection.mutable import java.nio.ByteBuffer @@ -28,7 +28,8 @@ class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffse clientId: String) { val underlying = { - val scalaMap = JavaConversions.asMap(requestInfo).toMap + import collection.JavaConversions._ + val scalaMap = (requestInfo: mutable.Map[TopicAndPartition, PartitionOffsetRequestInfo]).toMap kafka.api.OffsetRequest( requestInfo = scalaMap, versionId = versionId, diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala index 97b6dcd..d08c3f4 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala @@ -17,16 +17,20 @@ package kafka.javaapi import kafka.cluster.Broker -import scala.collection.JavaConversions.asList +import scala.collection.JavaConversions private[javaapi] object MetadataListImplicits { implicit def toJavaTopicMetadataList(topicMetadataSeq: Seq[kafka.api.TopicMetadata]): - java.util.List[kafka.javaapi.TopicMetadata] = - asList(topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))) + java.util.List[kafka.javaapi.TopicMetadata] = { + import JavaConversions._ + topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_)) + } implicit def toPartitionMetadataList(partitionMetadataSeq: Seq[kafka.api.PartitionMetadata]): - java.util.List[kafka.javaapi.PartitionMetadata] = - asList(partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))) + java.util.List[kafka.javaapi.PartitionMetadata] = { + import JavaConversions._ + partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_)) + } } class TopicMetadata(private val underlying: kafka.api.TopicMetadata) { @@ -51,9 +55,15 @@ class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) { underlying.leader } - def replicas: java.util.List[Broker] = asList(underlying.replicas) + def replicas: java.util.List[Broker] = { + import JavaConversions._ + underlying.replicas + } - def isr: java.util.List[Broker] = asList(underlying.isr) + def isr: java.util.List[Broker] = { + import JavaConversions._ + underlying.isr + } def errorCode: Short = underlying.errorCode diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index 5f80df7..05757a1 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -18,7 +18,7 @@ package kafka.javaapi import kafka.api._ import java.nio.ByteBuffer -import scala.collection.JavaConversions +import scala.collection.mutable class TopicMetadataRequest(val versionId: Short, override val correlationId: Int, @@ -26,8 +26,10 @@ class TopicMetadataRequest(val versionId: Short, val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) { - val underlying: kafka.api.TopicMetadataRequest = - new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, JavaConversions.asBuffer(topics)) + val underlying: kafka.api.TopicMetadataRequest = { + import scala.collection.JavaConversions._ + new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String]) + } def this(topics: java.util.List[String]) = this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics) diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 14c4c8a..58e83f6 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -18,7 +18,8 @@ package kafka.javaapi.consumer import kafka.serializer._ import kafka.consumer._ -import scala.collection.JavaConversions.asList +import scala.collection.mutable +import scala.collection.JavaConversions /** @@ -71,9 +72,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : java.util.Map[String,java.util.List[KafkaStream[K,V]]] = { - import scala.collection.JavaConversions._ - val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]]) + val scalaTopicCountMap: Map[String, Int] = { + import JavaConversions._ + Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]: mutable.Map[String, Int]) + } val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder) val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]] for ((topic, streams) <- scalaReturn) { @@ -88,8 +91,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]] = createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder()) - def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = - asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder)) + def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = { + import JavaConversions._ + underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder) + } def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder()) diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala index 0a95248..fecee8d 100644 --- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala @@ -20,12 +20,14 @@ import java.util.concurrent.atomic.AtomicLong import scala.reflect.BeanProperty import java.nio.ByteBuffer import kafka.message._ +import kafka.javaapi.Implicits.javaListToScalaBuffer class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet { private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer) def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) { - this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), scala.collection.JavaConversions.asBuffer(messages): _*).buffer) + // due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors and must be used explicitly + this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), javaListToScalaBuffer(messages).toSeq : _*).buffer) } def this(messages: java.util.List[Message]) { diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala index 7265328..c465da5 100644 --- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala +++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala @@ -19,6 +19,7 @@ package kafka.javaapi.producer import kafka.producer.ProducerConfig import kafka.producer.KeyedMessage +import scala.collection.mutable class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only { @@ -38,7 +39,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for */ def send(messages: java.util.List[KeyedMessage[K,V]]) { import collection.JavaConversions._ - underlying.send(asBuffer(messages):_*) + underlying.send((messages: mutable.Buffer[KeyedMessage[K,V]]).toSeq: _*) } /** diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4771d11..739e22a 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -318,7 +318,7 @@ private[kafka] class LogManager(val config: KafkaConfig, if(timeSinceLastFlush >= logFlushInterval) log.flush } catch { - case e => + case e: Throwable => error("Error flushing topic " + log.topicName, e) e match { case _: IOException => diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala index cab1864..a442545 100644 --- a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala +++ b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala @@ -82,7 +82,7 @@ private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive case e: OutOfMemoryError => error("OOME with size " + size, e) throw e - case e2 => + case e2: Throwable => throw e2 } buffer diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 306f200..419156e 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -79,7 +79,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry disconnect() throw e - case e => throw e + case e: Throwable => throw e } response } diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 2e36d3b..c151032 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -129,7 +129,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, else serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message))) } catch { - case t => + case t: Throwable => producerStats.serializationErrorRate.mark() if (isSync) { throw t @@ -178,7 +178,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, }catch { // Swallow recoverable exceptions and return None so that they can be retried. case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None - case oe => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None + case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None } } diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 2b41a49..42e9c74 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -43,7 +43,7 @@ class ProducerSendThread[K,V](val threadName: String, try { processEvents }catch { - case e => error("Error in sending events: ", e) + case e: Throwable => error("Error in sending events: ", e) }finally { shutdownLatch.countDown } @@ -103,7 +103,7 @@ class ProducerSendThread[K,V](val threadName: String, if(size > 0) handler.handle(events) }catch { - case e => error("Error in handling batch of " + size + " events", e) + case e: Throwable => error("Error in handling batch of " + size + " events", e) } } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index d5addb3..a5fc96d 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -95,7 +95,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) response = simpleConsumer.fetch(fetchRequest) } catch { - case t => + case t: Throwable => if (isRunning.get) { warn("Error in fetch %s".format(fetchRequest), t) partitionMapLock synchronized { @@ -136,7 +136,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and // should get fixed in the subsequent fetches logger.warn("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) - case e => + case e: Throwable => throw new KafkaException("error processing data for partition [%s,%d] offset %d" .format(topic, partitionId, currentOffset.get), e) } @@ -147,7 +147,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke warn("Current offset %d for partition [%s,%d] out of range; reset offset to %d" .format(currentOffset.get, topic, partitionId, newOffset)) } catch { - case e => + case e: Throwable => warn("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) partitionsWithError += topicAndPartition } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index cd02aab..4679e18 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -264,7 +264,7 @@ class KafkaApis(val requestChannel: RequestChannel, warn("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage)) new ProduceResult(topicAndPartition, nle) - case e => + case e: Throwable => BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() error("Error processing ProducerRequest with correlation id %d from client %s on partition %s" @@ -353,7 +353,7 @@ class KafkaApis(val requestChannel: RequestChannel, warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format( fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage)) new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) - case t => + case t: Throwable => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d" @@ -430,7 +430,7 @@ class KafkaApis(val requestChannel: RequestChannel, warn("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage)) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) ) - case e => + case e: Throwable => warn("Error while responding to offset request", e) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) ) } @@ -481,7 +481,7 @@ class KafkaApis(val requestChannel: RequestChannel, isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) } catch { - case e => + case e: Throwable => error("Error while fetching metadata for partition %s".format(topicAndPartition), e) new PartitionMetadata(topicAndPartition.partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala index 5be65e9..acda52b 100644 --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala +++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala @@ -34,7 +34,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { server.startup() } catch { - case e => + case e: Throwable => fatal("Fatal error during KafkaServerStable startup. Prepare to shutdown", e) shutdown() System.exit(1) @@ -46,7 +46,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { server.shutdown() } catch { - case e => + case e: Throwable => fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e) System.exit(1) } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index f551243..03ba60e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -223,7 +223,7 @@ class ReplicaManager(val config: KafkaConfig, makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId) } catch { - case e => + case e: Throwable => val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + "epoch %d for partition %s").format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicAndPartition) diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index f1f0625..33b7360 100644 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -72,7 +72,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: } if (leaderId != -1) debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) - case e2 => + case e2: Throwable => error("Error while electing or becoming leader on broker %d".format(brokerId), e2) leaderId = -1 } diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index 55709b5..c8023ee 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -102,7 +102,7 @@ object ImportZkOffsets extends Logging { try { ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) } catch { - case e => e.printStackTrace() + case e: Throwable => e.printStackTrace() } } } diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index 7e424e7..747a675 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -86,7 +86,7 @@ object JmxTool extends Logging { else List(null) - val names = queries.map((name: ObjectName) => asSet(mbsc.queryNames(name, null))).flatten + val names = queries.map((name: ObjectName) => mbsc.queryNames(name, null): mutable.Set[ObjectName]).flatten val allAttributes: Iterable[(ObjectName, Array[String])] = names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName))) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 6fb545a..f0f871c 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -129,7 +129,7 @@ object MirrorMaker extends Logging { try { streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())).flatten } catch { - case t => + case t: Throwable => fatal("Unable to create stream - shutting down mirror maker.") connectors.foreach(_.shutdown) } @@ -204,7 +204,7 @@ object MirrorMaker extends Logging { } } } catch { - case e => + case e: Throwable => fatal("Stream unexpectedly exited.", e) } finally { shutdownLatch.countDown() diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 3cfa384..c889835 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -217,7 +217,7 @@ object SimpleConsumerShell extends Logging { formatter.writeTo(key, Utils.readBytes(message.payload), System.out) numMessagesConsumed += 1 } catch { - case e => + case e: Throwable => if (skipMessageOnError) error("Error processing message, skipping this message: ", e) else diff --git a/core/src/main/scala/kafka/utils/Annotations.scala b/core/src/main/scala/kafka/utils/Annotations.scala deleted file mode 100644 index 28269eb..0000000 --- a/core/src/main/scala/kafka/utils/Annotations.scala +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -/* Some helpful annotations */ - -/** - * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation - * must respect - */ -class threadsafe extends StaticAnnotation - -/** - * Indicates that the annotated class is not threadsafe - */ -class nonthreadsafe extends StaticAnnotation - -/** - * Indicates that the annotated class is immutable - */ -class immutable extends StaticAnnotation diff --git a/core/src/main/scala/kafka/utils/Annotations_2.8.scala b/core/src/main/scala/kafka/utils/Annotations_2.8.scala new file mode 100644 index 0000000..28269eb --- /dev/null +++ b/core/src/main/scala/kafka/utils/Annotations_2.8.scala @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +/* Some helpful annotations */ + +/** + * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation + * must respect + */ +class threadsafe extends StaticAnnotation + +/** + * Indicates that the annotated class is not threadsafe + */ +class nonthreadsafe extends StaticAnnotation + +/** + * Indicates that the annotated class is immutable + */ +class immutable extends StaticAnnotation diff --git a/core/src/main/scala/kafka/utils/Annotations_2.9+.scala b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala new file mode 100644 index 0000000..ab95ce1 --- /dev/null +++ b/core/src/main/scala/kafka/utils/Annotations_2.9+.scala @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import scala.annotation.StaticAnnotation + +/* Some helpful annotations */ + +/** + * Indicates that the annotated class is meant to be threadsafe. For an abstract class it is an part of the interface that an implementation + * must respect + */ +class threadsafe extends StaticAnnotation + +/** + * Indicates that the annotated class is not threadsafe + */ +class nonthreadsafe extends StaticAnnotation + +/** + * Indicates that the annotated class is immutable + */ +class immutable extends StaticAnnotation diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala index f80b2cc..03fb06f 100644 --- a/core/src/main/scala/kafka/utils/Json.scala +++ b/core/src/main/scala/kafka/utils/Json.scala @@ -32,7 +32,7 @@ object Json extends Logging { try { JSON.parseFull(input) } catch { - case t => + case t: Throwable => throw new KafkaException("Can't parse json string: %s".format(input), t) } } diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala index 64d84cc..db9f20b 100644 --- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala +++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala @@ -64,7 +64,7 @@ object Mx4jLoader extends Logging { case e: ClassNotFoundException => { info("Will not load MX4J, mx4j-tools.jar is not in the classpath"); } - case e => { + case e: Throwable => { warn("Could not start register mbean in JMX", e); } } diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala index 9a86eab..9ddcde7 100644 --- a/core/src/main/scala/kafka/utils/Pool.scala +++ b/core/src/main/scala/kafka/utils/Pool.scala @@ -19,6 +19,7 @@ package kafka.utils import java.util.ArrayList import java.util.concurrent._ +import collection.mutable import collection.JavaConversions import kafka.common.KafkaException import java.lang.Object @@ -71,10 +72,15 @@ class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] def remove(key: K): V = pool.remove(key) - def keys = JavaConversions.asSet(pool.keySet()) + def keys: mutable.Set[K] = { + import JavaConversions._ + pool.keySet() + } - def values: Iterable[V] = - JavaConversions.asIterable(new ArrayList[V](pool.values())) + def values: Iterable[V] = { + import JavaConversions._ + new ArrayList[V](pool.values()) + } def clear() { pool.clear() } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index e83eb5f..e0a5a27 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -67,7 +67,7 @@ object Utils extends Logging { fun() } catch { - case t => + case t: Throwable => // log any error and the stack trace error("error in loggedRunnable", t) } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index ca1ce12..6eede1b 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -271,7 +271,7 @@ object ZkUtils extends Logging { storedData = readData(client, path)._1 } catch { case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this - case e2 => throw e2 + case e2: Throwable => throw e2 } if (storedData == null || storedData != data) { info("conflict in " + path + " data: " + data + " stored data: " + storedData) @@ -281,7 +281,7 @@ object ZkUtils extends Logging { info(path + " exists with value " + data + " during connection loss; this is ok") } } - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -321,7 +321,7 @@ object ZkUtils extends Logging { case None => // the node disappeared; retry creating the ephemeral node immediately } } - case e2 => throw e2 + case e2: Throwable => throw e2 } } } @@ -360,10 +360,10 @@ object ZkUtils extends Logging { } catch { case e: ZkNodeExistsException => client.writeData(path, data) - case e2 => throw e2 + case e2: Throwable => throw e2 } } - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -416,7 +416,7 @@ object ZkUtils extends Logging { createParentPath(client, path) client.createEphemeral(path, data) } - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -428,7 +428,7 @@ object ZkUtils extends Logging { // this can happen during a connection loss event, return normally info(path + " deleted during connection loss; this is ok") false - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -439,7 +439,7 @@ object ZkUtils extends Logging { case e: ZkNoNodeException => // this can happen during a connection loss event, return normally info(path + " deleted during connection loss; this is ok") - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -449,7 +449,7 @@ object ZkUtils extends Logging { zk.deleteRecursive(dir) zk.close() } catch { - case _ => // swallow + case _: Throwable => // swallow } } @@ -466,7 +466,7 @@ object ZkUtils extends Logging { } catch { case e: ZkNoNodeException => (None, stat) - case e2 => throw e2 + case e2: Throwable => throw e2 } dataAndStat } @@ -484,7 +484,7 @@ object ZkUtils extends Logging { client.getChildren(path) } catch { case e: ZkNoNodeException => return Nil - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -675,7 +675,7 @@ object ZkUtils extends Logging { case nne: ZkNoNodeException => ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData)) - case e2 => throw new AdministrationException(e2.toString) + case e2: Throwable => throw new AdministrationException(e2.toString) } } } diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 06be990..2436289 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -104,7 +104,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { fail("Topic should not exist") } catch { case e: AdministrationException => //this is good - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -114,7 +114,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { fail("Add partitions should fail") } catch { case e: AdministrationException => //this is good - case e2 => throw e2 + case e2: Throwable => throw e2 } } diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index dc0013f..881e69b 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -38,7 +38,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } catch { case e: AdministrationException => // this is good - case e2 => throw e2 + case e2: Throwable => throw e2 } // test wrong replication factor @@ -48,7 +48,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } catch { case e: AdministrationException => // this is good - case e2 => throw e2 + case e2: Throwable => throw e2 } // correct assignment @@ -84,7 +84,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } catch { case e: AdministrationException => // this is good - case e2 => throw e2 + case e2: Throwable => throw e2 } // non-exist brokers @@ -95,7 +95,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } catch { case e: AdministrationException => // this is good - case e2 => throw e2 + case e2: Throwable => throw e2 } // inconsistent replication factor @@ -106,7 +106,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } catch { case e: AdministrationException => // this is good - case e2 => throw e2 + case e2: Throwable => throw e2 } // good assignment @@ -170,7 +170,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { fail("shouldn't be able to create a topic already exists") } catch { case e: TopicExistsException => // this is good - case e2 => throw e2 + case e2: Throwable => throw e2 } } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index fcfc583..268d14e 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -83,7 +83,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar fail("should get an exception") } catch { case e: ConsumerTimeoutException => // this is ok - case e => throw e + case e: Throwable => throw e } } @@ -406,10 +406,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar } def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = { - import scala.collection.JavaConversions + import scala.collection.JavaConversions._ val children = zkClient.getChildren(path) Collections.sort(children) - val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children) + val childrenAsSeq : Seq[java.lang.String] = (children: mutable.Buffer[String]).toSeq childrenAsSeq.map(partition => (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String])) } diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index 9f243f0..e8e454f 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -85,7 +85,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) messages ++= ms import scala.collection.JavaConversions._ - javaProducer.send(asList(ms.map(new KeyedMessage[Int, String](topic, partition, _)))) + javaProducer.send(ms.map(new KeyedMessage[Int, String](topic, partition, _)): java.util.List[KeyedMessage[Int, String]]) } javaProducer.close messages @@ -103,7 +103,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = { var messages: List[String] = Nil - val topicMessageStreams = asMap(jTopicMessageStreams) + import scala.collection.JavaConversions._ + val topicMessageStreams: collection.mutable.Map[String, java.util.List[KafkaStream[String, String]]] = jTopicMessageStreams for ((topic, messageStreams) <- topicMessageStreams) { for (messageStream <- messageStreams) { val iterator = messageStream.iterator diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala index abee11b..726399e 100644 --- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala @@ -29,8 +29,7 @@ trait BaseMessageSetTestCases extends JUnitSuite { def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet def toMessageIterator(messageSet: MessageSet): Iterator[Message] = { import scala.collection.JavaConversions._ - val messages = asIterable(messageSet) - messages.map(m => m.message).iterator + messageSet.map(m => m.message).iterator } @Test @@ -44,7 +43,7 @@ trait BaseMessageSetTestCases extends JUnitSuite { import scala.collection.JavaConversions._ val m = createMessageSet(messages) // two iterators over the same set should give the same results - TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator)) + TestUtils.checkEquals(m.iterator, m.iterator) } @Test @@ -52,7 +51,7 @@ trait BaseMessageSetTestCases extends JUnitSuite { import scala.collection.JavaConversions._ val m = createMessageSet(messages, DefaultCompressionCodec) // two iterators over the same set should give the same results - TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator)) + TestUtils.checkEquals(m.iterator, m.iterator) } @Test diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 4ed88e8..df90695 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -30,7 +30,7 @@ import scala.Some import kafka.server.KafkaConfig class LogTest extends JUnitSuite { - + var logDir: File = null val time = new MockTime var config: KafkaConfig = null @@ -46,7 +46,7 @@ class LogTest extends JUnitSuite { def tearDown() { Utils.rm(logDir) } - + def createEmptyLogs(dir: File, offsets: Int*) { for(offset <- offsets) { Log.logFilename(dir, offset).createNewFile() @@ -168,19 +168,19 @@ class LogTest extends JUnitSuite { val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)) assertEquals("Should be no more messages", 0, lastRead.size) } - + /** Test the case where we have compressed batches of messages */ @Test def testCompressedMessages() { /* this log should roll after every messageset */ val log = new Log(logDir, 10, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) - + /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes))) - + def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).head.message) - + /* we should always get the first message in the compressed set when reading any offset in the set */ assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset) assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset) @@ -202,7 +202,7 @@ class LogTest extends JUnitSuite { assertContains(makeRanges(5,8), 5) assertContains(makeRanges(5,8), 6) } - + @Test def testEdgeLogRollsStartingAtZero() { // first test a log segment starting at 0 @@ -226,7 +226,7 @@ class LogTest extends JUnitSuite { for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(i.toString.getBytes)) val curOffset = log.logEndOffset - + // time goes by; the log file is deleted log.markDeletedWhile(_ => true) @@ -262,7 +262,7 @@ class LogTest extends JUnitSuite { case e:MessageSizeTooLargeException => // this is good } } - + @Test def testLogRecoversToCorrectOffset() { val numMessages = 100 @@ -276,15 +276,15 @@ class LogTest extends JUnitSuite { val lastIndexOffset = log.segments.view.last.index.lastOffset val numIndexEntries = log.segments.view.last.index.entries log.close() - + // test non-recovery case log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096) assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries) log.close() - - // test + + // test log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096) assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset) @@ -305,10 +305,10 @@ class LogTest extends JUnitSuite { for (i<- 1 to msgPerSeg) log.append(set) - + assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments) assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset) - + val lastOffset = log.logEndOffset val size = log.size log.truncateTo(log.logEndOffset) // keep the entire log @@ -326,7 +326,7 @@ class LogTest extends JUnitSuite { for (i<- 1 to msgPerSeg) log.append(set) - + assertEquals("Should be back to original offset", log.logEndOffset, lastOffset) assertEquals("Should be back to original size", log.size, size) log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1)) @@ -371,14 +371,14 @@ class LogTest extends JUnitSuite { def testAppendWithoutOffsetAssignment() { for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) { logDir.mkdir() - var log = new Log(logDir, - maxLogFileSize = 64*1024, + var log = new Log(logDir, + maxLogFileSize = 64*1024, maxMessageSize = config.messageMaxBytes, - maxIndexSize = 1000, - indexIntervalBytes = 10000, + maxIndexSize = 1000, + indexIntervalBytes = 10000, needsRecovery = true) val messages = List("one", "two", "three", "four", "five", "six") - val ms = new ByteBufferMessageSet(compressionCodec = codec, + val ms = new ByteBufferMessageSet(compressionCodec = codec, offsetCounter = new AtomicLong(0), messages = messages.map(s => new Message(s.getBytes)):_*) val firstOffset = ms.toList.head.offset @@ -391,7 +391,7 @@ class LogTest extends JUnitSuite { log.delete() } } - + /** * When we open a log any index segments without an associated log segment should be deleted. */ @@ -399,22 +399,22 @@ class LogTest extends JUnitSuite { def testBogusIndexSegmentsAreRemoved() { val bogusIndex1 = Log.indexFilename(logDir, 0) val bogusIndex2 = Log.indexFilename(logDir, 5) - + val set = TestUtils.singleMessageSet("test".getBytes()) - val log = new Log(logDir, - maxLogFileSize = set.sizeInBytes * 5, + val log = new Log(logDir, + maxLogFileSize = set.sizeInBytes * 5, maxMessageSize = config.messageMaxBytes, - maxIndexSize = 1000, - indexIntervalBytes = 1, + maxIndexSize = 1000, + indexIntervalBytes = 1, needsRecovery = false) - + assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0) assertFalse("The second index file should have been deleted.", bogusIndex2.exists) - + // check that we can append to the log for(i <- 0 until 10) log.append(set) - + log.delete() } @@ -423,38 +423,38 @@ class LogTest extends JUnitSuite { val set = TestUtils.singleMessageSet("test".getBytes()) // create a log - var log = new Log(logDir, - maxLogFileSize = set.sizeInBytes * 5, + var log = new Log(logDir, + maxLogFileSize = set.sizeInBytes * 5, maxMessageSize = config.messageMaxBytes, - maxIndexSize = 1000, - indexIntervalBytes = 10000, + maxIndexSize = 1000, + indexIntervalBytes = 10000, needsRecovery = true) - + // add enough messages to roll over several segments then close and re-open and attempt to truncate for(i <- 0 until 100) log.append(set) log.close() - log = new Log(logDir, - maxLogFileSize = set.sizeInBytes * 5, + log = new Log(logDir, + maxLogFileSize = set.sizeInBytes * 5, maxMessageSize = config.messageMaxBytes, - maxIndexSize = 1000, - indexIntervalBytes = 10000, + maxIndexSize = 1000, + indexIntervalBytes = 10000, needsRecovery = true) log.truncateTo(3) assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments) assertEquals("Log end offset should be 3.", 3, log.logEndOffset) } - + def assertContains(ranges: Array[Range], offset: Long) = { Log.findRange(ranges, offset) match { - case Some(range) => + case Some(range) => assertTrue(range + " does not contain " + offset, range.contains(offset)) case None => fail("No range found, but expected to find " + offset) } } - + class SimpleRange(val start: Long, val size: Long) extends Range - + def makeRanges(breaks: Int*): Array[Range] = { val list = new ArrayList[Range] var prior = 0 @@ -464,5 +464,5 @@ class LogTest extends JUnitSuite { } list.toArray(new Array[Range](list.size)) } - + } diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala index fe5bc09..7df7405 100644 --- a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala @@ -30,14 +30,15 @@ class KafkaTimerTest extends JUnit3Suite { val clock = new ManualClock val testRegistry = new MetricsRegistry(clock) val metric = testRegistry.newTimer(this.getClass, "TestTimer") + val Epsilon = java.lang.Double.longBitsToDouble(0x3ca0000000000000L) val timer = new KafkaTimer(metric) timer.time { clock.addMillis(1000) } assertEquals(1, metric.count()) - assertTrue((metric.max() - 1000).abs <= Double.Epsilon) - assertTrue((metric.min() - 1000).abs <= Double.Epsilon) + assertTrue((metric.max() - 1000).abs <= Epsilon) + assertTrue((metric.min() - 1000).abs <= Epsilon) } private class ManualClock extends Clock { diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 1781bc0..69c88c7 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -263,7 +263,7 @@ class AsyncProducerTest extends JUnit3Suite { } catch { // should not throw any exception - case e => fail("Should not throw any exception") + case e: Throwable => fail("Should not throw any exception") } } @@ -450,7 +450,8 @@ class AsyncProducerTest extends JUnit3Suite { val topic = "topic1" val msgs = TestUtils.getMsgStrings(5) val scalaProducerData = msgs.map(m => new KeyedMessage[String, String](topic, m)) - val javaProducerData = scala.collection.JavaConversions.asList(scalaProducerData) + import scala.collection.JavaConversions._ + val javaProducerData: java.util.List[KeyedMessage[String, String]] = scalaProducerData val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]]) mockScalaProducer.send(scalaProducerData.head) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 29331db..2cabfbb 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -108,7 +108,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ fail("Test should fail because the broker list provided are not valid") } catch { case e: FailedToSendMessageException => - case oe => fail("fails with exception", oe) + case oe: Throwable => fail("fails with exception", oe) } finally { producer1.close() } @@ -121,7 +121,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ try{ producer2.send(new KeyedMessage[String, String](topic, "test", "test1")) } catch { - case e => fail("Should succeed sending the message", e) + case e: Throwable => fail("Should succeed sending the message", e) } finally { producer2.close() } @@ -134,7 +134,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ try{ producer3.send(new KeyedMessage[String, String](topic, "test", "test1")) } catch { - case e => fail("Should succeed sending the message", e) + case e: Throwable => fail("Should succeed sending the message", e) } finally { producer3.close() } @@ -191,7 +191,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ } catch { case se: FailedToSendMessageException => true - case e => fail("Not expected", e) + case e: Throwable => fail("Not expected", e) } finally { producer2.close() @@ -225,7 +225,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ // on broker 0 producer.send(new KeyedMessage[String, String](topic, "test", "test1")) } catch { - case e => fail("Unexpected exception: " + e) + case e: Throwable => fail("Unexpected exception: " + e) } // kill the broker @@ -238,7 +238,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ fail("Should fail since no leader exists for the partition.") } catch { case e : TestFailedException => throw e // catch and re-throw the failure message - case e2 => // otherwise success + case e2: Throwable => // otherwise success } // restart server 1 @@ -287,7 +287,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ assertTrue("Message set should have 1 message", messageSet1.hasNext) assertEquals(new Message("test".getBytes), messageSet1.next.message) } catch { - case e => case e: Exception => producer.close; fail("Not expected", e) + case e: Throwable => case e: Exception => producer.close; fail("Not expected", e) } // stop IO threads and request handling, but leave networking operational diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index b3e89c3..3592bff 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -136,7 +136,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0)) } catch { case e : java.io.IOException => // success - case e2 => throw e2 + case e2: Throwable => throw e2 } } @@ -205,7 +205,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.fail("Should have received timeout exception since request handling is stopped.") } catch { case e: SocketTimeoutException => /* success */ - case e => Assert.fail("Unexpected exception when expecting timeout: " + e) + case e: Throwable => Assert.fail("Unexpected exception when expecting timeout: " + e) } val t2 = SystemTime.milliseconds // make sure we don't wait fewer than timeoutMs for a response diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 830608f..ee591d0 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -410,7 +410,7 @@ object TestUtils extends Logging { ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)) } catch { - case oe => error("Error while electing leader for partition [%s,%d]".format(topic, partition), oe) + case oe: Throwable => error("Error while electing leader for partition [%s,%d]".format(topic, partition), oe) } } } diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala index 3158a22..ec3cd29 100644 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala @@ -175,7 +175,7 @@ object ConsumerPerformance { case _: InterruptedException => case _: ClosedByInterruptException => case _: ConsumerTimeoutException => - case e => throw e + case e: Throwable => throw e } totalMessagesRead.addAndGet(messagesRead) totalBytesRead.addAndGet(bytesRead) diff --git a/project/Build.scala b/project/Build.scala index b3858f3..2cdbc9e 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -41,7 +41,8 @@ object KafkaBuild extends Build { , scalacOptions ++= Seq("-deprecation", "-unchecked", "-g:none"), - crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2"), + crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2", "2.10.1"), + excludeFilter in unmanagedSources <<= scalaVersion(v => if (v.startsWith("2.8")) "*_2.9+.scala" else "*_2.8.scala"), scalaVersion := "2.8.0", version := "0.8.0-beta1", publishTo := Some("Apache Maven Repo" at "https://repository.apache.org/service/local/staging/deploy/maven2"), -- 1.7.10.2 (Apple Git-33)