diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index e332633..7a8a125 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -19,7 +19,7 @@ package kafka.consumer import scala.collection._ import org.I0Itec.zkclient.ZkClient -import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging} +import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, Utils} import kafka.common.KafkaException private[kafka] trait TopicCount { @@ -125,7 +125,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient, makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*)) } - def getTopicCountMap = Map(topicFilter.regex -> numStreams) + def getTopicCountMap = Map(Utils.quoteJsonLiteral(topicFilter.regex) -> numStreams) def pattern: String = { topicFilter match { diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index a89b046..5f50f55 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -445,6 +445,28 @@ object Utils extends Logging { */ def nullOrEmpty(s: String): Boolean = s == null || s.equals("") + def quoteJsonLiteral (s : String) : String = + s.map { + case '"' => "\\\"" + case '\\' => "\\\\" + case '/' => "\\/" + case '\b' => "\\b" + case '\f' => "\\f" + case '\n' => "\\n" + case '\r' => "\\r" + case '\t' => "\\t" + /* We'll unicode escape any control characters. These include: + * 0x0 -> 0x1f : ASCII Control (C0 Control Codes) + * 0x7f : ASCII DELETE + * 0x80 -> 0x9f : C1 Control Codes + * + * Per RFC4627, section 2.5, we're not technically required to + * encode the C1 codes, but we do to be safe. + */ + case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f')) => "\\u%04x".format(c: Int) + case c => c + }.mkString + /** * Create a circular (looping) iterator over a collection. * @param coll An iterable over the underlying collection.