Index: trunk/config/mirror-producer.properties
===================================================================
--- trunk/config/mirror-producer.properties	(revision 0)
+++ trunk/config/mirror-producer.properties	(revision 0)
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+producer.type=async
+
+# since this producer is asynchronous, we need to configure it to block on a
+# full send queue, or we can lose messages
+
+queue.enqueueTimeout.ms=-1
+
+broker.list=0:localhost:9095
+
Index: trunk/config/mirror-zookeeper.properties
===================================================================
--- trunk/config/mirror-zookeeper.properties	(revision 0)
+++ trunk/config/mirror-zookeeper.properties	(revision 0)
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/mirror-zookeeper
+# the port at which the clients will connect
+clientPort=2171
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
Index: trunk/config/mirror-server.properties
===================================================================
--- trunk/config/mirror-server.properties	(revision 0)
+++ trunk/config/mirror-server.properties	(revision 0)
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+brokerid=0
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
+# may not be what you want.
+#hostname=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9095
+
+# The number of processor threads the socket server uses for receiving and answering requests. 
+# Defaults to the number of cores on the machine
+num.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+max.socket.request.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/mirror-kafka-logs
+
+# The number of logical partitions per topic per server. More partitions allow greater parallelism
+# for consumption, but also mean more files.
+num.partitions=1
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+#    3. Throughput: The flush is generally the most expensive operation. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.default.flush.interval.ms=1000
+
+# Per-topic overrides for log.default.flush.interval.ms
+#topic.flush.intervals.ms=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.default.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.size.
+#log.retention.size=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.file.size=536870912
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zk.connect=localhost:2171
+
+# Timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
Index: trunk/config/mirror-consumer.properties
===================================================================
--- trunk/config/mirror-consumer.properties	(revision 0)
+++ trunk/config/mirror-consumer.properties	(revision 0)
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# zk connection string (should point to the source cluster that you want to
+# mirror)
+zk.connect=127.0.0.1:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# mirror cluster's consumer group id
+groupid=KafkaMirror
+
+# consumer timeout should be -1 (default)
+consumer.timeout.ms=-1
+
+# By default, all topics on the source Kafka cluster will be discovered and
+# mirrored. If you wish to include or exclude specific topics, you can use
+# (only one) of the following options:
+
+# mirror.topics.whitelist=GoodTopic1,GoodTopic2
+# mirror.topics.blacklist=UnwantedTopic1
+
Index: site/images/mirroring.png
===================================================================
Cannot display: file marked as a binary type.
svn:mime-type = application/octet-stream

Property changes on: site/images/mirroring.png
___________________________________________________________________
Added: svn:mime-type
   + application/octet-stream

