diff --git core/src/main/scala/kafka/Kafka.scala core/src/main/scala/kafka/Kafka.scala index 0fe6471..84a4a05 100644 --- core/src/main/scala/kafka/Kafka.scala +++ core/src/main/scala/kafka/Kafka.scala @@ -35,7 +35,7 @@ object Kafka extends Logging { val verifiableProps = serverConfig.props val metricsConfig = new KafkaMetricsConfig(verifiableProps) metricsConfig.reporters.foreach(reporterType => { - val reporter = Utils.getObject[KafkaMetricsReporter](reporterType) + val reporter = Utils.createObject[KafkaMetricsReporter](reporterType) reporter.init(verifiableProps) if (reporter.isInstanceOf[KafkaMetricsReporterMBean]) Utils.registerMBean(reporter, reporter.asInstanceOf[KafkaMetricsReporterMBean].getMBeanName) diff --git core/src/main/scala/kafka/admin/AdminUtils.scala core/src/main/scala/kafka/admin/AdminUtils.scala index 40976c5..9339224 100644 --- core/src/main/scala/kafka/admin/AdminUtils.scala +++ core/src/main/scala/kafka/admin/AdminUtils.scala @@ -18,7 +18,8 @@ package kafka.admin import java.util.Random -import kafka.api.{TopicMetadata, PartitionMetadata} +import kafka.api.{TopicMetadata, PartitionMetadata, TopicMetadataRequest, TopicMetadataResponse} +import kafka.common._ import kafka.cluster.Broker import kafka.utils.{Logging, Utils, ZkUtils} import org.I0Itec.zkclient.ZkClient diff --git core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala index 981d510..0d32392 100644 --- core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala +++ core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala @@ -33,12 +33,12 @@ object CheckReassignmentStatus extends Logging { val jsonFile = options.valueOf(jsonFileOpt) val zkConnect = options.valueOf(zkConnectOpt) - val jsonString = Utils.readFileIntoString(jsonFile) + val jsonString = Utils.readFileAsString(jsonFile) val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) try { // read the json file into a string - val partitionsToBeReassigned = SyncJSON.parseFull(jsonString) match { + val partitionsToBeReassigned = SyncJson.parseFull(jsonString) match { case Some(reassignedPartitions) => val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]] partitions.map { m => diff --git core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 1a1a900..7b28632 100644 --- core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -50,12 +50,12 @@ object ReassignPartitionsCommand extends Logging { val jsonFile = options.valueOf(jsonFileOpt) val zkConnect = options.valueOf(zkConnectOpt) - val jsonString = Utils.readFileIntoString(jsonFile) + val jsonString = Utils.readFileAsString(jsonFile) var zkClient: ZkClient = null try { // read the json file into a string - val partitionsToBeReassigned = SyncJSON.parseFull(jsonString) match { + val partitionsToBeReassigned = SyncJson.parseFull(jsonString) match { case Some(reassignedPartitions) => val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]] partitions.map { m => diff --git core/src/main/scala/kafka/api/ApiUtils.scala core/src/main/scala/kafka/api/ApiUtils.scala new file mode 100644 index 0000000..ba1d199 --- /dev/null +++ core/src/main/scala/kafka/api/ApiUtils.scala @@ -0,0 +1,92 @@ +package kafka.api + +import java.nio._ +import kafka.common._ + +/** + * Helper functions specific to parsing or serializing requests and responses + */ +object ApiUtils { + + val ProtocolEncoding = "UTF-8" + + /** + * Read size prefixed string where the size is stored as a 2 byte short. + * @param buffer The buffer to read from + */ + def readShortString(buffer: ByteBuffer): String = { + val size: Int = buffer.getShort() + if(size < 0) + return null + val bytes = new Array[Byte](size) + buffer.get(bytes) + new String(bytes, ProtocolEncoding) + } + + /** + * Write a size prefixed string where the size is stored as a 2 byte short + * @param buffer The buffer to write to + * @param string The string to write + */ + def writeShortString(buffer: ByteBuffer, string: String) { + if(string == null) { + buffer.putShort(-1) + } else if(string.length > Short.MaxValue) { + throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".") + } else { + buffer.putShort(string.length.asInstanceOf[Short]) + buffer.put(string.getBytes(ProtocolEncoding)) + } + } + + /** + * Return size of a size prefixed string where the size is stored as a 2 byte short + * @param string The string to write + */ + def shortStringLength(string: String): Int = { + if(string == null) { + 2 + } else { + val encodedString = string.getBytes(ProtocolEncoding) + if(encodedString.length > Short.MaxValue) { + throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".") + } else { + 2 + encodedString.length + } + } + } + + /** + * Read an integer out of the bytebuffer from the current position and check that it falls within the given + * range. If not, throw KafkaException. + */ + def readIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = { + val value = buffer.getInt + if(value < range._1 || value > range._2) + throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".") + else value + } + + /** + * Read a short out of the bytebuffer from the current position and check that it falls within the given + * range. If not, throw KafkaException. + */ + def readShortInRange(buffer: ByteBuffer, name: String, range: (Short, Short)): Short = { + val value = buffer.getShort + if(value < range._1 || value > range._2) + throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".") + else value + } + + /** + * Read a long out of the bytebuffer from the current position and check that it falls within the given + * range. If not, throw KafkaException. + */ + def readLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = { + val value = buffer.getLong + if(value < range._1 || value > range._2) + throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".") + else value + } + +} \ No newline at end of file diff --git core/src/main/scala/kafka/api/FetchRequest.scala core/src/main/scala/kafka/api/FetchRequest.scala index 41ac09c..3faa904 100644 --- core/src/main/scala/kafka/api/FetchRequest.scala +++ core/src/main/scala/kafka/api/FetchRequest.scala @@ -18,7 +18,8 @@ package kafka.api import java.nio.ByteBuffer -import kafka.utils.{nonthreadsafe, Utils} +import kafka.utils.nonthreadsafe +import kafka.api.ApiUtils._ import scala.collection.immutable.Map import kafka.common.TopicAndPartition @@ -36,13 +37,13 @@ object FetchRequest { def readFrom(buffer: ByteBuffer): FetchRequest = { val versionId = buffer.getShort val correlationId = buffer.getInt - val clientId = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val clientId = readShortString(buffer) val replicaId = buffer.getInt val maxWait = buffer.getInt val minBytes = buffer.getInt val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { - val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val topic = readShortString(buffer) val partitionCount = buffer.getInt (1 to partitionCount).map(_ => { val partitionId = buffer.getInt @@ -72,14 +73,14 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) buffer.putInt(correlationId) - Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset) + writeShortString(buffer, clientId) buffer.putInt(replicaId) buffer.putInt(maxWait) buffer.putInt(minBytes) buffer.putInt(requestInfoGroupedByTopic.size) // topic count requestInfoGroupedByTopic.foreach { case (topic, partitionFetchInfos) => - Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) + writeShortString(buffer, topic) buffer.putInt(partitionFetchInfos.size) // partition count partitionFetchInfos.foreach { case (TopicAndPartition(_, partition), PartitionFetchInfo(offset, fetchSize)) => @@ -93,7 +94,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, def sizeInBytes: Int = { 2 + /* versionId */ 4 + /* correlationId */ - Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + + shortStringLength(clientId) + 4 + /* replicaId */ 4 + /* maxWait */ 4 + /* minBytes */ @@ -101,7 +102,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { val (topic, partitionFetchInfos) = currTopic foldedTopics + - Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) + + shortStringLength(topic) + 4 + /* partition count */ partitionFetchInfos.size * ( 4 + /* partition id */ diff --git core/src/main/scala/kafka/api/FetchResponse.scala core/src/main/scala/kafka/api/FetchResponse.scala index 8ccc5a7..5234479 100644 --- core/src/main/scala/kafka/api/FetchResponse.scala +++ core/src/main/scala/kafka/api/FetchResponse.scala @@ -22,7 +22,7 @@ import java.nio.channels.GatheringByteChannel import kafka.common.{TopicAndPartition, ErrorMapping} import kafka.message.{MessageSet, ByteBufferMessageSet} import kafka.network.{MultiSend, Send} -import kafka.utils.Utils +import kafka.api.ApiUtils._ object FetchResponsePartitionData { def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = { @@ -84,7 +84,7 @@ class PartitionDataSend(val partitionData: FetchResponsePartitionData) extends S object TopicData { def readFrom(buffer: ByteBuffer): TopicData = { - val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val topic = readShortString(buffer) val partitionCount = buffer.getInt val topicPartitionDataPairs = (1 to partitionCount).map(_ => { val partitionData = FetchResponsePartitionData.readFrom(buffer) @@ -94,7 +94,7 @@ object TopicData { } def headerSize(topic: String) = - Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) + + shortStringLength(topic) + 4 /* partition count */ } @@ -113,7 +113,7 @@ class TopicDataSend(val topicData: TopicData) extends Send { override def complete = sent >= size private val buffer = ByteBuffer.allocate(topicData.headerSize) - Utils.writeShortString(buffer, topicData.topic, RequestOrResponse.DefaultCharset) + writeShortString(buffer, topicData.topic) buffer.putInt(topicData.partitionData.size) buffer.rewind() diff --git core/src/main/scala/kafka/api/LeaderAndISRResponse.scala core/src/main/scala/kafka/api/LeaderAndISRResponse.scala index 41ec8fe..1afbde5 100644 --- core/src/main/scala/kafka/api/LeaderAndISRResponse.scala +++ core/src/main/scala/kafka/api/LeaderAndISRResponse.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import kafka.utils.Utils import collection.mutable.HashMap import collection.Map +import kafka.api.ApiUtils._ object LeaderAndISRResponse { @@ -31,7 +32,7 @@ object LeaderAndISRResponse { val numEntries = buffer.getInt val responseMap = new HashMap[(String, Int), Short]() for (i<- 0 until numEntries){ - val topic = Utils.readShortString(buffer, "UTF-8") + val topic = readShortString(buffer) val partition = buffer.getInt val partitionErrorCode = buffer.getShort responseMap.put((topic, partition), partitionErrorCode) @@ -58,7 +59,7 @@ case class LeaderAndISRResponse(versionId: Short, buffer.putShort(errorCode) buffer.putInt(responseMap.size) for ((key:(String, Int), value) <- responseMap){ - Utils.writeShortString(buffer, key._1, "UTF-8") + writeShortString(buffer, key._1) buffer.putInt(key._2) buffer.putShort(value) } diff --git core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index 26f2bd8..6c99a7a 100644 --- core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -20,6 +20,7 @@ package kafka.api import java.nio._ import kafka.utils._ +import kafka.api.ApiUtils._ import collection.mutable.Map import collection.mutable.HashMap @@ -30,7 +31,7 @@ object LeaderAndIsr { def readFrom(buffer: ByteBuffer): LeaderAndIsr = { val leader = buffer.getInt val leaderGenId = buffer.getInt - val ISRString = Utils.readShortString(buffer, "UTF-8") + val ISRString = readShortString(buffer) val ISR = ISRString.split(",").map(_.toInt).toList val zkVersion = buffer.getInt new LeaderAndIsr(leader, leaderGenId, ISR, zkVersion) @@ -43,7 +44,7 @@ case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int def writeTo(buffer: ByteBuffer) { buffer.putInt(leader) buffer.putInt(leaderEpoch) - Utils.writeShortString(buffer, isr.mkString(","), "UTF-8") + writeShortString(buffer, isr.mkString(",")) buffer.putInt(zkVersion) } @@ -57,7 +58,7 @@ case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int jsonDataMap.put("leader", leader.toString) jsonDataMap.put("leaderEpoch", leaderEpoch.toString) jsonDataMap.put("ISR", isr.mkString(",")) - Utils.stringMapToJsonString(jsonDataMap) + Utils.stringMapToJson(jsonDataMap) } } @@ -71,13 +72,13 @@ object LeaderAndIsrRequest { def readFrom(buffer: ByteBuffer): LeaderAndIsrRequest = { val versionId = buffer.getShort - val clientId = Utils.readShortString(buffer) + val clientId = readShortString(buffer) val ackTimeoutMs = buffer.getInt val leaderAndISRRequestCount = buffer.getInt val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndIsr] for(i <- 0 until leaderAndISRRequestCount){ - val topic = Utils.readShortString(buffer, "UTF-8") + val topic = readShortString(buffer) val partition = buffer.getInt val leaderAndISRRequest = LeaderAndIsr.readFrom(buffer) @@ -100,11 +101,11 @@ case class LeaderAndIsrRequest (versionId: Short, def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) - Utils.writeShortString(buffer, clientId) + writeShortString(buffer, clientId) buffer.putInt(ackTimeoutMs) buffer.putInt(leaderAndISRInfos.size) for((key, value) <- leaderAndISRInfos){ - Utils.writeShortString(buffer, key._1, "UTF-8") + writeShortString(buffer, key._1) buffer.putInt(key._2) value.writeTo(buffer) } diff --git core/src/main/scala/kafka/api/OffsetRequest.scala core/src/main/scala/kafka/api/OffsetRequest.scala index cf90a7a..03b66e8 100644 --- core/src/main/scala/kafka/api/OffsetRequest.scala +++ core/src/main/scala/kafka/api/OffsetRequest.scala @@ -20,6 +20,7 @@ package kafka.api import java.nio.ByteBuffer import kafka.utils.Utils import kafka.common.TopicAndPartition +import kafka.api.ApiUtils._ object OffsetRequest { @@ -33,11 +34,11 @@ object OffsetRequest { def readFrom(buffer: ByteBuffer): OffsetRequest = { val versionId = buffer.getShort - val clientId = Utils.readShortString(buffer) + val clientId = readShortString(buffer) val replicaId = buffer.getInt val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { - val topic = Utils.readShortString(buffer) + val topic = readShortString(buffer) val partitionCount = buffer.getInt (1 to partitionCount).map(_ => { val partitionId = buffer.getInt @@ -62,13 +63,13 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) - Utils.writeShortString(buffer, clientId) + writeShortString(buffer, clientId) buffer.putInt(replicaId) buffer.putInt(requestInfoGroupedByTopic.size) // topic count requestInfoGroupedByTopic.foreach { case((topic, partitionInfos)) => - Utils.writeShortString(buffer, topic) + writeShortString(buffer, topic) buffer.putInt(partitionInfos.size) // partition count partitionInfos.foreach { case (TopicAndPartition(_, partition), partitionInfo) => @@ -81,13 +82,13 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ def sizeInBytes = 2 + /* versionId */ - Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + + shortStringLength(clientId) + 4 + /* replicaId */ 4 + /* topic count */ requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { val (topic, partitionInfos) = currTopic foldedTopics + - Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) + + shortStringLength(topic) + 4 + /* partition count */ partitionInfos.size * ( 4 + /* partition */ diff --git core/src/main/scala/kafka/api/OffsetResponse.scala core/src/main/scala/kafka/api/OffsetResponse.scala index 242b496..a803867 100644 --- core/src/main/scala/kafka/api/OffsetResponse.scala +++ core/src/main/scala/kafka/api/OffsetResponse.scala @@ -20,6 +20,7 @@ package kafka.api import java.nio.ByteBuffer import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.utils.Utils +import kafka.api.ApiUtils._ object OffsetResponse { @@ -28,7 +29,7 @@ object OffsetResponse { val versionId = buffer.getShort val numTopics = buffer.getInt val pairs = (1 to numTopics).flatMap(_ => { - val topic = Utils.readShortString(buffer) + val topic = readShortString(buffer) val numPartitions = buffer.getInt (1 to numPartitions).map(_ => { val partition = buffer.getInt @@ -61,7 +62,7 @@ case class OffsetResponse(versionId: Short, offsetsGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { val (topic, errorAndOffsetsMap) = currTopic foldedTopics + - Utils.shortStringLength(topic) + + shortStringLength(topic) + 4 + /* partition count */ errorAndOffsetsMap.foldLeft(0)((foldedPartitions, currPartition) => { foldedPartitions + @@ -78,7 +79,7 @@ case class OffsetResponse(versionId: Short, buffer.putInt(offsetsGroupedByTopic.size) // topic count offsetsGroupedByTopic.foreach { case((topic, errorAndOffsetsMap)) => - Utils.writeShortString(buffer, topic) + writeShortString(buffer, topic) buffer.putInt(errorAndOffsetsMap.size) // partition count errorAndOffsetsMap.foreach { case((TopicAndPartition(_, partition), errorAndOffsets)) => diff --git core/src/main/scala/kafka/api/ProducerRequest.scala core/src/main/scala/kafka/api/ProducerRequest.scala index 3b3332d..3eb8e91 100644 --- core/src/main/scala/kafka/api/ProducerRequest.scala +++ core/src/main/scala/kafka/api/ProducerRequest.scala @@ -22,6 +22,7 @@ import kafka.message._ import kafka.utils._ import scala.collection.Map import kafka.common.TopicAndPartition +import kafka.api.ApiUtils._ object ProducerRequestPartitionData { def readFrom(buffer: ByteBuffer): ProducerRequestPartitionData = { @@ -49,14 +50,14 @@ object ProducerRequest { def readFrom(buffer: ByteBuffer): ProducerRequest = { val versionId: Short = buffer.getShort val correlationId: Int = buffer.getInt - val clientId: String = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val clientId: String = readShortString(buffer) val requiredAcks: Short = buffer.getShort val ackTimeoutMs: Int = buffer.getInt //build the topic structure val topicCount = buffer.getInt val partitionDataPairs = (1 to topicCount).flatMap(_ => { // process topic - val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val topic = readShortString(buffer) val partitionCount = buffer.getInt (1 to partitionCount).map(_ => { val partition = buffer.getInt @@ -95,7 +96,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) buffer.putInt(correlationId) - Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset) + writeShortString(buffer, clientId) buffer.putShort(requiredAcks) buffer.putInt(ackTimeoutMs) @@ -103,7 +104,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, buffer.putInt(dataGroupedByTopic.size) //the number of topics dataGroupedByTopic.foreach { case (topic, topicAndPartitionData) => - Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) //write the topic + writeShortString(buffer, topic) //write the topic buffer.putInt(topicAndPartitionData.size) //the number of partitions topicAndPartitionData.foreach(partitionAndData => { val partitionData = partitionAndData._2 @@ -119,13 +120,13 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, def sizeInBytes: Int = { 2 + /* versionId */ 4 + /* correlationId */ - Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + /* client id */ + shortStringLength(clientId) + /* client id */ 2 + /* requiredAcks */ 4 + /* ackTimeoutMs */ 4 + /* number of topics */ dataGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { foldedTopics + - Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) + + shortStringLength(currTopic._1) + 4 + /* the number of partitions */ { currTopic._2.foldLeft(0)((foldedPartitions, currPartition) => { diff --git core/src/main/scala/kafka/api/ProducerResponse.scala core/src/main/scala/kafka/api/ProducerResponse.scala index 6de9c93..f56a93a 100644 --- core/src/main/scala/kafka/api/ProducerResponse.scala +++ core/src/main/scala/kafka/api/ProducerResponse.scala @@ -21,6 +21,7 @@ import java.nio.ByteBuffer import kafka.utils.Utils import scala.collection.Map import kafka.common.{TopicAndPartition, ErrorMapping} +import kafka.api.ApiUtils._ object ProducerResponse { @@ -29,7 +30,7 @@ object ProducerResponse { val correlationId = buffer.getInt val topicCount = buffer.getInt val statusPairs = (1 to topicCount).flatMap(_ => { - val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val topic = readShortString(buffer) val partitionCount = buffer.getInt (1 to partitionCount).map(_ => { val partition = buffer.getInt @@ -64,7 +65,7 @@ case class ProducerResponse(versionId: Short, 4 + /* topic count */ groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => { foldedTopics + - Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) + + shortStringLength(currTopic._1) + 4 + /* partition count for this topic */ currTopic._2.size * { 4 + /* partition id */ @@ -83,7 +84,7 @@ case class ProducerResponse(versionId: Short, groupedStatus.foreach(topicStatus => { val (topic, errorsAndOffsets) = topicStatus - Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) + writeShortString(buffer, topic) buffer.putInt(errorsAndOffsets.size) // partition count errorsAndOffsets.foreach { case((TopicAndPartition(_, partition), ProducerResponseStatus(error, nextOffset))) => diff --git core/src/main/scala/kafka/api/RequestKeys.scala core/src/main/scala/kafka/api/RequestKeys.scala index 94e13f0..b000eb7 100644 --- core/src/main/scala/kafka/api/RequestKeys.scala +++ core/src/main/scala/kafka/api/RequestKeys.scala @@ -29,12 +29,12 @@ object RequestKeys { val StopReplicaKey: Short = 5 val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= - Map( ProduceKey -> ("Produce", ProducerRequest.readFrom), - FetchKey -> ("Fetch", FetchRequest.readFrom), - OffsetsKey -> ("Offsets", OffsetRequest.readFrom), - MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom), - LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom), - StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom) ) + Map(ProduceKey -> ("Produce", ProducerRequest.readFrom), + FetchKey -> ("Fetch", FetchRequest.readFrom), + OffsetsKey -> ("Offsets", OffsetRequest.readFrom), + MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom), + LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom), + StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom)) def nameForKey(key: Short): String = { keyToNameAndDeserializerMap.get(key) match { diff --git core/src/main/scala/kafka/api/RequestOrResponse.scala core/src/main/scala/kafka/api/RequestOrResponse.scala index 9b1b478..6861146 100644 --- core/src/main/scala/kafka/api/RequestOrResponse.scala +++ core/src/main/scala/kafka/api/RequestOrResponse.scala @@ -19,12 +19,6 @@ package kafka.api import java.nio._ - -object RequestOrResponse { - val DefaultCharset = "UTF-8" -} - - object Request { val DefaultReplicaId = -1 val NonFollowerId = DefaultReplicaId diff --git core/src/main/scala/kafka/api/StopReplicaRequest.scala core/src/main/scala/kafka/api/StopReplicaRequest.scala index 99a5f95..c3db6f9 100644 --- core/src/main/scala/kafka/api/StopReplicaRequest.scala +++ core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -19,7 +19,7 @@ package kafka.api import java.nio._ -import kafka.utils._ +import kafka.api.ApiUtils._ object StopReplicaRequest { val CurrentVersion = 1.shortValue() @@ -28,13 +28,12 @@ object StopReplicaRequest { def readFrom(buffer: ByteBuffer): StopReplicaRequest = { val versionId = buffer.getShort - val clientId = Utils.readShortString(buffer) + val clientId = readShortString(buffer) val ackTimeoutMs = buffer.getInt val topicPartitionPairCount = buffer.getInt val topicPartitionPairSet = new collection.mutable.HashSet[(String, Int)]() - for (i <- 0 until topicPartitionPairCount) { - topicPartitionPairSet.add(Utils.readShortString(buffer, "UTF-8"), buffer.getInt) - } + for (i <- 0 until topicPartitionPairCount) + topicPartitionPairSet.add(readShortString(buffer), buffer.getInt) new StopReplicaRequest(versionId, clientId, ackTimeoutMs, topicPartitionPairSet.toSet) } } @@ -51,11 +50,11 @@ case class StopReplicaRequest(versionId: Short, def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) - Utils.writeShortString(buffer, clientId) + writeShortString(buffer, clientId) buffer.putInt(ackTimeoutMs) buffer.putInt(partitions.size) for ((topic, partitionId) <- partitions){ - Utils.writeShortString(buffer, topic, "UTF-8") + writeShortString(buffer, topic) buffer.putInt(partitionId) } } diff --git core/src/main/scala/kafka/api/StopReplicaResponse.scala core/src/main/scala/kafka/api/StopReplicaResponse.scala index 29e5209..d849570 100644 --- core/src/main/scala/kafka/api/StopReplicaResponse.scala +++ core/src/main/scala/kafka/api/StopReplicaResponse.scala @@ -22,6 +22,7 @@ import kafka.utils.Utils import collection.mutable.HashMap import collection.mutable.Map import kafka.common.ErrorMapping +import kafka.api.ApiUtils._ object StopReplicaResponse { @@ -32,7 +33,7 @@ object StopReplicaResponse { val responseMap = new HashMap[(String, Int), Short]() for (i<- 0 until numEntries){ - val topic = Utils.readShortString(buffer, "UTF-8") + val topic = readShortString(buffer) val partition = buffer.getInt val partitionErrorCode = buffer.getShort() responseMap.put((topic, partition), partitionErrorCode) @@ -58,7 +59,7 @@ case class StopReplicaResponse(val versionId: Short, buffer.putShort(errorCode) buffer.putInt(responseMap.size) for ((key:(String, Int), value) <- responseMap){ - Utils.writeShortString(buffer, key._1, "UTF-8") + writeShortString(buffer, key._1) buffer.putInt(key._2) buffer.putShort(value) } diff --git core/src/main/scala/kafka/api/TopicMetadata.scala core/src/main/scala/kafka/api/TopicMetadata.scala index f538db3..e2d03e8 100644 --- core/src/main/scala/kafka/api/TopicMetadata.scala +++ core/src/main/scala/kafka/api/TopicMetadata.scala @@ -19,7 +19,8 @@ package kafka.api import kafka.cluster.Broker import java.nio.ByteBuffer -import kafka.utils.Utils._ +import kafka.api.ApiUtils._ +import kafka.utils.Logging import collection.mutable.ListBuffer import kafka.common.{KafkaException, ErrorMapping} @@ -54,9 +55,9 @@ case object LeaderDoesNotExist extends LeaderRequest { val requestId: Byte = 0 } object TopicMetadata { def readFrom(buffer: ByteBuffer): TopicMetadata = { - val errorCode = getShortInRange(buffer, "error code", (-1, Short.MaxValue)) + val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) val topic = readShortString(buffer) - val numPartitions = getIntInRange(buffer, "number of partitions", (0, Int.MaxValue)) + val numPartitions = readIntInRange(buffer, "number of partitions", (0, Int.MaxValue)) val partitionsMetadata = new ListBuffer[PartitionMetadata]() for(i <- 0 until numPartitions) partitionsMetadata += PartitionMetadata.readFrom(buffer) @@ -64,7 +65,7 @@ object TopicMetadata { } } -case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) { +case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) extends Logging { def sizeInBytes: Int = { var size: Int = 2 /* error code */ size += shortStringLength(topic) @@ -87,8 +88,8 @@ case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadat object PartitionMetadata { def readFrom(buffer: ByteBuffer): PartitionMetadata = { - val errorCode = getShortInRange(buffer, "error code", (-1, Short.MaxValue)) - val partitionId = getIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */ + val errorCode = readShortInRange(buffer, "error code", (-1, Short.MaxValue)) + val partitionId = readIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */ val doesLeaderExist = getLeaderRequest(buffer.get) val leader = doesLeaderExist match { case LeaderExists => /* leader exists */ @@ -97,14 +98,14 @@ object PartitionMetadata { } /* list of all replicas */ - val numReplicas = getShortInRange(buffer, "number of all replicas", (0, Short.MaxValue)) + val numReplicas = readShortInRange(buffer, "number of all replicas", (0, Short.MaxValue)) val replicas = new Array[Broker](numReplicas) for(i <- 0 until numReplicas) { replicas(i) = Broker.readFrom(buffer) } /* list of in-sync replicas */ - val numIsr = getShortInRange(buffer, "number of in-sync replicas", (0, Short.MaxValue)) + val numIsr = readShortInRange(buffer, "number of in-sync replicas", (0, Short.MaxValue)) val isr = new Array[Broker](numIsr) for(i <- 0 until numIsr) { isr(i) = Broker.readFrom(buffer) @@ -122,8 +123,11 @@ object PartitionMetadata { } } -case class PartitionMetadata(partitionId: Int, val leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty, - errorCode: Short = ErrorMapping.NoError) { +case class PartitionMetadata(partitionId: Int, + val leader: Option[Broker], + replicas: Seq[Broker], + isr: Seq[Broker] = Seq.empty, + errorCode: Short = ErrorMapping.NoError) extends Logging { def sizeInBytes: Int = { var size: Int = 2 /* error code */ + 4 /* partition id */ + 1 /* if leader exists*/ diff --git core/src/main/scala/kafka/api/TopicMetadataRequest.scala core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 7ce953a..70c42e3 100644 --- core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -18,11 +18,11 @@ package kafka.api import java.nio.ByteBuffer -import kafka.utils.Utils._ +import kafka.api.ApiUtils._ import collection.mutable.ListBuffer -import kafka.utils._ +import kafka.utils.Logging -object TopicMetadataRequest { +object TopicMetadataRequest extends Logging { val CurrentVersion = 1.shortValue() val DefaultClientId = "" @@ -33,11 +33,11 @@ object TopicMetadataRequest { def readFrom(buffer: ByteBuffer): TopicMetadataRequest = { val versionId = buffer.getShort - val clientId = Utils.readShortString(buffer) - val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue)) + val clientId = readShortString(buffer) + val numTopics = readIntInRange(buffer, "number of topics", (0, Int.MaxValue)) val topics = new ListBuffer[String]() for(i <- 0 until numTopics) - topics += readShortString(buffer, "UTF-8") + topics += readShortString(buffer) val topicsList = topics.toList debug("topic = %s".format(topicsList.head)) new TopicMetadataRequest(versionId, clientId, topics.toList) @@ -54,7 +54,7 @@ def this(topics: Seq[String]) = def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) - Utils.writeShortString(buffer, clientId) + writeShortString(buffer, clientId) buffer.putInt(topics.size) topics.foreach(topic => writeShortString(buffer, topic)) } diff --git core/src/main/scala/kafka/client/ClientUtils.scala core/src/main/scala/kafka/client/ClientUtils.scala new file mode 100644 index 0000000..5bbc3ef --- /dev/null +++ core/src/main/scala/kafka/client/ClientUtils.scala @@ -0,0 +1,40 @@ +package kafka.client + +import scala.collection._ +import kafka.cluster._ +import kafka.api._ +import kafka.producer._ +import kafka.common.KafkaException +import kafka.utils.Logging + +object ClientUtils extends Logging{ + + def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse = { + var fetchMetaDataSucceeded: Boolean = false + var i: Int = 0 + val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq) + var topicMetadataResponse: TopicMetadataResponse = null + var t: Throwable = null + while(i < brokers.size && !fetchMetaDataSucceeded) { + val producer: SyncProducer = ProducerPool.createSyncProducer(None, brokers(i)) + info("Fetching metadata for topic %s".format(topics)) + try { + topicMetadataResponse = producer.send(topicMetadataRequest) + fetchMetaDataSucceeded = true + } + catch { + case e => + warn("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers(i).toString), e) + t = e + } finally { + i = i + 1 + producer.close() + } + } + if(!fetchMetaDataSucceeded){ + throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers), t) + } + return topicMetadataResponse + } + +} \ No newline at end of file diff --git core/src/main/scala/kafka/cluster/Broker.scala core/src/main/scala/kafka/cluster/Broker.scala index 9b57b42..03a75f0 100644 --- core/src/main/scala/kafka/cluster/Broker.scala +++ core/src/main/scala/kafka/cluster/Broker.scala @@ -18,6 +18,7 @@ package kafka.cluster import kafka.utils.Utils._ +import kafka.api.ApiUtils._ import java.nio.ByteBuffer import kafka.common.BrokerNotAvailableException diff --git core/src/main/scala/kafka/consumer/ConsoleConsumer.scala core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 3c190cf..038fdf0 100644 --- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -25,7 +25,7 @@ import java.util.Properties import java.util.Random import java.io.PrintStream import kafka.message._ -import kafka.utils.{Utils, Logging} +import kafka.utils.{Utils, Logging, ZkUtils, CommandLineUtils} import kafka.utils.ZKStringSerializer import kafka.serializer.StringDecoder @@ -109,7 +109,7 @@ object ConsoleConsumer extends Logging { "skip it instead of halt.") val options: OptionSet = tryParse(parser, args) - Utils.checkRequiredArgs(parser, options, zkConnectOpt) + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) if (topicOrFilterOpt.size != 1) { @@ -145,14 +145,14 @@ object ConsoleConsumer extends Logging { val connector = Consumer.create(config) if(options.has(resetBeginningOpt)) - tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt)) + ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { connector.shutdown() // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack if(!options.has(groupIdOpt)) - tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt)) + ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) } }) @@ -265,17 +265,5 @@ object ConsoleConsumer extends Logging { } } } - - def tryCleanupZookeeper(zkUrl: String, groupId: String) { - try { - val dir = "/consumers/" + groupId - info("Cleaning up temporary zookeeper data under " + dir + ".") - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) - zk.deleteRecursive(dir) - zk.close() - } catch { - case _ => // swallow - } - } } diff --git core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 29e29f3..de3d42a 100644 --- core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -28,6 +28,7 @@ import kafka.utils.ZkUtils._ import kafka.utils.{ShutdownableThread, SystemTime} import kafka.utils.Utils._ import kafka.common.TopicAndPartition +import kafka.client.ClientUtils /** * Usage: @@ -52,7 +53,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, cond.await() val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = getTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata + val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, brokers).topicsMetadata val leaderForPartitionsMap = new HashMap[(String, Int), Broker] topicsMetadata.foreach( tmd => { diff --git core/src/main/scala/kafka/consumer/TopicCount.scala core/src/main/scala/kafka/consumer/TopicCount.scala index 06bce24..19d065d 100644 --- core/src/main/scala/kafka/consumer/TopicCount.scala +++ core/src/main/scala/kafka/consumer/TopicCount.scala @@ -20,7 +20,7 @@ package kafka.consumer import scala.collection._ import org.I0Itec.zkclient.ZkClient import java.util.regex.Pattern -import kafka.utils.{SyncJSON, ZKGroupDirs, ZkUtils, Logging} +import kafka.utils.{SyncJson, ZKGroupDirs, ZkUtils, Logging} private[kafka] trait TopicCount { def getConsumerThreadIdsPerTopic: Map[String, Set[String]] @@ -88,7 +88,7 @@ private[kafka] object TopicCount extends Logging { else { var topMap : Map[String,Int] = null try { - SyncJSON.parseFull(topicCountString) match { + SyncJson.parseFull(topicCountString) match { case Some(m) => topMap = m.asInstanceOf[Map[String,Int]] case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString) } diff --git core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 1cc2802..4cbc3d9 100644 --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -32,6 +32,7 @@ import java.util.UUID import kafka.serializer.Decoder import kafka.utils.ZkUtils._ import kafka.common._ +import kafka.client.ClientUtils import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup import kafka.utils.Utils._ @@ -415,7 +416,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic val consumersPerTopicMap = getConsumersPerTopic(zkClient, group) val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = getTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata + val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers).topicsMetadata val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int] topicsMetadata.foreach(m =>{ diff --git core/src/main/scala/kafka/log/LogManager.scala core/src/main/scala/kafka/log/LogManager.scala index 23f1c2d..7f5e9aa 100644 --- core/src/main/scala/kafka/log/LogManager.scala +++ core/src/main/scala/kafka/log/LogManager.scala @@ -65,9 +65,9 @@ private[kafka] class LogManager(val config: KafkaConfig, warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?") } else { info("Loading log '" + dir.getName() + "'") - val topic = Utils.getTopicPartition(dir.getName)._1 - val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs) - val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize) + val topicPartition = parseTopicPartitionName(dir.getName) + val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs) + val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize) val log = new Log(dir, maxLogFileSize, config.maxMessageSize, @@ -78,10 +78,9 @@ private[kafka] class LogManager(val config: KafkaConfig, config.logIndexIntervalBytes, time, config.brokerId) - val topicPartition = Utils.getTopicPartition(dir.getName) - logs.putIfNotExists(topicPartition._1, new Pool[Int, Log]()) - val parts = logs.get(topicPartition._1) - parts.put(topicPartition._2, log) + logs.putIfNotExists(topicPartition.topic, new Pool[Int, Log]()) + val parts = logs.get(topicPartition.topic) + parts.put(topicPartition.partition, log) } } } @@ -168,7 +167,7 @@ private[kafka] class LogManager(val config: KafkaConfig, /* Runs through the log removing segments older than a certain age */ private def cleanupExpiredSegments(log: Log): Int = { val startMs = time.milliseconds - val topic = Utils.getTopicPartition(log.name)._1 + val topic = parseTopicPartitionName(log.name).topic val logCleanupThresholdMs = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs) val toBeDeleted = log.markDeletedWhile(startMs - _.messageSet.file.lastModified > logCleanupThresholdMs) val total = log.deleteSegments(toBeDeleted) @@ -180,7 +179,7 @@ private[kafka] class LogManager(val config: KafkaConfig, * is at least logRetentionSize bytes in size */ private def cleanupSegmentsToMaintainSize(log: Log): Int = { - val topic = Utils.getTopicPartition(log.dir.getName)._1 + val topic = parseTopicPartitionName(log.dir.getName).topic val maxLogRetentionSize = logRetentionSizeMap.get(topic).getOrElse(config.logRetentionSize) if(maxLogRetentionSize < 0 || log.size < maxLogRetentionSize) return 0 var diff = log.size - maxLogRetentionSize @@ -256,5 +255,10 @@ private[kafka] class LogManager(val config: KafkaConfig, def topics(): Iterable[String] = logs.keys + + private def parseTopicPartitionName(name: String): TopicAndPartition = { + val index = name.lastIndexOf('-') + TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt) + } } diff --git core/src/main/scala/kafka/message/Message.scala core/src/main/scala/kafka/message/Message.scala index aff46e4..aedab42 100644 --- core/src/main/scala/kafka/message/Message.scala +++ core/src/main/scala/kafka/message/Message.scala @@ -116,7 +116,7 @@ class Message(val buffer: ByteBuffer) { buffer.rewind() // now compute the checksum and fill it in - Utils.putUnsignedInt(buffer, CrcOffset, computeChecksum) + Utils.writeUnsignedInt(buffer, CrcOffset, computeChecksum) } def this(bytes: Array[Byte], key: Array[Byte], codec: CompressionCodec) = @@ -140,7 +140,7 @@ class Message(val buffer: ByteBuffer) { /** * Retrieve the previously computed CRC for this message */ - def checksum: Long = Utils.getUnsignedInt(buffer, CrcOffset) + def checksum: Long = Utils.readUnsignedInt(buffer, CrcOffset) /** * Returns true if the crc stored with the message matches the crc computed off the message contents diff --git core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala index 35c4f22..84f6208 100644 --- core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala +++ core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala @@ -28,7 +28,7 @@ class KafkaMetricsConfig(props: VerifiableProperties) { * Comma-separated list of reporter types. These classes should be on the * classpath and will be instantiated at run-time. */ - val reporters = Utils.getCSVList(props.getString("kafka.metrics.reporters", "")) + val reporters = Utils.parseCsvList(props.getString("kafka.metrics.reporters", "")) /** * The metrics polling interval (in seconds). diff --git core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index f881b29..13e7ab3 100644 --- core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -21,6 +21,8 @@ import kafka.api.{TopicMetadataRequest, TopicMetadata} import kafka.common.KafkaException import kafka.utils.{Logging, Utils} import kafka.common.ErrorMapping +import kafka.cluster.Broker +import kafka.client.ClientUtils class BrokerPartitionInfo(producerConfig: ProducerConfig, @@ -28,7 +30,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, topicPartitionInfo: HashMap[String, TopicMetadata]) extends Logging { val brokerList = producerConfig.brokerList - val brokers = Utils.getAllBrokersFromBrokerList(brokerList) + val brokers = getAllBrokersFromBrokerList(brokerList) /** * Return a sequence of (brokerId, numPartitions). @@ -71,7 +73,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, */ def updateInfo(topics: Set[String]) = { var topicsMetadata: Seq[TopicMetadata] = Nil - val topicMetadataResponse = Utils.getTopicMetadata(topics, brokers) + val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers) topicsMetadata = topicMetadataResponse.topicsMetadata // throw partition specific exception topicsMetadata.foreach(tmd =>{ @@ -88,6 +90,20 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, }) producerPool.updateProducer(topicsMetadata) } + + def getAllBrokersFromBrokerList(brokerListStr: String): Seq[Broker] = { + val brokersStr = Utils.parseCsvList(brokerListStr) + + brokersStr.zipWithIndex.map(b =>{ + val brokerStr = b._1 + val brokerId = b._2 + val brokerInfos = brokerStr.split(":") + val hostName = brokerInfos(0) + val port = brokerInfos(1).toInt + val creatorId = hostName + "-" + System.currentTimeMillis() + new Broker(brokerId, creatorId, hostName, port) + }) + } } case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int]) diff --git core/src/main/scala/kafka/producer/Producer.scala core/src/main/scala/kafka/producer/Producer.scala index 0baa5a5..84c5278 100644 --- core/src/main/scala/kafka/producer/Producer.scala +++ core/src/main/scala/kafka/producer/Producer.scala @@ -18,6 +18,7 @@ package kafka.producer import async.{AsyncProducerStats, DefaultEventHandler, ProducerSendThread, EventHandler} import kafka.utils._ +import java.util.Random import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} import kafka.serializer.Encoder import java.util.concurrent.atomic.AtomicBoolean @@ -33,13 +34,14 @@ extends Logging { private val queue = new LinkedBlockingQueue[ProducerData[K,V]](config.queueSize) + private val random = new Random private var sync: Boolean = true private var producerSendThread: ProducerSendThread[K,V] = null config.producerType match { case "sync" => case "async" => sync = false - val asyncProducerID = Utils.getNextRandomInt + val asyncProducerID = random.nextInt(Int.MaxValue) producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + asyncProducerID, queue, eventHandler, config.queueTime, config.batchSize) producerSendThread.start @@ -49,8 +51,8 @@ extends Logging { def this(config: ProducerConfig) = this(config, new DefaultEventHandler[K,V](config, - Utils.getObject[Partitioner[K]](config.partitionerClass), - Utils.getObject[Encoder[V]](config.serializerClass), + Utils.createObject[Partitioner[K]](config.partitionerClass), + Utils.createObject[Encoder[V]](config.serializerClass), new ProducerPool(config))) /** diff --git core/src/main/scala/kafka/producer/ProducerConfig.scala core/src/main/scala/kafka/producer/ProducerConfig.scala index c14061f..2977095 100644 --- core/src/main/scala/kafka/producer/ProducerConfig.scala +++ core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -78,7 +78,7 @@ class ProducerConfig private (val props: VerifiableProperties) * * If the compression codec is NoCompressionCodec, compression is disabled for all topics */ - val compressedTopics = Utils.getCSVList(props.getString("compressed.topics", null)) + val compressedTopics = Utils.parseCsvList(props.getString("compressed.topics", null)) /** * The producer using the zookeeper software load balancer maintains a ZK cache that gets diff --git core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index c2388b4..7023619 100644 --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -24,6 +24,7 @@ import kafka.serializer.Encoder import kafka.utils.{Utils, Logging} import scala.collection.{Seq, Map} import scala.collection.mutable.{ListBuffer, HashMap} +import java.util.concurrent.atomic._ import kafka.api.{TopicMetadata, ProducerRequest, ProducerRequestPartitionData} @@ -35,6 +36,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, extends EventHandler[K,V] with Logging { val isSync = ("sync" == config.producerType) + val counter = new AtomicInteger(0) val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos) private val lock = new Object() @@ -185,8 +187,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig, if(numPartitions <= 0) throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions + "\n Valid values are > 0") - val partition = if(key == null) Utils.getNextRandomInt(numPartitions) - else partitioner.partition(key, numPartitions) + val partition = + if(key == null) + counter.getAndIncrement() % numPartitions + else + partitioner.partition(key, numPartitions) if(partition < 0 || partition >= numPartitions) throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition + "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]") diff --git core/src/main/scala/kafka/server/KafkaConfig.scala core/src/main/scala/kafka/server/KafkaConfig.scala index 4252c89..43cf3b1 100644 --- core/src/main/scala/kafka/server/KafkaConfig.scala +++ core/src/main/scala/kafka/server/KafkaConfig.scala @@ -22,6 +22,7 @@ import kafka.message.Message import kafka.consumer.ConsumerConfig import java.net.InetAddress import kafka.utils.{Topic, Utils, VerifiableProperties, ZKConfig} +import scala.collection._ /** * Configuration settings for the kafka server @@ -73,32 +74,32 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the default number of log partitions per topic */ val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue)) - /* the directory in which the log data is kept */ + /* the directories in which the log data is kept */ val logDir = props.getString("log.dir") /* the maximum size of a single log file */ val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue)) /* the maximum size of a single log file for some specific topic */ - val logFileSizeMap = Utils.getTopicFileSize(props.getString("topic.log.file.size", "")) + val logFileSizeMap = props.getMap("topic.log.file.size", _.toInt > 0).mapValues(_.toInt) /* the maximum time before a new log segment is rolled out */ val logRollHours = props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue)) /* the number of hours before rolling out a new log segment for some specific topic */ - val logRollHoursMap = Utils.getTopicRollHours(props.getString("topic.log.roll.hours", "")) + val logRollHoursMap = props.getMap("topic.log.roll.hours", _.toInt > 0).mapValues(_.toInt) /* the number of hours to keep a log file before deleting it */ val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue)) /* the number of hours to keep a log file before deleting it for some specific topic*/ - val logRetentionHoursMap = Utils.getTopicRetentionHours(props.getString("topic.log.retention.hours", "")) + val logRetentionHoursMap = props.getMap("topic.log.retention.hours", _.toInt > 0).mapValues(_.toInt) /* the maximum size of the log before deleting it */ val logRetentionSize = props.getLong("log.retention.size", -1) /* the maximum size of the log for some specific topic before deleting it */ - val logRetentionSizeMap = Utils.getTopicRetentionSize(props.getString("topic.log.retention.size", "")) + val logRetentionSizeMap = props.getMap("topic.log.retention.size", _.toLong > 0).mapValues(_.toLong) /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */ val logCleanupIntervalMinutes = props.getIntInRange("log.cleanup.interval.mins", 10, (1, Int.MaxValue)) @@ -113,7 +114,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue)) /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */ - val flushIntervalMap = Utils.getTopicFlushIntervals(props.getString("topic.flush.intervals.ms", "")) + val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt) /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */ val flushSchedulerThreadRate = props.getInt("log.default.flush.scheduler.interval.ms", 3000) @@ -161,4 +162,5 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* number of fetcher threads used to replicate messages from a source broker. * Increasing this value can increase the degree of I/O parallelism in the follower broker. */ val numReplicaFetchers = props.getInt("replica.fetchers", 1) + } diff --git core/src/main/scala/kafka/tools/DumpLogSegments.scala core/src/main/scala/kafka/tools/DumpLogSegments.scala index 947aff3..31a4f86 100644 --- core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -69,8 +69,8 @@ object DumpLogSegments { print(" keysize: " + msg.keySize) if(printContents) { if(msg.hasKey) - print(" key: " + Utils.toString(messageAndOffset.message.payload, "UTF-8")) - print(" payload: " + Utils.toString(messageAndOffset.message.payload, "UTF-8")) + print(" key: " + Utils.readString(messageAndOffset.message.payload, "UTF-8")) + print(" payload: " + Utils.readString(messageAndOffset.message.payload, "UTF-8")) } println() } diff --git core/src/main/scala/kafka/tools/MirrorMaker.scala core/src/main/scala/kafka/tools/MirrorMaker.scala index 3438f2c..8a2588d 100644 --- core/src/main/scala/kafka/tools/MirrorMaker.scala +++ core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -19,7 +19,7 @@ package kafka.tools import kafka.message.Message import joptsimple.OptionParser -import kafka.utils.{Utils, Logging} +import kafka.utils.{Utils, CommandLineUtils, Logging} import kafka.producer.{ProducerData, ProducerConfig, Producer} import scala.collection.JavaConversions._ import java.util.concurrent.CountDownLatch @@ -81,8 +81,7 @@ object MirrorMaker extends Logging { System.exit(0) } - Utils.checkRequiredArgs( - parser, options, consumerConfigOpt, producerConfigOpt) + CommandLineUtils.checkRequiredArgs(parser, options, consumerConfigOpt, producerConfigOpt) if (List(whitelistOpt, blacklistOpt).count(options.has) != 1) { println("Exactly one of whitelist or blacklist is required.") System.exit(1) diff --git core/src/main/scala/kafka/tools/ReplayLogProducer.scala core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 83bbc94..952b034 100644 --- core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -22,14 +22,14 @@ import java.util.concurrent.{Executors, CountDownLatch} import java.util.Properties import kafka.producer.{ProducerData, ProducerConfig, Producer} import kafka.consumer._ -import kafka.utils.{ZKStringSerializer, Logging} +import kafka.utils.{ZKStringSerializer, Logging, ZkUtils} import kafka.api.OffsetRequest import org.I0Itec.zkclient._ import kafka.message.{CompressionCodec, Message} object ReplayLogProducer extends Logging { - private val GROUPID: String = "replay-log-producer" + private val GroupId: String = "replay-log-producer" def main(args: Array[String]) { val config = new Config(args) @@ -38,12 +38,12 @@ object ReplayLogProducer extends Logging { val allDone = new CountDownLatch(config.numThreads) // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - tryCleanupZookeeper(config.zkConnect, GROUPID) + ZkUtils.maybeDeletePath(config.zkConnect, "/consumers/" + GroupId) Thread.sleep(500) // consumer properties val consumerProps = new Properties - consumerProps.put("groupid", GROUPID) + consumerProps.put("groupid", GroupId) consumerProps.put("zk.connect", config.zkConnect) consumerProps.put("consumer.timeout.ms", "10000") consumerProps.put("autooffset.reset", OffsetRequest.SmallestTimeString) @@ -137,18 +137,6 @@ object ReplayLogProducer extends Logging { val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue) } - def tryCleanupZookeeper(zkUrl: String, groupId: String) { - try { - val dir = "/consumers/" + groupId - info("Cleaning up temporary zookeeper data under " + dir + ".") - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) - zk.deleteRecursive(dir) - zk.close() - } catch { - case _ => // swallow - } - } - class ZKConsumerThread(config: Config, stream: KafkaStream[Message]) extends Thread with Logging { val shutdownLatch = new CountDownLatch(1) val props = new Properties() diff --git core/src/main/scala/kafka/tools/SimpleConsumerShell.scala core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index f5f9dde..ee399f4 100644 --- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -107,7 +107,7 @@ object SimpleConsumerShell extends Logging { var consumed = 0 for(messageAndOffset <- messageSet) { if(printMessages) - info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8")) + info("consumed: " + Utils.readString(messageAndOffset.message.payload, "UTF-8")) offset = messageAndOffset.nextOffset if(printOffsets) info("next offset = " + offset) diff --git core/src/main/scala/kafka/utils/CommandLineUtils.scala core/src/main/scala/kafka/utils/CommandLineUtils.scala new file mode 100644 index 0000000..5516afa --- /dev/null +++ core/src/main/scala/kafka/utils/CommandLineUtils.scala @@ -0,0 +1,20 @@ +package kafka.utils + +import joptsimple.{OptionSpec, OptionSet, OptionParser} + +/** + * Helper functions for dealing with command line utilities + */ +object CommandLineUtils { + + def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { + for(arg <- required) { + if(!options.has(arg)) { + error("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + } + +} \ No newline at end of file diff --git core/src/main/scala/kafka/utils/SyncJson.scala core/src/main/scala/kafka/utils/SyncJson.scala new file mode 100644 index 0000000..f58b6c7 --- /dev/null +++ core/src/main/scala/kafka/utils/SyncJson.scala @@ -0,0 +1,24 @@ +package kafka.utils + +import kafka.common._ +import util.parsing.json.JSON + +/** + * A wrapper that synchronizes JSON in scala, which is not threadsafe. + */ +object SyncJson extends Logging { + val myConversionFunc = {input : String => input.toInt} + JSON.globalNumberParser = myConversionFunc + val lock = new Object + + def parseFull(input: String): Option[Any] = { + lock synchronized { + try { + JSON.parseFull(input) + } catch { + case t => + throw new KafkaException("Can't parse json string: %s".format(input), t) + } + } + } +} \ No newline at end of file diff --git core/src/main/scala/kafka/utils/Utils.scala core/src/main/scala/kafka/utils/Utils.scala index 27c73d2..3da1130 100644 --- core/src/main/scala/kafka/utils/Utils.scala +++ core/src/main/scala/kafka/utils/Utils.scala @@ -26,26 +26,22 @@ import java.util.zip.CRC32 import javax.management._ import scala.collection._ import scala.collection.mutable -import org.I0Itec.zkclient.ZkClient -import java.util.{Random, Properties} -import joptsimple.{OptionSpec, OptionSet, OptionParser} +import java.util.Properties import kafka.common.KafkaException -import kafka.cluster.Broker -import util.parsing.json.JSON -import kafka.api.RequestOrResponse -import kafka.api.{TopicMetadataRequest, TopicMetadataResponse} -import kafka.producer.{ProducerPool, SyncProducer} /** - * Helper functions! + * General helper functions! + * + * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in + * the standard library etc. + * + * If you are making a new helper function and want to add it to this class please ensure the following: + * 1. It has documentation + * 2. It is the most general possible utility, not just the thing you needed in one particular place + * 3. You have tests for it if it is nontrivial in any way */ object Utils extends Logging { - val random = new Random - - def getNextRandomInt(): Int = random.nextInt - - def getNextRandomInt(upper: Int): Int = random.nextInt(upper) /** * Wrap the given function in a java.lang.Runnable @@ -151,55 +147,6 @@ object Utils extends Logging { } bytes } - - /** - * Read size prefixed string where the size is stored as a 2 byte short. - * @param buffer The buffer to read from - * @param encoding The encoding in which to read the string - */ - def readShortString(buffer: ByteBuffer, encoding: String = RequestOrResponse.DefaultCharset): String = { - val size: Int = buffer.getShort() - if(size < 0) - return null - val bytes = new Array[Byte](size) - buffer.get(bytes) - new String(bytes, encoding) - } - - /** - * Write a size prefixed string where the size is stored as a 2 byte short - * @param buffer The buffer to write to - * @param string The string to write - * @param encoding The encoding in which to write the string - */ - def writeShortString(buffer: ByteBuffer, string: String, encoding: String = RequestOrResponse.DefaultCharset) { - if(string == null) { - buffer.putShort(-1) - } else if(string.length > Short.MaxValue) { - throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".") - } else { - buffer.putShort(string.length.asInstanceOf[Short]) - buffer.put(string.getBytes(encoding)) - } - } - - /** - * Return size of a size prefixed string where the size is stored as a 2 byte short - * @param string The string to write - * @param encoding The encoding in which to write the string - */ - def shortStringLength(string: String, encoding: String = RequestOrResponse.DefaultCharset): Int = { - if(string == null) { - 2 - } else { - val encodedString = string.getBytes(encoding) - if(encodedString.length > Short.MaxValue) { - throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".") - } else { - 2 + encodedString.length - } - } - } /** * Read a properties file from the given path @@ -212,27 +159,6 @@ object Utils extends Logging { props } - def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = { - val value = buffer.getInt - if(value < range._1 || value > range._2) - throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".") - else value - } - - def getShortInRange(buffer: ByteBuffer, name: String, range: (Short, Short)): Short = { - val value = buffer.getShort - if(value < range._1 || value > range._2) - throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".") - else value - } - - def getLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = { - val value = buffer.getLong - if(value < range._1 || value > range._2) - throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".") - else value - } - /** * Open a channel for the given file */ @@ -278,7 +204,7 @@ object Utils extends Logging { * @param buffer The buffer to translate * @param encoding The encoding to use in translating bytes to characters */ - def toString(buffer: ByteBuffer, encoding: String): String = { + def readString(buffer: ByteBuffer, encoding: String): String = { val bytes = new Array[Byte](buffer.remaining) buffer.get(bytes) new String(bytes, encoding) @@ -365,7 +291,7 @@ object Utils extends Logging { * @param buffer The buffer to read from * @return The integer read, as a long to avoid signedness */ - def getUnsignedInt(buffer: ByteBuffer): Long = + def readUnsignedInt(buffer: ByteBuffer): Long = buffer.getInt() & 0xffffffffL /** @@ -375,7 +301,7 @@ object Utils extends Logging { * @param index the index from which to read the integer * @return The integer read, as a long to avoid signedness */ - def getUnsignedInt(buffer: ByteBuffer, index: Int): Long = + def readUnsignedInt(buffer: ByteBuffer, index: Int): Long = buffer.getInt(index) & 0xffffffffL /** @@ -383,7 +309,7 @@ object Utils extends Logging { * @param buffer The buffer to write to * @param value The value to write */ - def putUnsignedInt(buffer: ByteBuffer, value: Long): Unit = + def writetUnsignedInt(buffer: ByteBuffer, value: Long): Unit = buffer.putInt((value & 0xffffffffL).asInstanceOf[Int]) /** @@ -392,7 +318,7 @@ object Utils extends Logging { * @param index The position in the buffer at which to begin writing * @param value The value to write */ - def putUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit = + def writeUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit = buffer.putInt(index, (value & 0xffffffffL).asInstanceOf[Int]) /** @@ -458,6 +384,10 @@ object Utils extends Logging { } } + /** + * Throw an exception if the given value is null, else return it. You can use this like: + * val myValue = Utils.notNull(expressionThatShouldntBeNull) + */ def notNull[V](v: V) = { if(v == null) throw new KafkaException("Value cannot be null.") @@ -465,16 +395,17 @@ object Utils extends Logging { v } - def getHostPort(hostport: String) : (String, Int) = { + /** + * Parse a host and port out of a string + */ + def parseHostPort(hostport: String) : (String, Int) = { val splits = hostport.split(":") (splits(0), splits(1).toInt) } - def getTopicPartition(topicPartition: String) : (String, Int) = { - val index = topicPartition.lastIndexOf('-') - (topicPartition.substring(0,index), topicPartition.substring(index+1).toInt) - } - + /** + * Get the stack trace from an exception as a string + */ def stackTrace(e: Throwable): String = { val sw = new StringWriter; val pw = new PrintWriter(sw); @@ -486,113 +417,30 @@ object Utils extends Logging { * This method gets comma seperated values which contains key,value pairs and returns a map of * key value pairs. the format of allCSVal is key1:val1, key2:val2 .... */ - private def getCSVMap[K, V](allCSVals: String, exceptionMsg:String, successMsg:String) :Map[K, V] = { - val map = new mutable.HashMap[K, V] - if("".equals(allCSVals)) - return map - val csVals = allCSVals.split(",") - for(i <- 0 until csVals.length) - { - try{ - val tempSplit = csVals(i).split(":") - info(successMsg + tempSplit(0) + " : " + Integer.parseInt(tempSplit(1).trim)) - map += tempSplit(0).asInstanceOf[K] -> Integer.parseInt(tempSplit(1).trim).asInstanceOf[V] - } catch { - case _ => error(exceptionMsg + ": " + csVals(i)) - } - } - map + def parseCsvMap(str: String): Map[String, String] = { + val map = new mutable.HashMap[String, String] + if("".equals(str)) + return map + val keyVals = str.split("\\s*,\\s*").map(s => s.split("\\s*:\\s*")) + keyVals.map(pair => (pair(0), pair(1))).toMap } - - def getCSVList(csvList: String): Seq[String] = { + + /** + * Parse a comma separated string into a sequence of strings. + * Whitespace surrounding the comma will be removed. + */ + def parseCsvList(csvList: String): Seq[String] = { if(csvList == null) Seq.empty[String] else { - csvList.split(",").filter(v => !v.equals("")) - } - } - - def seqToCSV(seq: Seq[String]): String = { - var csvString = "" - for (i <- 0 until seq.size) { - if (i > 0) - csvString = csvString + ',' - csvString = csvString + seq(i) + csvList.split("\\s*,\\s*").filter(v => !v.equals("")) } - csvString - } - - def getTopicRetentionHours(retentionHours: String) : Map[String, Int] = { - val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: " - val successMsg = "The retention hours for " - val map: Map[String, Int] = getCSVMap(retentionHours, exceptionMsg, successMsg) - map.foreach{case(topic, hrs) => - require(hrs > 0, "Log retention hours value for topic " + topic + " is " + hrs + - " which is not greater than 0.")} - map } - def getTopicRollHours(rollHours: String) : Map[String, Int] = { - val exceptionMsg = "Malformed token for topic.log.roll.hours in server.properties: " - val successMsg = "The roll hours for " - val map: Map[String, Int] = getCSVMap(rollHours, exceptionMsg, successMsg) - map.foreach{case(topic, hrs) => - require(hrs > 0, "Log roll hours value for topic " + topic + " is " + hrs + - " which is not greater than 0.")} - map - } - - def getTopicFileSize(fileSizes: String): Map[String, Int] = { - val exceptionMsg = "Malformed token for topic.log.file.size in server.properties: " - val successMsg = "The log file size for " - val map: Map[String, Int] = getCSVMap(fileSizes, exceptionMsg, successMsg) - map.foreach{case(topic, size) => - require(size > 0, "Log file size value for topic " + topic + " is " + size + - " which is not greater than 0.")} - map - } - - def getTopicRetentionSize(retentionSizes: String): Map[String, Long] = { - val exceptionMsg = "Malformed token for topic.log.retention.size in server.properties: " - val successMsg = "The log retention size for " - val map: Map[String, Long] = getCSVMap(retentionSizes, exceptionMsg, successMsg) - map.foreach{case(topic, size) => - require(size > 0, "Log retention size value for topic " + topic + " is " + size + - " which is not greater than 0.")} - map - } - - def getTopicFlushIntervals(allIntervals: String) : Map[String, Int] = { - val exceptionMsg = "Malformed token for topic.flush.Intervals.ms in server.properties: " - val successMsg = "The flush interval for " - val map: Map[String, Int] = getCSVMap(allIntervals, exceptionMsg, successMsg) - map.foreach{case(topic, interval) => - require(interval > 0, "Flush interval value for topic " + topic + " is " + interval + - " ms which is not greater than 0.")} - map - } - - def getTopicPartitions(allPartitions: String) : Map[String, Int] = { - val exceptionMsg = "Malformed token for topic.partition.counts in server.properties: " - val successMsg = "The number of partitions for topic " - val map: Map[String, Int] = getCSVMap(allPartitions, exceptionMsg, successMsg) - map.foreach{case(topic, count) => - require(count > 0, "The number of partitions for topic " + topic + " is " + count + - " which is not greater than 0.")} - map - } - - def getConsumerTopicMap(consumerTopicString: String) : Map[String, Int] = { - val exceptionMsg = "Malformed token for embeddedconsumer.topics in consumer.properties: " - val successMsg = "The number of consumer threads for topic " - val map: Map[String, Int] = getCSVMap(consumerTopicString, exceptionMsg, successMsg) - map.foreach{case(topic, count) => - require(count > 0, "The number of consumer threads for topic " + topic + " is " + count + - " which is not greater than 0.")} - map - } - - def getObject[T<:AnyRef](className: String): T = { + /** + * Create an instance of the class with the given class name + */ + def createObject[T<:AnyRef](className: String): T = { className match { case null => null.asInstanceOf[T] case _ => @@ -604,27 +452,15 @@ object Utils extends Logging { } } - def propertyExists(prop: String): Boolean = { - if(prop == null) - false - else if(prop.compareTo("") == 0) - false - else true - } - - def tryCleanupZookeeper(zkUrl: String, groupId: String) { - try { - val dir = "/consumers/" + groupId - info("Cleaning up temporary zookeeper data under " + dir + ".") - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) - zk.deleteRecursive(dir) - zk.close() - } catch { - case _ => // swallow - } - } + /** + * Is the given string null or empty ("")? + */ + def nullOrEmpty(s: String): Boolean = s == null || s.equals("") - def stringMapToJsonString(jsonDataMap: Map[String, String]): String = { + /** + * Format a Map[String, String] as JSON + */ + def stringMapToJson(jsonDataMap: Map[String, String]): String = { val builder = new StringBuilder builder.append("{ ") var numElements = 0 @@ -639,6 +475,9 @@ object Utils extends Logging { builder.toString } + /** + * Format an arbitrary map as JSON + */ def mapToJson[T <: Any](map: Map[String, Seq[String]]): String = { val builder = new StringBuilder builder.append("{ ") @@ -654,58 +493,6 @@ object Utils extends Logging { builder.toString } - def getAllBrokersFromBrokerList(brokerListStr: String): Seq[Broker] = { - val brokersStr = Utils.getCSVList(brokerListStr) - - brokersStr.zipWithIndex.map(b =>{ - val brokerStr = b._1 - val brokerId = b._2 - val brokerInfos = brokerStr.split(":") - val hostName = brokerInfos(0) - val port = brokerInfos(1).toInt - val creatorId = hostName + "-" + System.currentTimeMillis() - new Broker(brokerId, creatorId, hostName, port) - }) - } - - def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { - for(arg <- required) { - if(!options.has(arg)) { - error("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - } - - def getTopicMetadata(topics: Set[String], brokers: Seq[Broker]): TopicMetadataResponse = { - var fetchMetaDataSucceeded: Boolean = false - var i: Int = 0 - val topicMetadataRequest = new TopicMetadataRequest(topics.toSeq) - var topicMetadataResponse: TopicMetadataResponse = null - var t: Throwable = null - while(i < brokers.size && !fetchMetaDataSucceeded) { - val producer: SyncProducer = ProducerPool.createSyncProducer(None, brokers(i)) - info("Fetching metadata for topic %s".format(topics)) - try { - topicMetadataResponse = producer.send(topicMetadataRequest) - fetchMetaDataSucceeded = true - } - catch { - case e => - warn("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers(i).toString), e) - t = e - } finally { - i = i + 1 - producer.close() - } - } - if(!fetchMetaDataSucceeded){ - throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, brokers), t) - } - return topicMetadataResponse - } - /** * Create a circular (looping) iterator over a collection. * @param coll An iterable over the underlying collection. @@ -717,35 +504,18 @@ object Utils extends Logging { stream.iterator } - def readFileIntoString(path: String): String = { + /** + * Attempt to read a file as a string + */ + def readFileAsString(path: String, charset: Charset = Charset.defaultCharset()): String = { val stream = new FileInputStream(new File(path)) try { val fc = stream.getChannel() val bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size()) - Charset.defaultCharset().decode(bb).toString() + charset.decode(bb).toString() } finally { stream.close() } } -} - -/** - * A wrapper that synchronizes JSON in scala, which is not threadsafe. - */ -object SyncJSON extends Logging { - val myConversionFunc = {input : String => input.toInt} - JSON.globalNumberParser = myConversionFunc - val lock = new Object - - def parseFull(input: String): Option[Any] = { - lock synchronized { - try { - JSON.parseFull(input) - } catch { - case t => - throw new KafkaException("Can't parse json string: %s".format(input), t) - } - } - } } \ No newline at end of file diff --git core/src/main/scala/kafka/utils/VerifiableProperties.scala core/src/main/scala/kafka/utils/VerifiableProperties.scala index 22aaba8..30c2758 100644 --- core/src/main/scala/kafka/utils/VerifiableProperties.scala +++ core/src/main/scala/kafka/utils/VerifiableProperties.scala @@ -18,7 +18,7 @@ package kafka.utils import java.util.Properties -import collection.mutable +import scala.collection._ class VerifiableProperties(val props: Properties) extends Logging { private val referenceSet = mutable.HashSet[String]() @@ -156,6 +156,23 @@ class VerifiableProperties(val props: Properties) extends Logging { require(containsKey(name), "Missing required property '" + name + "'") getProperty(name) } + + /** + * Get a Map[String, String] from a property list in the form k1:v2, k2:v2, ... + */ + def getMap(name: String, valid: String => Boolean): Map[String, String] = { + try { + val m = Utils.parseCsvMap(getString(name, "")) + m.foreach { + case(key, value) => + if(!valid(value)) + throw new IllegalArgumentException("Invalid entry '%s' = '%s' for property '%s'".format(key, value, name)) + } + m + } catch { + case e: Exception => throw new IllegalArgumentException("Error parsing configuration property '%s': %s".format(name, e.getMessage)) + } + } def verify() { info("Verifying properties") diff --git core/src/main/scala/kafka/utils/ZkUtils.scala core/src/main/scala/kafka/utils/ZkUtils.scala index dcc0cda..e6b0303 100644 --- core/src/main/scala/kafka/utils/ZkUtils.scala +++ core/src/main/scala/kafka/utils/ZkUtils.scala @@ -85,12 +85,12 @@ object ZkUtils extends Logging { } def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat): Option[LeaderAndIsr] = { - SyncJSON.parseFull(leaderAndIsrStr) match { + SyncJson.parseFull(leaderAndIsrStr) match { case Some(m) => val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get - val isr = Utils.getCSVList(isrString).map(r => r.toInt) + val isr = Utils.parseCsvList(isrString).map(r => r.toInt) val zkPathVersion = stat.getVersion debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch, isr.toString(), zkPathVersion, topic, partition)) @@ -103,7 +103,7 @@ object ZkUtils extends Logging { val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1 leaderAndIsrOpt match { case Some(leaderAndIsr) => - SyncJSON.parseFull(leaderAndIsr) match { + SyncJson.parseFull(leaderAndIsr) match { case Some(m) => Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt) case None => None @@ -121,7 +121,7 @@ object ZkUtils extends Logging { val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1 leaderAndIsrOpt match { case Some(leaderAndIsr) => - SyncJSON.parseFull(leaderAndIsr) match { + SyncJson.parseFull(leaderAndIsr) match { case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition)) case Some(m) => m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt } @@ -137,10 +137,10 @@ object ZkUtils extends Logging { val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1 leaderAndIsrOpt match { case Some(leaderAndIsr) => - SyncJSON.parseFull(leaderAndIsr) match { + SyncJson.parseFull(leaderAndIsr) match { case Some(m) => val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get - Utils.getCSVList(ISRString).map(r => r.toInt) + Utils.parseCsvList(ISRString).map(r => r.toInt) case None => Seq.empty[Int] } case None => Seq.empty[Int] @@ -154,7 +154,7 @@ object ZkUtils extends Logging { val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 jsonPartitionMapOpt match { case Some(jsonPartitionMap) => - SyncJSON.parseFull(jsonPartitionMap) match { + SyncJson.parseFull(jsonPartitionMap) match { case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match { case None => Seq.empty[Int] case Some(seq) => seq.map(_.toInt) @@ -327,7 +327,7 @@ object ZkUtils extends Logging { case e2 => throw e2 } } - + def deletePath(client: ZkClient, path: String): Boolean = { try { client.delete(path) @@ -350,6 +350,16 @@ object ZkUtils extends Logging { case e2 => throw e2 } } + + def maybeDeletePath(zkUrl: String, dir: String) { + try { + val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) + zk.deleteRecursive(dir) + zk.close() + } catch { + case _ => // swallow + } + } def readData(client: ZkClient, path: String): (String, Stat) = { val stat: Stat = new Stat() @@ -412,7 +422,7 @@ object ZkUtils extends Logging { val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 jsonPartitionMapOpt match { case Some(jsonPartitionMap) => - SyncJSON.parseFull(jsonPartitionMap) match { + SyncJson.parseFull(jsonPartitionMap) match { case Some(m) => val replicaMap = m.asInstanceOf[Map[String, Seq[String]]] for((partition, replicas) <- replicaMap){ @@ -448,7 +458,7 @@ object ZkUtils extends Logging { val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 jsonPartitionMapOpt match { case Some(jsonPartitionMap) => - SyncJSON.parseFull(jsonPartitionMap) match { + SyncJson.parseFull(jsonPartitionMap) match { case Some(m) => val replicaMap = m.asInstanceOf[Map[String, Seq[String]]] for((partition, replicas) <- replicaMap){ @@ -470,7 +480,7 @@ object ZkUtils extends Logging { val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 val partitionMap = jsonPartitionMapOpt match { case Some(jsonPartitionMap) => - SyncJSON.parseFull(jsonPartitionMap) match { + SyncJson.parseFull(jsonPartitionMap) match { case Some(m) => val m1 = m.asInstanceOf[Map[String, Seq[String]]] m1.map(p => (p._1.toInt, p._2.map(_.toInt))) @@ -534,7 +544,7 @@ object ZkUtils extends Logging { } def parsePartitionReassignmentData(jsonData: String):Map[(String, Int), Seq[Int]] = { - SyncJSON.parseFull(jsonData) match { + SyncJson.parseFull(jsonData) match { case Some(m) => val replicaMap = m.asInstanceOf[Map[String, Seq[String]]] replicaMap.map { reassignedPartitions => @@ -629,6 +639,7 @@ object ZkUtils extends Logging { if(topics == null) Seq.empty[String] else topics } + } class LeaderExistsOrChangedListener(topic: String, diff --git core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala index fa709de..2260111 100644 --- core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala +++ core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala @@ -62,7 +62,7 @@ private class ConsumerThread(stream: KafkaStream[Message]) extends Thread { override def run() { println("Starting consumer thread..") for (messageAndMetadata <- stream) { - println("consumed: " + Utils.toString(messageAndMetadata.message.payload, "UTF-8")) + println("consumed: " + Utils.readString(messageAndMetadata.message.payload, "UTF-8")) } shutdownLatch.countDown println("thread shutdown !" ) diff --git core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index f08b156..18f3f80 100644 --- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -286,7 +286,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar // send some messages to each broker val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) val sentMessages2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec) - val sentMessages = (sentMessages1 ++ sentMessages2).map(m => Utils.toString(m.payload, "UTF-8")). + val sentMessages = (sentMessages1 ++ sentMessages2).map(m => Utils.readString(m.payload, "UTF-8")). sortWith((s, t) => s.compare(t) == -1) val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -401,7 +401,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar assertTrue(iterator.hasNext) val message = iterator.next.message messages ::= message - debug("received message: " + Utils.toString(message.payload, "UTF-8")) + debug("received message: " + Utils.readString(message.payload, "UTF-8")) } } } diff --git core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index d489872..e5cc792 100644 --- core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -105,7 +105,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar assertTrue(iterator.hasNext) val message = iterator.next.message messages ::= message - debug("received message: " + Utils.toString(message.payload, "UTF-8")) + debug("received message: " + Utils.readString(message.payload, "UTF-8")) } } } diff --git core/src/test/scala/unit/kafka/log/LogManagerTest.scala core/src/test/scala/unit/kafka/log/LogManagerTest.scala index cf304b5..536101f 100644 --- core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -159,7 +159,7 @@ class LogManagerTest extends JUnit3Suite { override val logFileSize = 1024 *1024 *1024 override val flushSchedulerThreadRate = 50 override val flushInterval = Int.MaxValue - override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100") + override val flushIntervalMap = Map("timebasedflush" -> 100) } logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false) logManager.startup diff --git core/src/test/scala/unit/kafka/message/MessageTest.scala core/src/test/scala/unit/kafka/message/MessageTest.scala index fc75dd6..f905612 100644 --- core/src/test/scala/unit/kafka/message/MessageTest.scala +++ core/src/test/scala/unit/kafka/message/MessageTest.scala @@ -66,7 +66,7 @@ class MessageTest extends JUnitSuite { assertTrue("Auto-computed checksum should be valid", v.message.isValid) // garble checksum val badChecksum: Int = (v.message.checksum + 1 % Int.MaxValue).toInt - Utils.putUnsignedInt(v.message.buffer, Message.CrcOffset, badChecksum) + Utils.writeUnsignedInt(v.message.buffer, Message.CrcOffset, badChecksum) assertFalse("Message with invalid checksum should be invalid", v.message.isValid) } } diff --git perf/src/main/scala/kafka/perf/ConsumerPerformance.scala perf/src/main/scala/kafka/perf/ConsumerPerformance.scala index 96ad307..fb9106b 100644 --- perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ perf/src/main/scala/kafka/perf/ConsumerPerformance.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong import java.nio.channels.ClosedByInterruptException import org.apache.log4j.Logger import kafka.message.Message -import kafka.utils.Utils +import kafka.utils.ZkUtils import java.util.{Random, Properties} import kafka.consumer._ import java.text.SimpleDateFormat @@ -48,7 +48,7 @@ object ConsumerPerformance { } // clean up zookeeper state for this group id for every perf run - Utils.tryCleanupZookeeper(config.consumerConfig.zkConnect, config.consumerConfig.groupId) + ZkUtils.maybeDeletePath(config.consumerConfig.zkConnect, "/consumers/" + config.consumerConfig.groupId) val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig)