From ba1a05af80c577fd08b2f23fadc07e7c7bd20f8e Mon Sep 17 00:00:00 2001 From: Matt Christiansen Date: Thu, 4 Apr 2013 23:59:28 -0700 Subject: [PATCH 1/1] KAFKA-717 Convert to scala 2.10 --- .gitignore | 1 + bin/kafka-run-class.sh | 8 ++++---- core/build.sbt | 1 + core/src/main/scala/kafka/Kafka.scala | 2 +- core/src/main/scala/kafka/admin/AdminUtils.scala | 8 ++++---- .../scala/kafka/admin/CreateTopicCommand.scala | 2 +- .../scala/kafka/admin/DeleteTopicCommand.scala | 2 +- .../main/scala/kafka/admin/ListTopicCommand.scala | 2 +- .../PreferredReplicaLeaderElectionCommand.scala | 6 +++--- .../kafka/admin/ReassignPartitionsCommand.scala | 4 ++-- core/src/main/scala/kafka/client/ClientUtils.scala | 4 ++-- core/src/main/scala/kafka/cluster/Broker.scala | 2 +- .../scala/kafka/consumer/ConsoleConsumer.scala | 6 +++--- .../kafka/consumer/ConsumerFetcherManager.scala | 4 ++-- .../main/scala/kafka/consumer/SimpleConsumer.scala | 2 +- .../src/main/scala/kafka/consumer/TopicCount.scala | 2 +- .../consumer/ZookeeperConsumerConnector.scala | 10 +++++----- .../consumer/ZookeeperTopicEventWatcher.scala | 2 +- .../controller/ControllerChannelManager.scala | 4 ++-- .../scala/kafka/controller/KafkaController.scala | 12 ++++++------ .../kafka/controller/PartitionStateMachine.scala | 16 ++++++++-------- .../kafka/controller/ReplicaStateMachine.scala | 4 ++-- .../main/scala/kafka/javaapi/FetchRequest.scala | 2 +- .../main/scala/kafka/javaapi/OffsetRequest.scala | 2 +- .../main/scala/kafka/javaapi/TopicMetadata.scala | 10 +++++----- .../scala/kafka/javaapi/TopicMetadataRequest.scala | 4 ++-- .../consumer/ZookeeperConsumerConnector.scala | 7 +++---- .../javaapi/message/ByteBufferMessageSet.scala | 3 ++- .../scala/kafka/javaapi/producer/Producer.scala | 4 ++-- core/src/main/scala/kafka/log/LogManager.scala | 2 +- .../kafka/network/BoundedByteBufferReceive.scala | 2 +- .../main/scala/kafka/producer/SyncProducer.scala | 2 +- .../kafka/producer/async/DefaultEventHandler.scala | 4 ++-- .../kafka/producer/async/ProducerSendThread.scala | 4 ++-- .../scala/kafka/server/AbstractFetcherThread.scala | 6 +++--- core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++++---- .../scala/kafka/server/KafkaServerStartable.scala | 4 ++-- .../main/scala/kafka/server/ReplicaManager.scala | 2 +- .../kafka/server/ZookeeperLeaderElector.scala | 2 +- .../main/scala/kafka/tools/ImportZkOffsets.scala | 2 +- core/src/main/scala/kafka/tools/JmxTool.scala | 2 +- .../main/scala/kafka/tools/KafkaMigrationTool.java | 1 + core/src/main/scala/kafka/tools/MirrorMaker.scala | 2 +- .../scala/kafka/tools/SimpleConsumerShell.scala | 2 +- core/src/main/scala/kafka/utils/Annotations.scala | 2 ++ core/src/main/scala/kafka/utils/Json.scala | 2 +- core/src/main/scala/kafka/utils/Mx4jLoader.scala | 2 +- core/src/main/scala/kafka/utils/Pool.scala | 11 +++++------ core/src/main/scala/kafka/utils/Utils.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 22 +++++++++++----------- .../consumer/ZookeeperConsumerConnectorTest.scala | 4 ++-- .../consumer/ZookeeperConsumerConnectorTest.scala | 7 ++++--- .../javaapi/message/BaseMessageSetTestCases.scala | 11 +++++------ .../scala/unit/kafka/metrics/KafkaTimerTest.scala | 5 +++-- .../unit/kafka/producer/AsyncProducerTest.scala | 2 +- .../scala/kafka/perf/ConsumerPerformance.scala | 2 +- project/Build.scala | 4 ++-- 57 files changed, 131 insertions(+), 126 deletions(-) diff --git a/.gitignore b/.gitignore index 553a077..492ed93 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ project/sbt_project_definition.iml .#* rat.out TAGS +.idea_modules/ diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index e055d67..d8cb85c 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -29,10 +29,10 @@ ivyPath=$(echo "$USER_HOME/.ivy2/cache") snappy=$(echo "$ivyPath/org.xerial.snappy/snappy-java/bundles/snappy-java-1.0.4.1.jar") CLASSPATH=$CLASSPATH:$snappy -library=$(echo "$ivyPath/org.scala-lang/scala-library/jars/scala-library-2.8.0.jar") +library=$(echo "$ivyPath/org.scala-lang/scala-library/jars/scala-library-2.10.1.jar") CLASSPATH=$CLASSPATH:$library -compiler=~$(echo "$ivyPath/org.scala-lang/scala-compiler/jars/scala-compiler-2.8.0.jar") +compiler=~$(echo "$ivyPath/org.scala-lang/scala-compiler/jars/scala-compiler-2.10.1.jar") CLASSPATH=$CLASSPATH:$compiler log4j=$(echo "$ivyPath/log4j/log4j/jars/log4j-1.2.15.jar") @@ -47,7 +47,7 @@ CLASSPATH=$CLASSPATH:$zookeeper jopt=$(echo "$ivyPath/net.sf.jopt-simple/jopt-simple/jars/jopt-simple-3.2.jar") CLASSPATH=$CLASSPATH:$jopt -for file in $base_dir/core/target/scala-2.8.0/*.jar; +for file in $base_dir/core/target/scala-2.10.1/*.jar; do CLASSPATH=$CLASSPATH:$file done @@ -57,7 +57,7 @@ do CLASSPATH=$CLASSPATH:$file done -for file in $base_dir/perf/target/scala-2.8.0/kafka*.jar; +for file in $base_dir/perf/target/scala-2.10.1/kafka*.jar; do CLASSPATH=$CLASSPATH:$file done diff --git a/core/build.sbt b/core/build.sbt index 211aaf9..68755f1 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -20,6 +20,7 @@ libraryDependencies ++= Seq( libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) => deps :+ (sv match { case "2.8.0" => "org.scalatest" % "scalatest" % "1.2" % "test" + case "2.10.1" => "org.scalatest" % "scalatest_2.10.0" % "1.8" % "test" case _ => "org.scalatest" %% "scalatest" % "1.8" % "test" }) } diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index dafb1ee..5d2a1d0 100644 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -47,7 +47,7 @@ object Kafka extends Logging { kafkaServerStartble.awaitShutdown } catch { - case e => fatal(e) + case e: Exception => fatal(e) } System.exit(0) } diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 63f5bc8..7d9063a 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -85,7 +85,7 @@ object AdminUtils extends Logging { debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) } catch { case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic)) - case e2 => throw new AdministrationException(e2.toString) + case e2: Exception => throw new AdministrationException(e2.toString) } } @@ -117,7 +117,7 @@ object AdminUtils extends Logging { try { Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head) } catch { - case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e) + case e: Exception => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e) } case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition) } @@ -125,7 +125,7 @@ object AdminUtils extends Logging { replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)) isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas) } catch { - case e => throw new ReplicaNotAvailableException(e) + case e: Exception => throw new ReplicaNotAvailableException(e) } if(replicaInfo.size < replicas.size) throw new ReplicaNotAvailableException("Replica information not available for following brokers: " + @@ -135,7 +135,7 @@ object AdminUtils extends Logging { inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(",")) new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) } catch { - case e => + case e: Exception => debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e) new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala index e762115..eb00b00 100644 --- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala @@ -74,7 +74,7 @@ object CreateTopicCommand extends Logging { createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr) println("creation succeeded!") } catch { - case e => + case e: Exception => println("creation failed because of " + e.getMessage) println(Utils.stackTrace(e)) } finally { diff --git a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala index 3da4518..1b67168 100644 --- a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala @@ -54,7 +54,7 @@ object DeleteTopicCommand { println("deletion succeeded!") } catch { - case e => + case e: Exception => println("delection failed because of " + e.getMessage) println(Utils.stackTrace(e)) } diff --git a/core/src/main/scala/kafka/admin/ListTopicCommand.scala b/core/src/main/scala/kafka/admin/ListTopicCommand.scala index 095469b..08060f8 100644 --- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/ListTopicCommand.scala @@ -66,7 +66,7 @@ object ListTopicCommand { showTopic(t, zkClient) } catch { - case e => + case e: Exception => println("list topic failed because of " + e.getMessage) println(Utils.stackTrace(e)) } diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index d5de5f3..bc6aea3 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -60,7 +60,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { preferredReplicaElectionCommand.moveLeaderToPreferredReplica() println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection)) } catch { - case e => + case e: Exception => println("Failed to start preferred replica election") println(Utils.stackTrace(e)) } finally { @@ -104,7 +104,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { val partitionsUndergoingPreferredReplicaElection = parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1) throw new AdministrationException("Preferred replica leader election currently in progress for " + "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection)) - case e2 => throw new AdministrationException(e2.toString) + case e2: Exception => throw new AdministrationException(e2.toString) } } } @@ -116,7 +116,7 @@ class PreferredReplicaLeaderElectionCommand(zkClient: ZkClient, partitions: scal val validPartitions = partitions.filter(p => validatePartition(zkClient, p.topic, p.partition)) PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions) } catch { - case e => throw new AdminCommandFailedException("Admin command failed", e) + case e: Exception => throw new AdminCommandFailedException("Admin command failed", e) } } diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 8d287f4..d2bcf74 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -67,7 +67,7 @@ object ReassignPartitionsCommand extends Logging { else println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) } catch { - case e => + case e: Exception => println("Partitions reassignment failed due to " + e.getMessage) println(Utils.stackTrace(e)) } finally { @@ -90,7 +90,7 @@ class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[T val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient) throw new AdminCommandFailedException("Partition reassignment currently in " + "progress for %s. Aborting operation".format(partitionsBeingReassigned)) - case e => error("Admin command failed", e); false + case e: Exception => error("Admin command failed", e); false } } diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index a3d88ea..4b22bc8 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -25,7 +25,7 @@ object ClientUtils extends Logging{ var i: Int = 0 val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq) var topicMetadataResponse: TopicMetadataResponse = null - var t: Throwable = null + var t: Exception = null while(i < brokers.size && !fetchMetaDataSucceeded) { val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, brokers(i)) info("Fetching metadata with correlation id %d for %d topic(s) %s".format(correlationId, topics.size, topics)) @@ -34,7 +34,7 @@ object ClientUtils extends Logging{ fetchMetaDataSucceeded = true } catch { - case e => + case e: Exception => warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed" .format(correlationId, topics, brokers(i).toString), e) t = e diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 435c473..314bbee 100644 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -42,7 +42,7 @@ private[kafka] object Broker { throw new BrokerNotAvailableException("Broker id %d does not exist".format(id)) } } catch { - case t => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, t) + case e: Exception => throw new KafkaException("Failed to parse the broker info from zookeeper: " + brokerInfoString, e) } } diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index d6c4a51..b391df8 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -197,7 +197,7 @@ object ConsoleConsumer extends Logging { formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) numMessages += 1 } catch { - case e => + case e: Exception => if (skipMessageOnError) error("Error processing message, skipping this message: ", e) else @@ -213,7 +213,7 @@ object ConsoleConsumer extends Logging { } } } catch { - case e => error("Error processing message, stopping consumer: ", e) + case e: Exception => error("Error processing message, stopping consumer: ", e) } System.err.println("Consumed %d messages".format(numMessages)) System.out.flush() @@ -240,7 +240,7 @@ object ConsoleConsumer extends Logging { zk.deleteRecursive(dir) zk.close() } catch { - case _ => // swallow + case e: Exception => // swallow } } } diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index d1373c9..467b73a 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -83,13 +83,13 @@ class ConsumerFetcherManager(private val consumerIdString: String, addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) noLeaderPartitionSet -= topicAndPartition } catch { - case t => warn("Failed to add fetcher for %s to broker %s".format(topicAndPartition, leaderBroker), t) + case e: Exception => warn("Failed to add fetcher for %s to broker %s".format(topicAndPartition, leaderBroker), e) } } shutdownIdleFetcherThreads() } catch { - case t => warn("Failed to find leader for %s".format(noLeaderPartitionSet), t) + case e: Exception => warn("Failed to find leader for %s".format(noLeaderPartitionSet), e) } } finally { lock.unlock() diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 1fbdfc3..08f3f48 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -86,7 +86,7 @@ class SimpleConsumer(val host: String, disconnect() throw ioe } - case e => throw e + case e: Exception => throw e } response } diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index c8e8406..c6e9a98 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -67,7 +67,7 @@ private[kafka] object TopicCount extends Logging { case None => throw new KafkaException("error constructing TopicCount : " + topicCountString) } } catch { - case e => + case e: Exception => error("error parsing consumer json string " + topicCountString, e) throw e } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 9a5fbfe..6bcf753 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -173,7 +173,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, zkClient = null } } catch { - case e => + case e: Exception => fatal("error during consumer connector shutdown", e) } info("ZKConsumerConnector shut down completed") @@ -324,7 +324,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (doRebalance) syncedRebalance } catch { - case t => error("error during syncedRebalance", t) + case t: Exception => error("error during syncedRebalance", t) } } info("stopping watcher executor thread for consumer " + consumerIdString) @@ -372,7 +372,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, try { done = rebalance(cluster) } catch { - case e => + case e: Exception => /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. * For example, a ZK node can disappear between the time we get all children and the time we try to get * the value of a child. Just let this go since another rebalance will be triggered. @@ -438,7 +438,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, " for topic " + topic + " with consumers: " + curConsumers) for (consumerThreadId <- consumerThreadIdSet) { - val myConsumerPosition = curConsumers.findIndexOf(_ == consumerThreadId) + val myConsumerPosition = curConsumers.indexWhere(_ == consumerThreadId) assert(myConsumerPosition >= 0) val startPart = nPartsPerConsumer*myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart) val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1) @@ -556,7 +556,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // The node hasn't been deleted by the original owner. So wait a bit and retry. info("waiting for the partition ownership to be deleted: " + partition) false - case e2 => throw e2 + case e2: Exception => throw e2 } } val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1)) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index df83baa..45df693 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -75,7 +75,7 @@ class ZookeeperTopicEventWatcher(val config:ConsumerConfig, } } catch { - case e => + case e: Exception => error("error in handling child changes", e) } } diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 3164f78..d7f019e 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -91,7 +91,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext brokerStateInfo(brokerId).requestSendThread.shutdown() brokerStateInfo.remove(brokerId) }catch { - case e => error("Error while removing broker by the controller", e) + case e: Exception => error("Error while removing broker by the controller", e) } } @@ -137,7 +137,7 @@ class RequestSendThread(val controllerId: Int, } } } catch { - case e => + case e: Exception => // log it and let it go. Let controller shut it down. debug("Exception occurs", e) } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 65def03..d69e50c 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -395,7 +395,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector) } catch { - case e => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e) + case e: Exception => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e) } finally { removePartitionsFromPreferredReplicaElection(partitions) } @@ -461,9 +461,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } catch { case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " + "Aborting controller startup procedure") - case oe => error("Error while incrementing controller epoch", oe) + case oe: Exception => error("Error while incrementing controller epoch", oe) } - case oe => error("Error while incrementing controller epoch", oe) + case oe: Exception => error("Error while incrementing controller epoch", oe) } info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch)) @@ -636,7 +636,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap)) } catch { case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic)) - case e2 => throw new KafkaException(e2.toString) + case e2: Exception => throw new KafkaException(e2.toString) } } @@ -802,7 +802,7 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL .format(topicAndPartition)) } } catch { - case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e) + case e: Exception => error("Error completing reassignment of partition %s".format(topicAndPartition), e) // remove the partition from the admin path to unblock the admin client controller.removePartitionFromReassignedPartitions(topicAndPartition) } @@ -883,7 +883,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: } } }catch { - case e => error("Error while handling partition reassignment", e) + case e: Exception => error("Error while handling partition reassignment", e) } } diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index c017727..aa75654 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -17,7 +17,7 @@ package kafka.controller import collection._ -import collection.JavaConversions._ +import collection.JavaConverters._ import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} @@ -91,7 +91,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers) } catch { - case e => error("Error while moving some partitions to the online state", e) + case e: Exception => error("Error while moving some partitions to the online state", e) // TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions } } @@ -111,7 +111,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers) }catch { - case e => error("Error while moving some partitions to %s state".format(targetState), e) + case e: Exception => error("Error while moving some partitions to %s state".format(targetState), e) // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions } } @@ -176,7 +176,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { // post: partition state is deleted from all brokers and zookeeper } } catch { - case t: Throwable => + case t: Exception => stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed" .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t) } @@ -322,7 +322,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } catch { case lenne: LeaderElectionNotNeededException => // swallow case nroe: NoReplicaOnlineException => throw nroe - case sce => + case sce: Exception => val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage) stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg) throw new StateChangeFailedException(failMsg, sce) @@ -360,8 +360,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { controllerContext.controllerLock synchronized { if (!hasShutdown.get) { try { - debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(","))) - val currentChildren = JavaConversions.asBuffer(children).toSet + debug("Topic change listener fired for path %s with children %s".format(parentPath, children.asScala.mkString(","))) + val currentChildren = children.asScala.toSet val newTopics = currentChildren -- controllerContext.allTopics val deletedTopics = controllerContext.allTopics -- currentChildren // val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1)) @@ -376,7 +376,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { if(newTopics.size > 0) controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet) } catch { - case e => error("Error while handling new topic", e ) + case e: Exception => error("Error while handling new topic", e ) } // TODO: kafka-330 Handle deleted topics } diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index bea1644..98f3fba 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -89,7 +89,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState)) brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers) }catch { - case e => error("Error while moving some replicas to %s state".format(targetState), e) + case e: Exception => error("Error while moving some replicas to %s state".format(targetState), e) } } @@ -257,7 +257,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { if(deadBrokerIds.size > 0) controller.onBrokerFailure(deadBrokerIds.toSeq) } catch { - case e => error("Error while handling broker changes", e) + case e: Exception => error("Error while handling broker changes", e) } } } diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala index b475240..0ecadb3 100644 --- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala +++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala @@ -29,7 +29,7 @@ class FetchRequest(correlationId: Int, requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) { val underlying = { - val scalaMap = JavaConversions.asMap(requestInfo).toMap + val scalaMap = JavaConversions.asScalaMap(requestInfo).toMap kafka.api.FetchRequest( correlationId = correlationId, clientId = clientId, diff --git a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala index 1c77ff8..79d952f 100644 --- a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala +++ b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala @@ -28,7 +28,7 @@ class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffse clientId: String) { val underlying = { - val scalaMap = JavaConversions.asMap(requestInfo).toMap + val scalaMap = JavaConversions.asScalaMap(requestInfo).toMap kafka.api.OffsetRequest( requestInfo = scalaMap, versionId = versionId, diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala index 97b6dcd..2a780b5 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala @@ -17,16 +17,16 @@ package kafka.javaapi import kafka.cluster.Broker -import scala.collection.JavaConversions.asList +import scala.collection.JavaConversions.asJavaList private[javaapi] object MetadataListImplicits { implicit def toJavaTopicMetadataList(topicMetadataSeq: Seq[kafka.api.TopicMetadata]): java.util.List[kafka.javaapi.TopicMetadata] = - asList(topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))) + asJavaList(topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))) implicit def toPartitionMetadataList(partitionMetadataSeq: Seq[kafka.api.PartitionMetadata]): java.util.List[kafka.javaapi.PartitionMetadata] = - asList(partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))) + asJavaList(partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))) } class TopicMetadata(private val underlying: kafka.api.TopicMetadata) { @@ -51,9 +51,9 @@ class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) { underlying.leader } - def replicas: java.util.List[Broker] = asList(underlying.replicas) + def replicas: java.util.List[Broker] = asJavaList(underlying.replicas) - def isr: java.util.List[Broker] = asList(underlying.isr) + def isr: java.util.List[Broker] = asJavaList(underlying.isr) def errorCode: Short = underlying.errorCode diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index 5f80df7..f7c4fa1 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -18,7 +18,7 @@ package kafka.javaapi import kafka.api._ import java.nio.ByteBuffer -import scala.collection.JavaConversions +import scala.collection.JavaConverters._ class TopicMetadataRequest(val versionId: Short, override val correlationId: Int, @@ -27,7 +27,7 @@ class TopicMetadataRequest(val versionId: Short, extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) { val underlying: kafka.api.TopicMetadataRequest = - new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, JavaConversions.asBuffer(topics)) + new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics.asScala) def this(topics: java.util.List[String]) = this(kafka.api.TopicMetadataRequest.CurrentVersion, 0, kafka.api.TopicMetadataRequest.DefaultClientId, topics) diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 14c4c8a..cb93f80 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -18,7 +18,7 @@ package kafka.javaapi.consumer import kafka.serializer._ import kafka.consumer._ -import scala.collection.JavaConversions.asList +import collection.JavaConverters._ /** @@ -71,9 +71,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : java.util.Map[String,java.util.List[KafkaStream[K,V]]] = { - import scala.collection.JavaConversions._ - val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]]) + val scalaTopicCountMap: Map[String, Int] = Map.empty ++ topicCountMap.asInstanceOf[java.util.Map[String, Int]].asScala val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder) val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]] for ((topic, streams) <- scalaReturn) { @@ -89,7 +88,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder()) def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) = - asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder)) + underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder).asJava def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder()) diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala index 0a95248..6c78fd3 100644 --- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala @@ -20,12 +20,13 @@ import java.util.concurrent.atomic.AtomicLong import scala.reflect.BeanProperty import java.nio.ByteBuffer import kafka.message._ +import collection.JavaConverters._ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet { private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer) def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) { - this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), scala.collection.JavaConversions.asBuffer(messages): _*).buffer) + this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), messages.asScala: _*).buffer) } def this(messages: java.util.List[Message]) { diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala index 7265328..33c234f 100644 --- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala +++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala @@ -19,6 +19,7 @@ package kafka.javaapi.producer import kafka.producer.ProducerConfig import kafka.producer.KeyedMessage +import collection.JavaConverters._ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only { @@ -37,8 +38,7 @@ class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for * @param messages list of producer data objects that encapsulate the topic, key and message data */ def send(messages: java.util.List[KeyedMessage[K,V]]) { - import collection.JavaConversions._ - underlying.send(asBuffer(messages):_*) + underlying.send(messages.asScala: _*) } /** diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 497cfdd..48a66d2 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -318,7 +318,7 @@ private[kafka] class LogManager(val config: KafkaConfig, if(timeSinceLastFlush >= logFlushInterval) log.flush } catch { - case e => + case e: Exception => error("Error flushing topic " + log.topicName, e) e match { case _: IOException => diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala index cab1864..f666224 100644 --- a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala +++ b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala @@ -82,7 +82,7 @@ private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive case e: OutOfMemoryError => error("OOME with size " + size, e) throw e - case e2 => + case e2: Exception => throw e2 } buffer diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 306f200..8f9a59f 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -79,7 +79,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry disconnect() throw e - case e => throw e + case e: Exception => throw e } response } diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 27b16e3..eda9405 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -128,7 +128,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, else serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message))) } catch { - case t => + case t: Exception => producerStats.serializationErrorRate.mark() if (isSync) { throw t @@ -177,7 +177,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, }catch { // Swallow recoverable exceptions and return None so that they can be retried. case ute: UnknownTopicException => warn("Failed to collate messages by topic,partition due to", ute); None case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to", lnae); None - case oe => error("Failed to collate messages by topic, partition due to", oe); throw oe + case oe: Exception => error("Failed to collate messages by topic, partition due to", oe); throw oe } } diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 6691147..94c5dac 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -43,7 +43,7 @@ class ProducerSendThread[K,V](val threadName: String, try { processEvents }catch { - case e => error("Error in sending events: ", e) + case e: Exception => error("Error in sending events: ", e) }finally { shutdownLatch.countDown } @@ -103,7 +103,7 @@ class ProducerSendThread[K,V](val threadName: String, if(size > 0) handler.handle(events) }catch { - case e => error("Error in handling batch of " + size + " events", e) + case e: Exception => error("Error in handling batch of " + size + " events", e) } } diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index cfa7747..ef9ed63 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -96,7 +96,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) response = simpleConsumer.fetch(fetchRequest) } catch { - case t => + case t: Exception => debug("error in fetch %s".format(fetchRequest), t) if (isRunning.get) { partitionMapLock synchronized { @@ -131,7 +131,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread processPartitionData(topicAndPartition, currentOffset.get, partitionData) } catch { - case e => + case e: Exception => throw new KafkaException("error processing data for topic %s partititon %d offset %d" .format(topic, partitionId, currentOffset.get), e) } @@ -142,7 +142,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke warn("current offset %d for topic %s partition %d out of range; reset offset to %d" .format(currentOffset.get, topic, partitionId, newOffset)) } catch { - case e => + case e: Exception => warn("error getting offset for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), e) partitionsWithError += topicAndPartition } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 87ca6b0..e8ce73e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -210,7 +210,7 @@ class KafkaApis(val requestChannel: RequestChannel, case nle: NotLeaderForPartitionException => warn(nle.getMessage) new ProduceResult(topicAndPartition, nle) - case e => + case e: Exception => BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() error("Error processing ProducerRequest with correlation id %d from client %s on %s:%d" @@ -296,7 +296,7 @@ class KafkaApis(val requestChannel: RequestChannel, case nle: NotLeaderForPartitionException => warn(nle.getMessage) new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty) - case t => + case t: Exception => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark() error("error when processing request " + (topic, partition, offset, fetchSize), t) @@ -370,7 +370,7 @@ class KafkaApis(val requestChannel: RequestChannel, case nle: NotLeaderForPartitionException => warn(nle.getMessage) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) ) - case e => + case e: Exception => warn("Error while responding to offset request", e) (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) ) } @@ -416,7 +416,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } } catch { - case e => error("Error while retrieving topic metadata", e) + case e: Exception => error("Error while retrieving topic metadata", e) } case _ => error("Error while fetching topic metadata for topic " + topicAndMetadata.topic, diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala index 5be65e9..9879a11 100644 --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala +++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala @@ -34,7 +34,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { server.startup() } catch { - case e => + case e: Exception => fatal("Fatal error during KafkaServerStable startup. Prepare to shutdown", e) shutdown() System.exit(1) @@ -46,7 +46,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { server.shutdown() } catch { - case e => + case e: Exception => fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e) System.exit(1) } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6d849ac..d0be385 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -222,7 +222,7 @@ class ReplicaManager(val config: KafkaConfig, makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.aliveLeaders, leaderAndISRRequest.correlationId) } catch { - case e => + case e: Exception => val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d " + "epoch %d for partition %s").format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicAndPartition) diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala index 574922b..bf1a89d 100644 --- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala +++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala @@ -59,7 +59,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: if (data != null) { leaderId = data.toInt } - case e2 => + case e2: Exception => error("Error while electing or becoming leader on broker %d".format(brokerId), e2) resign() } diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index 63519e1..2ecb0da 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -105,7 +105,7 @@ object ImportZkOffsets extends Logging { try { ZkUtils.updatePersistentPath(zkClient, partition, offset.toString) } catch { - case e => e.printStackTrace() + case e: Exception => e.printStackTrace() } } } diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index 7e424e7..26badb1 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -86,7 +86,7 @@ object JmxTool extends Logging { else List(null) - val names = queries.map((name: ObjectName) => asSet(mbsc.queryNames(name, null))).flatten + val names = queries.map((name: ObjectName) => asJavaSet(mbsc.queryNames(name, null))).flatten val allAttributes: Iterable[(ObjectName, Array[String])] = names.map((name: ObjectName) => (name, mbsc.getMBeanInfo(name).getAttributes().map(_.getName))) diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index 3c18286..3bd674f 100644 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code. */ +@SuppressWarnings({"unchecked", "rawtypes"}) public class KafkaMigrationTool { private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName()); private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer"; diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 3d22dc7..46d10df 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -164,7 +164,7 @@ object MirrorMaker extends Logging { producerDataChannel.sendRequest(pd) } } catch { - case e => + case e: Exception => fatal("%s stream unexpectedly exited.", e) } finally { shutdownLatch.countDown() diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 3cfa384..bd9c556 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -217,7 +217,7 @@ object SimpleConsumerShell extends Logging { formatter.writeTo(key, Utils.readBytes(message.payload), System.out) numMessagesConsumed += 1 } catch { - case e => + case e: Exception => if (skipMessageOnError) error("Error processing message, skipping this message: ", e) else diff --git a/core/src/main/scala/kafka/utils/Annotations.scala b/core/src/main/scala/kafka/utils/Annotations.scala index 28269eb..1a18226 100644 --- a/core/src/main/scala/kafka/utils/Annotations.scala +++ b/core/src/main/scala/kafka/utils/Annotations.scala @@ -17,6 +17,8 @@ package kafka.utils +import scala.annotation.StaticAnnotation + /* Some helpful annotations */ /** diff --git a/core/src/main/scala/kafka/utils/Json.scala b/core/src/main/scala/kafka/utils/Json.scala index a114769..2f116ca 100644 --- a/core/src/main/scala/kafka/utils/Json.scala +++ b/core/src/main/scala/kafka/utils/Json.scala @@ -16,7 +16,7 @@ object Json extends Logging { try { JSON.parseFull(input) } catch { - case t => + case t: Exception => throw new KafkaException("Can't parse json string: %s".format(input), t) } } diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala index 64d84cc..69fe2ad 100644 --- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala +++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala @@ -64,7 +64,7 @@ object Mx4jLoader extends Logging { case e: ClassNotFoundException => { info("Will not load MX4J, mx4j-tools.jar is not in the classpath"); } - case e => { + case e: Exception => { warn("Could not start register mbean in JMX", e); } } diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala index 9a86eab..738cd0b 100644 --- a/core/src/main/scala/kafka/utils/Pool.scala +++ b/core/src/main/scala/kafka/utils/Pool.scala @@ -19,7 +19,7 @@ package kafka.utils import java.util.ArrayList import java.util.concurrent._ -import collection.JavaConversions +import collection.JavaConverters._ import kafka.common.KafkaException import java.lang.Object @@ -70,11 +70,10 @@ class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] def get(key: K): V = pool.get(key) def remove(key: K): V = pool.remove(key) - - def keys = JavaConversions.asSet(pool.keySet()) - - def values: Iterable[V] = - JavaConversions.asIterable(new ArrayList[V](pool.values())) + + def keys = pool.keySet.asScala.toSet + + def values: Iterable[V] = new ArrayList[V](pool.values).asScala def clear() { pool.clear() } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index c639efb..b13fb21 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -67,7 +67,7 @@ object Utils extends Logging { fun() } catch { - case t => + case t: Exception => // log any error and the stack trace error("error in loggedRunnable", t) } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index ce1904b..5b93387 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -261,7 +261,7 @@ object ZkUtils extends Logging { storedData = readData(client, path)._1 } catch { case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this - case e2 => throw e2 + case e2: Exception => throw e2 } if (storedData == null || storedData != data) { info("conflict in " + path + " data: " + data + " stored data: " + storedData) @@ -271,7 +271,7 @@ object ZkUtils extends Logging { info(path + " exists with value " + data + " during connection loss; this is ok") } } - case e2 => throw e2 + case e2: Exception => throw e2 } } @@ -314,10 +314,10 @@ object ZkUtils extends Logging { case e: ZkNodeExistsException => stat = client.writeData(path, data) return stat.getVersion - case e2 => throw e2 + case e2: Exception => throw e2 } } - case e2 => throw e2 + case e2: Exception => throw e2 } } @@ -370,7 +370,7 @@ object ZkUtils extends Logging { createParentPath(client, path) client.createEphemeral(path, data) } - case e2 => throw e2 + case e2: Exception => throw e2 } } @@ -382,7 +382,7 @@ object ZkUtils extends Logging { // this can happen during a connection loss event, return normally info(path + " deleted during connection loss; this is ok") false - case e2 => throw e2 + case e2: Exception => throw e2 } } @@ -393,7 +393,7 @@ object ZkUtils extends Logging { case e: ZkNoNodeException => // this can happen during a connection loss event, return normally info(path + " deleted during connection loss; this is ok") - case e2 => throw e2 + case e2: Exception => throw e2 } } @@ -403,7 +403,7 @@ object ZkUtils extends Logging { zk.deleteRecursive(dir) zk.close() } catch { - case _ => // swallow + case e: Exception => // swallow } } @@ -420,7 +420,7 @@ object ZkUtils extends Logging { } catch { case e: ZkNoNodeException => (None, stat) - case e2 => throw e2 + case e2: Exception => throw e2 } dataAndStat } @@ -438,7 +438,7 @@ object ZkUtils extends Logging { client.getChildren(path) } catch { case e: ZkNoNodeException => return Nil - case e2 => throw e2 + case e2: Exception => throw e2 } } @@ -613,7 +613,7 @@ object ZkUtils extends Logging { case nne: ZkNoNodeException => ZkUtils.createPersistentPath(zkClient, zkPath, jsonData) debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData)) - case e2 => throw new AdministrationException(e2.toString) + case e2: Exception => throw new AdministrationException(e2.toString) } } } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index f7ee914..c1a82ff 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -31,6 +31,7 @@ import org.I0Itec.zkclient.ZkClient import kafka.utils._ import kafka.producer.{ProducerConfig, KeyedMessage, Producer} import java.util.{Collections, Properties} +import collection.JavaConverters._ import kafka.utils.TestUtils._ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { @@ -390,10 +391,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar } def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = { - import scala.collection.JavaConversions val children = zkClient.getChildren(path) Collections.sort(children) - val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children) + val childrenAsSeq : Seq[java.lang.String] = children.asScala childrenAsSeq.map(partition => (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String])) } diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index 9f243f0..a86e3ce 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -21,7 +21,6 @@ import junit.framework.Assert._ import kafka.integration.KafkaServerTestHarness import kafka.server._ import org.scalatest.junit.JUnit3Suite -import scala.collection.JavaConversions._ import org.apache.log4j.{Level, Logger} import kafka.message._ import kafka.serializer._ @@ -32,6 +31,7 @@ import kafka.utils.TestUtils._ import kafka.utils.{Logging, TestUtils} import kafka.consumer.{KafkaStream, ConsumerConfig} import kafka.zk.ZooKeeperTestHarness +import scala.collection.JavaConversions class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging { @@ -85,7 +85,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val ms = 0.until(messagesPerNode).map(x => header + conf.brokerId + "-" + partition + "-" + x) messages ++= ms import scala.collection.JavaConversions._ - javaProducer.send(asList(ms.map(new KeyedMessage[Int, String](topic, partition, _)))) + javaProducer.send(asJavaList(ms.map(new KeyedMessage[Int, String](topic, partition, _)))) } javaProducer.close messages @@ -102,8 +102,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String, String]]]): List[String] = { + import scala.collection.JavaConversions._ var messages: List[String] = Nil - val topicMessageStreams = asMap(jTopicMessageStreams) + val topicMessageStreams = JavaConversions.asScalaMap(jTopicMessageStreams).toMap for ((topic, messageStreams) <- topicMessageStreams) { for (messageStream <- messageStreams) { val iterator = messageStream.iterator diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala index abee11b..a889630 100644 --- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala @@ -22,14 +22,15 @@ import org.scalatest.junit.JUnitSuite import org.junit.Test import kafka.utils.TestUtils import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, CompressionCodec, Message} +import scala.collection.JavaConversions._ + trait BaseMessageSetTestCases extends JUnitSuite { val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes())) def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet def toMessageIterator(messageSet: MessageSet): Iterator[Message] = { - import scala.collection.JavaConversions._ - val messages = asIterable(messageSet) + val messages = asJavaIterable(messageSet) messages.map(m => m.message).iterator } @@ -41,18 +42,16 @@ trait BaseMessageSetTestCases extends JUnitSuite { @Test def testIteratorIsConsistent() { - import scala.collection.JavaConversions._ val m = createMessageSet(messages) // two iterators over the same set should give the same results - TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator)) + TestUtils.checkEquals(m.iterator, m.iterator) } @Test def testIteratorIsConsistentWithCompression() { - import scala.collection.JavaConversions._ val m = createMessageSet(messages, DefaultCompressionCodec) // two iterators over the same set should give the same results - TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator)) + TestUtils.checkEquals(m.iterator, m.iterator) } @Test diff --git a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala index a3f85cf..c43e69c 100644 --- a/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala @@ -36,8 +36,9 @@ class KafkaTimerTest extends JUnit3Suite { clock.addMillis(1000) } assertEquals(1, metric.getCount()) - assertTrue((metric.getMax() - 1000).abs <= Double.Epsilon) - assertTrue((metric.getMin() - 1000).abs <= Double.Epsilon) + //They got rid of Episolon see https://github.com/scala/scala/pull/1076 + assertTrue((metric.getMax() - 1000).abs <= java.lang.Double.longBitsToDouble(0x3ca0000000000000L)) + assertTrue((metric.getMin() - 1000).abs <= java.lang.Double.longBitsToDouble(0x3ca0000000000000L)) } private class ManualClock extends Clock { diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 922a200..49d89d6 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -447,7 +447,7 @@ class AsyncProducerTest extends JUnit3Suite { val topic = "topic1" val msgs = TestUtils.getMsgStrings(5) val scalaProducerData = msgs.map(m => new KeyedMessage[String, String](topic, m)) - val javaProducerData = scala.collection.JavaConversions.asList(scalaProducerData) + val javaProducerData = scala.collection.JavaConversions.asJavaList(scalaProducerData) val mockScalaProducer = EasyMock.createMock(classOf[kafka.producer.Producer[String, String]]) mockScalaProducer.send(scalaProducerData.head) diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala index ee2ce95..1605c98 100644 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala @@ -175,7 +175,7 @@ object ConsumerPerformance { case _: InterruptedException => case _: ClosedByInterruptException => case _: ConsumerTimeoutException => - case e => throw e + case e: Exception => throw e } totalMessagesRead.addAndGet(messagesRead) totalBytesRead.addAndGet(bytesRead) diff --git a/project/Build.scala b/project/Build.scala index 4bbdfee..a1cd1ae 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -27,8 +27,8 @@ object KafkaBuild extends Build { version := "0.8-SNAPSHOT", organization := "org.apache", scalacOptions ++= Seq("-deprecation", "-unchecked", "-g:none"), - crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2"), - scalaVersion := "2.8.0", + crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2", "2.10.1"), + scalaVersion := "2.10.1", javacOptions ++= Seq("-Xlint:unchecked", "-source", "1.5"), parallelExecution in Test := false, // Prevent tests from overrunning each other libraryDependencies ++= Seq( -- 1.8.2