diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index a3eb53e..7698260 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -19,7 +19,7 @@ package kafka.consumer import scala.collection._ import org.I0Itec.zkclient.ZkClient -import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging} +import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging, Utils} import kafka.common.KafkaException private[kafka] trait TopicCount { @@ -142,7 +142,7 @@ private[kafka] class WildcardTopicCount(zkClient: ZkClient, makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*)) } - def dbString = "{ \"%s\" : %d }".format(topicFilter.regex, numStreams) + def dbString = "{ \"%s\" : %d }".format(Utils.quoteJsonLiteral(topicFilter.regex), numStreams) def pattern: String = { topicFilter match { diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 60a019c..5eae721 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -476,6 +476,28 @@ object Utils extends Logging { builder.toString } + def quoteJsonLiteral (s : String) : String = + s.map { + case '"' => "\\\"" + case '\\' => "\\\\" + case '/' => "\\/" + case '\b' => "\\b" + case '\f' => "\\f" + case '\n' => "\\n" + case '\r' => "\\r" + case '\t' => "\\t" + /* We'll unicode escape any control characters. These include: + * 0x0 -> 0x1f : ASCII Control (C0 Control Codes) + * 0x7f : ASCII DELETE + * 0x80 -> 0x9f : C1 Control Codes + * + * Per RFC4627, section 2.5, we're not technically required to + * encode the C1 codes, but we do to be safe. + */ + case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f')) => "\\u%04x".format(c: Int) + case c => c + }.mkString + /** * Format a Map[String, String] as JSON object. */