From 6d7c3af9f38dfd057342c1bb12f5811b3676f6bd Mon Sep 17 00:00:00 2001 From: jholoman Date: Tue, 9 Dec 2014 09:34:44 -0500 Subject: [PATCH 1/2] KAFKA-1812 initial --- core/src/main/scala/kafka/utils/Utils.scala | 103 +++++++++++---------- .../test/scala/unit/kafka/utils/UtilsTest.scala | 17 ++++ 2 files changed, 71 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 58685cc..cb95d0e 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -33,10 +33,10 @@ import kafka.common.KafkaStorageException /** * General helper functions! - * + * * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in - * the standard library etc. - * + * the standard library etc. + * * If you are making a new helper function and want to add it to this class please ensure the following: * 1. It has documentation * 2. It is the most general possible utility, not just the thing you needed in one particular place @@ -68,18 +68,18 @@ object Utils extends Logging { * @param runnable The runnable to execute in the background * @return The unstarted thread */ - def daemonThread(name: String, runnable: Runnable): Thread = + def daemonThread(name: String, runnable: Runnable): Thread = newThread(name, runnable, true) - + /** * Create a daemon thread * @param name The name of the thread * @param fun The runction to execute in the thread * @return The unstarted thread */ - def daemonThread(name: String, fun: () => Unit): Thread = + def daemonThread(name: String, fun: () => Unit): Thread = daemonThread(name, runnable(fun)) - + /** * Create a new thread * @param name The name of the thread @@ -88,16 +88,16 @@ object Utils extends Logging { * @return The unstarted thread */ def newThread(name: String, runnable: Runnable, daemon: Boolean): Thread = { - val thread = new Thread(runnable, name) + val thread = new Thread(runnable, name) thread.setDaemon(daemon) thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { def uncaughtException(t: Thread, e: Throwable) { error("Uncaught exception in thread '" + t.getName + "':", e) - } + } }) thread } - + /** * Create a new thread * @param runnable The work for the thread to do @@ -114,7 +114,7 @@ object Utils extends Logging { }) thread } - + /** * Read the given byte buffer into a byte array */ @@ -161,7 +161,7 @@ object Utils extends Logging { else new FileInputStream(file).getChannel() } - + /** * Do the given action and log any exceptions thrown without rethrowing them * @param log The log method to use for logging. E.g. logger.warn @@ -174,7 +174,7 @@ object Utils extends Logging { case e: Throwable => log(e.getMessage(), e) } } - + /** * Test if two byte buffers are equal. In this case equality means having * the same bytes from the current position to the limit @@ -191,7 +191,7 @@ object Utils extends Logging { return false return true } - + /** * Translate the given buffer into a string * @param buffer The buffer to translate @@ -202,7 +202,7 @@ object Utils extends Logging { buffer.get(bytes) new String(bytes, encoding) } - + /** * Print an error message and shutdown the JVM * @param message The error message @@ -211,19 +211,19 @@ object Utils extends Logging { System.err.println(message) System.exit(1) } - + /** * Recursively delete the given file/directory and any subfiles (if any exist) * @param file The root file at which to begin deleting */ def rm(file: String): Unit = rm(new File(file)) - + /** * Recursively delete the list of files/directories and any subfiles (if any exist) * @param a sequence of files to be deleted */ def rm(files: Seq[String]): Unit = files.map(f => rm(new File(f))) - + /** * Recursively delete the given file/directory and any subfiles (if any exist) * @param file The root file at which to begin deleting @@ -242,7 +242,7 @@ object Utils extends Logging { file.delete() } } - + /** * Register the given mbean with the platform mbean server, * unregistering any mbean that was there before. Note, @@ -270,7 +270,7 @@ object Utils extends Logging { } } } - + /** * Unregister the mbean with the given name, if there is one registered * @param name The mbean name to unregister @@ -283,16 +283,16 @@ object Utils extends Logging { mbs.unregisterMBean(objName) } } - + /** - * Read an unsigned integer from the current position in the buffer, + * Read an unsigned integer from the current position in the buffer, * incrementing the position by 4 bytes * @param buffer The buffer to read from * @return The integer read, as a long to avoid signedness */ - def readUnsignedInt(buffer: ByteBuffer): Long = + def readUnsignedInt(buffer: ByteBuffer): Long = buffer.getInt() & 0xffffffffL - + /** * Read an unsigned integer from the given position without modifying the buffers * position @@ -300,33 +300,33 @@ object Utils extends Logging { * @param index the index from which to read the integer * @return The integer read, as a long to avoid signedness */ - def readUnsignedInt(buffer: ByteBuffer, index: Int): Long = + def readUnsignedInt(buffer: ByteBuffer, index: Int): Long = buffer.getInt(index) & 0xffffffffL - + /** * Write the given long value as a 4 byte unsigned integer. Overflow is ignored. * @param buffer The buffer to write to * @param value The value to write */ - def writetUnsignedInt(buffer: ByteBuffer, value: Long): Unit = + def writetUnsignedInt(buffer: ByteBuffer, value: Long): Unit = buffer.putInt((value & 0xffffffffL).asInstanceOf[Int]) - + /** * Write the given long value as a 4 byte unsigned integer. Overflow is ignored. * @param buffer The buffer to write to * @param index The position in the buffer at which to begin writing * @param value The value to write */ - def writeUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit = + def writeUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit = buffer.putInt(index, (value & 0xffffffffL).asInstanceOf[Int]) - + /** * Compute the CRC32 of the byte array * @param bytes The array to compute the checksum for * @return The CRC32 */ def crc32(bytes: Array[Byte]): Long = crc32(bytes, 0, bytes.length) - + /** * Compute the CRC32 of the segment of the byte array given by the specificed size and offset * @param bytes The bytes to checksum @@ -339,7 +339,7 @@ object Utils extends Logging { crc.update(bytes, offset, size) crc.getValue() } - + /** * Compute the hash code for the given items */ @@ -356,7 +356,7 @@ object Utils extends Logging { } return h } - + /** * Group the given values by keys extracted with the given function */ @@ -368,12 +368,12 @@ object Utils extends Logging { case Some(l: List[V]) => m.put(k, v :: l) case None => m.put(k, List(v)) } - } + } m } - + /** - * Read some bytes into the provided buffer, and return the number of bytes read. If the + * Read some bytes into the provided buffer, and return the number of bytes read. If the * channel has been closed or we get -1 on the read for any reason, throw an EOFException */ def read(channel: ReadableByteChannel, buffer: ByteBuffer): Int = { @@ -381,8 +381,8 @@ object Utils extends Logging { case -1 => throw new EOFException("Received -1 when reading from channel, socket has likely been closed.") case n: Int => n } - } - + } + /** * Throw an exception if the given value is null, else return it. You can use this like: * val myValue = Utils.notNull(expressionThatShouldntBeNull) @@ -407,15 +407,20 @@ object Utils extends Logging { /** * This method gets comma separated values which contains key,value pairs and returns a map of * key value pairs. the format of allCSVal is key1:val1, key2:val2 .... + * Also supports strings with multiple ":" such as IpV6 addresses, taking the last occurrence + * of the ":" in the pair as the split, eg a:b:c:val1, d:e:f:val2 => a:b:c -> val1, d:e:f -> val2 */ def parseCsvMap(str: String): Map[String, String] = { val map = new mutable.HashMap[String, String] - if("".equals(str)) - return map - val keyVals = str.split("\\s*,\\s*").map(s => s.split("\\s*:\\s*")) + if ("".equals(str)) + return map + val keyVals: Array[Array[String]] = str.split("\\s*,\\s*").map(s => { + val lio = s.lastIndexOf(":") + Array(s.substring(0,lio).trim, s.substring(lio + 1).trim) + }) keyVals.map(pair => (pair(0), pair(1))).toMap } - + /** * Parse a comma separated string into a sequence of strings. * Whitespace surrounding the comma will be removed. @@ -467,7 +472,7 @@ object Utils extends Logging { stream.close() } } - + /** * Get the absolute value of the given number. If the number is Int.MinValue return 0. * This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). @@ -496,7 +501,7 @@ object Utils extends Logging { throw new KafkaStorageException("Failed to create file %s.".format(path)) f } - + /** * Turn a properties map into a string */ @@ -531,7 +536,7 @@ object Utils extends Logging { } evaluated } - + /** * Read some properties with the given default values */ @@ -541,7 +546,7 @@ object Utils extends Logging { props.load(reader) props } - + /** * Read a big-endian integer from a byte array */ @@ -551,7 +556,7 @@ object Utils extends Logging { ((bytes(offset + 2) & 0xFF) << 8) | (bytes(offset + 3) & 0xFF) } - + /** * Execute the given function inside the lock */ @@ -590,7 +595,7 @@ object Utils extends Logging { */ case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f')) => "\\u%04x".format(c: Int) case c => c - }.mkString + }.mkString } /** diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala index 0d0f0e2..abb26df 100644 --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -105,6 +105,23 @@ class UtilsTest extends JUnitSuite { } @Test + def testCsvMap() { + val emptyString: String = "" + val emptyMap = Utils.parseCsvMap(emptyString) + val emptyStringMap = Map.empty[String, String] + + assertTrue(emptyMap != null) + assertTrue(emptyStringMap.equals(emptyStringMap)) + + val kvPairsIpV6: String = "a:b:c:v,a:b:c:v" + val ipv6Map = Utils.parseCsvMap(kvPairsIpV6) + for (m <- ipv6Map) { + assertTrue(m._1.equals("a:b:c")) + assertTrue(m._2.equals("v")) + } + } + + @Test def testInLock() { val lock = new ReentrantLock() val result = inLock(lock) { -- 1.8.4 From 83822522aef89b3e86f7b54e7aab4ebace8825a6 Mon Sep 17 00:00:00 2001 From: jholoman Date: Wed, 10 Dec 2014 19:24:41 -0500 Subject: [PATCH 2/2] KAFKA-1812 Updated based on rb comments --- core/src/main/scala/kafka/utils/Utils.scala | 6 +++--- .../src/test/scala/unit/kafka/utils/UtilsTest.scala | 21 ++++++++++++++++++++- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index cb95d0e..738c1af 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -414,11 +414,11 @@ object Utils extends Logging { val map = new mutable.HashMap[String, String] if ("".equals(str)) return map - val keyVals: Array[Array[String]] = str.split("\\s*,\\s*").map(s => { + val keyVals = str.split("\\s*,\\s*").map(s => { val lio = s.lastIndexOf(":") - Array(s.substring(0,lio).trim, s.substring(lio + 1).trim) + Pair(s.substring(0,lio).trim, s.substring(lio + 1).trim) }) - keyVals.map(pair => (pair(0), pair(1))).toMap + keyVals.toMap } /** diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala index abb26df..066553c 100644 --- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala @@ -109,7 +109,6 @@ class UtilsTest extends JUnitSuite { val emptyString: String = "" val emptyMap = Utils.parseCsvMap(emptyString) val emptyStringMap = Map.empty[String, String] - assertTrue(emptyMap != null) assertTrue(emptyStringMap.equals(emptyStringMap)) @@ -119,8 +118,28 @@ class UtilsTest extends JUnitSuite { assertTrue(m._1.equals("a:b:c")) assertTrue(m._2.equals("v")) } + + val singleEntry:String = "key:value" + val singleMap = Utils.parseCsvMap(singleEntry) + val value = singleMap.getOrElse("key", 0) + assertTrue(value.equals("value")) + + val kvPairsIpV4: String = "192.168.2.1/30:allow, 192.168.2.1/30:allow" + val ipv4Map = Utils.parseCsvMap(kvPairsIpV4) + for (m <- ipv4Map) { + assertTrue(m._1.equals("192.168.2.1/30")) + assertTrue(m._2.equals("allow")) + } + + val kvPairsSpaces: String = "key:value , key: value" + val spaceMap = Utils.parseCsvMap(kvPairsSpaces) + for (m <- spaceMap) { + assertTrue(m._1.equals("key")) + assertTrue(m._2.equals("value")) + } } + @Test def testInLock() { val lock = new ReentrantLock() -- 1.8.4