Index: site/mirroring.html
===================================================================
--- site/mirroring.html	(revision 0)
+++ site/mirroring.html	(revision 0)
@@ -0,0 +1,159 @@
+
+<!--#include virtual="includes/header.html" -->
+
+<h2>Kafka mirroring</h2>
+
+<p>
+Kafka's mirroring feature makes it possible to maintain a replica of an
+existing Kafka cluster. This is generally a good practice in production
+environments, as it increases the durability and availability of data in case
+the primary Kafka cluster fails. It can also improve scalability in asymmetric
+producer-consumer configurations - i.e., if you have a very large number of
+consumers, it is preferrable to have these clients consume from a mirror as
+opposed to the primary Kafka cluster.
+</p>
+
+<img src="images/mirroring.png">
+
+<p>
+The Kafka mirror cluster uses an embedded Kafka consumer to consume messages
+from a source cluster, and re-publishes those messages to the local cluster
+using an embedded Kafka producer.
+</p>
+
+<h2>How to set up a mirror</h2>
+
+<p>
+Setting up a mirror cluster is easy - simply provide the embedded consumer's
+configuration and the embedded producer's configuration, in addition to the
+server configuration configuration when you start up the Kafka brokers in the
+mirror cluster.  You need to point the consumer to the source cluster's
+ZooKeeper, and the producer to the mirror cluster's ZooKeeper. By default, the
+mirror cluster will mirror all topics present on the source cluster. You can
+instead configure a whitelist or blacklist as described in the next section.
+</p>
+
+<p>
+The following demo uses the sample configurations provided for the Kafka
+<a href="quickstart.html">quick-start</a>:
+</p>
+
+<ul>
+    <li>Start the primary cluster</li>
+    <pre>
+    <b>bin/zookeeper-server-start.sh config/zookeeper.properties</b>
+    <b>bin/kafka-server-start.sh config/server.properties</b>
+    </pre>
+    <li>Start the mirror cluster</li>
+    <pre>
+    <b>bin/zookeeper-server-start.sh config/mirror-zookeeper.properties</b>
+    <b>JMX_PORT=8888 bin/kafka-server-start.sh config/mirror-server.properties config/mirror-consumer.properties config/mirror-producer.properties</b>
+    </pre>
+    <li>If you now send messages to topics on the source cluster, the mirror
+    cluster will eventually mirror those topics.</li>
+</ul>
+
+
+<h2>Important configuration parameters for a mirror</h2>
+
+<h3>Embedded producer timeout</h3>
+
+<p>
+In order to sustain a higher throughput, you would typically use an
+asynchronous embedded producer and it should be configured to be in blocking
+mode (i.e., <i>queue.enqueueTimeout.ms=-1</i>). This recommendation is to
+ensure that messages will not be lost. Otherwise, the default enqueue timeout
+of the asynchronous producer is zero which means if the producer's internal
+queue is full, then messages will be dropped due to <a
+    href="http://incubator.apache.org/kafka/api-docs/0.6/kafka/producer/async/QueueFullException.html">QueueFullException</a>s.
+A blocking producer however, will wait if the queue is full, and effectively
+throttle back the embedded consumer's consumption rate. You can enable trace
+logging in the producer to observe the remaining queue size over time. If the
+producer's queue is consistently full, it indicates that the mirror cluster is
+bottle-necked on re-publishing messages to the local (mirror) cluster and/or
+flushing messages to disk.
+</p>
+
+<h3>Embedded consumer whitelist or blacklist</h3>
+
+<p>
+If you do not wish to have full mirroring, you may use either the
+whitelist(<i>mirror.topics.whitelist</i>) configuration option to specify which
+topics to include, or the blacklist (<i>mirror.topics.blacklist</i>)
+configuration option to specify which topics to exclude. (It is invalid to
+specify both a whitelist and a blacklist.) These options accept a
+comma-separated list of topics.
+</p>
+
+<h3>Embedded consumer and source cluster socket buffer sizes</h3>
+
+<p>
+Mirroring is often used in cross-DC scenarios, and there are a few
+configuration options that you may want to tune to help deal with inter-DC
+communication latencies and performance bottlenecks on your specific hardware.
+In general, you should set a high value for the socket buffer size on the
+mirror cluster's consumer configuration (<i>socket.buffersize</i>) and the
+source cluster's broker configuration (<i>socket.send.buffer</i>). Also, the
+embedded consumer's fetch size (<i>fetch.size</i>) should be higher than the
+consumer's socket buffer size. Note that the socket buffer size configurations
+are a hint to the underlying platform's networking code. If you enable trace
+logging, you can check the actual receive buffer size and determine whether the
+setting in the OS networking layer also needs to be adjusted.
+</p>
+
+<h2>How to check whether a mirror is keeping up</h2>
+
+<p>
+The consumer offset checker tool is useful to gauge how well your mirror is
+keeping up with the source cluster. For example, the following was executed during the above
+demo:
+</p>
+
+
+<pre>
+<b>bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group KafkaMirror --zkconnect localhost:2181 --topic test-topic</b>
+KafkaMirror,topic1,0-0 (Group,Topic,BrokerId-PartitionId)
+            Owner = KafkaMirror_jkoshy-ld-1320972386342-beb4bfc9-0
+  Consumer offset = 561154288
+                  = 561,154,288 (0.52G)
+         Log size = 2231392259
+                  = 2,231,392,259 (2.08G)
+     Consumer lag = 1670237971
+                  = 1,670,237,971 (1.56G)
+
+BROKER INFO
+0 -> 127.0.0.1:9092
+</pre>
+
+<p>
+Note that the <i>--zkconnect</i> argument should point to the source cluster's
+ZooKeeper. Also, if the topic is not specified, then the tool prints
+information for all topics under the given consumer group.
+</p>
+
+<h2>Current limitations</h2>
+
+<p>
+There are few limitations with the current implementation of mirroring:
+</p>
+
+<ul>
+
+    <li>The embedded producer uses the default (random) partitioner. So if you
+    use a <a
+        href="http://incubator.apache.org/kafka/api-docs/0.6/kafka/producer/Partitioner.html">custom
+        partitioner</a> in your source cluster, the mirror cluster is not
+    capable of using the same partitioning scheme.</li>
+
+    <li> The embedded consumer instantiates the same number of message streams
+    for each topic according to the <i>mirror.consumer.numthreads</i>
+    configuration option.  There is no per-topic override at the moment.</li>
+
+    <li> The whitelist and blacklist configuration options do not accept
+    regular expressions and only one of these options can be used in a given
+    mirror setup.  </li>
+
+</ul>
+
+<!--#include virtual="includes/footer.html" -->
+
