diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index e332633..0be03c0 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -19,7 +19,7 @@ package kafka.consumer import scala.collection._ import org.I0Itec.zkclient.ZkClient -import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging} +import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, Utils} import kafka.common.KafkaException private[kafka] trait TopicCount { @@ -125,7 +125,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient, makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*)) } - def getTopicCountMap = Map(topicFilter.regex -> numStreams) + def getTopicCountMap = Map(Utils.JSONEscapeString(topicFilter.regex) -> numStreams) def pattern: String = { topicFilter match { diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index a89b046..480d5f0 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -540,5 +540,29 @@ object Utils extends Logging { lock.unlock() } } - + + + //JSON strings need to be escaped based on ECMA-404 standard http://json.org + def JSONEscapeString (s : String) : String = { + s.map { + case '"' => "\\\"" + case '\\' => "\\\\" + case '/' => "\\/" + case '\b' => "\\b" + case '\f' => "\\f" + case '\n' => "\\n" + case '\r' => "\\r" + case '\t' => "\\t" + /* We'll unicode escape any control characters. These include: + * 0x0 -> 0x1f : ASCII Control (C0 Control Codes) + * 0x7f : ASCII DELETE + * 0x80 -> 0x9f : C1 Control Codes + * + * Per RFC4627, section 2.5, we're not technically required to + * encode the C1 codes, but we do to be safe. + */ + case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f')) => "\\u%04x".format(c: Int) + case c => c + }.mkString + } } diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala index cf2724b..4313756 100644 --- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala @@ -44,4 +44,25 @@ class TopicFilterTest extends JUnitSuite { def testBlacklists() { val topicFilter1 = new Blacklist("black1") } + + @Test + def testWildcardTopicCountGetTopicCountMapEscapeJson() { + def getTopicCountMapKey(regex: String): String = { + val topicCount = new WildcardTopicCount(null, "consumerId", new Whitelist(regex), 1) + topicCount.getTopicCountMap.head._1 + } + //lets make sure that the JSON strings are escaping as we expect + assertEquals("-\\\"-", getTopicCountMapKey("-\"-")) + assertEquals("-\\\\-", getTopicCountMapKey("-\\-")) + assertEquals("-\\/-", getTopicCountMapKey("-/-")) + assertEquals("-\\\\b-", getTopicCountMapKey("-\\b-")) + assertEquals("-\\\\f-", getTopicCountMapKey("-\\f-")) + assertEquals("-\\\\n-", getTopicCountMapKey("-\\n-")) + assertEquals("-\\\\r-", getTopicCountMapKey("-\\r-")) + assertEquals("-\\\\t-", getTopicCountMapKey("-\\t-")) + assertEquals("-\\\\u0000-", getTopicCountMapKey("-\\u0000-")) + assertEquals("-\\\\u001f-", getTopicCountMapKey("-\\u001f-")) + assertEquals("-\\\\u007f-", getTopicCountMapKey("-\\u007f-")) + assertEquals("-\\\\u009f-", getTopicCountMapKey("-\\u009f-")) + } } \ No newline at end of file