diff --git a/README.md b/README.md index e3fea22..2e76720 100644 --- a/README.md +++ b/README.md @@ -17,8 +17,9 @@ See our [web site](http://kafka.apache.org) for details on the project. ## Building a jar and running it ## -1. ./gradlew jar -2. Follow instuctions in http://kafka.apache.org/documentation.html#quickstart +1. ./gradlew copyDependantLibs +2. ./gradlew jar +3. Follow instuctions in http://kafka.apache.org/documentation.html#quickstart ## Running unit tests ## ./gradlew test @@ -66,6 +67,15 @@ The release file can be found inside ./core/build/distributions/. ## Publishing the jar for all version of Scala and for all projects to maven (To test locally, change mavenUrl in gradle.properties to a local dir.) ## ./gradlew uploadArchivesAll +Please note for this to work you need to create/update `~/.gradle/gradle.properties` and assign the following variables + + mavenUrl= + mavenUsername= + mavenPassword= + signing.keyId= + signing.password= + signing.secretKeyRingFile= + ## Building the test jar ## ./gradlew testJar diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index e332633..0be03c0 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -19,7 +19,7 @@ package kafka.consumer import scala.collection._ import org.I0Itec.zkclient.ZkClient -import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging} +import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, Utils} import kafka.common.KafkaException private[kafka] trait TopicCount { @@ -125,7 +125,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient, makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*)) } - def getTopicCountMap = Map(topicFilter.regex -> numStreams) + def getTopicCountMap = Map(Utils.JSONEscapeString(topicFilter.regex) -> numStreams) def pattern: String = { topicFilter match { diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index a89b046..480d5f0 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -540,5 +540,29 @@ object Utils extends Logging { lock.unlock() } } - + + + //JSON strings need to be escaped based on ECMA-404 standard http://json.org + def JSONEscapeString (s : String) : String = { + s.map { + case '"' => "\\\"" + case '\\' => "\\\\" + case '/' => "\\/" + case '\b' => "\\b" + case '\f' => "\\f" + case '\n' => "\\n" + case '\r' => "\\r" + case '\t' => "\\t" + /* We'll unicode escape any control characters. These include: + * 0x0 -> 0x1f : ASCII Control (C0 Control Codes) + * 0x7f : ASCII DELETE + * 0x80 -> 0x9f : C1 Control Codes + * + * Per RFC4627, section 2.5, we're not technically required to + * encode the C1 codes, but we do to be safe. + */ + case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f')) => "\\u%04x".format(c: Int) + case c => c + }.mkString + } } diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala index cf2724b..6c4a8d3 100644 --- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala @@ -42,6 +42,29 @@ class TopicFilterTest extends JUnitSuite { @Test def testBlacklists() { - val topicFilter1 = new Blacklist("black1") + val topicFilter1 = new Blacklist("notthis.+") + assertTrue(topicFilter1.isTopicAllowed("something")) + assertFalse(topicFilter1.isTopicAllowed("notthis_anything")) } + + @Test + def testWildcardTopicCountGetTopicCountMapEscapeJson() { + def getTopicCountMapKey(regex: String): String = { + val topicCount = new WildcardTopicCount(null, "consumerId", new Whitelist(regex), 1) + topicCount.getTopicCountMap.head._1 + } + //lets make sure that the JSON strings are escaping as we expect + assertEquals("-\\\"-", getTopicCountMapKey("-\"-")) + assertEquals("-\\\\-", getTopicCountMapKey("-\\-")) + assertEquals("-\\/-", getTopicCountMapKey("-/-")) + assertEquals("-\\\\b-", getTopicCountMapKey("-\\b-")) + assertEquals("-\\\\f-", getTopicCountMapKey("-\\f-")) + assertEquals("-\\\\n-", getTopicCountMapKey("-\\n-")) + assertEquals("-\\\\r-", getTopicCountMapKey("-\\r-")) + assertEquals("-\\\\t-", getTopicCountMapKey("-\\t-")) + assertEquals("-\\\\u0000-", getTopicCountMapKey("-\\u0000-")) + assertEquals("-\\\\u001f-", getTopicCountMapKey("-\\u001f-")) + assertEquals("-\\\\u007f-", getTopicCountMapKey("-\\u007f-")) + assertEquals("-\\\\u009f-", getTopicCountMapKey("-\\u009f-")) + } } \ No newline at end of file