From 781977148bcf535d3229cd22589ba5768413b448 Mon Sep 17 00:00:00 2001 From: Jon Riehl Date: Fri, 5 Sep 2014 18:25:32 -0500 Subject: [PATCH 1/4] KAFKA-1054; Suppressed compiler warnings about type erasure in matching via unboxing. --- core/src/main/scala/kafka/admin/AdminUtils.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index eee80f9..f06edf4 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -317,12 +317,17 @@ object AdminUtils extends Logging { if(str != null) { Json.parseFull(str) match { case None => // there are no config overrides - case Some(map: Map[String, _]) => + case Some(mapAnon: Map[_, _]) => + val map = mapAnon collect { case (k: String, v: Any) => k -> v } require(map("version") == 1) map.get("config") match { - case Some(config: Map[String, String]) => - for((k,v) <- config) - props.setProperty(k, v) + case Some(config: Map[_, _]) => + for(configTup <- config) + configTup match { + case (k: String, v: String) => + props.setProperty(k, v) + case _ => throw new IllegalArgumentException("Invalid topic config: " + str) + } case _ => throw new IllegalArgumentException("Invalid topic config: " + str) } -- 1.9.3 (Apple Git-50) From d331ba5f2b9a1fae57f01bb5859a242833dbb332 Mon Sep 17 00:00:00 2001 From: Jon Riehl Date: Fri, 5 Sep 2014 18:39:21 -0500 Subject: [PATCH 2/4] KAFKA-1054; Suppressed warning caused by slight difference in input function type. Added parameter name to scaladoc in comment. --- core/src/main/scala/kafka/utils/CoreUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index 98abc45..2546629 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -66,7 +66,7 @@ object CoreUtils extends Logging { * @param fun The runction to execute in the thread * @return The unstarted thread */ - def daemonThread(name: String, fun: () => Unit): Thread = + def daemonThread(name: String, fun: => Unit): Thread = Utils.daemonThread(name, runnable(fun)) /** -- 1.9.3 (Apple Git-50) From 2377a1562919a8a498ac61959ba3f4aed78c2f01 Mon Sep 17 00:00:00 2001 From: Blake Smith Date: Tue, 10 Mar 2015 23:17:52 -0500 Subject: [PATCH 3/4] KAFKA-1054; Fix compiler warnings: ServerShutdownTest, DelayedJoinGroup function signature --- core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala index df60cbc..f5bd5dc 100644 --- a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala +++ b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala @@ -28,7 +28,7 @@ import kafka.server.DelayedOperation */ class DelayedJoinGroup(sessionTimeout: Long, consumerRegistry: ConsumerRegistry, - responseCallback: () => Unit) extends DelayedOperation(sessionTimeout) { + responseCallback: => Unit) extends DelayedOperation(sessionTimeout) { /* always successfully complete the operation once called */ override def tryComplete(): Boolean = { @@ -45,4 +45,4 @@ class DelayedJoinGroup(sessionTimeout: Long, // TODO responseCallback } -} \ No newline at end of file +} -- 1.9.3 (Apple Git-50) From fc4878643496ddc9ee642ca7ccfec118c569b810 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sun, 26 Apr 2015 22:37:01 +0100 Subject: [PATCH 4/4] KAFKA-1054; Fix Scala 2.11 warnings `Pair` has been deprecated, `try` without `catch` and `finally` is useless and `Pair` is deprecated. --- core/src/main/scala/kafka/consumer/ConsumerConfig.scala | 4 ++-- core/src/main/scala/kafka/consumer/ConsumerIterator.scala | 6 ++---- core/src/main/scala/kafka/utils/CoreUtils.scala | 2 +- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 9ebbee6..0199317 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -104,8 +104,6 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** the socket timeout for network requests. Its value should be at least fetch.wait.max.ms. */ val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout) - require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" + - " to prevent unnecessary socket timeouts") /** the socket receive buffer for network requests */ val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize) @@ -133,6 +131,8 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes */ val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs) + require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" + + " to prevent unnecessary socket timeouts") /** backoff time between retries during rebalance */ val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs) diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index 0c5c451..0c6c810 100755 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -104,10 +104,8 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk } def clearCurrentChunk() { - try { - debug("Clearing the current data chunk for this consumer iterator") - current.set(null) - } + debug("Clearing the current data chunk for this consumer iterator") + current.set(null) } } diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index 2546629..d0a8fa7 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -207,7 +207,7 @@ object CoreUtils extends Logging { return map val keyVals = str.split("\\s*,\\s*").map(s => { val lio = s.lastIndexOf(":") - Pair(s.substring(0,lio).trim, s.substring(lio + 1).trim) + (s.substring(0,lio).trim, s.substring(lio + 1).trim) }) keyVals.toMap } -- 1.9.3 (Apple Git-50)