From 20863281b64bd8bfba89e81195beedba7e07f541 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 21 Apr 2015 23:21:31 +0100 Subject: [PATCH 01/20] Rename file to match class name --- .../common/ConsumerRebalanceFailedException.scala | 26 ++++++++++++++++++++++ .../common/ConsumerReblanceFailedException.scala | 26 ---------------------- 2 files changed, 26 insertions(+), 26 deletions(-) create mode 100644 core/src/main/scala/kafka/common/ConsumerRebalanceFailedException.scala delete mode 100644 core/src/main/scala/kafka/common/ConsumerReblanceFailedException.scala diff --git a/core/src/main/scala/kafka/common/ConsumerRebalanceFailedException.scala b/core/src/main/scala/kafka/common/ConsumerRebalanceFailedException.scala new file mode 100644 index 0000000..ae5018d --- /dev/null +++ b/core/src/main/scala/kafka/common/ConsumerRebalanceFailedException.scala @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Thrown when a request is made for broker but no brokers with that topic + * exist. + */ +class ConsumerRebalanceFailedException(message: String) extends RuntimeException(message) { + def this() = this(null) +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/common/ConsumerReblanceFailedException.scala b/core/src/main/scala/kafka/common/ConsumerReblanceFailedException.scala deleted file mode 100644 index ae5018d..0000000 --- a/core/src/main/scala/kafka/common/ConsumerReblanceFailedException.scala +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Thrown when a request is made for broker but no brokers with that topic - * exist. - */ -class ConsumerRebalanceFailedException(message: String) extends RuntimeException(message) { - def this() = this(null) -} \ No newline at end of file -- 1.9.3 (Apple Git-50) From e2aba43533435712990f6f6335c79f959ac87910 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 21 Apr 2015 23:22:10 +0100 Subject: [PATCH 02/20] Remove redundant `extends Object` --- .../src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java | 2 +- .../src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java | 2 +- .../test/java/org/apache/kafka/common/config/AbstractConfigTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 42c7219..daf48fe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -304,7 +304,7 @@ public class ConsumerConfig extends AbstractConfig { return newProperties; } - ConsumerConfig(Map props) { + ConsumerConfig(Map props) { super(CONFIG, props); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 5a57555..187d000 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -242,7 +242,7 @@ public class ProducerConfig extends AbstractConfig { return newProperties; } - ProducerConfig(Map props) { + ProducerConfig(Map props) { super(CONFIG, props); } diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java index 66442ed..08d267a 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java @@ -73,7 +73,7 @@ public class AbstractConfigTest { METRIC_REPORTER_CLASSES_DOC); } - public TestConfig(Map props) { + public TestConfig(Map props) { super(CONFIG, props); } } -- 1.9.3 (Apple Git-50) From e900d31522f83a6baa936ba0899cc5369b059815 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 21 Apr 2015 23:29:13 +0100 Subject: [PATCH 03/20] Don't pass `char` to `StringBuilder` constructor It ends up calling the `StringBuilder(int capacity)` constructor. --- .../src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java index cf91f5f..c571b4b 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java @@ -58,7 +58,7 @@ public class Histogram { @Override public String toString() { - StringBuilder b = new StringBuilder('{'); + StringBuilder b = new StringBuilder("{"); for (int i = 0; i < this.hist.length - 1; i++) { b.append(String.format("%.10f", binScheme.fromBin(i))); b.append(':'); -- 1.9.3 (Apple Git-50) From b44ba4b2978813216ca1247e1161d75e6ebd4a8a Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 21 Apr 2015 23:31:17 +0100 Subject: [PATCH 04/20] Fix `equals` call to use the correct value --- .../main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index 305ec8e..5ab05dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -55,7 +55,7 @@ public class ConsumerRecords implements Iterable> { throw new IllegalArgumentException("Topic must be non-null."); List>> recs = new ArrayList>>(); for (Map.Entry>> entry : records.entrySet()) { - if (entry.getKey().equals(topic)) + if (entry.getKey().topic().equals(topic)) recs.add(entry.getValue()); } return new ConcatenatedIterable(recs); -- 1.9.3 (Apple Git-50) From 7bbde811d67273a7c7a718901f1d42f56c2db19f Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 21 Apr 2015 23:34:20 +0100 Subject: [PATCH 05/20] Improve numeric coercions --- .../main/java/org/apache/kafka/common/protocol/SecurityProtocol.java | 2 +- .../apache/kafka/common/protocol/types/ProtocolSerializationTest.java | 4 ++-- core/src/main/scala/kafka/cluster/EndPoint.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java index d3394ee..dab1a94 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java @@ -49,7 +49,7 @@ public enum SecurityProtocol { } public static String getName(int id) { - return CODE_TO_SECURITY_PROTOCOL.get(id).name; + return CODE_TO_SECURITY_PROTOCOL.get((short) id).name; } public static List getNames() { diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java index 8b92634..6c335a1 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java @@ -43,8 +43,8 @@ public class ProtocolSerializationTest { new Field("struct", new Schema(new Field("field", Type.INT32)))); this.struct = new Struct(this.schema).set("int8", (byte) 1) .set("int16", (short) 1) - .set("int32", (int) 1) - .set("int64", (long) 1) + .set("int32", 1) + .set("int64", 1L) .set("string", "1") .set("bytes", "1".getBytes()) .set("array", new Object[] {1}); diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala index 3286f6d..e9008e6 100644 --- a/core/src/main/scala/kafka/cluster/EndPoint.scala +++ b/core/src/main/scala/kafka/cluster/EndPoint.scala @@ -68,7 +68,7 @@ case class EndPoint(host: String, port: Int, protocolType: SecurityProtocol) { def writeTo(buffer: ByteBuffer): Unit = { buffer.putInt(port) writeShortString(buffer, host) - buffer.putShort(protocolType.id.toShort) + buffer.putShort(protocolType.id) } def sizeInBytes: Int = -- 1.9.3 (Apple Git-50) From c5ba74b3f2d98c183a041228ffc7aab6cf889079 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 21 Apr 2015 23:38:31 +0100 Subject: [PATCH 06/20] Remove redundant `val` modifiers in case classes --- .../kafka/api/ControlledShutdownRequest.scala | 8 ++--- .../kafka/api/ControlledShutdownResponse.scala | 8 ++--- .../main/scala/kafka/api/LeaderAndIsrRequest.scala | 6 ++-- .../main/scala/kafka/api/StopReplicaResponse.scala | 8 ++--- core/src/main/scala/kafka/api/TopicMetadata.scala | 2 +- .../scala/kafka/api/TopicMetadataRequest.scala | 10 +++---- .../scala/kafka/controller/KafkaController.scala | 4 +-- core/src/main/scala/kafka/log/CleanerConfig.scala | 20 ++++++------- core/src/main/scala/kafka/log/LogConfig.scala | 34 +++++++++++----------- core/src/main/scala/kafka/log/OffsetPosition.scala | 2 +- .../main/scala/kafka/producer/KeyedMessage.scala | 4 +-- 11 files changed, 53 insertions(+), 53 deletions(-) diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala index 5be393a..fe81635 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala @@ -37,9 +37,9 @@ object ControlledShutdownRequest extends Logging { } } -case class ControlledShutdownRequest(val versionId: Short, - val correlationId: Int, - val brokerId: Int) +case class ControlledShutdownRequest(versionId: Short, + correlationId: Int, + brokerId: Int) extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey)){ def this(correlationId: Int, brokerId: Int) = @@ -74,4 +74,4 @@ case class ControlledShutdownRequest(val versionId: Short, controlledShutdownRequest.append("; BrokerId: " + brokerId) controlledShutdownRequest.toString() } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala index 5e0a1cf..9ecdee7 100644 --- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala +++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala @@ -39,9 +39,9 @@ object ControlledShutdownResponse { } -case class ControlledShutdownResponse(val correlationId: Int, - val errorCode: Short = ErrorMapping.NoError, - val partitionsRemaining: Set[TopicAndPartition]) +case class ControlledShutdownResponse(correlationId: Int, + errorCode: Short = ErrorMapping.NoError, + partitionsRemaining: Set[TopicAndPartition]) extends RequestOrResponse() { def sizeInBytes(): Int ={ var size = @@ -68,4 +68,4 @@ case class ControlledShutdownResponse(val correlationId: Int, override def describe(details: Boolean):String = { toString } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 2fad585..431190a 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -59,8 +59,8 @@ object PartitionStateInfo { } } -case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, - val allReplicas: Set[Int]) { +case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, + allReplicas: Set[Int]) { def replicationFactor = allReplicas.size def writeTo(buffer: ByteBuffer) { @@ -200,4 +200,4 @@ case class LeaderAndIsrRequest (versionId: Short, leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(",")) leaderAndIsrRequest.toString() } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala index 3431f3f..2fc3c95 100644 --- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala +++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala @@ -42,9 +42,9 @@ object StopReplicaResponse { } -case class StopReplicaResponse(val correlationId: Int, - val responseMap: Map[TopicAndPartition, Short], - val errorCode: Short = ErrorMapping.NoError) +case class StopReplicaResponse(correlationId: Int, + responseMap: Map[TopicAndPartition, Short], + errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse() { def sizeInBytes(): Int ={ var size = @@ -72,4 +72,4 @@ case class StopReplicaResponse(val correlationId: Int, } override def describe(details: Boolean):String = { toString } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index 5e39f45..bd866bc 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -109,7 +109,7 @@ object PartitionMetadata { } case class PartitionMetadata(partitionId: Int, - val leader: Option[BrokerEndPoint], + leader: Option[BrokerEndPoint], replicas: Seq[BrokerEndPoint], isr: Seq[BrokerEndPoint] = Seq.empty, errorCode: Short = ErrorMapping.NoError) extends Logging { diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 7dca09c..363bae0 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -46,10 +46,10 @@ object TopicMetadataRequest extends Logging { } } -case class TopicMetadataRequest(val versionId: Short, - val correlationId: Int, - val clientId: String, - val topics: Seq[String]) +case class TopicMetadataRequest(versionId: Short, + correlationId: Int, + clientId: String, + topics: Seq[String]) extends RequestOrResponse(Some(RequestKeys.MetadataKey)){ def this(topics: Seq[String], correlationId: Int) = @@ -93,4 +93,4 @@ case class TopicMetadataRequest(val versionId: Short, topicMetadataRequest.append("; Topics: " + topics.mkString(",")) topicMetadataRequest.toString() } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 3a09377..a635116 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -647,7 +647,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt */ def startup() = { inLock(controllerContext.controllerLock) { - info("Controller starting up"); + info("Controller starting up") registerSessionExpirationListener() isRunning = true controllerElector.startup @@ -1326,7 +1326,7 @@ case class PartitionAndReplica(topic: String, partition: Int, replica: Int) { } } -case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) { +case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) { override def toString(): String = { val leaderAndIsrInfo = new StringBuilder leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader) diff --git a/core/src/main/scala/kafka/log/CleanerConfig.scala b/core/src/main/scala/kafka/log/CleanerConfig.scala index ade8386..782bc9a 100644 --- a/core/src/main/scala/kafka/log/CleanerConfig.scala +++ b/core/src/main/scala/kafka/log/CleanerConfig.scala @@ -29,13 +29,13 @@ package kafka.log * @param enableCleaner Allows completely disabling the log cleaner * @param hashAlgorithm The hash algorithm to use in key comparison. */ -case class CleanerConfig(val numThreads: Int = 1, - val dedupeBufferSize: Long = 4*1024*1024L, - val dedupeBufferLoadFactor: Double = 0.9d, - val ioBufferSize: Int = 1024*1024, - val maxMessageSize: Int = 32*1024*1024, - val maxIoBytesPerSecond: Double = Double.MaxValue, - val backOffMs: Long = 15 * 1000, - val enableCleaner: Boolean = true, - val hashAlgorithm: String = "MD5") { -} \ No newline at end of file +case class CleanerConfig(numThreads: Int = 1, + dedupeBufferSize: Long = 4*1024*1024L, + dedupeBufferLoadFactor: Double = 0.9d, + ioBufferSize: Int = 1024*1024, + maxMessageSize: Int = 32*1024*1024, + maxIoBytesPerSecond: Double = Double.MaxValue, + backOffMs: Long = 15 * 1000, + enableCleaner: Boolean = true, + hashAlgorithm: String = "MD5") { +} diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index da55a34..a907da0 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -63,23 +63,23 @@ object Defaults { * @param compressionType compressionType for a given topic * */ -case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, - val segmentMs: Long = Defaults.SegmentMs, - val segmentJitterMs: Long = Defaults.SegmentJitterMs, - val flushInterval: Long = Defaults.FlushInterval, - val flushMs: Long = Defaults.FlushMs, - val retentionSize: Long = Defaults.RetentionSize, - val retentionMs: Long = Defaults.RetentionMs, - val maxMessageSize: Int = Defaults.MaxMessageSize, - val maxIndexSize: Int = Defaults.MaxIndexSize, - val indexInterval: Int = Defaults.IndexInterval, - val fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs, - val deleteRetentionMs: Long = Defaults.DeleteRetentionMs, - val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio, - val compact: Boolean = Defaults.Compact, - val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, - val minInSyncReplicas: Int = Defaults.MinInSyncReplicas, - val compressionType: String = Defaults.CompressionType) { +case class LogConfig(segmentSize: Int = Defaults.SegmentSize, + segmentMs: Long = Defaults.SegmentMs, + segmentJitterMs: Long = Defaults.SegmentJitterMs, + flushInterval: Long = Defaults.FlushInterval, + flushMs: Long = Defaults.FlushMs, + retentionSize: Long = Defaults.RetentionSize, + retentionMs: Long = Defaults.RetentionMs, + maxMessageSize: Int = Defaults.MaxMessageSize, + maxIndexSize: Int = Defaults.MaxIndexSize, + indexInterval: Int = Defaults.IndexInterval, + fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs, + deleteRetentionMs: Long = Defaults.DeleteRetentionMs, + minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio, + compact: Boolean = Defaults.Compact, + uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable, + minInSyncReplicas: Int = Defaults.MinInSyncReplicas, + compressionType: String = Defaults.CompressionType) { def toProps: Properties = { val props = new Properties() diff --git a/core/src/main/scala/kafka/log/OffsetPosition.scala b/core/src/main/scala/kafka/log/OffsetPosition.scala index 6cefde4..24b6dcf 100644 --- a/core/src/main/scala/kafka/log/OffsetPosition.scala +++ b/core/src/main/scala/kafka/log/OffsetPosition.scala @@ -22,4 +22,4 @@ package kafka.log * in some log file of the beginning of the message set entry with the * given offset. */ -case class OffsetPosition(val offset: Long, val position: Int) \ No newline at end of file +case class OffsetPosition(offset: Long, position: Int) diff --git a/core/src/main/scala/kafka/producer/KeyedMessage.scala b/core/src/main/scala/kafka/producer/KeyedMessage.scala index 388bc9b..dbcf295 100644 --- a/core/src/main/scala/kafka/producer/KeyedMessage.scala +++ b/core/src/main/scala/kafka/producer/KeyedMessage.scala @@ -21,7 +21,7 @@ package kafka.producer * A topic, key, and value. * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored. */ -case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) { +case class KeyedMessage[K, V](topic: String, key: K, partKey: Any, message: V) { if(topic == null) throw new IllegalArgumentException("Topic cannot be null.") @@ -39,4 +39,4 @@ case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, v } def hasKey = key != null -} \ No newline at end of file +} -- 1.9.3 (Apple Git-50) From 0882175b2cea4ef9fce604b2ae60798904c9d838 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 21 Apr 2015 23:42:12 +0100 Subject: [PATCH 07/20] Remove redundant `return` keywords --- core/src/main/scala/kafka/client/ClientUtils.scala | 2 +- core/src/main/scala/kafka/log/OffsetIndex.scala | 4 ++-- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- core/src/main/scala/kafka/server/LogOffsetMetadata.scala | 2 +- core/src/main/scala/kafka/server/ReplicaManager.scala | 2 +- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 4 ++-- core/src/main/scala/kafka/tools/ExportZkOffsets.scala | 11 +++++------ core/src/main/scala/kafka/tools/ImportZkOffsets.scala | 2 +- core/src/main/scala/kafka/tools/ProducerPerformance.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 10 ++++------ core/src/test/scala/unit/kafka/utils/MockScheduler.scala | 8 ++++---- core/src/test/scala/unit/kafka/utils/TestUtils.scala | 10 +++++----- 12 files changed, 28 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index b66424b..62394c0 100755 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -74,7 +74,7 @@ object ClientUtils extends Logging{ } else { debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics)) } - return topicMetadataResponse + topicMetadataResponse } /** diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 4ab22de..a1082ae 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -375,10 +375,10 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi if(Os.isWindows) lock.lock() try { - return fun + fun } finally { if(Os.isWindows) lock.unlock() } } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c63f4ba..d401bac 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -458,7 +458,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg checkpoint.write(new BrokerMetadata(brokerId)) } - return brokerId + brokerId } private def generateBrokerId: Int = { diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala index a868334..00b60fe 100644 --- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala +++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala @@ -26,7 +26,7 @@ object LogOffsetMetadata { class OffsetOrdering extends Ordering[LogOffsetMetadata] { override def compare(x: LogOffsetMetadata , y: LogOffsetMetadata ): Int = { - return x.offsetDiff(y).toInt + x.offsetDiff(y).toInt } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8ddd325..59c9bc3 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -242,7 +242,7 @@ class ReplicaManager(val config: KafkaConfig, def getReplicaOrException(topic: String, partition: Int): Replica = { val replicaOpt = getReplica(topic, partition) if(replicaOpt.isDefined) - return replicaOpt.get + replicaOpt.get else throw new ReplicaNotAvailableException("Replica %d is not available for partition [%s,%d]".format(config.brokerId, topic, partition)) } diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 80b2674..bba3990 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -69,7 +69,7 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("prop") .ofType(classOf[String]) - val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up"); + val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up") val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + "start with the earliest message present in the log rather than the latest message.") val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") @@ -209,7 +209,7 @@ object ConsoleConsumer extends Logging { def checkZkPathExists(zkUrl: String, path: String): Boolean = { try { - val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer); + val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer) zk.exists(path) } catch { case _: Throwable => false diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala index 4d051bc..ce14bbc 100644 --- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala @@ -114,11 +114,10 @@ object ExportZkOffsets extends Logging { } } - private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] = { - return ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList - } + private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] = + ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList - private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] = { - return ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList - } + private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] = + ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList + } diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index abe0972..598350d 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -89,7 +89,7 @@ object ImportZkOffsets extends Logging { s = br.readLine() } - return partOffsetsMap + partOffsetsMap } private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = { diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index bc25cd2..71b1bd5 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -235,7 +235,7 @@ object ProducerPerformance extends Logging { val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x') debug(seqMsgString) - return seqMsgString.getBytes() + seqMsgString.getBytes() } private def generateProducerData(topic: String, messageId: Long): Array[Byte] = { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 5685a1e..1da8f90 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -498,7 +498,7 @@ object ZkUtils extends Logging { try { client.getChildren(path) } catch { - case e: ZkNoNodeException => return Nil + case e: ZkNoNodeException => Nil case e2: Throwable => throw e2 } } @@ -728,21 +728,19 @@ object ZkUtils extends Logging { def getSequenceId(client: ZkClient, path: String): Int = { try { val stat = client.writeDataReturnStat(path, "", -1) - return stat.getVersion + stat.getVersion } catch { case e: ZkNoNodeException => { createParentPath(client, BrokerSequenceIdPath) try { client.createPersistent(BrokerSequenceIdPath, "") - return 0 + 0 } catch { case e: ZkNodeExistsException => val stat = client.writeDataReturnStat(BrokerSequenceIdPath, "", -1) - return stat.getVersion - case e2: Throwable => throw e2 + stat.getVersion } } - case e2: Throwable => throw e2 } } diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala index c674078..eeafeda 100644 --- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala +++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala @@ -81,10 +81,10 @@ case class MockTask(val name: String, val fun: () => Unit, var nextExecution: Lo def periodic = period >= 0 def compare(t: MockTask): Int = { if(t.nextExecution == nextExecution) - return 0 + 0 else if (t.nextExecution < nextExecution) - return -1 + -1 else - return 1 + 1 } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 8dc99b6..faae0e9 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -396,7 +396,7 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) + new KafkaProducer[Array[Byte],Array[Byte]](producerProps) } /** @@ -417,7 +417,7 @@ object TestUtils extends Logging { consumerProps.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") - return new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps) + new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps) } /** @@ -457,9 +457,9 @@ object TestUtils extends Logging { new IteratorTemplate[Message] { override def makeNext(): Message = { if (iter.hasNext) - return iter.next.message + iter.next.message else - return allDone() + allDone() } } } @@ -579,7 +579,7 @@ object TestUtils extends Logging { fail("Timing out after %d ms since leader is not elected or changed for partition [%s,%d]" .format(timeoutMs, topic, partition)) - return leader + leader } /** -- 1.9.3 (Apple Git-50) From 1d1970250a5c60d114fb869fb33aea6e4e7db0d2 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 21 Apr 2015 23:49:34 +0100 Subject: [PATCH 08/20] Remove redundant semi-colons --- core/src/main/scala/kafka/admin/TopicCommand.scala | 2 +- .../src/main/scala/kafka/api/GenericRequestAndHeader.scala | 4 ++-- .../main/scala/kafka/api/GenericResponseAndHeader.scala | 4 ++-- core/src/main/scala/kafka/log/LogCleaner.scala | 4 ++-- core/src/main/scala/kafka/network/SocketServer.scala | 6 +++--- core/src/main/scala/kafka/tools/ConsoleProducer.scala | 4 ++-- core/src/main/scala/kafka/utils/Mx4jLoader.scala | 6 +++--- core/src/main/scala/kafka/utils/Throttler.scala | 4 ++-- core/src/test/scala/unit/kafka/log/LogTest.scala | 14 +++++++------- core/src/test/scala/unit/kafka/zk/ZKPathTest.scala | 8 ++++---- 10 files changed, 28 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 60f0228..8e6f186 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -143,7 +143,7 @@ object TopicCommand { topics.foreach { topic => try { if (Topic.InternalTopics.contains(topic)) { - throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic)); + throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic)) } else { ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) println("Topic %s is marked for deletion.".format(topic)) diff --git a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala index f40e19f..b0c6d7a 100644 --- a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala +++ b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala @@ -36,7 +36,7 @@ private[kafka] abstract class GenericRequestAndHeader(val versionId: Short, 2 /* version id */ + 4 /* correlation id */ + (2 + clientId.length) /* client id */ + - body.sizeOf(); + body.sizeOf() } override def toString(): String = { @@ -52,4 +52,4 @@ private[kafka] abstract class GenericRequestAndHeader(val versionId: Short, strBuffer.append("; Body: " + body.toString) strBuffer.toString() } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala index a4879e2..748b5e9 100644 --- a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala +++ b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala @@ -29,7 +29,7 @@ private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int, def sizeInBytes(): Int = { 4 /* correlation id */ + - body.sizeOf(); + body.sizeOf() } override def toString(): String = { @@ -43,4 +43,4 @@ private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int, strBuffer.append("; Body: " + body.toString) strBuffer.toString() } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 12eacdf..abea8b2 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -72,7 +72,7 @@ class LogCleaner(val config: CleanerConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { /* for managing the state of partitions being cleaned. package-private to allow access in tests */ - private[log] val cleanerManager = new LogCleanerManager(logDirs, logs); + private[log] val cleanerManager = new LogCleanerManager(logDirs, logs) /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */ private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, @@ -622,4 +622,4 @@ private case class LogToClean(topicPartition: TopicAndPartition, log: Log, first val cleanableRatio = dirtyBytes / totalBytes.toDouble def totalBytes = cleanBytes + dirtyBytes override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index c5fec00..b9bedde 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -87,7 +87,7 @@ class SocketServer(val brokerId: Int, quotas, connectionsMaxIdleMs, portToProtocol) - Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start(); + Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start() } } @@ -244,7 +244,7 @@ private[kafka] class Acceptor(val host: String, * Accept loop that checks for new connection attempts */ def run() { - serverChannel.register(selector, SelectionKey.OP_ACCEPT); + serverChannel.register(selector, SelectionKey.OP_ACCEPT) startupComplete() var currentProcessor = 0 while(isRunning) { @@ -480,7 +480,7 @@ private[kafka] class Processor(val id: Int, key.attach(receive) } val read = receive.readFrom(socketChannel) - val address = socketChannel.socket.getRemoteSocketAddress(); + val address = socketChannel.socket.getRemoteSocketAddress() trace(read + " bytes read from " + address) if(read < 0) { close(key) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 00265f9..6971e6e 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -74,7 +74,7 @@ object ConsoleProducer { def getOldProducerProps(config: ProducerConfig): Properties = { - val props = new Properties; + val props = new Properties props.putAll(config.extraProducerProps) @@ -100,7 +100,7 @@ object ConsoleProducer { def getNewProducerProps(config: ProducerConfig): Properties = { - val props = new Properties; + val props = new Properties props.putAll(config.extraProducerProps) diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala index 7417897..aa120ab 100644 --- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala +++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala @@ -39,7 +39,7 @@ object Mx4jLoader extends Logging { val address = props.getString("mx4jaddress", "0.0.0.0") val port = props.getInt("mx4jport", 8082) try { - debug("Will try to load MX4j now, if it's in the classpath"); + debug("Will try to load MX4j now, if it's in the classpath") val mbs = ManagementFactory.getPlatformMBeanServer() val processorName = new ObjectName("Server:name=XSLTProcessor") @@ -62,10 +62,10 @@ object Mx4jLoader extends Logging { } catch { case e: ClassNotFoundException => { - info("Will not load MX4J, mx4j-tools.jar is not in the classpath"); + info("Will not load MX4J, mx4j-tools.jar is not in the classpath") } case e: Throwable => { - warn("Could not start register mbean in JMX", e); + warn("Could not start register mbean in JMX", e) } } false diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala index d1a144d..998ade1 100644 --- a/core/src/main/scala/kafka/utils/Throttler.scala +++ b/core/src/main/scala/kafka/utils/Throttler.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.utils; +package kafka.utils import kafka.metrics.KafkaMetricsGroup import java.util.concurrent.TimeUnit @@ -95,4 +95,4 @@ object Throttler { } } } -} \ No newline at end of file +} diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 069aa02..a237383 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -107,7 +107,7 @@ class LogTest extends JUnitSuite { time.sleep(log.config.segmentMs - maxJitter) log.append(set) - assertEquals("Log does not roll on this append because it occurs earlier than max jitter", 1, log.numberOfSegments); + assertEquals("Log does not roll on this append because it occurs earlier than max jitter", 1, log.numberOfSegments) time.sleep(maxJitter - log.activeSegment.rollJitterMs + 1) log.append(set) assertEquals("Log should roll after segmentMs adjusted by random jitter", 2, log.numberOfSegments) @@ -752,7 +752,7 @@ class LogTest extends JUnitSuite { val topic: String = "test_topic" val partition:String = "143" val dir: File = new File(logDir + topicPartitionName(topic, partition)) - val topicAndPartition = Log.parseTopicPartitionName(dir); + val topicAndPartition = Log.parseTopicPartitionName(dir) assertEquals(topic, topicAndPartition.asTuple._1) assertEquals(partition.toInt, topicAndPartition.asTuple._2) } @@ -761,7 +761,7 @@ class LogTest extends JUnitSuite { def testParseTopicPartitionNameForEmptyName() { try { val dir: File = new File("") - val topicAndPartition = Log.parseTopicPartitionName(dir); + val topicAndPartition = Log.parseTopicPartitionName(dir) fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) } catch { case e: Exception => // its GOOD! @@ -772,7 +772,7 @@ class LogTest extends JUnitSuite { def testParseTopicPartitionNameForNull() { try { val dir: File = null - val topicAndPartition = Log.parseTopicPartitionName(dir); + val topicAndPartition = Log.parseTopicPartitionName(dir) fail("KafkaException should have been thrown for dir: " + dir) } catch { case e: Exception => // its GOOD! @@ -785,7 +785,7 @@ class LogTest extends JUnitSuite { val partition:String = "1999" val dir: File = new File(logDir + File.separator + topic + partition) try { - val topicAndPartition = Log.parseTopicPartitionName(dir); + val topicAndPartition = Log.parseTopicPartitionName(dir) fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) } catch { case e: Exception => // its GOOD! @@ -798,7 +798,7 @@ class LogTest extends JUnitSuite { val partition:String = "1999" val dir: File = new File(logDir + topicPartitionName(topic, partition)) try { - val topicAndPartition = Log.parseTopicPartitionName(dir); + val topicAndPartition = Log.parseTopicPartitionName(dir) fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) } catch { case e: Exception => // its GOOD! @@ -811,7 +811,7 @@ class LogTest extends JUnitSuite { val partition:String = "" val dir: File = new File(logDir + topicPartitionName(topic, partition)) try { - val topicAndPartition = Log.parseTopicPartitionName(dir); + val topicAndPartition = Log.parseTopicPartitionName(dir) fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath) } catch { case e: Exception => // its GOOD! diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala index d42108e..a2d062f 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala @@ -58,7 +58,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { case exception: Throwable => fail("Failed to create persistent path") } - Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)); + Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)) } def testMakeSurePersistsPathExistsThrowsException { @@ -88,7 +88,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { case exception: Throwable => fail("Failed to create persistent path") } - Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)); + Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path)) } def testCreateEphemeralPathThrowsException { @@ -118,7 +118,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { case exception: Throwable => fail("Failed to create ephemeral path") } - Assert.assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path)); + Assert.assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path)) } def testCreatePersistentSequentialThrowsException { @@ -150,6 +150,6 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { case exception: Throwable => fail("Failed to create persistent path") } - Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath)); + Assert.assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath)) } } -- 1.9.3 (Apple Git-50) From 9de4ff48018b8874e384fb18d738c8a4e63b51df Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 21 Apr 2015 23:58:31 +0100 Subject: [PATCH 09/20] Use `forall` instead of `foldLeft` --- .../scala/kafka/controller/ReplicaStateMachine.scala | 2 +- .../main/scala/kafka/coordinator/DelayedRebalance.scala | 3 +-- .../test/scala/unit/kafka/admin/DeleteTopicTest.scala | 17 +++++++---------- .../test/scala/unit/kafka/server/ReplicaFetchTest.scala | 5 +++-- 4 files changed, 12 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index e5c56e0..3a44fdc 100755 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -282,7 +282,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { val replicasForTopic = controller.controllerContext.replicasForTopic(topic) val replicaStatesForTopic = replicasForTopic.map(r => (r, replicaState(r))).toMap debug("Are all replicas for topic %s deleted %s".format(topic, replicaStatesForTopic)) - replicaStatesForTopic.foldLeft(true)((deletionState, r) => deletionState && r._2 == ReplicaDeletionSuccessful) + replicaStatesForTopic.forall(_._2 == ReplicaDeletionSuccessful) } def isAtLeastOneReplicaInDeletionStartedState(topic: String): Boolean = { diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala index 8defa2e..60fbdae 100644 --- a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala +++ b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala @@ -41,8 +41,7 @@ class DelayedRebalance(sessionTimeout: Long, /* check if all known consumers have requested to re-join group */ override def tryComplete(): Boolean = { - allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.foldLeft - (true) ((agg, cur) => agg && cur.joinGroupReceived.get())) + allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.forall(_.joinGroupReceived.get())) if (allConsumersJoinedGroup.get()) forceComplete() diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 61cc602..fa8ce25 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -54,7 +54,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // check if all replicas but the one that is shut down has deleted the log TestUtils.waitUntilTrue(() => servers.filter(s => s.config.brokerId != follower.config.brokerId) - .foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log.") + .forall(_.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log.") // ensure topic deletion is halted TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), "Admin path /admin/delete_topic/test path deleted even when a follower replica is down") @@ -104,8 +104,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // wait until replica log is created on every broker - TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created.") val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0) assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined) @@ -155,7 +154,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) // verify that new partition doesn't exist on any broker either TestUtils.waitUntilTrue(() => - servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty), + servers.forall(_.getLogManager().getLog(newPartition).isEmpty), "Replica logs not for new partition [test,1] not deleted after delete topic is complete.") servers.foreach(_.shutdown()) } @@ -173,7 +172,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) // verify that new partition doesn't exist on any broker either assertTrue("Replica logs not deleted after delete topic is complete", - servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty)) + servers.forall(_.getLogManager().getLog(newPartition).isEmpty)) servers.foreach(_.shutdown()) } @@ -192,7 +191,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined) // check if all replica logs are created - TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created.") servers.foreach(_.shutdown()) } @@ -207,8 +206,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // verify delete topic path for test2 is removed from zookeeper TestUtils.verifyTopicDeletion(zkClient, "test2", 1, servers) // verify that topic test is untouched - TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created") // test the topic path exists assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) @@ -267,8 +265,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) // wait until replica log is created on every broker - TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => - res && server.getLogManager().getLog(topicAndPartition).isDefined), + TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined), "Replicas for topic test not created") servers } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index a67cc37..a3a03db 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -65,8 +65,9 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { for (topic <- List(topic1, topic2)) { val topicAndPart = TopicAndPartition(topic, partition) val expectedOffset = brokers.head.getLogManager().getLog(topicAndPart).get.logEndOffset - result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total && - (expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset) } + result = result && expectedOffset > 0 && brokers.forall { item => + (expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset) + } } result } -- 1.9.3 (Apple Git-50) From a47e1a0c9f23ebc8542d0f40fe4a8408c83f2d15 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 22 Apr 2015 00:03:13 +0100 Subject: [PATCH 10/20] Remove `javaListToScalaBuffer` implicit No need to support 2.8.x. --- core/src/main/scala/kafka/javaapi/Implicits.scala | 5 ----- .../src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala | 6 +++--- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala index 8baf4d4..c69b0a3 100644 --- a/core/src/main/scala/kafka/javaapi/Implicits.scala +++ b/core/src/main/scala/kafka/javaapi/Implicits.scala @@ -47,9 +47,4 @@ private[javaapi] object Implicits extends Logging { } } - // used explicitly by ByteBufferMessageSet constructor as due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors - implicit def javaListToScalaBuffer[A](l: java.util.List[A]) = { - import scala.collection.JavaConversions._ - l: collection.mutable.Buffer[A] - } } diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala index 0125565..df30279 100644 --- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala @@ -19,14 +19,14 @@ package kafka.javaapi.message import java.util.concurrent.atomic.AtomicLong import java.nio.ByteBuffer import kafka.message._ -import kafka.javaapi.Implicits.javaListToScalaBuffer + +import scala.collection.JavaConverters._ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet { private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer) def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) { - // due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors and must be used explicitly - this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), javaListToScalaBuffer(messages).toSeq : _*).buffer) + this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), messages.asScala: _*).buffer) } def this(messages: java.util.List[Message]) { -- 1.9.3 (Apple Git-50) From 1c2f8fa6f7b75964298001e3f50df9e9c6c63acf Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 22 Apr 2015 00:22:58 +0100 Subject: [PATCH 11/20] Remove unnecessary casts --- contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java | 2 +- core/src/main/scala/kafka/message/MessageSet.scala | 2 +- core/src/main/scala/kafka/utils/CoreUtils.scala | 2 +- examples/src/main/java/kafka/examples/SimpleConsumerDemo.java | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java index 1d0e0a9..c9b9018 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java @@ -159,7 +159,7 @@ public class KafkaETLContext { _response = _consumer.fetch(fetchRequest); if(_response != null) { _respIterator = new ArrayList(){{ - add((ByteBufferMessageSet) _response.messageSet(_request.getTopic(), _request.getPartition())); + add(_response.messageSet(_request.getTopic(), _request.getPartition())); }}.iterator(); } _requestTime += (System.currentTimeMillis() - tempTime); diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala index f1b8432..28b56e6 100644 --- a/core/src/main/scala/kafka/message/MessageSet.scala +++ b/core/src/main/scala/kafka/message/MessageSet.scala @@ -43,7 +43,7 @@ object MessageSet { var size = 0 val iter = messages.iterator while(iter.hasNext) { - val message = iter.next.asInstanceOf[Message] + val message = iter.next size += entrySize(message) } size diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index c473a03..81863b3 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -230,7 +230,7 @@ object CoreUtils extends Logging { def createObject[T<:AnyRef](className: String, args: AnyRef*): T = { val klass = Class.forName(className).asInstanceOf[Class[T]] val constructor = klass.getConstructor(args.map(_.getClass): _*) - constructor.newInstance(args: _*).asInstanceOf[T] + constructor.newInstance(args: _*) } /** diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java index e5096f0..c43b461 100644 --- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java +++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java @@ -68,7 +68,7 @@ public class SimpleConsumerDemo { .addFetch(KafkaProperties.topic2, 0, 0L, 100) .build(); FetchResponse fetchResponse = simpleConsumer.fetch(req); - printMessages((ByteBufferMessageSet) fetchResponse.messageSet(KafkaProperties.topic2, 0)); + printMessages(fetchResponse.messageSet(KafkaProperties.topic2, 0)); System.out.println("Testing single multi-fetch"); Map> topicMap = new HashMap>(); @@ -85,7 +85,7 @@ public class SimpleConsumerDemo { String topic = entry.getKey(); for ( Integer offset : entry.getValue()) { System.out.println("Response from fetch request no: " + ++fetchReq); - printMessages((ByteBufferMessageSet) fetchResponse.messageSet(topic, offset)); + printMessages(fetchResponse.messageSet(topic, offset)); } } } -- 1.9.3 (Apple Git-50) From 4cdf00b77f7a0571c820126ac10864c18d6f92c8 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 22 Apr 2015 00:25:40 +0100 Subject: [PATCH 12/20] Remove unnecessary `toSeq` --- core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala | 2 +- core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index bbe3362..acaa611 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -127,7 +127,7 @@ object ReassignPartitionsCommand extends Logging { } val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned.toMap) // before starting assignment, output the current replica assignment to facilitate rollback - val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq) + val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic)) println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback" .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment))) // start the reassignment diff --git a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala index 46a4e89..fbd245c 100644 --- a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala +++ b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala @@ -22,7 +22,7 @@ import org.junit.{Test, After, Before} class IteratorTemplateTest extends Assertions { - val lst = (0 until 10).toSeq + val lst = (0 until 10) val iterator = new IteratorTemplate[Int]() { var i = 0 override def makeNext() = { @@ -54,4 +54,4 @@ class IteratorTemplateTest extends Assertions { } } -} \ No newline at end of file +} -- 1.9.3 (Apple Git-50) From 45d6511199d7d11fc95000495e0b65e2646a6d76 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 22 Apr 2015 00:26:40 +0100 Subject: [PATCH 13/20] Simplify boolean logic --- core/src/main/scala/kafka/controller/ControllerChannelManager.scala | 4 ++-- core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 97acdb2..1b22310 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -309,8 +309,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging } updateMetadataRequestMap.clear() stopReplicaRequestMap foreach { case(broker, replicaInfoList) => - val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet - val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet + val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet + val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet debug("The stop replica request (delete = true) sent to broker %d is %s" .format(broker, stopReplicaWithDelete.mkString(","))) debug("The stop replica request (delete = false) sent to broker %d is %s" diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala index 30fd0ea..0d6da34 100755 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala @@ -52,7 +52,7 @@ object KafkaMetricsReporter { def startReporters (verifiableProps: VerifiableProperties) { ReporterStarted synchronized { - if (ReporterStarted.get() == false) { + if (!ReporterStarted.get()) { val metricsConfig = new KafkaMetricsConfig(verifiableProps) if(metricsConfig.reporters.size > 0) { metricsConfig.reporters.foreach(reporterType => { -- 1.9.3 (Apple Git-50) From 7e3dfc970ee21b10ff21c9045630c2b789fd2801 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 22 Apr 2015 00:51:17 +0100 Subject: [PATCH 14/20] Use `sum` instead of `foldLeft` --- core/src/main/scala/kafka/tools/JmxTool.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index 1d1a120..c2b2030 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -103,7 +103,7 @@ object JmxTool extends Logging { // print csv header val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted - if(keys.size == numExpectedAttributes.map(_._2).foldLeft(0)(_ + _) + 1) + if(keys.size == numExpectedAttributes.map(_._2).sum + 1) println(keys.map("\"" + _ + "\"").mkString(",")) while(true) { @@ -113,7 +113,7 @@ object JmxTool extends Logging { case Some(dFormat) => dFormat.format(new Date) case None => System.currentTimeMillis().toString } - if(attributes.keySet.size == numExpectedAttributes.map(_._2).foldLeft(0)(_ + _) + 1) + if(attributes.keySet.size == numExpectedAttributes.map(_._2).sum + 1) println(keys.map(attributes(_)).mkString(",")) val sleep = max(0, interval - (System.currentTimeMillis - start)) Thread.sleep(sleep) @@ -137,4 +137,4 @@ object JmxTool extends Logging { attributes } -} \ No newline at end of file +} -- 1.9.3 (Apple Git-50) From 8dbf2571383d8cdbbe74315a15fa55cbcb82409a Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 22 Apr 2015 00:52:40 +0100 Subject: [PATCH 15/20] Use `foreach` instead of `map` --- core/src/main/scala/kafka/utils/CoreUtils.scala | 2 +- core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala | 6 +++--- .../test/scala/integration/kafka/api/IntegrationTestHarness.scala | 6 +++--- core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala | 4 ++-- .../test/scala/unit/kafka/integration/KafkaServerTestHarness.scala | 4 ++-- core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala | 4 ++-- .../scala/unit/kafka/integration/UncleanLeaderElectionTest.scala | 4 ++-- core/src/test/scala/unit/kafka/log/LogManagerTest.scala | 2 +- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala | 4 ++-- 9 files changed, 18 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index 81863b3..98abc45 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -102,7 +102,7 @@ object CoreUtils extends Logging { * Recursively delete the list of files/directories and any subfiles (if any exist) * @param files sequence of files to be deleted */ - def rm(files: Seq[String]): Unit = files.map(f => rm(new File(f))) + def rm(files: Seq[String]): Unit = files.foreach(f => rm(new File(f))) /** * Recursively delete the given file/directory and any subfiles (if any exist) diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 35f4f46..5c4cca6 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -71,7 +71,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { def consumeWithBrokerFailures(numIters: Int) { val numRecords = 1000 sendRecords(numRecords) - this.producers.map(_.close) + this.producers.foreach(_.close) var consumed = 0 val consumer = this.consumers(0) @@ -100,7 +100,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { def seekAndCommitWithBrokerFailures(numIters: Int) { val numRecords = 1000 sendRecords(numRecords) - this.producers.map(_.close) + this.producers.foreach(_.close) val consumer = this.consumers(0) consumer.subscribe(tp) @@ -151,4 +151,4 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { } futures.map(_.get) } -} \ No newline at end of file +} diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 02d2627..2bbd4c9 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -44,7 +44,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { override def generateConfigs() = { val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect) - cfgs.map(_.putAll(serverConfig)) + cfgs.foreach(_.putAll(serverConfig)) cfgs.map(KafkaConfig.fromProps) } @@ -70,8 +70,8 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { } override def tearDown() { - producers.map(_.close()) - consumers.map(_.close()) + producers.foreach(_.close()) + consumers.foreach(_.close()) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index ab5d16c..df5c6ba 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -55,8 +55,8 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { } override def tearDown() { - servers.map(server => server.shutdown()) - servers.map(server => CoreUtils.rm(server.config.logDirs)) + servers.foreach(_.shutdown()) + servers.foreach(server => CoreUtils.rm(server.config.logDirs)) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 447e421..87c6315 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -63,8 +63,8 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { } override def tearDown() { - servers.map(server => server.shutdown()) - servers.map(server => server.config.logDirs.map(CoreUtils.rm(_))) + servers.foreach(_.shutdown()) + servers.foreach(_.config.logDirs.foreach(CoreUtils.rm(_))) super.tearDown } diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala index 1113619..12d0733 100755 --- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -40,8 +40,8 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { } override def tearDown() { - servers.map(server => server.shutdown()) - servers.map(server => CoreUtils.rm(server.config.logDirs)) + servers.foreach(_.shutdown()) + servers.foreach(server => CoreUtils.rm(server.config.logDirs)) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 5b7b529..e4bf2df 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -78,8 +78,8 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { } override def tearDown() { - servers.map(server => shutdownServer(server)) - servers.map(server => CoreUtils.rm(server.config.logDirs)) + servers.foreach(server => shutdownServer(server)) + servers.foreach(server => CoreUtils.rm(server.config.logDirs)) // restore log levels kafkaApisLogger.setLevel(Level.ERROR) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 0a26f5f..01dfbc4 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -48,7 +48,7 @@ class LogManagerTest extends JUnit3Suite { if(logManager != null) logManager.shutdown() CoreUtils.rm(logDir) - logManager.logDirs.map(CoreUtils.rm(_)) + logManager.logDirs.foreach(CoreUtils.rm(_)) super.tearDown() } diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 26572f7..f1977d8 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -49,8 +49,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { } override def tearDown() { - servers.map(server => server.shutdown()) - servers.map(server => CoreUtils.rm(server.config.logDirs)) + servers.foreach(_.shutdown()) + servers.foreach(server => CoreUtils.rm(server.config.logDirs)) super.tearDown() } -- 1.9.3 (Apple Git-50) From 43e5638bba1687c3882eea89aac8efe452a95499 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 22 Apr 2015 00:53:14 +0100 Subject: [PATCH 16/20] Use `reverseMap` instead of `reverse.map` --- core/src/test/scala/unit/kafka/admin/AdminTest.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 4b728a1..efb2f8e 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -301,7 +301,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) - val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) + val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s)) // broker 2 should be the leader since it was started first val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None).get // trigger preferred replica election @@ -319,7 +319,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { val partition = 1 // create brokers val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps) - val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s)) + val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s)) // create the topic TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers) @@ -330,7 +330,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { try { // wait for the update metadata request to trickle to the brokers TestUtils.waitUntilTrue(() => - activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), + activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), "Topic test not created after timeout") assertEquals(0, partitionsRemaining.size) var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get @@ -346,11 +346,11 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader assertEquals(0, leaderAfterShutdown) - assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) + assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) partitionsRemaining = controller.shutdownBroker(0) assertEquals(1, partitionsRemaining.size) // leader doesn't change since all the replicas are shut down - assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) + assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) } finally { servers.foreach(_.shutdown()) @@ -397,7 +397,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { checkConfig(2*maxMessageSize, 2 * retentionMs) } finally { server.shutdown() - server.config.logDirs.map(CoreUtils.rm(_)) + server.config.logDirs.foreach(CoreUtils.rm(_)) } } -- 1.9.3 (Apple Git-50) From 4b21d6a0d3afc521dd6b8aff0f33c642c5f68dba Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 22 Apr 2015 00:54:36 +0100 Subject: [PATCH 17/20] Use `contains` and fix comparison to use the same type for both parameters Also use pattern matching to extract `topic` and `partitions` more concisely. --- core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala index aef8361..4fb519b 100644 --- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala +++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala @@ -79,9 +79,7 @@ object VerifyConsumerRebalance extends Logging { val consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics = false) val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, consumersPerTopicMap.keySet.toSeq) - partitionsPerTopicMap.foreach { partitionsForTopic => - val topic = partitionsForTopic._1 - val partitions = partitionsForTopic._2 + partitionsPerTopicMap.foreach { case (topic, partitions) => val topicDirs = new ZKGroupTopicDirs(group, topic) info("Alive partitions for topic %s are %s ".format(topic, partitions.toString)) info("Alive consumers for topic %s => %s ".format(topic, consumersPerTopicMap.get(topic))) @@ -95,8 +93,8 @@ object VerifyConsumerRebalance extends Logging { // for each available partition for topic, check if an owner exists partitions.foreach { partition => - // check if there is a node for [partition] - if(!partitionsWithOwners.exists(p => p.equals(partition))) { + // check if there is a node for [partition] + if(!partitionsWithOwners.contains(partition.toString)) { error("No owner for partition [%s,%d]".format(topic, partition)) rebalanceSucceeded = false } -- 1.9.3 (Apple Git-50) From a17be4f6d57c3f873293dc70c243532e7a59ba3b Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 22 Apr 2015 00:54:58 +0100 Subject: [PATCH 18/20] Remove unnecessary `toInt` --- core/src/main/scala/kafka/log/OffsetMap.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala index 2940e47..303aad5 100755 --- a/core/src/main/scala/kafka/log/OffsetMap.scala +++ b/core/src/main/scala/kafka/log/OffsetMap.scala @@ -69,7 +69,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend /** * The maximum number of entries this map can contain */ - val slots: Int = (memory / bytesPerEntry).toInt + val slots: Int = memory / bytesPerEntry /** * Associate this offset to the given key. @@ -177,4 +177,4 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend digest.digest(buffer, 0, hashSize) } -} \ No newline at end of file +} -- 1.9.3 (Apple Git-50) From b70f0e7f508e170283fa825226ed96022326355c Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 22 Apr 2015 00:55:24 +0100 Subject: [PATCH 19/20] Fix inconsistency in calls to `format` --- core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala | 2 +- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala index 01e8f72..6e8d68d 100755 --- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala +++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala @@ -72,7 +72,7 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging { } } catch { case e: FileNotFoundException => - warn("No meta.properties file under dir %s".format(file.getAbsolutePath(), e.getMessage)) + warn("No meta.properties file under dir %s".format(file.getAbsolutePath())) None case e1: Exception => error("Failed to read meta.properties file under dir %s due to %s".format(file.getAbsolutePath(), e1.getMessage)) diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index dec9516..1c2023c 100755 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -148,7 +148,7 @@ object SimpleConsumerShell extends Logging { if(replicaId == UseLeaderReplica) { replicaOpt = partitionMetadataOpt.get.leader if(!replicaOpt.isDefined) { - System.err.println("Error: user speicifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(replicaId, topic, partitionId)) + System.err.println("Error: user specifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(topic, partitionId)) System.exit(1) } } -- 1.9.3 (Apple Git-50) From 122e78b123c54cd614443799570eb7c7d327ec1e Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Wed, 22 Apr 2015 00:55:42 +0100 Subject: [PATCH 20/20] Remove redundant `toString` call --- core/src/test/scala/unit/kafka/log/LogTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index a237383..76d3bfd 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -302,7 +302,7 @@ class LogTest extends JUnitSuite { assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset) assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", currOffset, - log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset) + log.append(TestUtils.singleMessageSet("hello".getBytes)).firstOffset) // cleanup the log log.delete() -- 1.9.3 (Apple Git-50)