Property changes on: . ___________________________________________________________________ Modified: svn:mergeinfo Merged /incubator/kafka/trunk:r1239903-1342339 Index: system_test/embedded_consumer/config/whitelisttest.consumer.properties =================================================================== --- system_test/embedded_consumer/config/whitelisttest.consumer.properties (revision 1342312) +++ system_test/embedded_consumer/config/whitelisttest.consumer.properties (working copy) @@ -1,29 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# see kafka.consumer.ConsumerConfig for more details - -# zk connection string -# comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -zk.connect=localhost:2181 - -# timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 - -#consumer group id -groupid=group1 - -mirror.topics.whitelist=test01 - Index: system_test/embedded_consumer/config/server_source1.properties =================================================================== --- system_test/embedded_consumer/config/server_source1.properties (revision 1342312) +++ system_test/embedded_consumer/config/server_source1.properties (working copy) @@ -1,76 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# see kafka.server.KafkaConfig for additional details and defaults - -# the id of the broker -brokerid=1 - -# hostname of broker. If not set, will pick up from the value returned -# from getLocalHost. If there are multiple interfaces getLocalHost -# may not be what you want. -# hostname= - -# number of logical partitions on this broker -num.partitions=1 - -# the port the socket server runs on -port=9092 - -# the number of processor threads the socket server uses. Defaults to the number of cores on the machine -num.threads=8 - -# the directory in which to store log files -log.dir=/tmp/kafka-source1-logs - -# the send buffer used by the socket server -socket.send.buffer=1048576 - -# the receive buffer used by the socket server -socket.receive.buffer=1048576 - -# the maximum size of a log segment -log.file.size=10000000 - -# the interval between running cleanup on the logs -log.cleanup.interval.mins=1 - -# the minimum age of a log file to eligible for deletion -log.retention.hours=168 - -#the number of messages to accept without flushing the log to disk -log.flush.interval=600 - -#set the following properties to use zookeeper - -# enable connecting to zookeeper -enable.zookeeper=true - -# zk connection string -# comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -zk.connect=localhost:2181 - -# timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 - -# time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 - -# default time based flush interval in ms -log.default.flush.interval.ms=1000 - -# time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 - Index: system_test/embedded_consumer/config/server_source2.properties =================================================================== --- system_test/embedded_consumer/config/server_source2.properties (revision 1342312) +++ system_test/embedded_consumer/config/server_source2.properties (working copy) @@ -1,76 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# see kafka.server.KafkaConfig for additional details and defaults - -# the id of the broker -brokerid=2 - -# hostname of broker. If not set, will pick up from the value returned -# from getLocalHost. If there are multiple interfaces getLocalHost -# may not be what you want. -# hostname= - -# number of logical partitions on this broker -num.partitions=1 - -# the port the socket server runs on -port=9091 - -# the number of processor threads the socket server uses. Defaults to the number of cores on the machine -num.threads=8 - -# the directory in which to store log files -log.dir=/tmp/kafka-source2-logs - -# the send buffer used by the socket server -socket.send.buffer=1048576 - -# the receive buffer used by the socket server -socket.receive.buffer=1048576 - -# the maximum size of a log segment -log.file.size=536870912 - -# the interval between running cleanup on the logs -log.cleanup.interval.mins=1 - -# the minimum age of a log file to eligible for deletion -log.retention.hours=168 - -#the number of messages to accept without flushing the log to disk -log.flush.interval=600 - -#set the following properties to use zookeeper - -# enable connecting to zookeeper -enable.zookeeper=true - -# zk connection string -# comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -zk.connect=localhost:2181 - -# timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 - -# time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 - -# default time based flush interval in ms -log.default.flush.interval.ms=1000 - -# time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 - Index: system_test/embedded_consumer/config/server_source3.properties =================================================================== --- system_test/embedded_consumer/config/server_source3.properties (revision 1342312) +++ system_test/embedded_consumer/config/server_source3.properties (working copy) @@ -1,76 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# see kafka.server.KafkaConfig for additional details and defaults - -# the id of the broker -brokerid=3 - -# hostname of broker. If not set, will pick up from the value returned -# from getLocalHost. If there are multiple interfaces getLocalHost -# may not be what you want. -# hostname= - -# number of logical partitions on this broker -num.partitions=1 - -# the port the socket server runs on -port=9090 - -# the number of processor threads the socket server uses. Defaults to the number of cores on the machine -num.threads=8 - -# the directory in which to store log files -log.dir=/tmp/kafka-source3-logs - -# the send buffer used by the socket server -socket.send.buffer=1048576 - -# the receive buffer used by the socket server -socket.receive.buffer=1048576 - -# the maximum size of a log segment -log.file.size=536870912 - -# the interval between running cleanup on the logs -log.cleanup.interval.mins=1 - -# the minimum age of a log file to eligible for deletion -log.retention.hours=168 - -#the number of messages to accept without flushing the log to disk -log.flush.interval=600 - -#set the following properties to use zookeeper - -# enable connecting to zookeeper -enable.zookeeper=true - -# zk connection string -# comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -zk.connect=localhost:2181 - -# timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 - -# time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 - -# default time based flush interval in ms -log.default.flush.interval.ms=1000 - -# time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 - Index: system_test/embedded_consumer/config/zookeeper_target.properties =================================================================== --- system_test/embedded_consumer/config/zookeeper_target.properties (revision 1342312) +++ system_test/embedded_consumer/config/zookeeper_target.properties (working copy) @@ -1,18 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# the directory where the snapshot is stored. -dataDir=/tmp/zookeeper_target -# the port at which the clients will connect -clientPort=2182 Index: system_test/embedded_consumer/config/consumer.properties =================================================================== --- system_test/embedded_consumer/config/consumer.properties (revision 1342312) +++ system_test/embedded_consumer/config/consumer.properties (working copy) @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. Index: system_test/embedded_consumer/config/mirror_producer.properties =================================================================== --- system_test/embedded_consumer/config/mirror_producer.properties (revision 1342312) +++ system_test/embedded_consumer/config/mirror_producer.properties (working copy) @@ -1,27 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# zk connection string -# comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -zk.connect=localhost:2182 - -# timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 - -producer.type=async - -# to avoid dropping events if the queue is full, wait indefinitely -queue.enqueueTimeout.ms=-1 - Index: system_test/embedded_consumer/config/server_target1.properties =================================================================== --- system_test/embedded_consumer/config/server_target1.properties (revision 1342312) +++ system_test/embedded_consumer/config/server_target1.properties (working copy) @@ -1,78 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# see kafka.server.KafkaConfig for additional details and defaults - -# the id of the broker -brokerid=1 - -# hostname of broker. If not set, will pick up from the value returned -# from getLocalHost. If there are multiple interfaces getLocalHost -# may not be what you want. -# hostname= - -# number of logical partitions on this broker -num.partitions=1 - -# the port the socket server runs on -port=9093 - -# the number of processor threads the socket server uses. Defaults to the number of cores on the machine -num.threads=8 - -# the directory in which to store log files -log.dir=/tmp/kafka-target1-logs - -# the send buffer used by the socket server -socket.send.buffer=1048576 - -# the receive buffer used by the socket server -socket.receive.buffer=1048576 - -# the maximum size of a log segment -log.file.size=536870912 - -# the interval between running cleanup on the logs -log.cleanup.interval.mins=1 - -# the minimum age of a log file to eligible for deletion -log.retention.hours=168 - -#the number of messages to accept without flushing the log to disk -log.flush.interval=600 - -#set the following properties to use zookeeper - -# enable connecting to zookeeper -enable.zookeeper=true - -# zk connection string -# comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -zk.connect=localhost:2182 - -# timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 - -# time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 - -# default time based flush interval in ms -log.default.flush.interval.ms=1000 - -# time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 - -# topic partition count map -# topic.partition.count.map=topic1:3, topic2:4 Index: system_test/embedded_consumer/config/server_target2.properties =================================================================== --- system_test/embedded_consumer/config/server_target2.properties (revision 1342312) +++ system_test/embedded_consumer/config/server_target2.properties (working copy) @@ -1,78 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# see kafka.server.KafkaConfig for additional details and defaults - -# the id of the broker -brokerid=2 - -# hostname of broker. If not set, will pick up from the value returned -# from getLocalHost. If there are multiple interfaces getLocalHost -# may not be what you want. -# hostname= - -# number of logical partitions on this broker -num.partitions=1 - -# the port the socket server runs on -port=9094 - -# the number of processor threads the socket server uses. Defaults to the number of cores on the machine -num.threads=8 - -# the directory in which to store log files -log.dir=/tmp/kafka-target2-logs - -# the send buffer used by the socket server -socket.send.buffer=1048576 - -# the receive buffer used by the socket server -socket.receive.buffer=1048576 - -# the maximum size of a log segment -log.file.size=536870912 - -# the interval between running cleanup on the logs -log.cleanup.interval.mins=1 - -# the minimum age of a log file to eligible for deletion -log.retention.hours=168 - -#the number of messages to accept without flushing the log to disk -log.flush.interval=600 - -#set the following properties to use zookeeper - -# enable connecting to zookeeper -enable.zookeeper=true - -# zk connection string -# comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -zk.connect=localhost:2182 - -# timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 - -# time based topic flush intervals in ms -#topic.flush.intervals.ms=topic:1000 - -# default time based flush interval in ms -log.default.flush.interval.ms=1000 - -# time based topic flasher time rate in ms -log.default.flush.scheduler.interval.ms=1000 - -# topic partition count map -# topic.partition.count.map=topic1:3, topic2:4 Index: system_test/embedded_consumer/config/blacklisttest.consumer.properties =================================================================== --- system_test/embedded_consumer/config/blacklisttest.consumer.properties (revision 1342312) +++ system_test/embedded_consumer/config/blacklisttest.consumer.properties (working copy) @@ -1,29 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# see kafka.consumer.ConsumerConfig for more details - -# zk connection string -# comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -zk.connect=localhost:2181 - -# timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 - -#consumer group id -groupid=group1 - -mirror.topics.blacklist=test02,test03 - Index: system_test/embedded_consumer/config/zookeeper_source.properties =================================================================== --- system_test/embedded_consumer/config/zookeeper_source.properties (revision 1342312) +++ system_test/embedded_consumer/config/zookeeper_source.properties (working copy) @@ -1,18 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# the directory where the snapshot is stored. -dataDir=/tmp/zookeeper_source -# the port at which the clients will connect -clientPort=2181 Index: system_test/embedded_consumer/expected.out =================================================================== --- system_test/embedded_consumer/expected.out (revision 1342312) +++ system_test/embedded_consumer/expected.out (working copy) @@ -1,11 +0,0 @@ -start the servers ... -start producing messages ... -Total Num Messages: 10000000 bytes: 1994374785 in 106.076 secs -Messages/sec: 94272.0314 -MB/sec: 17.9304 -[2011-05-02 11:50:29,022] INFO Disconnecting from localhost:9092 (kafka.producer.SyncProducer) -wait for consumer to finish consuming ... -test passed -bin/../../../bin/kafka-server-start.sh: line 11: 359 Terminated $(dirname $0)/kafka-run-class.sh kafka.Kafka $@ -bin/../../../bin/zookeeper-server-start.sh: line 9: 357 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@ -bin/../../../bin/zookeeper-server-start.sh: line 9: 358 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@ Index: system_test/embedded_consumer/bin/expected.out =================================================================== --- system_test/embedded_consumer/bin/expected.out (revision 1342312) +++ system_test/embedded_consumer/bin/expected.out (working copy) @@ -1,18 +0,0 @@ -start the servers ... -start producing messages ... -wait for consumer to finish consuming ... -[2011-05-17 14:49:11,605] INFO Creating async producer for broker id = 2 at localhost:9091 (kafka.producer.ProducerPool) -[2011-05-17 14:49:11,606] INFO Creating async producer for broker id = 1 at localhost:9092 (kafka.producer.ProducerPool) -[2011-05-17 14:49:11,607] INFO Creating async producer for broker id = 3 at localhost:9090 (kafka.producer.ProducerPool) -thread 0: 400000 messages sent 3514012.1233 nMsg/sec 3.3453 MBs/sec -[2011-05-17 14:49:34,382] INFO Closing all async producers (kafka.producer.ProducerPool) -[2011-05-17 14:49:34,383] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer) -[2011-05-17 14:49:34,384] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer) -[2011-05-17 14:49:34,385] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer) -Total Num Messages: 400000 bytes: 79859641 in 22.93 secs -Messages/sec: 17444.3960 -MB/sec: 3.3214 -test passed -stopping the servers -bin/../../../bin/zookeeper-server-start.sh: line 9: 22584 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@ -bin/../../../bin/zookeeper-server-start.sh: line 9: 22585 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@ Index: system_test/embedded_consumer/bin/run-test.sh =================================================================== --- system_test/embedded_consumer/bin/run-test.sh (revision 1342312) +++ system_test/embedded_consumer/bin/run-test.sh (working copy) @@ -1,328 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -readonly num_messages=400000 -readonly message_size=400 -readonly action_on_fail="proceed" - -readonly test_start_time="$(date +%s)" - -readonly base_dir=$(dirname $0)/.. - -info() { - echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*" -} - -kill_child_processes() { - isTopmost=$1 - curPid=$2 - childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}') - for childPid in $childPids - do - kill_child_processes 0 $childPid - done - if [ $isTopmost -eq 0 ]; then - kill -15 $curPid 2> /dev/null - fi -} - -cleanup() { - info "cleaning up" - - pid_zk_source= - pid_zk_target= - pid_kafka_source1= - pid_kafka_source2= - pid_kafka_source3= - pid_kafka_target1= - pid_kafka_target2= - pid_producer= - - rm -rf /tmp/zookeeper_source - rm -rf /tmp/zookeeper_target - - rm -rf /tmp/kafka-source{1..3}-logs - # mkdir -p /tmp/kafka-source{1..3}-logs/test0{1..3}-0 - # touch /tmp/kafka-source{1..3}-logs/test0{1..3}-0/00000000000000000000.kafka - - rm -rf /tmp/kafka-target{1..2}-logs -} - -begin_timer() { - t_begin=$(date +%s) -} - -end_timer() { - t_end=$(date +%s) -} - -start_zk() { - info "starting zookeepers" - $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source.properties 2>&1 > $base_dir/zookeeper_source.log & - pid_zk_source=$! - $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log & - pid_zk_target=$! -} - -start_source_servers() { - info "starting source cluster" - $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source1.properties 2>&1 > $base_dir/kafka_source1.log & - pid_kafka_source1=$! - $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source2.properties 2>&1 > $base_dir/kafka_source2.log & - pid_kafka_source2=$! - $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source3.properties 2>&1 > $base_dir/kafka_source3.log & - pid_kafka_source3=$! -} - -start_target_servers_for_whitelist_test() { - echo "starting mirror cluster" - $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/whitelisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target1.log & - pid_kafka_target1=$! - $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/whitelisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target2.log & - pid_kafka_target2=$! -} - -start_target_servers_for_blacklist_test() { - echo "starting mirror cluster" - $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/blacklisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target1.log & - pid_kafka_target1=$! - $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/blacklisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target2.log & - pid_kafka_target2=$! -} - -shutdown_servers() { - info "stopping producer" - if [ "x${pid_producer}" != "x" ]; then kill_child_processes 0 ${pid_producer}; fi - - info "shutting down target servers" - if [ "x${pid_kafka_target1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target1}; fi - if [ "x${pid_kafka_target2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target2}; fi - sleep 2 - - info "shutting down source servers" - if [ "x${pid_kafka_source1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source1}; fi - if [ "x${pid_kafka_source2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source2}; fi - if [ "x${pid_kafka_source3}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source3}; fi - - info "shutting down zookeeper servers" - if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi - if [ "x${pid_zk_source}" != "x" ]; then kill_child_processes 0 ${pid_zk_source}; fi -} - -start_producer() { - topic=$1 - info "start producing messages for topic $topic ..." - $base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log & - pid_producer=$! -} - -# In case the consumer does not consume, the test may exit prematurely (i.e., -# shut down the kafka brokers, and ProducerPerformance will start throwing ugly -# exceptions. So, wait for the producer to finish before shutting down. If it -# takes too long, the user can just hit Ctrl-c which is trapped to kill child -# processes. -# Usage: wait_partition_done ([kafka-server] [topic] [partition-id])+ -wait_partition_done() { - n_tuples=$(($# / 3)) - - i=1 - while (($#)); do - kafka_server[i]=$1 - topic[i]=$2 - partitionid[i]=$3 - prev_offset[i]=0 - info "\twaiting for partition on server ${kafka_server[i]}, topic ${topic[i]}, partition ${partitionid[i]}" - i=$((i+1)) - shift 3 - done - - all_done=0 - - # set -x - while [[ $all_done != 1 ]]; do - sleep 4 - i=$n_tuples - all_done=1 - for ((i=1; i <= $n_tuples; i++)); do - cur_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server ${kafka_server[i]} --topic ${topic[i]} --partition ${partitionid[i]} --time -1 --offsets 1 | tail -1) - if [ "x$cur_size" != "x${prev_offset[i]}" ]; then - all_done=0 - prev_offset[i]=$cur_size - fi - done - done - -} - -cmp_logs() { - topic=$1 - info "comparing source and target logs for topic $topic" - source_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) - source_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) - source_part2_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) - target_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) - target_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1) - if [ "x$target_part0_size" == "x" ]; then target_part0_size=0; fi - if [ "x$target_part1_size" == "x" ]; then target_part1_size=0; fi - expected_size=$(($source_part0_size + $source_part1_size + $source_part2_size)) - actual_size=$(($target_part0_size + $target_part1_size)) - if [ "x$expected_size" != "x$actual_size" ] - then - info "source size: $expected_size target size: $actual_size" - return 1 - else - return 0 - fi -} - -take_fail_snapshot() { - snapshot_dir="$base_dir/failed-${snapshot_prefix}-${test_start_time}" - mkdir $snapshot_dir - for dir in /tmp/zookeeper_source /tmp/zookeeper_target /tmp/kafka-source{1..3}-logs /tmp/kafka-target{1..2}-logs; do - if [ -d $dir ]; then - cp -r $dir $snapshot_dir - fi - done -} - -# Usage: process_test_result -# result: last test result -# action_on_fail: (exit|wait|proceed) -# ("wait" is useful if you want to troubleshoot using zookeeper) -process_test_result() { - result=$1 - if [ $1 -eq 0 ]; then - info "test passed" - else - info "test failed" - case "$2" in - "wait") info "waiting: hit Ctrl-c to quit" - wait - ;; - "exit") shutdown_servers - take_fail_snapshot - exit $result - ;; - *) shutdown_servers - take_fail_snapshot - info "proceeding" - ;; - esac - fi -} - -test_whitelists() { - info "### Testing whitelists" - snapshot_prefix="whitelist-test" - - cleanup - start_zk - start_source_servers - start_target_servers_for_whitelist_test - sleep 4 - - begin_timer - - start_producer test01 - info "waiting for producer to finish producing ..." - wait_partition_done kafka://localhost:9090 test01 0 kafka://localhost:9091 test01 0 kafka://localhost:9092 test01 0 - - info "waiting for consumer to finish consuming ..." - wait_partition_done kafka://localhost:9093 test01 0 kafka://localhost:9094 test01 0 - - end_timer - info "embedded consumer took $((t_end - t_begin)) seconds" - - sleep 2 - - cmp_logs test01 - result=$? - - return $result -} - -test_blacklists() { - info "### Testing blacklists" - snapshot_prefix="blacklist-test" - cleanup - start_zk - start_source_servers - start_target_servers_for_blacklist_test - sleep 4 - - start_producer test02 - info "waiting for producer to finish producing test02 ..." - wait_partition_done kafka://localhost:9090 test02 0 kafka://localhost:9091 test02 0 kafka://localhost:9092 test02 0 - - # start_producer test03 - # info "waiting for producer to finish producing test03 ..." - # wait_partition_done kafka://localhost:9090 test03 0 kafka://localhost:9091 test03 0 kafka://localhost:9092 test03 0 - - begin_timer - - start_producer test01 - info "waiting for producer to finish producing ..." - wait_partition_done kafka://localhost:9090 test01 0 kafka://localhost:9091 test01 0 kafka://localhost:9092 test01 0 - - info "waiting for consumer to finish consuming ..." - wait_partition_done kafka://localhost:9093 test01 0 kafka://localhost:9094 test01 0 - - end_timer - - info "embedded consumer took $((t_end - t_begin)) seconds" - - sleep 2 - - cmp_logs test02 - result1=$? - # cmp_logs test03 - # result2=$? - # if [[ "x$result1" == "x0" || "x$result2" == "x0" ]]; then - if [[ "x$result1" == "x0" ]]; then - result=1 - else - cmp_logs test01 - result=$? - fi - - return $result -} - -# main test begins - -echo "Test-$test_start_time" - -# Ctrl-c trap. Catches INT signal -trap "shutdown_servers; exit 0" INT - -test_whitelists -result=$? - -process_test_result $result $action_on_fail - -shutdown_servers - -sleep 2 - -test_blacklists -result=$? - -process_test_result $result $action_on_fail - -shutdown_servers - -exit $result - Index: system_test/embedded_consumer/README =================================================================== --- system_test/embedded_consumer/README (revision 1342312) +++ system_test/embedded_consumer/README (working copy) @@ -1,27 +0,0 @@ -This test replicates messages from 3 kafka brokers to 2 other kafka brokers -using the embedded consumer. At the end, the messages produced at the source -brokers should match that at the target brokers. - -To run this test, do -bin/run-test.sh - -The expected output is given in bin/expected.out. There is only 1 thing that's -important. -1. The output should have a line "test passed". - -In the event of failure, by default the brokers and zookeepers remain running -to make it easier to debug the issue - hit Ctrl-C to shut them down. You can -change this behavior by setting the action_on_fail flag in the script to "exit" -or "proceed", in which case a snapshot of all the logs and directories is -placed in the test's base directory. - -If you are making any changes that may affect the embedded consumer, it is a -good idea to run the test in a loop. E.g.: - -:>/tmp/embeddedconsumer_test.log -for i in {1..10}; do echo "run $i"; ./bin/run-test.sh 2>1 >> /tmp/embeddedconsumer_test.log; done -tail -F /tmp/embeddedconsumer_test.log - -grep -ic passed /tmp/embeddedconsumer_test.log -grep -ic failed /tmp/embeddedconsumer_test.log - Index: system_test/broker_failure/config/server_target1.properties =================================================================== --- system_test/broker_failure/config/server_target1.properties (revision 1342312) +++ system_test/broker_failure/config/server_target1.properties (working copy) @@ -26,7 +26,7 @@ num.partitions=1 # the port the socket server runs on -port=9093 +port=9081 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine num.threads=8 Index: system_test/broker_failure/config/mirror_producer1.properties =================================================================== --- system_test/broker_failure/config/mirror_producer1.properties (revision 1342312) +++ system_test/broker_failure/config/mirror_producer1.properties (working copy) @@ -15,7 +15,7 @@ # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -broker.list=0:localhost:9093 +broker.list=0:localhost:9081 # timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 Index: system_test/broker_failure/config/server_target2.properties =================================================================== --- system_test/broker_failure/config/server_target2.properties (revision 1342312) +++ system_test/broker_failure/config/server_target2.properties (working copy) @@ -26,7 +26,7 @@ num.partitions=1 # the port the socket server runs on -port=9094 +port=9082 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine num.threads=8 Index: system_test/broker_failure/config/mirror_producer2.properties =================================================================== --- system_test/broker_failure/config/mirror_producer2.properties (revision 1342312) +++ system_test/broker_failure/config/mirror_producer2.properties (working copy) @@ -15,7 +15,7 @@ # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -broker.list=0:localhost:9094 +broker.list=0:localhost:9082 # timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 Index: system_test/broker_failure/config/server_target3.properties =================================================================== --- system_test/broker_failure/config/server_target3.properties (revision 1342312) +++ system_test/broker_failure/config/server_target3.properties (working copy) @@ -26,7 +26,7 @@ num.partitions=1 # the port the socket server runs on -port=9095 +port=9083 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine num.threads=8 Index: system_test/broker_failure/config/mirror_producer3.properties =================================================================== --- system_test/broker_failure/config/mirror_producer3.properties (revision 1342312) +++ system_test/broker_failure/config/mirror_producer3.properties (working copy) @@ -15,7 +15,7 @@ # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -broker.list=0:localhost:9095 +broker.list=0:localhost:9083 # timeout in ms for connecting to zookeeper zk.connectiontimeout.ms=1000000 Index: system_test/broker_failure/config/whitelisttest.consumer.properties =================================================================== --- system_test/broker_failure/config/whitelisttest.consumer.properties (revision 1342312) +++ system_test/broker_failure/config/whitelisttest.consumer.properties (working copy) @@ -25,5 +25,5 @@ #consumer group id groupid=group1 -mirror.topics.whitelist=test01 - +mirror.topics.whitelist=test_1,test_2 +autooffset.reset=smallest Index: system_test/broker_failure/config/server_source1.properties =================================================================== --- system_test/broker_failure/config/server_source1.properties (revision 1342312) +++ system_test/broker_failure/config/server_source1.properties (working copy) @@ -26,7 +26,7 @@ num.partitions=1 # the port the socket server runs on -port=9092 +port=9091 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine num.threads=8 Index: system_test/broker_failure/config/server_source2.properties =================================================================== --- system_test/broker_failure/config/server_source2.properties (revision 1342312) +++ system_test/broker_failure/config/server_source2.properties (working copy) @@ -26,7 +26,7 @@ num.partitions=1 # the port the socket server runs on -port=9091 +port=9092 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine num.threads=8 Index: system_test/broker_failure/config/server_source3.properties =================================================================== --- system_test/broker_failure/config/server_source3.properties (revision 1342312) +++ system_test/broker_failure/config/server_source3.properties (working copy) @@ -26,7 +26,7 @@ num.partitions=1 # the port the socket server runs on -port=9090 +port=9093 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine num.threads=8 Index: system_test/broker_failure/config/server_source4.properties =================================================================== --- system_test/broker_failure/config/server_source4.properties (revision 1342312) +++ system_test/broker_failure/config/server_source4.properties (working copy) @@ -26,7 +26,7 @@ num.partitions=1 # the port the socket server runs on -port=9096 +port=9094 # the number of processor threads the socket server uses. Defaults to the number of cores on the machine num.threads=8 Index: system_test/broker_failure/config/log4j.properties =================================================================== --- system_test/broker_failure/config/log4j.properties (revision 1342312) +++ system_test/broker_failure/config/log4j.properties (working copy) @@ -12,30 +12,74 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=INFO, stdout +log4j.rootLogger=INFO, stdout, kafkaAppender + +# ==================================== +# messages going to kafkaAppender +# ==================================== +log4j.logger.kafka=DEBUG, kafkaAppender +log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, kafkaAppender +log4j.logger.org.apache.zookeeper=INFO, kafkaAppender + +# ==================================== +# messages going to zookeeperAppender +# ==================================== +# (comment out this line to redirect ZK-related messages to kafkaAppender +# to allow reading both Kafka and ZK debugging messages in a single file) +#log4j.logger.org.apache.zookeeper=INFO, zookeeperAppender + +# ==================================== +# stdout +# ==================================== log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n -#log4j.appender.fileAppender=org.apache.log4j.FileAppender -#log4j.appender.fileAppender.File=kafka-request.log -#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout -#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n +# ==================================== +# fileAppender +# ==================================== +log4j.appender.fileAppender=org.apache.log4j.FileAppender +log4j.appender.fileAppender.File=/tmp/kafka_all_request.log +log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.fileAppender.layout.ConversionPattern=[%d] %p %m (%c)%n +# ==================================== +# kafkaAppender +# ==================================== +log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.kafkaAppender.File=/tmp/kafka.log +log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n +log4j.additivity.kafka=true -# Turn on all our debugging info -#log4j.logger.kafka=INFO -log4j.logger.org.I0Itec.zkclient.ZkClient=INFO -log4j.logger.org.apache.zookeeper=INFO -log4j.logger.kafka.consumer=DEBUG -log4j.logger.kafka.server.EmbeddedConsumer$MirroringThread=TRACE -log4j.logger.kafka.server.KafkaRequestHandlers=TRACE +# ==================================== +# zookeeperAppender +# ==================================== +log4j.appender.zookeeperAppender=org.apache.log4j.DailyRollingFileAppender +log4j.appender.zookeeperAppender.File=/tmp/zookeeper.log +log4j.appender.zookeeperAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.zookeeperAppender.layout.ConversionPattern=[%d] %p %m (%c)%n +log4j.additivity.org.apache.zookeeper=false + +# ==================================== +# other available debugging info +# ==================================== +#log4j.logger.kafka.server.EmbeddedConsumer$MirroringThread=TRACE +#log4j.logger.kafka.server.KafkaRequestHandlers=TRACE #log4j.logger.kafka.producer.async.AsyncProducer=TRACE #log4j.logger.kafka.producer.async.ProducerSendThread=TRACE -log4j.logger.kafka.producer.async.DefaultEventHandler=TRACE +#log4j.logger.kafka.producer.async.DefaultEventHandler=TRACE + +log4j.logger.kafka.consumer=DEBUG log4j.logger.kafka.tools.VerifyConsumerRebalance=DEBUG # to print message checksum from ProducerPerformance -log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG +log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG +# to print socket buffer size validated by Kafka broker +log4j.logger.kafka.network.Acceptor=DEBUG + +# to print socket buffer size validated by SimpleConsumer +log4j.logger.kafka.consumer.SimpleConsumer=TRACE + Index: system_test/broker_failure/bin/run-test.sh =================================================================== --- system_test/broker_failure/bin/run-test.sh (revision 1342312) +++ system_test/broker_failure/bin/run-test.sh (working copy) @@ -68,12 +68,14 @@ readonly num_msg_per_batch=500 readonly batches_per_iteration=5 -readonly num_iterations=5 +readonly num_iterations=1 readonly zk_source_port=2181 readonly zk_mirror_port=2182 -readonly topic_1=test01 +readonly topic_prefix=test +readonly max_topic_id=2 +readonly unbalanced_start_id=2 readonly consumer_grp=group1 readonly source_console_consumer_grp=source readonly mirror_console_consumer_grp=mirror @@ -96,10 +98,16 @@ readonly wait_time_after_killing_broker=0 readonly wait_time_after_restarting_broker=5 -background_producer_pid= +readonly producer_4_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093,4:localhost:9094" +readonly producer_3_brokerinfo_str="broker.list=1:localhost:9091,2:localhost:9092,3:localhost:9093" + +background_producer_pid_1= +background_producer_pid_2= + no_bouncing=$# iter=1 +abort_test=false pid_zk_source= pid_zk_target= @@ -177,17 +185,29 @@ verify_consumer_rebalancing() { - info "Verifying consumer rebalancing operation" + info "Verifying consumer rebalancing operation" - $base_dir/bin/kafka-run-class.sh \ - kafka.tools.VerifyConsumerRebalance \ - --zk.connect=localhost:2181 \ - --group $consumer_grp \ - 2>&1 >> $consumer_rebalancing_log + CONSUMER_REBALANCING_RESULT=`$base_dir/bin/kafka-run-class.sh \ + kafka.tools.VerifyConsumerRebalance \ + --zk.connect=localhost:2181 \ + --group $consumer_grp` + echo "$CONSUMER_REBALANCING_RESULT" >> $consumer_rebalancing_log + + REBALANCE_STATUS_LINE=`grep "Rebalance operation" $consumer_rebalancing_log | tail -1` + # info "REBALANCE_STATUS_LINE: $REBALANCE_STATUS_LINE" + REBALANCE_STATUS=`echo $REBALANCE_STATUS_LINE | grep "Rebalance operation successful" || echo -n "Rebalance operation failed"` + info "REBALANCE_STATUS: $REBALANCE_STATUS" + + if [ "${REBALANCE_STATUS}_x" == "Rebalance operation failed_x" ]; then + info "setting abort_test to true due to Rebalance operation failed" + abort_test="true" + fi } wait_for_zero_consumer_lags() { + topic_id=$1 + # no of times to check for zero lagging no_of_zero_to_verify=3 @@ -196,7 +216,7 @@ TOTAL_LAG=0 CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \ --group $consumer_grp --zkconnect localhost:$zk_source_port \ - --topic $topic_1 | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` + --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` for lag in $CONSUMER_LAGS; do @@ -217,6 +237,8 @@ wait_for_zero_source_console_consumer_lags() { + topic_id=$1 + # no of times to check for zero lagging no_of_zero_to_verify=3 @@ -225,7 +247,7 @@ TOTAL_LAG=0 CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \ --group $source_console_consumer_grp --zkconnect localhost:$zk_source_port \ - --topic $topic_1 | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` + --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` for lag in $CONSUMER_LAGS; do @@ -246,6 +268,8 @@ wait_for_zero_mirror_console_consumer_lags() { + topic_id=$1 + # no of times to check for zero lagging no_of_zero_to_verify=3 @@ -254,7 +278,7 @@ TOTAL_LAG=0 CONSUMER_LAGS=`$base_dir/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \ --group $mirror_console_consumer_grp --zkconnect localhost:$zk_mirror_port \ - --topic $topic_1 | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` + --topic ${topic_prefix}_${topic_id} | grep "Consumer lag" | tr -d ' ' | cut -f2 -d '='` for lag in $CONSUMER_LAGS; do @@ -321,6 +345,8 @@ rm -f $console_consumer_source_crc_sorted_log rm -f $console_consumer_mirror_crc_sorted_uniq_log rm -f $console_consumer_source_crc_sorted_uniq_log + + rm -f $consumer_rebalancing_log } start_zk() { @@ -380,41 +406,66 @@ } start_console_consumer_for_source_producer() { - info "starting console consumers for source producer" + topic_id=$1 + + info "starting console consumers for source producer on topic id [$topic_id]" + $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \ --zookeeper localhost:$zk_source_port \ - --topic $topic_1 \ + --topic ${topic_prefix}_${topic_id} \ --group $source_console_consumer_grp \ - --from-beginning \ + --from-beginning --consumer-timeout-ms 5000 \ --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \ - 2>&1 > ${console_consumer_source_log} & - console_consumer_source_pid=$! - - info " -> console consumer source pid: $console_consumer_source_pid" + --property topic=${topic_prefix}_${topic_id} \ + 2>&1 >> ${console_consumer_source_log} } start_console_consumer_for_mirror_producer() { - info "starting console consumers for mirroring producer" + topic_id=$1 + + info "starting console consumers for mirroring producer on topic id [$topic_id]" + $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \ --zookeeper localhost:$zk_mirror_port \ - --topic $topic_1 \ + --topic ${topic_prefix}_${topic_id} \ --group $mirror_console_consumer_grp \ - --from-beginning \ + --from-beginning --consumer-timeout-ms 5000 \ --formatter "kafka.consumer.ConsoleConsumer\$ChecksumMessageFormatter" \ - 2>&1 > ${console_consumer_mirror_log} & - console_consumer_mirror_pid=$! + --property topic=${topic_prefix}_${topic_id} \ + 2>&1 >> ${console_consumer_mirror_log} +} - info " -> console consumer mirror pid: $console_consumer_mirror_pid" +consume_source_producer_messages() { + consumer_counter=1 + while [ $consumer_counter -le $max_topic_id ] + do + start_console_consumer_for_source_producer $consumer_counter + consumer_counter=$(( $consumer_counter + 1 )) + done } +consume_mirror_producer_messages() { + consumer_counter=1 + while [ $consumer_counter -le $max_topic_id ] + do + start_console_consumer_for_mirror_producer $consumer_counter + consumer_counter=$(( $consumer_counter + 1 )) + done +} + shutdown_producer() { info "shutting down producer" - if [ "x${background_producer_pid}" != "x" ]; then - # kill_child_processes 0 ${background_producer_pid}; - kill -TERM ${background_producer_pid} 2> /dev/null; + if [ "x${background_producer_pid_1}" != "x" ]; then + # kill_child_processes 0 ${background_producer_pid_1}; + kill -TERM ${background_producer_pid_1} 2> /dev/null; fi + + if [ "x${background_producer_pid_2}" != "x" ]; then + # kill_child_processes 0 ${background_producer_pid_2}; + kill -TERM ${background_producer_pid_2} 2> /dev/null; + fi } shutdown_servers() { @@ -450,13 +501,15 @@ } start_background_producer() { + bkrinfo_str=$1 + start_topic_id=$2 + end_topic_id=$3 batch_no=0 - curr_iter=0 + topic_id=${start_topic_id} - while [ $num_iterations -gt $curr_iter ] + while [ 'x' == 'x' ] do - topic=$1 sleeptime= get_random_range $sleep_min $sleep_max @@ -464,19 +517,24 @@ batch_no=$(($batch_no + 1)) + if [ $topic_id -gt $end_topic_id ]; then + topic_id=${start_topic_id} + fi + $base_dir/bin/kafka-run-class.sh \ kafka.perf.ProducerPerformance \ - --brokerinfo zk.connect=localhost:2181 \ - --topic $topic \ + --brokerinfo $bkrinfo_str \ + --topic ${topic_prefix}_${topic_id} \ --messages $num_msg_per_batch \ --message-size $message_size \ --batch-size 50 \ --vary-message-size \ --threads 1 \ - --reporting-interval $num_msg_per_batch \ - --async \ + --reporting-interval $num_msg_per_batch --async \ 2>&1 >> $base_dir/producer_performance.log # appending all producers' msgs + topic_id=$(( $topic_id + 1 )) + sleep $sleeptime done } @@ -485,9 +543,9 @@ cmp_result=0 - grep ^checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log - grep ^checksum $console_consumer_mirror_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_mirror_crc_log - grep ^checksum $producer_performance_log | tr -d ' ' | cut -f2 -d ':' > $producer_performance_crc_log + grep checksum $console_consumer_source_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_source_crc_log + grep checksum $console_consumer_mirror_log | tr -d ' ' | cut -f2 -d ':' > $console_consumer_mirror_crc_log + grep checksum $producer_performance_log | tr -d ' ' | cut -f4 -d ':' | cut -f1 -d '(' > $producer_performance_crc_log sort $console_consumer_mirror_crc_log > $console_consumer_mirror_crc_sorted_log sort $console_consumer_source_crc_log > $console_consumer_source_crc_sorted_log @@ -555,6 +613,37 @@ echo "========================================================" >> $checksum_diff_log echo "${duplicate_mirror_crc}" >> $checksum_diff_log + topic_chksum_counter=1 + while [ $topic_chksum_counter -le $max_topic_id ] + do + # get producer topic counts + this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $producer_performance_log` + echo "PRODUCER topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}" + + topic_chksum_counter=$(($topic_chksum_counter + 1)) + done + echo + + topic_chksum_counter=1 + while [ $topic_chksum_counter -le $max_topic_id ] + do + this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_source_log` + echo "SOURCE consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}" + + topic_chksum_counter=$(($topic_chksum_counter + 1)) + done + echo + + topic_chksum_counter=1 + while [ $topic_chksum_counter -le $max_topic_id ] + do + this_chksum_count=`grep -c ${topic_prefix}_${topic_chksum_counter}\- $console_consumer_mirror_log` + echo "MIRROR consumer topic ${topic_prefix}_${topic_chksum_counter} count: ${this_chksum_count}" + + topic_chksum_counter=$(($topic_chksum_counter + 1)) + done + echo + return $cmp_result } @@ -567,15 +656,32 @@ start_target_servers_cluster sleep 2 - start_background_producer $topic_1 & - background_producer_pid=$! + start_background_producer $producer_4_brokerinfo_str 1 $(( $unbalanced_start_id - 1 )) & + background_producer_pid_1=$! info "==========================================" - info "Started background producer pid [${background_producer_pid}]" + info "Started background producer pid [${background_producer_pid_1}]" info "==========================================" - sleep 5 - + sleep 10 + + start_background_producer $producer_3_brokerinfo_str $unbalanced_start_id $max_topic_id & + background_producer_pid_2=$! + + info "==========================================" + info "Started background producer pid [${background_producer_pid_2}]" + info "==========================================" + + sleep 10 + + verify_consumer_rebalancing + + info "abort_test: [${abort_test}]" + if [ "${abort_test}_x" == "true_x" ]; then + info "aborting test" + iter=$((${num_iterations} + 1)) + fi + while [ $num_iterations -ge $iter ] do echo @@ -592,7 +698,6 @@ # even iterations -> bounce target kafka borker get_random_range 1 $num_kafka_target_server idx=$? - if [ "x${kafka_target_pids[$idx]}" != "x" ]; then echo info "#### Bouncing kafka TARGET broker ####" @@ -631,7 +736,15 @@ sleep $wait_time_after_restarting_broker fi fi + verify_consumer_rebalancing + + info "abort_test: [${abort_test}]" + if [ "${abort_test}_x" == "true_x" ]; then + info "aborting test" + iter=$((${num_iterations} + 1)) + fi + else info "No bouncing performed" fi @@ -670,8 +783,8 @@ start_test -start_console_consumer_for_source_producer -start_console_consumer_for_mirror_producer +consume_source_producer_messages +consume_mirror_producer_messages wait_for_zero_source_console_consumer_lags wait_for_zero_mirror_console_consumer_lags Index: core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (revision 1342312) +++ core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (working copy) @@ -19,13 +19,17 @@ import java.util.Properties import java.io.File +import kafka.consumer.SimpleConsumer +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{TestUtils, TestZKUtils,Utils, Logging} +import kafka.zk.EmbeddedZookeeper import junit.framework.Assert._ import kafka.api.FetchRequestBuilder import kafka.consumer.SimpleConsumer +import kafka.api.FetchRequest import kafka.message.Message import kafka.producer.async.MissingConfigException import kafka.serializer.Encoder -import kafka.server.{KafkaConfig, KafkaServer} import kafka.zk.ZooKeeperTestHarness import org.apache.log4j.spi.LoggingEvent import org.apache.log4j.{PropertyConfigurator, Logger} Index: core/src/test/scala/unit/kafka/utils/UtilsTest.scala =================================================================== --- core/src/test/scala/unit/kafka/utils/UtilsTest.scala (revision 1342312) +++ core/src/test/scala/unit/kafka/utils/UtilsTest.scala (working copy) @@ -20,7 +20,9 @@ import org.apache.log4j.Logger import org.scalatest.junit.JUnitSuite import org.junit.Test +import org.junit.Assert._ + class UtilsTest extends JUnitSuite { private val logger = Logger.getLogger(classOf[UtilsTest]) @@ -29,5 +31,24 @@ def testSwallow() { Utils.swallow(logger.info, throw new IllegalStateException("test")) } - + + @Test + def testCircularIterator() { + val l = List(1, 2) + val itl = Utils.circularIterator(l) + assertEquals(1, itl.next()) + assertEquals(2, itl.next()) + assertEquals(1, itl.next()) + assertEquals(2, itl.next()) + assertFalse(itl.hasDefiniteSize) + + val s = Set(1, 2) + val its = Utils.circularIterator(s) + assertEquals(1, its.next()) + assertEquals(2, its.next()) + assertEquals(1, its.next()) + assertEquals(2, its.next()) + assertEquals(1, its.next()) + } + } Index: core/src/test/scala/unit/kafka/log/LogTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log/LogTest.scala (revision 1342312) +++ core/src/test/scala/unit/kafka/log/LogTest.scala (working copy) @@ -182,8 +182,11 @@ assertEquals(curOffset, log.nextAppendOffset) // time goes by; the log file (which is empty) is deleted again - log.markDeletedWhile(_ => true) + val deletedSegments = log.markDeletedWhile(_ => true) + // we shouldn't delete the last empty log segment. + assertTrue(deletedSegments.size == 0) + // we now have a new log assertEquals(curOffset, log.nextAppendOffset) } Index: core/src/test/scala/unit/kafka/log/LogManagerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log/LogManagerTest.scala (revision 1342312) +++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala (working copy) @@ -43,6 +43,7 @@ val props = TestUtils.createBrokerConfig(0, -1) config = new KafkaConfig(props) { override val logFileSize = 1024 + override val flushInterval = 100 } logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false) logManager.startup @@ -86,10 +87,13 @@ offset += set.sizeInBytes } log.flush - // Why this sleep is required ? File system takes some time to update the last modified time for a file. - // TODO: What is unknown is why 1 second or couple 100 milliseconds didn't work ? - Thread.sleep(2000) + assertTrue("There should be more than one segment now.", log.numberOfSegments > 1) + + // update the last modified time of all log segments + val logSegments = log.segments.view + logSegments.foreach(s => s.file.setLastModified(time.currentMs)) + time.currentMs += maxLogAge + 3000 logManager.cleanupLogs() assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments) @@ -114,8 +118,9 @@ Thread.sleep(100) config = new KafkaConfig(props) { override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages - override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Int] // keep exactly 6 segments + 1 roll over + override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] // keep exactly 6 segments + 1 roll over override val logRetentionHours = retentionHours + override val flushInterval = 100 } logManager = new LogManager(config, time, veryLargeLogFlushInterval, retentionMs, false) logManager.startup @@ -182,6 +187,7 @@ config = new KafkaConfig(props) { override val logFileSize = 256 override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2") + override val flushInterval = 100 } logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false) Index: core/src/test/scala/unit/kafka/integration/FetcherTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/FetcherTest.scala (revision 1342312) +++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala (working copy) @@ -56,7 +56,7 @@ super.setUp fetcher = new Fetcher(new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), null) fetcher.stopConnectionsToAllBrokers - fetcher.startConnections(topicInfos, cluster, null) + fetcher.startConnections(topicInfos, cluster) } override def tearDown() { Index: core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (revision 1342312) +++ core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (working copy) @@ -20,10 +20,10 @@ import junit.framework.Assert._ import java.util.Properties -import kafka.api.{FetchRequestBuilder, OffsetRequest} +import kafka.api.{FetchRequestBuilder, OffsetRequest,FetchRequest} import kafka.consumer.SimpleConsumer import kafka.server.KafkaConfig -import kafka.utils.TestUtils +import junit.framework.Assert._ import org.apache.log4j.Logger import org.scalatest.junit.JUnit3Suite Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (revision 1342312) +++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (working copy) @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -17,13 +17,18 @@ package kafka.producer +import junit.framework.Assert +import kafka.server.{KafkaServer, KafkaConfig} +import org.apache.log4j.Logger +import org.scalatest.junit.JUnitSuite +import org.junit.{After, Before, Test} +import kafka.common.MessageSizeTooLargeException import java.util.Properties import junit.framework.Assert import kafka.admin.CreateTopicCommand import kafka.common.{ErrorMapping, MessageSizeTooLargeException} import kafka.integration.KafkaServerTestHarness -import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} -import kafka.server.KafkaConfig +import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, Message, ByteBufferMessageSet} import kafka.utils.{TestZKUtils, SystemTime, TestUtils} import org.junit.Test import org.scalatest.junit.JUnit3Suite @@ -71,7 +76,7 @@ } @Test - def testMessageSizeTooLarge() { + def testSingleMessageSizeTooLarge() { val server = servers.head val props = new Properties() props.put("host", "localhost") @@ -169,3 +174,31 @@ Assert.assertTrue((t2-t1) >= timeoutMs) } } + + @Test + def testCompressedMessageSizeTooLarge() { + val props = new Properties() + props.put("host", "localhost") + props.put("port", server.socketServer.port.toString) + props.put("buffer.size", "102400") + props.put("connect.timeout.ms", "300") + props.put("reconnect.interval", "500") + props.put("max.message.size", "100") + val producer = new SyncProducer(new SyncProducerConfig(props)) + val messages = new Array[Message](10) + import Array.fill + var a = 0 + for( a <- 0 to 9){ + val bytes = fill(20){a.asInstanceOf[Byte]} + messages(a) = new Message(bytes) + } + var failed = false + /** After compression, the compressed message has size 118 **/ + try { + producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = messages: _*)) + }catch { + case e: MessageSizeTooLargeException => failed = true + } + Assert.assertTrue(failed) + } +} Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (revision 1342312) +++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (working copy) @@ -35,6 +35,7 @@ import org.scalatest.junit.JUnit3Suite import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils} import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException, InvalidConfigException, QueueFullException} +import kafka.utils.TestZKUtils class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(1) @@ -68,6 +69,8 @@ props.put("producer.type", "async") props.put("queue.size", "10") props.put("batch.size", "1") + props.put("serializer.class", "kafka.producer.StringSerializer") + val config = new AsyncProducerConfig(props) val config = new ProducerConfig(props) val produceData = getProduceData(12) @@ -91,6 +94,11 @@ props.put("zk.connect", TestZKUtils.zookeeperConnect) props.put("producer.type", "async") props.put("batch.size", "1") + props.put("host", "localhost") + props.put("port", "9092") + props.put("queue.size", "10") + props.put("serializer.class", "kafka.producer.StringSerializer") + val config = new AsyncProducerConfig(props) val config = new ProducerConfig(props) val produceData = getProduceData(10) @@ -130,6 +138,7 @@ val producerSendThread = new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE, 5) producerSendThread.start() + props.put("zk.connect", TestZKUtils.zookeeperConnect) for (producerData <- producerDataList) queue.put(producerData) @@ -181,6 +190,7 @@ val partition2Metadata = new PartitionMetadata(1, Some(broker2), List(broker1, broker2)) val topic1Metadata = new TopicMetadata("topic1", List(partition1Metadata, partition2Metadata)) val topic2Metadata = new TopicMetadata("topic2", List(partition1Metadata, partition2Metadata)) + props.put("zk.connect", TestZKUtils.zookeeperConnect) val intPartitioner = new Partitioner[Int] { def partition(key: Int, numPartitions: Int): Int = key % numPartitions @@ -242,6 +252,7 @@ partitioner = null.asInstanceOf[Partitioner[String]], encoder = new StringEncoder, producerPool) + asyncProducerProps.put("zk.connect", TestZKUtils.zookeeperConnect) val serializedData = handler.serialize(produceData) val decoder = new StringDecoder @@ -283,6 +294,7 @@ def testNoBroker() { val props = new Properties() props.put("zk.connect", zkConnect) + props.put("zk.connect", TestZKUtils.zookeeperConnect) val config = new ProducerConfig(props) // create topic metadata with 0 partitions @@ -371,6 +383,7 @@ props.put("producer.type", "async") props.put("batch.size", "5") props.put("zk.connect", TestZKUtils.zookeeperConnect) + props.put("zk.connect", TestZKUtils.zookeeperConnect) val config = new ProducerConfig(props) Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/ProducerTest.scala (revision 1342312) +++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala (working copy) @@ -18,10 +18,23 @@ package kafka.producer import org.scalatest.junit.JUnit3Suite +import async.AsyncProducer import java.util.Properties import kafka.admin.CreateTopicCommand import kafka.api.FetchRequestBuilder import kafka.common.FailedToSendMessageException +import org.apache.log4j.{Logger, Level} +import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig} +import kafka.zk.EmbeddedZookeeper +import org.junit.{After, Before, Test} +import junit.framework.Assert +import org.easymock.EasyMock +import java.util.concurrent.ConcurrentHashMap +import kafka.cluster.Partition +import org.scalatest.junit.JUnitSuite +import kafka.common.{InvalidConfigException, UnavailableProducerException, InvalidPartitionException} +import kafka.utils.{TestUtils, TestZKUtils, Utils} +import kafka.serializer.{StringEncoder, Encoder} import kafka.consumer.SimpleConsumer import kafka.message.Message import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer} @@ -84,6 +97,304 @@ } @Test + def testSend() { + val props = new Properties() + props.put("partitioner.class", "kafka.producer.StaticPartitioner") + props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("zk.connect", TestZKUtils.zookeeperConnect) + val config = new ProducerConfig(props) + val partitioner = new StaticPartitioner + val serializer = new StringSerializer + + // 2 sync producers + val syncProducers = new ConcurrentHashMap[Int, SyncProducer]() + val syncProducer1 = EasyMock.createMock(classOf[SyncProducer]) + val syncProducer2 = EasyMock.createMock(classOf[SyncProducer]) + // it should send to partition 0 (first partition) on second broker i.e broker2 + syncProducer2.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes))) + EasyMock.expectLastCall + syncProducer1.close + EasyMock.expectLastCall + syncProducer2.close + EasyMock.expectLastCall + EasyMock.replay(syncProducer1) + EasyMock.replay(syncProducer2) + + syncProducers.put(brokerId1, syncProducer1) + syncProducers.put(brokerId2, syncProducer2) + + val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) + val producer = new Producer[String, String](config, partitioner, producerPool, false, null) + + producer.send(new ProducerData[String, String](topic, "test", Array("test1"))) + producer.close + + EasyMock.verify(syncProducer1) + EasyMock.verify(syncProducer2) + } + + @Test + def testSendSingleMessage() { + val props = new Properties() + props.put("serializer.class", "kafka.serializer.StringEncoder") + props.put("broker.list", "0:localhost:9092") + + + val config = new ProducerConfig(props) + val partitioner = new StaticPartitioner + val serializer = new StringSerializer + + // 2 sync producers + val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() + val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) + // it should send to a random partition due to use of broker.list + syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes()))) + EasyMock.expectLastCall + syncProducer1.close + EasyMock.expectLastCall + EasyMock.replay(syncProducer1) + + syncProducers.put(brokerId1, syncProducer1) + + val producerPool = new ProducerPool[String](config, serializer, syncProducers, + new ConcurrentHashMap[Int, AsyncProducer[String]]()) + val producer = new Producer[String, String](config, partitioner, producerPool, false, null) + + producer.send(new ProducerData[String, String](topic, "t")) + producer.close + + EasyMock.verify(syncProducer1) + } + + @Test + def testInvalidPartition() { + val props = new Properties() + props.put("partitioner.class", "kafka.producer.NegativePartitioner") + props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("zk.connect", TestZKUtils.zookeeperConnect) + val config = new ProducerConfig(props) + + val richProducer = new Producer[String, String](config) + try { + richProducer.send(new ProducerData[String, String](topic, "test", Array("test"))) + Assert.fail("Should fail with InvalidPartitionException") + }catch { + case e: InvalidPartitionException => // expected, do nothing + }finally { + richProducer.close() + } + } + + @Test + def testDefaultEncoder() { + val props = new Properties() + props.put("zk.connect", TestZKUtils.zookeeperConnect) + val config = new ProducerConfig(props) + + val stringProducer1 = new Producer[String, String](config) + try { + stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test"))) + fail("Should fail with ClassCastException due to incompatible Encoder") + } catch { + case e: ClassCastException => + }finally { + stringProducer1.close() + } + + props.put("serializer.class", "kafka.serializer.StringEncoder") + val stringProducer2 = new Producer[String, String](new ProducerConfig(props)) + stringProducer2.send(new ProducerData[String, String](topic, "test", Array("test"))) + stringProducer2.close() + + val messageProducer1 = new Producer[String, Message](config) + try { + messageProducer1.send(new ProducerData[String, Message](topic, "test", Array(new Message("test".getBytes)))) + } catch { + case e: ClassCastException => fail("Should not fail with ClassCastException due to default Encoder") + }finally { + messageProducer1.close() + } + } + + @Test + def testSyncProducerPool() { + // 2 sync producers + val syncProducers = new ConcurrentHashMap[Int, SyncProducer]() + val syncProducer1 = EasyMock.createMock(classOf[SyncProducer]) + val syncProducer2 = EasyMock.createMock(classOf[SyncProducer]) + syncProducer1.send("test-topic", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes))) + EasyMock.expectLastCall + syncProducer1.close + EasyMock.expectLastCall + syncProducer2.close + EasyMock.expectLastCall + EasyMock.replay(syncProducer1) + EasyMock.replay(syncProducer2) + + syncProducers.put(brokerId1, syncProducer1) + syncProducers.put(brokerId2, syncProducer2) + + // default for producer.type is "sync" + val props = new Properties() + props.put("partitioner.class", "kafka.producer.NegativePartitioner") + props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("zk.connect", TestZKUtils.zookeeperConnect) + val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, + syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) + producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) + + producerPool.close + EasyMock.verify(syncProducer1) + EasyMock.verify(syncProducer2) + } + + @Test + def testAsyncProducerPool() { + // 2 async producers + val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() + val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) + val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]]) + asyncProducer1.send(topic, "test1", 0) + EasyMock.expectLastCall + asyncProducer1.close + EasyMock.expectLastCall + asyncProducer2.close + EasyMock.expectLastCall + EasyMock.replay(asyncProducer1) + EasyMock.replay(asyncProducer2) + + asyncProducers.put(brokerId1, asyncProducer1) + asyncProducers.put(brokerId2, asyncProducer2) + + // change producer.type to "async" + val props = new Properties() + props.put("partitioner.class", "kafka.producer.NegativePartitioner") + props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("producer.type", "async") + props.put("zk.connect", TestZKUtils.zookeeperConnect) + val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, + new ConcurrentHashMap[Int, SyncProducer](), asyncProducers) + producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) + + producerPool.close + EasyMock.verify(asyncProducer1) + EasyMock.verify(asyncProducer2) + } + + @Test + def testSyncUnavailableProducerException() { + val syncProducers = new ConcurrentHashMap[Int, SyncProducer]() + val syncProducer1 = EasyMock.createMock(classOf[SyncProducer]) + val syncProducer2 = EasyMock.createMock(classOf[SyncProducer]) + syncProducer2.close + EasyMock.expectLastCall + EasyMock.replay(syncProducer1) + EasyMock.replay(syncProducer2) + + syncProducers.put(brokerId2, syncProducer2) + + // default for producer.type is "sync" + val props = new Properties() + props.put("partitioner.class", "kafka.producer.NegativePartitioner") + props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("zk.connect", TestZKUtils.zookeeperConnect) + val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, + syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]()) + try { + producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1"))) + Assert.fail("Should fail with UnavailableProducerException") + }catch { + case e: UnavailableProducerException => // expected + } + + producerPool.close + EasyMock.verify(syncProducer1) + EasyMock.verify(syncProducer2) + } + + @Test + def testAsyncUnavailableProducerException() { + val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() + val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) + val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]]) + asyncProducer2.close + EasyMock.expectLastCall + EasyMock.replay(asyncProducer1) + EasyMock.replay(asyncProducer2) + + asyncProducers.put(brokerId2, asyncProducer2) + + // change producer.type to "async" + val props = new Properties() + props.put("partitioner.class", "kafka.producer.NegativePartitioner") + props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("producer.type", "async") + props.put("zk.connect", TestZKUtils.zookeeperConnect) + val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer, + new ConcurrentHashMap[Int, SyncProducer](), asyncProducers) + try { + producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1"))) + Assert.fail("Should fail with UnavailableProducerException") + }catch { + case e: UnavailableProducerException => // expected + } + + producerPool.close + EasyMock.verify(asyncProducer1) + EasyMock.verify(asyncProducer2) + } + + @Test + def testConfigBrokerPartitionInfoWithPartitioner { + val props = new Properties() + props.put("partitioner.class", "kafka.producer.StaticPartitioner") + props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("producer.type", "async") + props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1 + ":" + 4 + "," + + brokerId2 + ":" + "localhost" + ":" + port2 + ":" + 4) + + var config: ProducerConfig = null + try { + config = new ProducerConfig(props) + fail("should fail with InvalidConfigException due to presence of partitioner.class and broker.list") + }catch { + case e: InvalidConfigException => // expected + } + } + + @Test + def testConfigBrokerPartitionInfo() { + val props = new Properties() + props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("producer.type", "async") + props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1) + + val config = new ProducerConfig(props) + val partitioner = new StaticPartitioner + val serializer = new StringSerializer + + // 2 async producers + val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() + val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) + // it should send to a random partition due to use of broker.list + asyncProducer1.send(topic, "test1", -1) + EasyMock.expectLastCall + asyncProducer1.close + EasyMock.expectLastCall + EasyMock.replay(asyncProducer1) + + asyncProducers.put(brokerId1, asyncProducer1) + + val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers) + val producer = new Producer[String, String](config, partitioner, producerPool, false, null) + + producer.send(new ProducerData[String, String](topic, "test1", Array("test1"))) + producer.close + + EasyMock.verify(asyncProducer1) + } + + @Test def testZKSendToNewTopic() { val props = new Properties() props.put("serializer.class", "kafka.serializer.StringEncoder") @@ -158,6 +469,8 @@ fail("Should fail since no leader exists for the partition.") } catch { case e => // success + }finally { + producer.close } // restart server 1 Index: core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala =================================================================== --- core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (revision 1342312) +++ core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (working copy) @@ -94,6 +94,10 @@ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) //make sure the last offset after iteration is correct assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.getSerialized().limit) + + //make sure shallow iterator is the same as deep iterator + TestUtils.checkEquals[Message](TestUtils.getMessageIterator(messageSet.shallowIterator), + TestUtils.getMessageIterator(messageSet.iterator)) } // test for compressed regular messages @@ -104,6 +108,8 @@ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) //make sure the last offset after iteration is correct assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.getSerialized().limit) + + verifyShallowIterator(messageSet) } // test for mixed empty and non-empty messagesets uncompressed @@ -121,6 +127,10 @@ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) //make sure the last offset after iteration is correct assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit) + + //make sure shallow iterator is the same as deep iterator + TestUtils.checkEquals[Message](TestUtils.getMessageIterator(mixedMessageSet.shallowIterator), + TestUtils.getMessageIterator(mixedMessageSet.iterator)) } // test for mixed empty and non-empty messagesets compressed @@ -138,7 +148,15 @@ TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) //make sure the last offset after iteration is correct assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit) + + verifyShallowIterator(mixedMessageSet) } } + def verifyShallowIterator(messageSet: ByteBufferMessageSet) { + //make sure the offsets returned by a shallow iterator is a subset of that of a deep iterator + val shallowOffsets = messageSet.shallowIterator.map(msgAndOff => msgAndOff.offset).toSet + val deepOffsets = messageSet.iterator.map(msgAndOff => msgAndOff.offset).toSet + assertTrue(shallowOffsets.subsetOf(deepOffsets)) + } } Index: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (revision 1342312) +++ core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (working copy) @@ -17,9 +17,13 @@ package kafka.server import java.io.File +import kafka.utils.Utils +import kafka.api.FetchRequest +import kafka.producer.{SyncProducer, SyncProducerConfig} import kafka.consumer.SimpleConsumer import java.util.Properties import org.junit.Test +import org.scalatest.junit.JUnitSuite import junit.framework.Assert._ import kafka.message.{Message, ByteBufferMessageSet} import org.scalatest.junit.JUnit3Suite Index: core/src/test/scala/unit/kafka/network/SocketServerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/network/SocketServerTest.scala (revision 1342312) +++ core/src/test/scala/unit/kafka/network/SocketServerTest.scala (working copy) @@ -20,7 +20,6 @@ import java.net._ import java.io._ import org.junit._ -import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite import kafka.utils.TestUtils import java.util.Random Index: core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala =================================================================== --- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (revision 1342312) +++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (working copy) @@ -337,7 +337,7 @@ val iterator = messageStream.iterator for (i <- 0 until nMessages * 2) { assertTrue(iterator.hasNext()) - val message = iterator.next() + val message = iterator.next().message receivedMessages ::= message debug("received message: " + message) } @@ -425,14 +425,14 @@ messages.sortWith((s,t) => s.checksum < t.checksum) } - def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream[Message]]]): List[Message]= { + def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaStream[Message]]]): List[Message]= { var messages: List[Message] = Nil for ((topic, messageStreams) <- topicMessageStreams) { for (messageStream <- messageStreams) { val iterator = messageStream.iterator for (i <- 0 until nMessagesPerThread) { assertTrue(iterator.hasNext) - val message = iterator.next + val message = iterator.next.message messages ::= message debug("received message: " + Utils.toString(message.payload, "UTF-8")) } Index: core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala =================================================================== --- core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (revision 1342312) +++ core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (working copy) @@ -22,14 +22,14 @@ import kafka.server._ import org.scalatest.junit.JUnit3Suite import scala.collection.JavaConversions._ -import kafka.consumer.{ConsumerConfig, KafkaMessageStream} import org.apache.log4j.{Level, Logger} import kafka.message._ import kafka.javaapi.producer.{ProducerData, Producer} import kafka.utils.TestUtils._ import kafka.utils.{Utils, Logging, TestUtils} +import kafka.consumer.{KafkaStream, ConsumerConfig} -class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging { +class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging { val zookeeperConnect = zkConnect val numNodes = 2 @@ -93,7 +93,7 @@ messages.sortWith((s,t) => s.checksum < t.checksum) } - def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaMessageStream[Message]]]) + def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[Message]]]) : List[Message]= { var messages: List[Message] = Nil val topicMessageStreams = asMap(jTopicMessageStreams) @@ -102,7 +102,7 @@ val iterator = messageStream.iterator for (i <- 0 until nMessagesPerThread) { assertTrue(iterator.hasNext) - val message = iterator.next + val message = iterator.next.message messages ::= message debug("received message: " + Utils.toString(message.payload, "UTF-8")) } Index: core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala =================================================================== --- core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala (revision 1342312) +++ core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala (working copy) @@ -56,13 +56,13 @@ } } -private class ConsumerThread(stream: KafkaMessageStream[Message]) extends Thread { +private class ConsumerThread(stream: KafkaStream[Message]) extends Thread { val shutdownLatch = new CountDownLatch(1) override def run() { println("Starting consumer thread..") - for (message <- stream) { - println("consumed: " + Utils.toString(message.payload, "UTF-8")) + for (messageAndMetadata <- stream) { + println("consumed: " + Utils.toString(messageAndMetadata.message.payload, "UTF-8")) } shutdownLatch.countDown println("thread shutdown !" ) Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1342312) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -25,6 +25,7 @@ import kafka.common._ import kafka.api.OffsetRequest import java.util._ +import kafka.server.BrokerTopicStat private[kafka] object Log { val FileSuffix = ".kafka" @@ -199,7 +200,7 @@ * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary. * Returns the offset at which the messages are written. */ - def append(messages: MessageSet): Unit = { + def append(messages: ByteBufferMessageSet): Unit = { // validate the messages var numberOfMessages = 0 for(messageAndOffset <- messages) { @@ -208,13 +209,25 @@ numberOfMessages += 1; } + BrokerTopicStat.getBrokerTopicStat(getTopicName).recordMessagesIn(numberOfMessages) + BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages) logStats.recordAppendedMessages(numberOfMessages) - + + // truncate the message set's buffer upto validbytes, before appending it to the on-disk log + val validByteBuffer = messages.getBuffer.duplicate() + val messageSetValidBytes = messages.validBytes + if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0) + throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + + " Message set cannot be appended to log. Possible causes are corrupted produce requests") + + validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int]) + val validMessages = new ByteBufferMessageSet(validByteBuffer) + // they are valid, insert them in the log lock synchronized { try { val segment = segments.view.last - segment.messageSet.append(messages) + segment.messageSet.append(validMessages) maybeFlush(numberOfMessages) maybeRoll(segment) } @@ -247,10 +260,18 @@ val deletable = view.takeWhile(predicate) for(seg <- deletable) seg.deleted = true - val numToDelete = deletable.size + var numToDelete = deletable.size // if we are deleting everything, create a new empty segment - if(numToDelete == view.size) - roll() + if(numToDelete == view.size) { + if (view(numToDelete - 1).size > 0) + roll() + else { + // If the last segment to be deleted is empty and we roll the log, the new segment will have the same + // file name. So simply reuse the last segment and reset the modified time. + view(numToDelete - 1).file.setLastModified(SystemTime.milliseconds) + numToDelete -=1 + } + } segments.trunc(numToDelete) } } @@ -288,9 +309,12 @@ */ def roll() { lock synchronized { - val last = segments.view.last val newOffset = nextAppendOffset val newFile = new File(dir, Log.nameFromOffset(newOffset)) + if (newFile.exists) { + warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it first") + newFile.delete() + } debug("Rolling log '" + name + "' to " + newFile.getName()) segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset)) } Index: core/src/main/scala/kafka/producer/SyncProducer.scala =================================================================== --- core/src/main/scala/kafka/producer/SyncProducer.scala (revision 1342312) +++ core/src/main/scala/kafka/producer/SyncProducer.scala (working copy) @@ -18,11 +18,16 @@ package kafka.producer import kafka.api._ -import kafka.common.MessageSizeTooLargeException import kafka.message.MessageSet import kafka.network.{BlockingChannel, BoundedByteBufferSend, Request, Receive} import kafka.utils._ +import java.util.Random +object SyncProducer { + val RequestKey: Short = 0 + val randomGenerator = new Random +} + /* * Send a message set. */ @@ -31,20 +36,45 @@ private val MaxConnectBackoffMs = 60000 private var sentOnConnection = 0 + /** make time-based reconnect starting at a random time **/ + private var lastConnectionTime = System.currentTimeMillis - SyncProducer.randomGenerator.nextDouble() * config.reconnectInterval + private val lock = new Object() @volatile private var shutdown: Boolean = false private val blockingChannel = new BlockingChannel(config.host, config.port, 0, config.bufferSize, config.socketTimeoutMs) - debug("Instantiating Scala Sync Producer") + trace("Instantiating Scala Sync Producer") private def verifyRequest(request: Request) = { - if (logger.isTraceEnabled) { + /** + * This seems a little convoluted, but the idea is to turn on verification simply changing log4j settings + * Also, when verification is turned on, care should be taken to see that the logs don't fill up with unnecessary + * data. So, leaving the rest of the logging at TRACE, while errors should be logged at ERROR level + */ + if (logger.isDebugEnabled) { val buffer = new BoundedByteBufferSend(request).buffer trace("verifying sendbuffer of size " + buffer.limit) val requestTypeId = buffer.getShort() - if(requestTypeId == RequestKeys.Produce) { - val request = ProducerRequest.readFrom(buffer) - trace(request.toString) + if (requestTypeId == RequestKeys.Produce) { + try { + val request = ProducerRequest.readFrom(buffer) + trace(request.toString) + for (produce <- request.produces) { + try { + for (messageAndOffset <- produce.messages) + if (!messageAndOffset.message.isValid) + throw new InvalidMessageException("Message for topic " + produce.topic + " is invalid") + } + catch { + case e: Throwable => + error("error iterating messages ", e) + } + } + } + catch { + case e: Throwable => + error("error verifying sendbuffer ", e) + } } } } @@ -71,9 +101,11 @@ } // TODO: do we still need this? sentOnConnection += 1 - if(sentOnConnection >= config.reconnectInterval) { + + if(sentOnConnection >= config.reconnectInterval || (config.reconnectTimeInterval >= 0 && System.currentTimeMillis - lastConnectionTime >= config.reconnectTimeInterval)) { reconnect() sentOnConnection = 0 + lastConnectionTime = System.currentTimeMillis } SyncProducerStats.recordProduceRequest(SystemTime.nanoseconds - startTime) response Index: core/src/main/scala/kafka/producer/Producer.scala =================================================================== --- core/src/main/scala/kafka/producer/Producer.scala (revision 1342312) +++ core/src/main/scala/kafka/producer/Producer.scala (working copy) @@ -23,11 +23,17 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean} import org.I0Itec.zkclient.ZkClient import kafka.common.{QueueFullException, InvalidConfigException} +import java.util.Properties +import kafka.cluster.{Partition, Broker} +import java.util.concurrent.atomic.AtomicBoolean +import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException} +import kafka.api.ProducerRequest class Producer[K,V](config: ProducerConfig, private val eventHandler: EventHandler[K,V]) // for testing only extends Logging { private val hasShutdown = new AtomicBoolean(false) + if(!Utils.propertyExists(config.zkConnect)) throw new InvalidConfigException("zk.connect property must be specified in the producer") if (config.batchSize > config.queueSize) Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala =================================================================== --- core/src/main/scala/kafka/producer/ConsoleProducer.scala (revision 1342312) +++ core/src/main/scala/kafka/producer/ConsoleProducer.scala (working copy) @@ -36,7 +36,7 @@ .withRequiredArg .describedAs("connection_string") .ofType(classOf[String]) - val asyncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") + val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed") val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.") .withRequiredArg @@ -78,7 +78,7 @@ val topic = options.valueOf(topicOpt) val zkConnect = options.valueOf(zkConnectOpt) - val async = options.has(asyncOpt) + val sync = options.has(syncOpt) val compress = options.has(compressOpt) val batchSize = options.valueOf(batchSizeOpt) val sendTimeout = options.valueOf(sendTimeoutOpt) @@ -89,10 +89,10 @@ val props = new Properties() props.put("zk.connect", zkConnect) props.put("compression.codec", DefaultCompressionCodec.codec.toString) - props.put("producer.type", if(async) "async" else "sync") + props.put("producer.type", if(sync) "sync" else "async") if(options.has(batchSizeOpt)) - props.put("batch.size", batchSize) - props.put("queue.enqueueTimeout.ms", sendTimeout.toString) + props.put("batch.size", batchSize.toString) + props.put("queue.time", sendTimeout.toString) props.put("serializer.class", encoderClass) val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader] Index: core/src/main/scala/kafka/producer/SyncProducerConfig.scala =================================================================== --- core/src/main/scala/kafka/producer/SyncProducerConfig.scala (revision 1342312) +++ core/src/main/scala/kafka/producer/SyncProducerConfig.scala (working copy) @@ -40,6 +40,9 @@ val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000) + /** negative reconnect time interval means disabling this time-based reconnect feature */ + var reconnectTimeInterval = Utils.getInt(props, "reconnect.time.interval.ms", 1000*1000*10) + val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000) /* the client application sending the producer requests */ Index: core/src/main/scala/kafka/producer/ProducerConfig.scala =================================================================== --- core/src/main/scala/kafka/producer/ProducerConfig.scala (revision 1342312) +++ core/src/main/scala/kafka/producer/ProducerConfig.scala (working copy) @@ -29,13 +29,32 @@ * to pass in static broker and per-broker partition information. Format- * * brokerid1:host1:port1, brokerid2:host2:port2*/ val brokerList = Utils.getString(props, "broker.list", null) + if(brokerList != null) throw new InvalidConfigException("broker.list is deprecated. Use zk.connect instead") + /** + * If DefaultEventHandler is used, this specifies the number of times to + * retry if an error is encountered during send. Currently, it is only + * appropriate when broker.list points to a VIP. If the zk.connect option + * is used instead, this will not have any effect because with the zk-based + * producer, brokers are not re-selected upon retry. So retries would go to + * the same (potentially still down) broker. (KAFKA-253 will help address + * this.) + */ + val numRetries = Utils.getInt(props, "num.retries", 0) + /** If both broker.list and zk.connect options are specified, throw an exception */ + if(zkConnect == null) throw new InvalidConfigException("zk.connect property is required") + if(Utils.propertyExists(brokerList) && Utils.propertyExists(zkConnect)) + throw new InvalidConfigException("only one of broker.list and zk.connect can be specified") + + if(!Utils.propertyExists(zkConnect) && !Utils.propertyExists(brokerList)) + throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified") + /** the partitioner class for partitioning events amongst sub-topics */ val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner") Index: core/src/main/scala/kafka/producer/async/ProducerSendThread.scala =================================================================== --- core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (revision 1342312) +++ core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (working copy) @@ -31,7 +31,7 @@ private val shutdownLatch = new CountDownLatch(1) private val shutdownCommand = new ProducerData[K,V](null, null.asInstanceOf[K], null.asInstanceOf[Seq[V]]) - override def run { + override def run = { try { processEvents @@ -49,7 +49,7 @@ info("Shutdown ProducerSendThread complete") } - private def processEvents() { + private def processEvents() = { var lastSend = SystemTime.milliseconds var events = new ListBuffer[ProducerData[K,V]] var full: Boolean = false @@ -70,10 +70,11 @@ trace("Dequeued item for topic %s, partition key: %s, data: %s" .format(currentQueueItem.getTopic, currentQueueItem.getKey.toString, currentQueueItem.getData.toString)) events += currentQueueItem + } - // check if the batch size is reached - full = events.size >= batchSize - } + // check if the batch size is reached + full = events.size >= batchSize + if(full || expired) { if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..") if(full) debug("Batch full. Sending..") @@ -88,7 +89,7 @@ .format(queue.size)) } - def tryToHandle(events: Seq[ProducerData[K,V]]) { + def tryToHandle(events: Seq[ProducerData[K,V]]) = { try { debug("Handling " + events.size + " events") if(events.size > 0) Index: core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala =================================================================== --- core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (revision 1342312) +++ core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (working copy) @@ -21,8 +21,8 @@ import org.apache.log4j.spi.LoggingEvent import org.apache.log4j.AppenderSkeleton import org.apache.log4j.helpers.LogLog -import kafka.utils.Logging import kafka.serializer.Encoder +import kafka.utils.Logging import java.util.{Properties, Date} import kafka.message.Message import scala.collection._ @@ -94,7 +94,3 @@ override def requiresLayout: Boolean = false } - -class DefaultStringEncoder extends Encoder[LoggingEvent] { - override def toMessage(event: LoggingEvent):Message = new Message(event.getMessage.asInstanceOf[String].getBytes) -} Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (revision 1342312) +++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (working copy) @@ -18,10 +18,10 @@ package kafka.message import kafka.utils.Logging -import kafka.common.{InvalidMessageSizeException, ErrorMapping} import java.nio.ByteBuffer import java.nio.channels._ import kafka.utils.IteratorTemplate +import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException, ErrorMapping} /** * A sequence of messages stored in a byte buffer @@ -36,8 +36,9 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer, private val initialOffset: Long = 0L, private val errorCode: Int = ErrorMapping.NoError) extends MessageSet with Logging { - private var validByteCount = -1L private var shallowValidByteCount = -1L + if(sizeInBytes > Int.MaxValue) + throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue) def this(compressionCodec: CompressionCodec, messages: Message*) { this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError) @@ -59,7 +60,7 @@ private def shallowValidBytes: Long = { if(shallowValidByteCount < 0) { - val iter = deepIterator + val iter = this.internalIterator(true) while(iter.hasNext) { val messageAndOffset = iter.next shallowValidByteCount = messageAndOffset.offset @@ -70,12 +71,31 @@ } /** Write the messages in this set to the given channel */ - def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long = - channel.write(buffer.duplicate) - - override def iterator: Iterator[MessageAndOffset] = deepIterator + def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long = { + buffer.mark() + val written = channel.write(buffer) + buffer.reset() + written + } - private def deepIterator(): Iterator[MessageAndOffset] = { + /** default iterator that iterates over decompressed messages */ + override def iterator: Iterator[MessageAndOffset] = internalIterator() + + /** iterator over compressed messages without decompressing */ + def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true) + + def verifyMessageSize(maxMessageSize: Int){ + var shallowIter = internalIterator(true) + while(shallowIter.hasNext){ + var messageAndOffset = shallowIter.next + val payloadSize = messageAndOffset.message.payloadSize + if ( payloadSize > maxMessageSize) + throw new MessageSizeTooLargeException("payload size of " + payloadSize + " larger than " + maxMessageSize) + } + } + + /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. This is used in verifyMessageSize() function **/ + private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = { ErrorMapping.maybeThrowException(errorCode) new IteratorTemplate[MessageAndOffset] { var topIter = buffer.slice() @@ -106,33 +126,50 @@ message.limit(size) topIter.position(topIter.position + size) val newMessage = new Message(message) - newMessage.compressionCodec match { - case NoCompressionCodec => - debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes)) - innerIter = null - currValidBytes += 4 + size - trace("currValidBytes = " + currValidBytes) - new MessageAndOffset(newMessage, currValidBytes) - case _ => - debug("Message is compressed. Valid byte count = %d".format(currValidBytes)) - innerIter = CompressionUtils.decompress(newMessage).deepIterator - if (!innerIter.hasNext) { - currValidBytes += 4 + lastMessageSize + if(!newMessage.isValid) + throw new InvalidMessageException("message is invalid, compression codec: " + newMessage.compressionCodec + + " size: " + size + " curr offset: " + currValidBytes + " init offset: " + initialOffset) + + if(isShallow){ + currValidBytes += 4 + size + trace("shallow iterator currValidBytes = " + currValidBytes) + new MessageAndOffset(newMessage, currValidBytes) + } + else{ + newMessage.compressionCodec match { + case NoCompressionCodec => + debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes)) innerIter = null - } - makeNext() + currValidBytes += 4 + size + trace("currValidBytes = " + currValidBytes) + new MessageAndOffset(newMessage, currValidBytes) + case _ => + debug("Message is compressed. Valid byte count = %d".format(currValidBytes)) + innerIter = CompressionUtils.decompress(newMessage).internalIterator() + if (!innerIter.hasNext) { + currValidBytes += 4 + lastMessageSize + innerIter = null + } + makeNext() + } } } override def makeNext(): MessageAndOffset = { - val isInnerDone = innerDone() - isInnerDone match { - case true => makeNextOuter - case false => { - val messageAndOffset = innerIter.next - if (!innerIter.hasNext) - currValidBytes += 4 + lastMessageSize - new MessageAndOffset(messageAndOffset.message, currValidBytes) + if(isShallow){ + makeNextOuter + } + else{ + val isInnerDone = innerDone() + debug("makeNext() in internalIterator: innerDone = " + isInnerDone) + isInnerDone match { + case true => makeNextOuter + case false => { + val messageAndOffset = innerIter.next + if (!innerIter.hasNext) + currValidBytes += 4 + lastMessageSize + new MessageAndOffset(messageAndOffset.message, currValidBytes) + } } } } Index: core/src/main/scala/kafka/message/MessageAndOffset.scala =================================================================== --- core/src/main/scala/kafka/message/MessageAndOffset.scala (revision 1342312) +++ core/src/main/scala/kafka/message/MessageAndOffset.scala (working copy) @@ -13,11 +13,10 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package kafka.message -/** - * Represents message and offset of the next message. This is used in the MessageSet to iterate over it - */ -case class MessageAndOffset(val message: Message, val offset: Long) + +case class MessageAndOffset(message: Message, offset: Long) + Index: core/src/main/scala/kafka/message/CompressionUtils.scala =================================================================== --- core/src/main/scala/kafka/message/CompressionUtils.scala (revision 1342312) +++ core/src/main/scala/kafka/message/CompressionUtils.scala (working copy) @@ -49,7 +49,7 @@ } override def read(a: Array[Byte]): Int = { - gzipIn.read(a) + gzipIn.read(a) } } Index: core/src/main/scala/kafka/message/InvalidMessageException.scala =================================================================== --- core/src/main/scala/kafka/message/InvalidMessageException.scala (revision 1342312) +++ core/src/main/scala/kafka/message/InvalidMessageException.scala (working copy) @@ -20,4 +20,6 @@ /** * Indicates that a message failed its checksum and is corrupt */ -class InvalidMessageException extends RuntimeException +class InvalidMessageException(message: String) extends RuntimeException(message) { + def this() = this(null) +} Index: core/src/main/scala/kafka/common/ErrorMapping.scala =================================================================== --- core/src/main/scala/kafka/common/ErrorMapping.scala (revision 1342312) +++ core/src/main/scala/kafka/common/ErrorMapping.scala (working copy) @@ -60,5 +60,4 @@ } class MessageSizeTooLargeException(message: String) extends RuntimeException(message) { - def this() = this(null) } Index: core/src/main/scala/kafka/consumer/KafkaMessageStream.scala =================================================================== --- core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (revision 1342312) +++ core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (working copy) @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.consumer - -import java.util.concurrent.BlockingQueue -import kafka.serializer.Decoder - -/** - * All calls to elements should produce the same thread-safe iterator? Should have a separate thread - * that feeds messages into a blocking queue for processing. - */ -class KafkaMessageStream[T](val topic: String, - private val queue: BlockingQueue[FetchedDataChunk], - consumerTimeoutMs: Int, - private val decoder: Decoder[T]) - extends Iterable[T] with java.lang.Iterable[T]{ - - private val iter: ConsumerIterator[T] = - new ConsumerIterator[T](topic, queue, consumerTimeoutMs, decoder) - - /** - * Create an iterator over messages in the stream. - */ - def iterator(): ConsumerIterator[T] = iter - - /** - * This method clears the queue being iterated during the consumer rebalancing. This is mainly - * to reduce the number of duplicates received by the consumer - */ - def clear() { - iter.clearCurrentChunk() - } - -} Index: core/src/main/scala/kafka/consumer/ConsumerIterator.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerIterator.scala (revision 1342312) +++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala (working copy) @@ -19,36 +19,38 @@ import kafka.utils.{IteratorTemplate, Logging} import java.util.concurrent.{TimeUnit, BlockingQueue} -import kafka.message.MessageAndOffset import kafka.serializer.Decoder import java.util.concurrent.atomic.AtomicReference +import kafka.message.{MessageAndOffset, MessageAndMetadata} + /** * An iterator that blocks until a value can be read from the supplied queue. * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown * */ -class ConsumerIterator[T](private val topic: String, - private val channel: BlockingQueue[FetchedDataChunk], +class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int, - private val decoder: Decoder[T]) - extends IteratorTemplate[T] with Logging { + private val decoder: Decoder[T], + val enableShallowIterator: Boolean) + extends IteratorTemplate[MessageAndMetadata[T]] with Logging { private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) private var currentTopicInfo:PartitionTopicInfo = null private var consumedOffset: Long = -1L - override def next(): T = { - val decodedMessage = super.next() + override def next(): MessageAndMetadata[T] = { + val item = super.next() if(consumedOffset < 0) throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset)) currentTopicInfo.resetConsumeOffset(consumedOffset) - trace("Setting consumed offset to %d".format(consumedOffset)) + val topic = currentTopicInfo.topic + trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1) - decodedMessage + item } - protected def makeNext(): T = { + protected def makeNext(): MessageAndMetadata[T] = { var currentDataChunk: FetchedDataChunk = null // if we don't have an iterator, get one var localCurrent = current.get() @@ -74,16 +76,18 @@ .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo)) currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset) } - localCurrent = currentDataChunk.messages.iterator + localCurrent = if (enableShallowIterator) currentDataChunk.messages.shallowIterator + else currentDataChunk.messages.iterator current.set(localCurrent) } } val item = localCurrent.next() consumedOffset = item.offset - decoder.toEvent(item.message) + + new MessageAndMetadata(decoder.toEvent(item.message), currentTopicInfo.topic) } - def clearCurrentChunk() = { + def clearCurrentChunk() { try { info("Clearing the current data chunk for this consumer iterator") current.set(null) @@ -92,3 +96,4 @@ } class ConsumerTimeoutException() extends RuntimeException() + Index: core/src/main/scala/kafka/consumer/ConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerConnector.scala (revision 1342312) +++ core/src/main/scala/kafka/consumer/ConsumerConnector.scala (working copy) @@ -29,14 +29,30 @@ * Create a list of MessageStreams for each topic. * * @param topicCountMap a map of (topic, #streams) pair - * @return a map of (topic, list of KafkaMessageStream) pair. The number of items in the - * list is #streams. Each KafkaMessageStream supports an iterator of messages. + * @param decoder Decoder to decode each Message to type T + * @return a map of (topic, list of KafkaStream) pairs. + * The number of items in the list is #streams. Each stream supports + * an iterator over message/metadata pairs. */ def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T] = new DefaultDecoder) - : Map[String,List[KafkaMessageStream[T]]] + : Map[String,List[KafkaStream[T]]] /** + * Create a list of message streams for all topics that match a given filter. + * + * @param topicFilter Either a Whitelist or Blacklist TopicFilter object. + * @param numStreams Number of streams to return + * @param decoder Decoder to decode each Message to type T + * @return a list of KafkaStream each of which provides an + * iterator over message/metadata pairs over allowed topics. + */ + def createMessageStreamsByFilter[T](topicFilter: TopicFilter, + numStreams: Int = 1, + decoder: Decoder[T] = new DefaultDecoder) + : Seq[KafkaStream[T]] + + /** * Commit the offsets of all broker partitions connected by this connector. */ def commitOffsets Index: core/src/main/scala/kafka/consumer/ConsoleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (revision 1342312) +++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (working copy) @@ -36,11 +36,19 @@ def main(args: Array[String]) { val parser = new OptionParser - val topicIdOpt = parser.accepts("topic", "REQUIRED: The topic id to consume on.") + val topicIdOpt = parser.accepts("topic", "The topic id to consume on.") .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.") + .withRequiredArg + .describedAs("whitelist") + .ofType(classOf[String]) + val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.") + .withRequiredArg + .describedAs("blacklist") + .ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + "Multiple URLS can be given to allow fail-over.") .withRequiredArg .describedAs("urls") @@ -90,8 +98,20 @@ "skip it instead of halt.") val options: OptionSet = tryParse(parser, args) - checkRequiredArgs(parser, options, topicIdOpt, zkConnectOpt) + Utils.checkRequiredArgs(parser, options, zkConnectOpt) + val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) + if (topicOrFilterOpt.size != 1) { + error("Exactly one of whitelist/blacklist/topic is required.") + parser.printHelpOn(System.err) + System.exit(1) + } + val topicArg = options.valueOf(topicOrFilterOpt.head) + val filterSpec = if (options.has(blacklistOpt)) + new Blacklist(topicArg) + else + new Whitelist(topicArg) + val props = new Properties() props.put("groupid", options.valueOf(groupIdOpt)) props.put("socket.buffersize", options.valueOf(socketBufferSizeOpt).toString) @@ -104,7 +124,6 @@ val config = new ConsumerConfig(props) val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false - val topic = options.valueOf(topicIdOpt) val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) val formatterArgs = tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt)) @@ -123,21 +142,20 @@ tryCleanupZookeeper(options.valueOf(zkConnectOpt), options.valueOf(groupIdOpt)) } }) - - var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0) - val iter = - if(maxMessages >= 0) - stream.slice(0, maxMessages) - else - stream + val stream = connector.createMessageStreamsByFilter(filterSpec).get(0) + val iter = if(maxMessages >= 0) + stream.slice(0, maxMessages) + else + stream + val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] formatter.init(formatterArgs) try { - for(message <- iter) { + for(messageAndTopic <- iter) { try { - formatter.writeTo(message, System.out) + formatter.writeTo(messageAndTopic.message, System.out) } catch { case e => if (skipMessageOnError) @@ -173,16 +191,6 @@ } } - def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { - for(arg <- required) { - if(!options.has(arg)) { - error("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - } - def tryParseFormatterArgs(args: Iterable[String]): Properties = { val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0) if(!splits.forall(_.length == 2)) { @@ -210,9 +218,19 @@ } class ChecksumMessageFormatter extends MessageFormatter { + private var topicStr: String = _ + + override def init(props: Properties) { + topicStr = props.getProperty("topic") + if (topicStr != null) + topicStr = topicStr + "-" + else + topicStr = "" + } + def writeTo(message: Message, output: PrintStream) { val chksum = message.checksum - output.println("checksum:" + chksum) + output.println(topicStr + "checksum:" + chksum) } } Index: core/src/main/scala/kafka/consumer/Fetcher.scala =================================================================== --- core/src/main/scala/kafka/consumer/Fetcher.scala (revision 1342312) +++ core/src/main/scala/kafka/consumer/Fetcher.scala (working copy) @@ -42,24 +42,24 @@ fetcherThreads = EMPTY_FETCHER_THREADS } - def clearFetcherQueues[T](topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster, + def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster, queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]], - kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) { + messageStreams: Map[String,List[KafkaStream[_]]]) { // Clear all but the currently iterated upon chunk in the consumer thread's queue queuesTobeCleared.foreach(_.clear) info("Cleared all relevant queues for this fetcher") // Also clear the currently iterated upon chunk in the consumer threads - if(kafkaMessageStreams != null) - kafkaMessageStreams.foreach(_._2.foreach(s => s.clear())) + if(messageStreams != null) + messageStreams.foreach(_._2.foreach(s => s.clear())) info("Cleared the data chunks in all the consumer message iterators") } - def startConnections[T](topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster, - kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) { + def startConnections(topicInfos: Iterable[PartitionTopicInfo], + cluster: Cluster) { if (topicInfos == null) return Index: core/src/main/scala/kafka/consumer/ConsumerConfig.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerConfig.scala (revision 1342312) +++ core/src/main/scala/kafka/consumer/ConsumerConfig.scala (working copy) @@ -20,7 +20,6 @@ import java.util.Properties import kafka.utils.{ZKConfig, Utils} import kafka.api.OffsetRequest -import kafka.common.InvalidConfigException object ConsumerConfig { val SocketTimeout = 30 * 1000 val SocketBufferSize = 64*1024 @@ -29,7 +28,7 @@ val DefaultFetcherBackoffMs = 1000 val AutoCommit = true val AutoCommitInterval = 10 * 1000 - val MaxQueuedChunks = 100 + val MaxQueuedChunks = 10 val MaxRebalanceRetries = 4 val AutoOffsetReset = OffsetRequest.SmallestTimeString val ConsumerTimeoutMs = -1 @@ -93,20 +92,10 @@ /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */ val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs) - /** Whitelist of topics for this mirror's embedded consumer to consume. At - * most one of whitelist/blacklist may be specified. */ - val mirrorTopicsWhitelist = Utils.getString( - props, MirrorTopicsWhitelistProp, MirrorTopicsWhitelist) - - /** Topics to skip mirroring. At most one of whitelist/blacklist may be - * specified */ - val mirrorTopicsBlackList = Utils.getString( - props, MirrorTopicsBlacklistProp, MirrorTopicsBlacklist) - - if (mirrorTopicsWhitelist.nonEmpty && mirrorTopicsBlackList.nonEmpty) - throw new InvalidConfigException("The embedded consumer's mirror topics configuration can only contain one of blacklist or whitelist") - - val mirrorConsumerNumThreads = Utils.getInt( - props, MirrorConsumerNumThreadsProp, MirrorConsumerNumThreads) + /** Use shallow iterator over compressed messages directly. This feature should be used very carefully. + * Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the + * overhead of decompression. + * */ + val enableShallowIterator = Utils.getBoolean(props, "shallowiterator.enable", false) } Index: core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (revision 1342312) +++ core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (working copy) @@ -21,11 +21,9 @@ import kafka.utils.{ZkUtils, ZKStringSerializer, Logging} import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState -import kafka.server.KafkaServerStartable -import kafka.common.ConsumerRebalanceFailedException class ZookeeperTopicEventWatcher(val config:ConsumerConfig, - val eventHandler: TopicEventHandler[String], kafkaServerStartable: KafkaServerStartable) extends Logging { + val eventHandler: TopicEventHandler[String]) extends Logging { val lock = new Object() @@ -35,7 +33,7 @@ startWatchingTopicEvents() private def startWatchingTopicEvents() { - val topicEventListener = new ZkTopicEventListener(kafkaServerStartable) + val topicEventListener = new ZkTopicEventListener() ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath) zkClient.subscribeStateChanges( @@ -52,6 +50,7 @@ def shutdown() { lock.synchronized { + info("Shutting down topic event watcher.") if (zkClient != null) { stopWatchingTopicEvents() zkClient.close() @@ -62,7 +61,7 @@ } } - class ZkTopicEventListener(val kafkaServerStartable: KafkaServerStartable) extends IZkChildListener { + class ZkTopicEventListener extends IZkChildListener { @throws(classOf[Exception]) def handleChildChange(parent: String, children: java.util.List[String]) { @@ -76,11 +75,8 @@ } } catch { - case e: ConsumerRebalanceFailedException => - fatal("can't rebalance in embedded consumer; proceed to shutdown", e) - kafkaServerStartable.shutdown() case e => - error("error in handling child changes in embedded consumer", e) + error("error in handling child changes", e) } } } Index: core/src/main/scala/kafka/consumer/TopicCount.scala =================================================================== --- core/src/main/scala/kafka/consumer/TopicCount.scala (revision 1342312) +++ core/src/main/scala/kafka/consumer/TopicCount.scala (working copy) @@ -19,33 +19,17 @@ import scala.collection._ import scala.util.parsing.json.JSON -import kafka.utils.Logging +import org.I0Itec.zkclient.ZkClient +import java.util.regex.Pattern +import kafka.utils.{ZKGroupDirs, ZkUtils, Logging} -private[kafka] object TopicCount extends Logging { - val myConversionFunc = {input : String => input.toInt} - JSON.globalNumberParser = myConversionFunc - def constructTopicCount(consumerIdSting: String, jsonString : String) : TopicCount = { - var topMap : Map[String,Int] = null - try { - JSON.parseFull(jsonString) match { - case Some(m) => topMap = m.asInstanceOf[Map[String,Int]] - case None => throw new RuntimeException("error constructing TopicCount : " + jsonString) - } - } catch { - case e => - error("error parsing consumer json string " + jsonString, e) - throw e - } - - new TopicCount(consumerIdSting, topMap) - } - -} - -private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) { - - def getConsumerThreadIdsPerTopic(): Map[String, Set[String]] = { +private[kafka] trait TopicCount { + def getConsumerThreadIdsPerTopic: Map[String, Set[String]] + def dbString: String + + protected def makeConsumerThreadIdsPerTopic(consumerIdString: String, + topicCountMap: Map[String, Int]) = { val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]() for ((topic, nConsumers) <- topicCountMap) { val consumerSet = new mutable.HashSet[String] @@ -56,11 +40,96 @@ } consumerThreadIdsPerTopicMap } +} +private[kafka] object TopicCount extends Logging { + + /* + * Example of whitelist topic count stored in ZooKeeper: + * Topics with whitetopic as prefix, and four streams: *4*whitetopic.* + * + * Example of blacklist topic count stored in ZooKeeper: + * Topics with blacktopic as prefix, and four streams: !4!blacktopic.* + */ + + val WHITELIST_MARKER = "*" + val BLACKLIST_MARKER = "!" + private val WHITELIST_PATTERN = + Pattern.compile("""\*(\p{Digit}+)\*(.*)""") + private val BLACKLIST_PATTERN = + Pattern.compile("""!(\p{Digit}+)!(.*)""") + + val myConversionFunc = {input : String => input.toInt} + JSON.globalNumberParser = myConversionFunc + + def constructTopicCount(group: String, + consumerId: String, + zkClient: ZkClient) : TopicCount = { + val dirs = new ZKGroupDirs(group) + val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId) + val hasWhitelist = topicCountString.startsWith(WHITELIST_MARKER) + val hasBlacklist = topicCountString.startsWith(BLACKLIST_MARKER) + + if (hasWhitelist || hasBlacklist) + info("Constructing topic count for %s from %s using %s as pattern." + .format(consumerId, topicCountString, + if (hasWhitelist) WHITELIST_PATTERN else BLACKLIST_PATTERN)) + + if (hasWhitelist || hasBlacklist) { + val matcher = if (hasWhitelist) + WHITELIST_PATTERN.matcher(topicCountString) + else + BLACKLIST_PATTERN.matcher(topicCountString) + require(matcher.matches()) + val numStreams = matcher.group(1).toInt + val regex = matcher.group(2) + val filter = if (hasWhitelist) + new Whitelist(regex) + else + new Blacklist(regex) + + new WildcardTopicCount(zkClient, consumerId, filter, numStreams) + } + else { + var topMap : Map[String,Int] = null + try { + JSON.parseFull(topicCountString) match { + case Some(m) => topMap = m.asInstanceOf[Map[String,Int]] + case None => throw new RuntimeException("error constructing TopicCount : " + topicCountString) + } + } + catch { + case e => + error("error parsing consumer json string " + topicCountString, e) + throw e + } + + new StaticTopicCount(consumerId, topMap) + } + } + + def constructTopicCount(consumerIdString: String, topicCount: Map[String, Int]) = + new StaticTopicCount(consumerIdString, topicCount) + + def constructTopicCount(consumerIdString: String, + filter: TopicFilter, + numStreams: Int, + zkClient: ZkClient) = + new WildcardTopicCount(zkClient, consumerIdString, filter, numStreams) + +} + +private[kafka] class StaticTopicCount(val consumerIdString: String, + val topicCountMap: Map[String, Int]) + extends TopicCount { + + def getConsumerThreadIdsPerTopic = + makeConsumerThreadIdsPerTopic(consumerIdString, topicCountMap) + override def equals(obj: Any): Boolean = { obj match { case null => false - case n: TopicCount => consumerIdString == n.consumerIdString && topicCountMap == n.topicCountMap + case n: StaticTopicCount => consumerIdString == n.consumerIdString && topicCountMap == n.topicCountMap case _ => false } } @@ -71,7 +140,7 @@ * "topic2" : 4 * } */ - def toJsonString() : String = { + def dbString = { val builder = new StringBuilder builder.append("{ ") var i = 0 @@ -82,6 +151,29 @@ i += 1 } builder.append(" }") - builder.toString + builder.toString() } } + +private[kafka] class WildcardTopicCount(zkClient: ZkClient, + consumerIdString: String, + topicFilter: TopicFilter, + numStreams: Int) extends TopicCount { + def getConsumerThreadIdsPerTopic = { + val wildcardTopics = ZkUtils.getChildrenParentMayNotExist( + zkClient, ZkUtils.BrokerTopicsPath).filter(topicFilter.isTopicAllowed(_)) + makeConsumerThreadIdsPerTopic(consumerIdString, + Map(wildcardTopics.map((_, numStreams)): _*)) + } + + def dbString = { + val marker = topicFilter match { + case wl: Whitelist => TopicCount.WHITELIST_MARKER + case bl: Blacklist => TopicCount.BLACKLIST_MARKER + } + + "%s%d%s%s".format(marker, numStreams, marker, topicFilter.regex) + } + +} + Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1342312) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -19,6 +19,7 @@ import java.util.concurrent._ import java.util.concurrent.atomic._ +import locks.ReentrantLock import scala.collection._ import kafka.cluster._ import kafka.utils._ @@ -33,6 +34,7 @@ import kafka.utils.ZkUtils._ import kafka.common.{NoBrokersForPartitionException, ConsumerRebalanceFailedException, InvalidConfigException} + /** * This class handles the consumers interaction with zookeeper * @@ -85,17 +87,41 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val enableFetcher: Boolean) // for testing only - extends ConsumerConnector with ZookeeperConsumerConnectorMBean with Logging { - + extends ConsumerConnector with ZookeeperConsumerConnectorMBean + with Logging { private val isShuttingDown = new AtomicBoolean(false) private val rebalanceLock = new Object private var fetcher: Option[Fetcher] = None private var zkClient: ZkClient = null - private val topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] + + private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] // queues : (topic,consumerThreadId) -> queue private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]] + // topicThreadIdAndQueues : (topic,consumerThreadId) -> queue + private val topicThreadIdAndQueues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]] private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false) + private val messageStreamCreated = new AtomicBoolean(false) + private var sessionExpirationListener: ZKSessionExpireListener = null + private var loadBalancerListener: ZKRebalancerListener = null + + private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null + + val consumerIdString = { + var consumerUuid : String = null + config.consumerId match { + case Some(consumerId) // for testing only + => consumerUuid = consumerId + case None // generate unique consumerId automatically + => val uuid = UUID.randomUUID() + consumerUuid = "%s-%d-%s".format( + InetAddress.getLocalHost.getHostName, System.currentTimeMillis, + uuid.getMostSignificantBits().toHexString.substring(0,8)) + } + config.groupId + "_" + consumerUuid + } + this.logIdent = consumerIdString + " " + connectZk() createFetcher() if (config.autoCommit) { @@ -106,10 +132,18 @@ def this(config: ConsumerConfig) = this(config, true) def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T]) - : Map[String,List[KafkaMessageStream[T]]] = { + : Map[String,List[KafkaStream[T]]] = { + if (messageStreamCreated.getAndSet(true)) + throw new RuntimeException(this.getClass.getSimpleName + + " can create message streams at most once") consume(topicCountMap, decoder) } + def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) = { + val wildcardStreamsHandler = new WildcardStreamsHandler[T](topicFilter, numStreams, decoder) + wildcardStreamsHandler.streams + } + private def createFetcher() { if (enableFetcher) fetcher = Some(new Fetcher(config, zkClient)) @@ -124,6 +158,9 @@ val canShutdown = isShuttingDown.compareAndSet(false, true); if (canShutdown) { info("ZKConsumerConnector shutting down") + + if (wildcardTopicWatcher != null) + wildcardTopicWatcher.shutdown() try { scheduler.shutdownNow() fetcher match { @@ -146,13 +183,12 @@ } def consume[T](topicCountMap: scala.collection.Map[String,Int], decoder: Decoder[T]) - : Map[String,List[KafkaMessageStream[T]]] = { + : Map[String,List[KafkaStream[T]]] = { debug("entering consume ") if (topicCountMap == null) throw new RuntimeException("topicCountMap is null") - val dirs = new ZKGroupDirs(config.groupId) - var ret = new mutable.HashMap[String,List[KafkaMessageStream[T]]] + val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap) var consumerUuid : String = null config.consumerId match { @@ -165,53 +201,36 @@ uuid.getMostSignificantBits().toHexString.substring(0,8) ) } val consumerIdString = config.groupId + "_" + consumerUuid - val topicCount = new TopicCount(consumerIdString, topicCountMap) - // create a queue per topic per consumer thread - val consumerThreadIdsPerTopic = topicCount.getConsumerThreadIdsPerTopic - for ((topic, threadIdSet) <- consumerThreadIdsPerTopic) { - var streamList: List[KafkaMessageStream[T]] = Nil - for (threadId <- threadIdSet) { - val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks) - queues.put((topic, threadId), stream) - streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs, decoder) - } - ret += (topic -> streamList) - debug("adding topic " + topic + " and stream to map..") - } + val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic - // listener to consumer and partition changes - val loadBalancerListener = new ZKRebalancerListener[T](config.groupId, consumerIdString, ret) + // make a list of (queue,stream) pairs, one pair for each threadId + val queuesAndStreams = topicThreadIds.values.map(threadIdSet => + threadIdSet.map(_ => { + val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks) + val stream = new KafkaStream[T]( + queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator) + (queue, stream) + }) + ).flatten.toList + + val dirs = new ZKGroupDirs(config.groupId) registerConsumerInZK(dirs, consumerIdString, topicCount) + reinitializeConsumer(topicCount, queuesAndStreams) - // register listener for session expired event - zkClient.subscribeStateChanges( - new ZKSessionExpireListener[T](dirs, consumerIdString, topicCount, loadBalancerListener)) - - zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) - - ret.foreach { topicAndStreams => - // register on broker partition path changes - val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1 - zkClient.subscribeChildChanges(partitionPath, loadBalancerListener) - } - - // explicitly trigger load balancing for this consumer - loadBalancerListener.syncedRebalance() - ret + loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[T]]]] } - // this API is used by unit tests only - def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry - private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = { info("begin registering consumer " + consumerIdString + " in ZK") - createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString) + createEphemeralPathExpectConflict(zkClient, + dirs.consumerRegistryDir + "/" + consumerIdString, + topicCount.dbString) info("end registering consumer " + consumerIdString + " in ZK") } private def sendShutdownToAllQueues() = { - for (queue <- queues.values) { + for (queue <- topicThreadIdAndQueues.values) { debug("Clearing up queue") queue.clear() queue.put(ZookeeperConsumerConnector.shutdownCommand) @@ -330,10 +349,10 @@ producedOffset } - class ZKSessionExpireListener[T](val dirs: ZKGroupDirs, + class ZKSessionExpireListener(val dirs: ZKGroupDirs, val consumerIdString: String, val topicCount: TopicCount, - val loadBalancerListener: ZKRebalancerListener[T]) + val loadBalancerListener: ZKRebalancerListener) extends IZkStateListener { @throws(classOf[Exception]) def handleStateChanged(state: KeeperState) { @@ -355,10 +374,10 @@ * consumer in the consumer registry and trigger a rebalance. */ info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString) - loadBalancerListener.resetState + loadBalancerListener.resetState() registerConsumerInZK(dirs, consumerIdString, topicCount) // explicitly trigger load balancing for this consumer - loadBalancerListener.syncedRebalance + loadBalancerListener.syncedRebalance() // There is no need to resubscribe to child and state changes. // The child change watchers will be set inside rebalance when we read the children list. @@ -366,26 +385,75 @@ } - class ZKRebalancerListener[T](val group: String, val consumerIdString: String, - kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) - extends IZkChildListener { + class ZKRebalancerListener(val group: String, val consumerIdString: String, + val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_]]]) extends IZkChildListener { + private val dirs = new ZKGroupDirs(group) private var oldPartitionsPerTopicMap: mutable.Map[String, Seq[String]] = new mutable.HashMap[String, Seq[String]]() private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]() + private var isWatcherTriggered = false + private val lock = new ReentrantLock + private val cond = lock.newCondition() + private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") { + override def run() { + info("starting watcher executor thread for consumer " + consumerIdString) + var doRebalance = false + while (!isShuttingDown.get) { + try { + lock.lock() + try { + if (!isWatcherTriggered) + cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag + } finally { + doRebalance = isWatcherTriggered + isWatcherTriggered = false + lock.unlock() + } + if (doRebalance) + syncedRebalance + } catch { + case t => error("error during syncedRebalance", t) + } + } + info("stopping watcher executor thread for consumer " + consumerIdString) + } + } + watcherExecutorThread.start() + @throws(classOf[Exception]) def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { - syncedRebalance + lock.lock() + try { + isWatcherTriggered = true + cond.signalAll() + } finally { + lock.unlock() + } } - private def releasePartitionOwnership()= { + private def deletePartitionOwnershipFromZK(topic: String, partition: String) { + val topicDirs = new ZKGroupTopicDirs(group, topic) + val znode = topicDirs.consumerOwnerDir + "/" + partition + deletePath(zkClient, znode) + debug("Consumer " + consumerIdString + " releasing " + znode) + } + + private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]])= { info("Releasing partition ownership") +<<<<<<< .working for ((topic, infos) <- topicRegistry) { for(partition <- infos.keys) { val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition.toString) deletePath(zkClient, partitionOwnerPath) debug("Consumer " + consumerIdString + " releasing " + partitionOwnerPath) } +======= + for ((topic, infos) <- localTopicRegistry) { + for(partition <- infos.keys) + deletePartitionOwnershipFromZK(topic, partition.toString) + localTopicRegistry.remove(topic) +>>>>>> .merge-right.r1342339 } } @@ -403,8 +471,6 @@ def resetState() { topicRegistry.clear - oldConsumersPerTopicMap.clear - oldPartitionsPerTopicMap.clear } def syncedRebalance() { @@ -422,8 +488,6 @@ * the value of a child. Just let this go since another rebalance will be triggered. **/ info("exception during rebalance ", e) - /* Explicitly make sure another rebalancing attempt will get triggered. */ - done = false } info("end rebalancing consumer " + consumerIdString + " try #" + i) if (done) { @@ -432,15 +496,9 @@ /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should * clear the cache */ info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") - oldConsumersPerTopicMap.clear() - oldPartitionsPerTopicMap.clear() } - // commit offsets - commitOffsets() // stop all fetchers and clear all the queues to avoid data duplication - closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2)) - // release all partitions, reset state and retry - releasePartitionOwnership() + closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) Thread.sleep(config.rebalanceBackoffMs) } } @@ -449,17 +507,9 @@ } private def rebalance(cluster: Cluster): Boolean = { - val myTopicThreadIdsMap = getTopicCount(zkClient, group, consumerIdString).getConsumerThreadIdsPerTopic + val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic val consumersPerTopicMap = getConsumersPerTopic(zkClient, group) val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator) - val relevantTopicThreadIdsMap = getRelevantTopicMap(myTopicThreadIdsMap, partitionsPerTopicMap, oldPartitionsPerTopicMap, consumersPerTopicMap, oldConsumersPerTopicMap) - if (relevantTopicThreadIdsMap.size <= 0) { - info("Consumer %s with %s and topic partitions %s doesn't need to rebalance.". - format(consumerIdString, consumersPerTopicMap, partitionsPerTopicMap)) - debug("Partitions per topic cache " + oldPartitionsPerTopicMap) - debug("Consumers per topic cache " + oldConsumersPerTopicMap) - return true - } /** * fetchers must be stopped to avoid data duplication, since if the current @@ -467,15 +517,16 @@ * But if we don't stop the fetchers first, this consumer would continue returning data for released * partitions in parallel. So, not stopping the fetchers leads to duplicate data. */ - closeFetchers(cluster, kafkaMessageStreams, relevantTopicThreadIdsMap) + closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) - releasePartitionOwnership() + releasePartitionOwnership(topicRegistry) var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]() - for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) { - topicRegistry.remove(topic) - topicRegistry.put(topic, new Pool[Int, PartitionTopicInfo]) + var currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] + for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) { + currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo]) + val topicDirs = new ZKGroupTopicDirs(group, topic) val curConsumers = consumersPerTopicMap.get(topic).get var curPartitions: Seq[String] = partitionsPerTopicMap.get(topic).get @@ -502,11 +553,9 @@ for (i <- startPart until startPart + nParts) { val partition = curPartitions(i) info(consumerThreadId + " attempting to claim partition " + partition) - val ownPartition = processPartition(topicDirs, partition, topic, consumerThreadId) - if (!ownPartition) - return false - else // record the partition ownership decision - partitionOwnershipDecision += ((topic, partition) -> consumerThreadId) + addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId) + // record the partition ownership decision + partitionOwnershipDecision += ((topic, partition) -> consumerThreadId) } } } @@ -520,21 +569,21 @@ info("Updating the cache") debug("Partitions per topic cache " + partitionsPerTopicMap) debug("Consumers per topic cache " + consumersPerTopicMap) - oldPartitionsPerTopicMap = partitionsPerTopicMap - oldConsumersPerTopicMap = consumersPerTopicMap - updateFetcher(cluster, kafkaMessageStreams) + topicRegistry = currentTopicRegistry + updateFetcher(cluster) true - } else + }else { false + } } private def closeFetchersForQueues(cluster: Cluster, - kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]], + messageStreams: Map[String,List[KafkaStream[_]]], queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) { var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten fetcher match { case Some(f) => f.stopConnectionsToAllBrokers - f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, kafkaMessageStreams) + f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams) info("Committing all offsets after clearing the fetcher queues") /** * here, we need to commit offsets before stopping the consumer from returning any more messages @@ -549,16 +598,15 @@ } } - private def closeFetchers(cluster: Cluster, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]], + private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_]]], relevantTopicThreadIdsMap: Map[String, Set[String]]) { // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer // after this rebalancing attempt - val queuesTobeCleared = queues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2) - closeFetchersForQueues(cluster, kafkaMessageStreams, queuesTobeCleared) + val queuesTobeCleared = topicThreadIdAndQueues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2) + closeFetchersForQueues(cluster, messageStreams, queuesTobeCleared) } - private def updateFetcher[T](cluster: Cluster, - kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) { + private def updateFetcher(cluster: Cluster) { // update partitions for fetcher var allPartitionInfos : List[PartitionTopicInfo] = Nil for (partitionInfos <- topicRegistry.values) @@ -569,33 +617,13 @@ fetcher match { case Some(f) => - f.startConnections(allPartitionInfos, cluster, kafkaMessageStreams) + f.startConnections(allPartitionInfos, cluster) case None => } } - private def processPartition(topicDirs: ZKGroupTopicDirs, partition: String, - topic: String, consumerThreadId: String) : Boolean = { - val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition - // check if some other consumer owns this partition at this time - val currentPartitionOwner = readDataMaybeNull(zkClient, partitionOwnerPath) - if(currentPartitionOwner != null) { - if(currentPartitionOwner.equals(consumerThreadId)) { - info(partitionOwnerPath + " exists with value " + currentPartitionOwner + " during connection loss; this is ok") - addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId) - true - } - else { - info(partitionOwnerPath + " exists with value " + currentPartitionOwner) - false - } - } else { - addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId) - true - } - } - private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = { + var successfullyOwnedPartitions : List[(String, String)] = Nil val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner => val topic = partitionOwner._1._1 val partition = partitionOwner._1._2 @@ -604,6 +632,7 @@ try { createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId) info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic) + successfullyOwnedPartitions ::= (topic, partition) true } catch { case e: ZkNodeExistsException => @@ -613,14 +642,20 @@ case e2 => throw e2 } } - val success = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => if(decision) 0 else 1) - if(success > 0) false /* even if one of the partition ownership attempt has failed, return false */ + val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1)) + /* even if one of the partition ownership attempt has failed, return false */ + if(hasPartitionOwnershipFailed > 0) { + // remove all paths that we have owned in ZK + successfullyOwnedPartitions.foreach(topicAndPartition => deletePartitionOwnershipFromZK(topicAndPartition._1, topicAndPartition._2)) + false + } else true } - private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partition: String, + private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Partition, PartitionTopicInfo]], + topicDirs: ZKGroupTopicDirs, partitionString: String, topic: String, consumerThreadId: String) { - val partTopicInfoMap = topicRegistry.get(topic) + val partTopicInfoMap = currentTopicRegistry.get(topic) // find the leader for this partition val leaderOpt = getLeaderForPartition(zkClient, topic, partition.toInt) @@ -647,7 +682,8 @@ else offset = offsetString.toLong - val queue = queues.get((topic, consumerThreadId)) + val queue = topicThreadIdAndQueues.get((topic, consumerThreadId)) + val consumedOffset = new AtomicLong(offset) val fetchedOffset = new AtomicLong(offset) val partTopicInfo = new PartitionTopicInfo(topic, @@ -661,5 +697,155 @@ debug(partTopicInfo + " selected new offset " + offset) } } + + private def reinitializeConsumer[T]( + topicCount: TopicCount, + queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[T])]) { + + val dirs = new ZKGroupDirs(config.groupId) + + // listener to consumer and partition changes + if (loadBalancerListener == null) { + val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[T]]] + loadBalancerListener = new ZKRebalancerListener( + config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_]]]]) + } + + // register listener for session expired event + if (sessionExpirationListener == null) + sessionExpirationListener = new ZKSessionExpireListener( + dirs, consumerIdString, topicCount, loadBalancerListener) + + val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams + + // map of {topic -> Set(thread-1, thread-2, ...)} + val consumerThreadIdsPerTopic: Map[String, Set[String]] = + topicCount.getConsumerThreadIdsPerTopic + + /* + * This usage of map flatten breaks up consumerThreadIdsPerTopic into + * a set of (topic, thread-id) pairs that we then use to construct + * the updated (topic, thread-id) -> queues map + */ + implicit def getTopicThreadIds(v: (String, Set[String])): Set[(String, String)] = v._2.map((v._1, _)) + + // iterator over (topic, thread-id) tuples + val topicThreadIds: Iterable[(String, String)] = + consumerThreadIdsPerTopic.flatten + + // list of (pairs of pairs): e.g., ((topic, thread-id),(queue, stream)) + val threadQueueStreamPairs = topicCount match { + case wildTopicCount: WildcardTopicCount => + for (tt <- topicThreadIds; qs <- queuesAndStreams) yield (tt -> qs) + case statTopicCount: StaticTopicCount => { + require(topicThreadIds.size == queuesAndStreams.size, + "Mismatch between thread ID count (%d) and queue count (%d)".format( + topicThreadIds.size, queuesAndStreams.size)) + topicThreadIds.zip(queuesAndStreams) + } + } + + threadQueueStreamPairs.foreach(e => { + val topicThreadId = e._1 + val q = e._2._1 + topicThreadIdAndQueues.put(topicThreadId, q) + }) + + val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1) + groupedByTopic.foreach(e => { + val topic = e._1 + val streams = e._2.map(_._2._2).toList + topicStreamsMap += (topic -> streams) + debug("adding topic %s and %d streams to map.".format(topic, streams.size)) + }) + + // listener to consumer and partition changes + zkClient.subscribeStateChanges(sessionExpirationListener) + + zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) + + topicStreamsMap.foreach { topicAndStreams => + // register on broker partition path changes + val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1 + zkClient.subscribeChildChanges(partitionPath, loadBalancerListener) + } + + // explicitly trigger load balancing for this consumer + loadBalancerListener.syncedRebalance() + } + + class WildcardStreamsHandler[T](topicFilter: TopicFilter, + numStreams: Int, + decoder: Decoder[T]) + extends TopicEventHandler[String] { + + if (messageStreamCreated.getAndSet(true)) + throw new RuntimeException("Each consumer connector can create " + + "message streams by filter at most once.") + + private val wildcardQueuesAndStreams = (1 to numStreams) + .map(e => { + val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks) + val stream = new KafkaStream[T]( + queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator) + (queue, stream) + }).toList + + // bootstrap with existing topics + private var wildcardTopics = + getChildrenParentMayNotExist(zkClient, BrokerTopicsPath) + .filter(topicFilter.isTopicAllowed) + + private val wildcardTopicCount = TopicCount.constructTopicCount( + consumerIdString, topicFilter, numStreams, zkClient) + + val dirs = new ZKGroupDirs(config.groupId) + registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount) + reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams) + + if (!topicFilter.requiresTopicEventWatcher) { + info("Not creating event watcher for trivial whitelist " + topicFilter) + } + else { + info("Creating topic event watcher for whitelist " + topicFilter) + wildcardTopicWatcher = new ZookeeperTopicEventWatcher(config, this) + + /* + * Topic events will trigger subsequent synced rebalances. Also, the + * consumer will get registered only after an allowed topic becomes + * available. + */ + } + + def handleTopicEvent(allTopics: Seq[String]) { + debug("Handling topic event") + + val updatedTopics = allTopics.filter(topicFilter.isTopicAllowed) + + val addedTopics = updatedTopics filterNot (wildcardTopics contains) + if (addedTopics.nonEmpty) + info("Topic event: added topics = %s" + .format(addedTopics)) + + /* + * TODO: Deleted topics are interesting (and will not be a concern until + * 0.8 release). We may need to remove these topics from the rebalance + * listener's map in reinitializeConsumer. + */ + val deletedTopics = wildcardTopics filterNot (updatedTopics contains) + if (deletedTopics.nonEmpty) + info("Topic event: deleted topics = %s" + .format(deletedTopics)) + + wildcardTopics = updatedTopics + info("Topics to consume = %s".format(wildcardTopics)) + + if (addedTopics.nonEmpty || deletedTopics.nonEmpty) + reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams) + } + + def streams: Seq[KafkaStream[T]] = + wildcardQueuesAndStreams.map(_._2) + } } Index: core/src/main/scala/kafka/Kafka.scala =================================================================== --- core/src/main/scala/kafka/Kafka.scala (revision 1342312) +++ core/src/main/scala/kafka/Kafka.scala (working copy) @@ -17,8 +17,6 @@ package kafka -import consumer.ConsumerConfig -import producer.ProducerConfig import server.{KafkaConfig, KafkaServerStartable, KafkaServer} import utils.{Utils, Logging} import org.apache.log4j.jmx.LoggerDynamicMBean @@ -30,8 +28,8 @@ import org.apache.log4j.Logger Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName) - if (!List(1, 3).contains(args.length)) { - println("USAGE: java [options] %s server.properties [consumer.properties producer.properties]".format(classOf[KafkaServer].getSimpleName())) + if (args.length != 1) { + println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName())) System.exit(1) } @@ -39,14 +37,7 @@ val props = Utils.loadProps(args(0)) val serverConfig = new KafkaConfig(props) - val kafkaServerStartble = args.length match { - case 3 => - val consumerConfig = new ConsumerConfig(Utils.loadProps(args(1))) - val producerConfig = new ProducerConfig(Utils.loadProps(args(2))) - new KafkaServerStartable(serverConfig, consumerConfig, producerConfig) - case 1 => - new KafkaServerStartable(serverConfig) - } + val kafkaServerStartble = new KafkaServerStartable(serverConfig) // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread() { Index: core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala =================================================================== --- core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (revision 1342312) +++ core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (working copy) @@ -57,9 +57,9 @@ // check if the rebalancing operation succeeded. try { if(validateRebalancingOperation(zkClient, group)) - info("Rebalance operation successful !") + println("Rebalance operation successful !") else - error("Rebalance operation failed !") + println("Rebalance operation failed !") } catch { case e2: Throwable => error("Error while verifying current rebalancing operation", e2) } @@ -132,6 +132,4 @@ rebalanceSucceeded } - - -} \ No newline at end of file +} Index: core/src/main/scala/kafka/tools/ReplayLogProducer.scala =================================================================== --- core/src/main/scala/kafka/tools/ReplayLogProducer.scala (revision 1342312) +++ core/src/main/scala/kafka/tools/ReplayLogProducer.scala (working copy) @@ -32,8 +32,6 @@ private val GROUPID: String = "replay-log-producer" def main(args: Array[String]) { - var isNoPrint = false; - val config = new Config(args) val executor = Executors.newFixedThreadPool(config.numThreads) @@ -151,7 +149,7 @@ } } - class ZKConsumerThread(config: Config, stream: KafkaMessageStream[Message]) extends Thread with Logging { + class ZKConsumerThread(config: Config, stream: KafkaStream[Message]) extends Thread with Logging { val shutdownLatch = new CountDownLatch(1) val props = new Properties() val brokerInfoList = config.brokerInfo.split("=") @@ -180,9 +178,9 @@ stream.slice(0, config.numMessages) else stream - for (message <- iter) { + for (messageAndMetadata <- iter) { try { - producer.send(new ProducerData[Message, Message](config.outputTopic, message)) + producer.send(new ProducerData[Message, Message](config.outputTopic, messageAndMetadata.message)) if (config.delayedMSBtwSend > 0 && (messageCount + 1) % config.batchSize == 0) Thread.sleep(config.delayedMSBtwSend) messageCount += 1 Index: core/src/main/scala/kafka/tools/ConsumerShell.scala =================================================================== --- core/src/main/scala/kafka/tools/ConsumerShell.scala (revision 1342312) +++ core/src/main/scala/kafka/tools/ConsumerShell.scala (working copy) @@ -82,15 +82,15 @@ } } -class ZKConsumerThread(stream: KafkaMessageStream[String]) extends Thread with Logging { +class ZKConsumerThread(stream: KafkaStream[String]) extends Thread with Logging { val shutdownLatch = new CountDownLatch(1) override def run() { println("Starting consumer thread..") var count: Int = 0 try { - for (message <- stream) { - println("consumed: " + message) + for (messageAndMetadata <- stream) { + println("consumed: " + messageAndMetadata.message) count += 1 } }catch { Index: core/src/main/scala/kafka/utils/Utils.scala =================================================================== --- core/src/main/scala/kafka/utils/Utils.scala (revision 1342312) +++ core/src/main/scala/kafka/utils/Utils.scala (working copy) @@ -29,15 +29,17 @@ import kafka.message.{NoCompressionCodec, CompressionCodec} import org.I0Itec.zkclient.ZkClient import java.util.{Random, Properties} +import joptsimple.{OptionSpec, OptionSet, OptionParser} + /** * Helper functions! */ object Utils extends Logging { + val random = new Random - def getNextRandomInt(): Int = random.nextInt - + def getNextRandomInt(): Int = random.nextInt def getNextRandomInt(upper: Int): Int = random.nextInt(upper) /** @@ -250,13 +252,47 @@ else value } - def getLong(props: Properties, name: String, default: Long): Long = + def getLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = { + val value = buffer.getLong + if(value < range._1 || value > range._2) + throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".") + else value + } + + /** + * Read a required long property value or throw an exception if no such property is found + */ + def getLong(props: Properties, name: String): Long = { + if(props.containsKey(name)) + return getLong(props, name, -1) + else + throw new IllegalArgumentException("Missing required property '" + name + "'") + } + + /** + * Read an long from the properties instance + * @param props The properties to read from + * @param name The property name + * @param default The default value to use if the property is not found + * @return the long value + */ + def getLong(props: Properties, name: String, default: Long): Long = getLongInRange(props, name, default, (Long.MinValue, Long.MaxValue)) + /** + * Read an long from the properties instance. Throw an exception + * if the value is not in the given range (inclusive) + * @param props The properties to read from + * @param name The property name + * @param default The default value to use if the property is not found + * @param range The range in which the value must fall (inclusive) + * @throws IllegalArgumentException If the value is not in the given range + * @return the long value + */ def getLongInRange(props: Properties, name: String, default: Long, range: (Long, Long)): Long = { - val v = + val v = if(props.containsKey(name)) - props.getProperty(name).toInt + props.getProperty(name).toLong else default if(v < range._1 || v > range._2) @@ -265,14 +301,6 @@ v } - - def getLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = { - val value = buffer.getLong - if(value < range._1 || value > range._2) - throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".") - else value - } - /** * Read a boolean value from the properties instance * @param props The properties to read from @@ -720,6 +748,27 @@ builder.append(" }") builder.toString } + + def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) { + for(arg <- required) { + if(!options.has(arg)) { + error("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + } + + /** + * Create a circular (looping) iterator over a collection. + * @param coll An iterable over the underlying collection. + * @return A circular iterator over the collection. + */ + def circularIterator[T](coll: Iterable[T]) = { + val stream: Stream[T] = + for (forever <- Stream.continually(1); t <- coll) yield t + stream.iterator + } } class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) { Index: core/src/main/scala/kafka/utils/ZkUtils.scala =================================================================== --- core/src/main/scala/kafka/utils/ZkUtils.scala (revision 1342312) +++ core/src/main/scala/kafka/utils/ZkUtils.scala (working copy) @@ -432,17 +432,11 @@ getChildren(zkClient, dirs.consumerRegistryDir) } - def getTopicCount(zkClient: ZkClient, group: String, consumerId: String) : TopicCount = { - val dirs = new ZKGroupDirs(group) - val topicCountJson = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId) - TopicCount.constructTopicCount(consumerId, topicCountJson) - } - def getConsumerTopicMaps(zkClient: ZkClient, group: String): Map[String, TopicCount] = { val dirs = new ZKGroupDirs(group) val consumersInGroup = getConsumersInGroup(zkClient, group) val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId, - ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId))) + ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId), zkClient)) consumersInGroup.zip(topicCountMaps).toMap } @@ -451,8 +445,8 @@ val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir) val consumersPerTopicMap = new mutable.HashMap[String, List[String]] for (consumer <- consumers) { - val topicCount = getTopicCount(zkClient, group, consumer) - for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic()) { + val topicCount = TopicCount.constructTopicCount(group, consumer, zkClient) + for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) { for (consumerThreadId <- consumerThreadIdSet) consumersPerTopicMap.get(topic) match { case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers) Index: core/src/main/scala/kafka/utils/Logging.scala =================================================================== --- core/src/main/scala/kafka/utils/Logging.scala (revision 1342312) +++ core/src/main/scala/kafka/utils/Logging.scala (working copy) @@ -23,17 +23,21 @@ val loggerName = this.getClass.getName lazy val logger = Logger.getLogger(loggerName) + protected var logIdent = "" + + private def msgWithLogIdent(msg: String) = "%s%s".format(logIdent, msg) + def trace(msg: => String): Unit = { if (logger.isTraceEnabled()) - logger.trace(msg) + logger.trace(msgWithLogIdent(msg)) } def trace(e: => Throwable): Any = { if (logger.isTraceEnabled()) - logger.trace("",e) + logger.trace(logIdent,e) } def trace(msg: => String, e: => Throwable) = { if (logger.isTraceEnabled()) - logger.trace(msg,e) + logger.trace(msgWithLogIdent(msg),e) } def swallowTrace(action: => Unit) { Utils.swallow(logger.trace, action) @@ -41,15 +45,15 @@ def debug(msg: => String): Unit = { if (logger.isDebugEnabled()) - logger.debug(msg) + logger.debug(msgWithLogIdent(msg)) } def debug(e: => Throwable): Any = { if (logger.isDebugEnabled()) - logger.debug("",e) + logger.debug(logIdent,e) } def debug(msg: => String, e: => Throwable) = { if (logger.isDebugEnabled()) - logger.debug(msg,e) + logger.debug(msgWithLogIdent(msg),e) } def swallowDebug(action: => Unit) { Utils.swallow(logger.debug, action) @@ -57,55 +61,54 @@ def info(msg: => String): Unit = { if (logger.isInfoEnabled()) - logger.info(msg) + logger.info(msgWithLogIdent(msg)) } def info(e: => Throwable): Any = { if (logger.isInfoEnabled()) - logger.info("",e) + logger.info(logIdent,e) } def info(msg: => String,e: => Throwable) = { if (logger.isInfoEnabled()) - logger.info(msg,e) + logger.info(msgWithLogIdent(msg),e) } def swallowInfo(action: => Unit) { Utils.swallow(logger.info, action) } def warn(msg: => String): Unit = { - logger.warn(msg) + logger.warn(msgWithLogIdent(msg)) } def warn(e: => Throwable): Any = { - logger.warn("",e) + logger.warn(logIdent,e) } def warn(msg: => String, e: => Throwable) = { - logger.warn(msg,e) + logger.warn(msgWithLogIdent(msg),e) } def swallowWarn(action: => Unit) { Utils.swallow(logger.warn, action) } def swallow(action: => Unit) = swallowWarn(action) - def error(msg: => String):Unit = { - logger.error(msg) + def error(msg: => String): Unit = { + logger.error(msgWithLogIdent(msg)) } def error(e: => Throwable): Any = { - logger.error("",e) + logger.error(logIdent,e) } def error(msg: => String, e: => Throwable) = { - logger.error(msg,e) + logger.error(msgWithLogIdent(msg),e) } def swallowError(action: => Unit) { Utils.swallow(logger.error, action) } def fatal(msg: => String): Unit = { - logger.fatal(msg) + logger.fatal(msgWithLogIdent(msg)) } def fatal(e: => Throwable): Any = { - logger.fatal("",e) + logger.fatal(logIdent,e) } def fatal(msg: => String, e: => Throwable) = { - logger.fatal(msg,e) + logger.fatal(msgWithLogIdent(msg),e) } - } \ No newline at end of file Index: core/src/main/scala/kafka/server/KafkaConfig.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaConfig.scala (revision 1342312) +++ core/src/main/scala/kafka/server/KafkaConfig.scala (working copy) @@ -71,7 +71,7 @@ val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue)) /* the maximum size of the log before deleting it */ - val logRetentionSize = Utils.getInt(props, "log.retention.size", -1) + val logRetentionSize = Utils.getLong(props, "log.retention.size", -1) /* the number of hours to keep a log file before deleting it for some specific topic*/ val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", "")) Index: core/src/main/scala/kafka/server/KafkaRequestHandler.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaRequestHandler.scala (revision 1342312) +++ core/src/main/scala/kafka/server/KafkaRequestHandler.scala (working copy) @@ -20,6 +20,7 @@ import org.apache.log4j._ import kafka.network._ import kafka.utils._ +import java.util.concurrent.atomic.AtomicLong /** * A thread that answers kafka requests. @@ -60,3 +61,60 @@ } } + +trait BrokerTopicStatMBean { + def getMessagesIn: Long + def getBytesIn: Long + def getBytesOut: Long + def getFailedProduceRequest: Long + def getFailedFetchRequest: Long +} + +@threadsafe +class BrokerTopicStat extends BrokerTopicStatMBean { + private val numCumulatedMessagesIn = new AtomicLong(0) + private val numCumulatedBytesIn = new AtomicLong(0) + private val numCumulatedBytesOut = new AtomicLong(0) + private val numCumulatedFailedProduceRequests = new AtomicLong(0) + private val numCumulatedFailedFetchRequests = new AtomicLong(0) + + def getMessagesIn: Long = numCumulatedMessagesIn.get + + def recordMessagesIn(nMessages: Int) = numCumulatedMessagesIn.getAndAdd(nMessages) + + def getBytesIn: Long = numCumulatedBytesIn.get + + def recordBytesIn(nBytes: Long) = numCumulatedBytesIn.getAndAdd(nBytes) + + def getBytesOut: Long = numCumulatedBytesOut.get + + def recordBytesOut(nBytes: Long) = numCumulatedBytesOut.getAndAdd(nBytes) + + def recordFailedProduceRequest = numCumulatedFailedProduceRequests.getAndIncrement + + def getFailedProduceRequest = numCumulatedFailedProduceRequests.get() + + def recordFailedFetchRequest = numCumulatedFailedFetchRequests.getAndIncrement + + def getFailedFetchRequest = numCumulatedFailedFetchRequests.get() +} + +object BrokerTopicStat extends Logging { + private val stats = new Pool[String, BrokerTopicStat] + private val allTopicStat = new BrokerTopicStat + Utils.registerMBean(allTopicStat, "kafka:type=kafka.BrokerAllTopicStat") + + def getBrokerAllTopicStat(): BrokerTopicStat = allTopicStat + + def getBrokerTopicStat(topic: String): BrokerTopicStat = { + var stat = stats.get(topic) + if (stat == null) { + stat = new BrokerTopicStat + if (stats.putIfNotExists(topic, stat) == null) + Utils.registerMBean(stat, "kafka:type=kafka.BrokerTopicStat." + topic) + else + stat = stats.get(topic) + } + return stat + } +} Index: core/src/main/scala/kafka/server/KafkaServer.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServer.scala (revision 1342312) +++ core/src/main/scala/kafka/server/KafkaServer.scala (working copy) @@ -34,6 +34,7 @@ val CleanShutdownFile = ".kafka_cleanshutdown" private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) + private val statsMBeanName = "kafka:type=kafka.SocketServerStats" var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null Index: core/src/main/scala/kafka/server/KafkaServerStartable.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaServerStartable.scala (revision 1342312) +++ core/src/main/scala/kafka/server/KafkaServerStartable.scala (working copy) @@ -17,36 +17,21 @@ package kafka.server -import kafka.utils.{Utils, Logging} -import kafka.consumer._ -import kafka.producer.{ProducerData, ProducerConfig, Producer} -import kafka.message.Message -import java.util.concurrent.CountDownLatch +import kafka.utils.Logging -import scala.collection.Map -class KafkaServerStartable(val serverConfig: KafkaConfig, - val consumerConfig: ConsumerConfig, - val producerConfig: ProducerConfig) extends Logging { +class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { private var server : KafkaServer = null - private var embeddedConsumer : EmbeddedConsumer = null init - def this(serverConfig: KafkaConfig) = this(serverConfig, null, null) - private def init() { server = new KafkaServer(serverConfig) - if (consumerConfig != null) - embeddedConsumer = - new EmbeddedConsumer(consumerConfig, producerConfig, this) } def startup() { try { server.startup() - if (embeddedConsumer != null) - embeddedConsumer.startup() } catch { case e => @@ -57,8 +42,6 @@ def shutdown() { try { - if (embeddedConsumer != null) - embeddedConsumer.shutdown() server.shutdown() } catch { @@ -73,153 +56,4 @@ } -class EmbeddedConsumer(private val consumerConfig: ConsumerConfig, - private val producerConfig: ProducerConfig, - private val kafkaServerStartable: KafkaServerStartable) extends TopicEventHandler[String] with Logging { - private val whiteListTopics = - consumerConfig.mirrorTopicsWhitelist.split(",").toList.map(_.trim) - - private val blackListTopics = - consumerConfig.mirrorTopicsBlackList.split(",").toList.map(_.trim) - - // mirrorTopics should be accessed by handleTopicEvent only - private var mirrorTopics:Seq[String] = List() - - private var consumerConnector: ConsumerConnector = null - private var topicEventWatcher:ZookeeperTopicEventWatcher = null - - private val producer = new Producer[Null, Message](producerConfig) - - var threadList = List[MirroringThread]() - - private def isTopicAllowed(topic: String) = { - if (consumerConfig.mirrorTopicsWhitelist.nonEmpty) - whiteListTopics.contains(topic) - else - !blackListTopics.contains(topic) - } - - // TopicEventHandler call-back only - @Override - def handleTopicEvent(allTopics: Seq[String]) { - val newMirrorTopics = allTopics.filter(isTopicAllowed) - - val addedTopics = newMirrorTopics filterNot (mirrorTopics contains) - if (addedTopics.nonEmpty) - info("topic event: added topics = %s".format(addedTopics)) - - val deletedTopics = mirrorTopics filterNot (newMirrorTopics contains) - if (deletedTopics.nonEmpty) - info("topic event: deleted topics = %s".format(deletedTopics)) - - mirrorTopics = newMirrorTopics - - if (addedTopics.nonEmpty || deletedTopics.nonEmpty) { - info("mirror topics = %s".format(mirrorTopics)) - startNewConsumerThreads(makeTopicMap(mirrorTopics)) - } - } - - private def makeTopicMap(mirrorTopics: Seq[String]) = { - if (mirrorTopics.nonEmpty) - Utils.getConsumerTopicMap(mirrorTopics.mkString( - "", ":%d,".format(consumerConfig.mirrorConsumerNumThreads), - ":%d".format(consumerConfig.mirrorConsumerNumThreads))) - else - Utils.getConsumerTopicMap("") - } - - private def startNewConsumerThreads(topicMap: Map[String, Int]) { - if (topicMap.nonEmpty) { - if (consumerConnector != null) - consumerConnector.shutdown() - - /** - * Before starting new consumer threads for the updated set of topics, - * shutdown the existing mirroring threads. Since the consumer connector - * is already shutdown, the mirroring threads should finish their task almost - * instantaneously. If they don't, this points to an error that needs to be looked - * into, and further mirroring should stop - */ - threadList.foreach(_.shutdown) - - // KAFKA: 212: clear the thread list to remove the older thread references that are already shutdown - threadList = Nil - - consumerConnector = Consumer.create(consumerConfig) - val topicMessageStreams = consumerConnector.createMessageStreams(topicMap) - for ((topic, streamList) <- topicMessageStreams) - for (i <- 0 until streamList.length) - threadList ::= new MirroringThread(streamList(i), topic, i) - - threadList.foreach(_.start) - } - else - info("Not starting mirroring threads (mirror topic list is empty)") - } - - def startup() { - info("staring up embedded consumer") - topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this, kafkaServerStartable) - /* - * consumer threads are (re-)started upon topic events (which includes an - * initial startup event which lists the current topics) - */ - } - - def shutdown() { - // first shutdown the topic watcher to prevent creating new consumer streams - if (topicEventWatcher != null) - topicEventWatcher.shutdown() - info("Stopped the ZK watcher for new topics, now stopping the Kafka consumers") - // stop pulling more data for mirroring - if (consumerConnector != null) - consumerConnector.shutdown() - info("Stopped the kafka consumer threads for existing topics, now stopping the existing mirroring threads") - // wait for all mirroring threads to stop - threadList.foreach(_.shutdown) - info("Stopped all existing mirroring threads, now stopping the producer") - // only then, shutdown the producer - producer.close() - info("Successfully shutdown this Kafka mirror") - } - - class MirroringThread(val stream: KafkaMessageStream[Message], val topic: String, val threadId: Int) extends Thread with Logging { - val shutdownComplete = new CountDownLatch(1) - val name = "kafka-embedded-consumer-%s-%d".format(topic, threadId) - this.setDaemon(false) - this.setName(name) - - - override def run = { - info("Starting mirroring thread %s for topic %s and stream %d".format(name, topic, threadId)) - - try { - for (message <- stream) { - trace("Mirroring thread received message " + message.checksum) - val pd = new ProducerData[Null, Message](topic, message) - producer.send(pd) - } - } - catch { - case e => - fatal(topic + " stream " + threadId + " unexpectedly exited", e) - }finally { - shutdownComplete.countDown - info("Stopped mirroring thread %s for topic %s and stream %d".format(name, topic, threadId)) - } - } - - def shutdown = { - try { - shutdownComplete.await - }catch { - case e: InterruptedException => fatal("Shutdown of thread " + name + " interrupted. " + - "Mirroring thread might leak data!") - } - } - } -} - - Index: core/src/main/scala/kafka/javaapi/message/MessageSet.scala =================================================================== --- core/src/main/scala/kafka/javaapi/message/MessageSet.scala (revision 1342312) +++ core/src/main/scala/kafka/javaapi/message/MessageSet.scala (working copy) @@ -17,8 +17,10 @@ package kafka.javaapi.message + import kafka.message.{MessageAndOffset, InvalidMessageException} + /** * A set of messages. A message set has a fixed serialized form, though the container * for the bytes could be either in-memory or on disk. A The format of each message is Index: core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java =================================================================== --- core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java (revision 1342312) +++ core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java (working copy) @@ -17,34 +17,53 @@ package kafka.javaapi.consumer; -import kafka.consumer.KafkaMessageStream; -import kafka.message.Message; -import kafka.serializer.Decoder; import java.util.List; import java.util.Map; +import kafka.consumer.KafkaStream; +import kafka.consumer.TopicFilter; +import kafka.message.Message; +import kafka.serializer.Decoder; public interface ConsumerConnector { - /** - * Create a list of MessageStreams of type T for each topic. - * - * @param topicCountMap a map of (topic, #streams) pair - * @param decoder a decoder that converts from Message to T - * @return a map of (topic, list of KafkaMessageStream) pair. The number of items in the - * list is #streams. Each KafkaMessageStream supports an iterator of messages. - */ - public Map>> createMessageStreams( - Map topicCountMap, Decoder decoder); - public Map>> createMessageStreams( - Map topicCountMap); + /** + * Create a list of MessageStreams of type T for each topic. + * + * @param topicCountMap a map of (topic, #streams) pair + * @param decoder a decoder that converts from Message to T + * @return a map of (topic, list of KafkaStream) pairs. + * The number of items in the list is #streams. Each stream supports + * an iterator over message/metadata pairs. + */ + public Map>> createMessageStreams( + Map topicCountMap, Decoder decoder); + public Map>> createMessageStreams( + Map topicCountMap); - /** - * Commit the offsets of all broker partitions connected by this connector. - */ - public void commitOffsets(); + /** + * Create a list of MessageAndTopicStreams containing messages of type T. + * + * @param topicFilter a TopicFilter that specifies which topics to + * subscribe to (encapsulates a whitelist or a blacklist). + * @param numStreams the number of message streams to return. + * @param decoder a decoder that converts from Message to T + * @return a list of KafkaStream. Each stream supports an + * iterator over its MessageAndMetadata elements. + */ + public List> createMessageStreamsByFilter( + TopicFilter topicFilter, int numStreams, Decoder decoder); + public List> createMessageStreamsByFilter( + TopicFilter topicFilter, int numStreams); + public List> createMessageStreamsByFilter( + TopicFilter topicFilter); - /** - * Shut down the connector - */ - public void shutdown(); + /** + * Commit the offsets of all broker partitions connected by this connector. + */ + public void commitOffsets(); + + /** + * Shut down the connector + */ + public void shutdown(); } Index: core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala (revision 1342312) +++ core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -16,10 +16,12 @@ */ package kafka.javaapi.consumer -import kafka.consumer.{KafkaMessageStream, ConsumerConfig} import kafka.message.Message import kafka.serializer.{DefaultDecoder, Decoder} +import kafka.consumer._ +import scala.collection.JavaConversions.asList + /** * This class handles the consumers interaction with zookeeper * @@ -68,14 +70,14 @@ def createMessageStreams[T]( topicCountMap: java.util.Map[String,java.lang.Integer], decoder: Decoder[T]) - : java.util.Map[String,java.util.List[KafkaMessageStream[T]]] = { + : java.util.Map[String,java.util.List[KafkaStream[T]]] = { import scala.collection.JavaConversions._ val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]]) val scalaReturn = underlying.consume(scalaTopicCountMap, decoder) - val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream[T]]] + val ret = new java.util.HashMap[String,java.util.List[KafkaStream[T]]] for ((topic, streams) <- scalaReturn) { - var javaStreamList = new java.util.ArrayList[KafkaMessageStream[T]] + var javaStreamList = new java.util.ArrayList[KafkaStream[T]] for (stream <- streams) javaStreamList.add(stream) ret.put(topic, javaStreamList) @@ -85,10 +87,18 @@ def createMessageStreams( topicCountMap: java.util.Map[String,java.lang.Integer]) - : java.util.Map[String,java.util.List[KafkaMessageStream[Message]]] = + : java.util.Map[String,java.util.List[KafkaStream[Message]]] = createMessageStreams(topicCountMap, new DefaultDecoder) + def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) = + asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder)) + def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = + createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder) + + def createMessageStreamsByFilter(topicFilter: TopicFilter) = + createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder) + def commitOffsets() { underlying.commitOffsets } Index: perf/src/main/scala/kafka/perf/ConsumerPerformance.scala =================================================================== --- perf/src/main/scala/kafka/perf/ConsumerPerformance.scala (revision 1342312) +++ perf/src/main/scala/kafka/perf/ConsumerPerformance.scala (working copy) @@ -17,15 +17,12 @@ package kafka.perf -import java.net.URI import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicLong import java.nio.channels.ClosedByInterruptException -import joptsimple._ import org.apache.log4j.Logger import kafka.message.Message -import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ZKStringSerializer, Utils} +import kafka.utils.Utils import java.util.{Random, Properties} import kafka.consumer._ import java.text.SimpleDateFormat @@ -139,7 +136,7 @@ val hideHeader = options.has(hideHeaderOpt) } - class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaMessageStream[Message], + class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Message], config:ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) extends Thread(name) { private val shutdownLatch = new CountDownLatch(1) @@ -157,9 +154,9 @@ var lastMessagesRead = 0L try { - for (message <- stream if messagesRead < config.numMessages) { + for (messageAndMetadata <- stream if messagesRead < config.numMessages) { messagesRead += 1 - bytesRead += message.payloadSize + bytesRead += messageAndMetadata.message.payloadSize if (messagesRead % config.reportingInterval == 0) { if(config.showDetailedStats) Index: perf/src/main/scala/kafka/perf/PerfConfig.scala =================================================================== --- perf/src/main/scala/kafka/perf/PerfConfig.scala (revision 1342312) +++ perf/src/main/scala/kafka/perf/PerfConfig.scala (working copy) @@ -18,8 +18,8 @@ package kafka.perf import joptsimple.OptionParser -import java.text.SimpleDateFormat + class PerfConfig(args: Array[String]) { val parser = new OptionParser val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") Index: perf/src/main/scala/kafka/perf/ProducerPerformance.scala =================================================================== --- perf/src/main/scala/kafka/perf/ProducerPerformance.scala (revision 1342312) +++ perf/src/main/scala/kafka/perf/ProducerPerformance.scala (working copy) @@ -20,18 +20,16 @@ import java.util.concurrent.{CountDownLatch, Executors} import java.util.concurrent.atomic.AtomicLong import kafka.producer._ -import async.DefaultEventHandler import org.apache.log4j.Logger -import joptsimple.OptionParser import kafka.message.{CompressionCodec, Message} -import kafka.serializer.DefaultEncoder import java.text.SimpleDateFormat -import java.util.{Date, Random, Properties} +import java.util.{Random, Properties} +import kafka.utils.Logging /** * Load test for the producer */ -object ProducerPerformance { +object ProducerPerformance extends Logging { def main(args: Array[String]) { @@ -141,7 +139,6 @@ val totalMessagesSent: AtomicLong, val allDone: CountDownLatch, val rand: Random) extends Runnable { - val logger = Logger.getLogger(getClass) val props = new Properties() val brokerInfoList = config.brokerInfo.split("=") if (brokerInfoList(0) == "zk.connect") { @@ -171,7 +168,7 @@ var lastReportTime = reportTime val messagesPerThread = if(!config.isAsync) config.numMessages / config.numThreads / config.batchSize else config.numMessages / config.numThreads - if(logger.isDebugEnabled) logger.debug("Messages per thread = " + messagesPerThread) + debug("Messages per thread = " + messagesPerThread) var messageSet: List[Message] = Nil if(config.isFixSize) { for(k <- 0 until config.batchSize) { @@ -203,11 +200,11 @@ rand.nextBytes(messageBytes) val message = new Message(messageBytes) producer.send(new ProducerData[Message,Message](config.topic, message)) - if(logger.isDebugEnabled) println("checksum:" + message.checksum) + debug(config.topic + "-checksum:" + message.checksum) bytesSent += message.payloadSize }else { producer.send(new ProducerData[Message,Message](config.topic, message)) - if(logger.isDebugEnabled) println("checksum:" + message.checksum) + debug(config.topic + "-checksum:" + message.checksum) bytesSent += message.payloadSize } nSends += 1 Index: contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java =================================================================== --- contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java (revision 1342312) +++ contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java (working copy) @@ -17,32 +17,24 @@ package kafka.etl.impl; -import java.io.IOException; + import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; import java.util.List; -import java.util.Random; -import java.util.Map.Entry; import java.util.Properties; - -import kafka.message.NoCompressionCodec; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.mapred.JobConf; - +import java.util.Random; import kafka.etl.KafkaETLKey; import kafka.etl.KafkaETLRequest; -import kafka.etl.KafkaETLUtils; import kafka.etl.Props; import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.Message; import kafka.javaapi.producer.SyncProducer; +import kafka.message.Message; import kafka.producer.SyncProducerConfig; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapred.JobConf; /** * Use this class to produce test events to Kafka server. Each event contains a Index: contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java =================================================================== --- contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java (revision 1342312) +++ contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLUtils.java (working copy) @@ -17,6 +17,7 @@ package kafka.etl; + import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -33,7 +34,6 @@ import java.util.Enumeration; import java.util.List; import java.util.Properties; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; Index: contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java =================================================================== --- contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (revision 1342312) +++ contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (working copy) @@ -16,11 +16,18 @@ */ package kafka.etl; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.OffsetRequest; import kafka.common.ErrorMapping; import kafka.javaapi.FetchResponse; +import kafka.javaapi.MultiFetchResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.javaapi.message.MessageSet; @@ -31,12 +38,6 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.MultipleOutputs; -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; - @SuppressWarnings({ "deprecation"}) public class KafkaETLContext { @@ -139,7 +140,7 @@ while ( !gotNext && _respIterator.hasNext()) { ByteBufferMessageSet msgSet = _respIterator.next(); if ( hasError(msgSet)) return false; - _messageIt = (Iterator) msgSet.iterator(); + _messageIt = msgSet.iterator(); gotNext = get(key, value); } } @@ -194,17 +195,17 @@ protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException { if (_messageIt != null && _messageIt.hasNext()) { - MessageAndOffset msgAndOffset = _messageIt.next(); + MessageAndOffset messageAndOffset = _messageIt.next(); - ByteBuffer buf = msgAndOffset.message().payload(); + ByteBuffer buf = messageAndOffset.message().payload(); int origSize = buf.remaining(); byte[] bytes = new byte[origSize]; - buf.get(bytes, buf.position(), origSize); + buf.get(bytes, buf.position(), origSize); value.set(bytes, 0, origSize); - key.set(_index, _offset, msgAndOffset.message().checksum()); + key.set(_index, _offset, messageAndOffset.message().checksum()); - _offset = msgAndOffset.offset(); //increase offset + _offset = messageAndOffset.offset(); //increase offset _count ++; //increase count return true; Index: contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java =================================================================== --- contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java (revision 1342312) +++ contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java (working copy) @@ -16,6 +16,7 @@ */ package kafka.etl; + import java.io.IOException; import java.net.URI; import java.util.Map; @@ -23,13 +24,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.lib.MultipleOutputs; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.lib.MultipleOutputs; @SuppressWarnings("deprecation") Index: contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java =================================================================== --- contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java (revision 1342312) +++ contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLKey.java (working copy) @@ -16,11 +16,11 @@ */ package kafka.etl; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; -import kafka.etl.KafkaETLKey; public class KafkaETLKey implements WritableComparable{ Index: contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java =================================================================== --- contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java (revision 1342312) +++ contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLJob.java (working copy) @@ -16,13 +16,13 @@ */ package kafka.etl; + import java.net.URI; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.lib.MultipleOutputs; Index: contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java =================================================================== --- contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java (revision 1342312) +++ contrib/hadoop-producer/src/main/java/kafka/bridge/pig/AvroKafkaStorage.java (working copy) @@ -16,9 +16,12 @@ */ package kafka.bridge.pig; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; import kafka.bridge.hadoop.KafkaOutputFormat; import kafka.bridge.hadoop.KafkaRecordWriter; - import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.Encoder; import org.apache.hadoop.fs.Path; @@ -33,10 +36,6 @@ import org.apache.pig.piggybank.storage.avro.PigAvroDatumWriter; import org.apache.pig.piggybank.storage.avro.PigSchema2Avro; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; - public class AvroKafkaStorage extends StoreFunc { protected KafkaRecordWriter writer; Index: contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java =================================================================== --- contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java (revision 1342312) +++ contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaOutputFormat.java (working copy) @@ -16,29 +16,46 @@ */ package kafka.bridge.hadoop; + +import java.io.IOException; +import java.net.URI; import java.util.Properties; - -import kafka.javaapi.producer.SyncProducer; -import kafka.producer.SyncProducerConfig; - +import kafka.javaapi.producer.Producer; +import kafka.message.Message; +import kafka.producer.ProducerConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.log4j.Logger; -import java.io.IOException; -import java.net.URI; - public class KafkaOutputFormat extends OutputFormat { + private Logger log = Logger.getLogger(KafkaOutputFormat.class); + public static final String KAFKA_URL = "kafka.output.url"; + /** Bytes to buffer before the OutputFormat does a send */ + public static final int KAFKA_QUEUE_SIZE = 10*1024*1024; + + /** Default value for Kafka's connect.timeout.ms */ public static final int KAFKA_PRODUCER_CONNECT_TIMEOUT = 30*1000; + /** Default value for Kafka's reconnect.interval*/ public static final int KAFKA_PRODUCER_RECONNECT_INTERVAL = 1000; + /** Default value for Kafka's buffer.size */ public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64*1024; + /** Default value for Kafka's max.message.size */ public static final int KAFKA_PRODUCER_MAX_MESSAGE_SIZE = 1024*1024; - public static final int KAFKA_QUEUE_SIZE = 10*1024*1024; + /** Default value for Kafka's producer.type */ + public static final String KAFKA_PRODUCER_PRODUCER_TYPE = "sync"; + /** Default value for Kafka's compression.codec */ + public static final int KAFKA_PRODUCER_COMPRESSION_CODEC = 0; public KafkaOutputFormat() { @@ -77,40 +94,80 @@ Path outputPath = getOutputPath(context); if (outputPath == null) throw new IllegalArgumentException("no kafka output url specified"); - URI uri = outputPath.toUri(); + URI uri = URI.create(outputPath.toString()); Configuration job = context.getConfiguration(); - final String topic = uri.getPath().substring(1); // ignore the initial '/' in the path + Properties props = new Properties(); + String topic; final int queueSize = job.getInt("kafka.output.queue_size", KAFKA_QUEUE_SIZE); final int timeout = job.getInt("kafka.output.connect_timeout", KAFKA_PRODUCER_CONNECT_TIMEOUT); final int interval = job.getInt("kafka.output.reconnect_interval", KAFKA_PRODUCER_RECONNECT_INTERVAL); final int bufSize = job.getInt("kafka.output.bufsize", KAFKA_PRODUCER_BUFFER_SIZE); final int maxSize = job.getInt("kafka.output.max_msgsize", KAFKA_PRODUCER_MAX_MESSAGE_SIZE); + final String producerType = job.get("kafka.output.producer_type", KAFKA_PRODUCER_PRODUCER_TYPE); + final int compressionCodec = job.getInt("kafka.output.compression_codec", KAFKA_PRODUCER_COMPRESSION_CODEC); - job.set("kafka.output.server", String.format("%s:%d", uri.getHost(), uri.getPort())); - job.set("kafka.output.topic", topic); job.setInt("kafka.output.queue_size", queueSize); job.setInt("kafka.output.connect_timeout", timeout); job.setInt("kafka.output.reconnect_interval", interval); job.setInt("kafka.output.bufsize", bufSize); job.setInt("kafka.output.max_msgsize", maxSize); + job.set("kafka.output.producer_type", producerType); + job.setInt("kafka.output.compression_codec", compressionCodec); - if (uri.getHost().isEmpty()) - throw new IllegalArgumentException("missing kafka server"); - if (uri.getPath().isEmpty()) - throw new IllegalArgumentException("missing kafka topic"); - - Properties props = new Properties(); - props.setProperty("host", uri.getHost()); - props.setProperty("port", Integer.toString(uri.getPort())); + props.setProperty("producer.type", producerType); props.setProperty("buffer.size", Integer.toString(bufSize)); props.setProperty("connect.timeout.ms", Integer.toString(timeout)); props.setProperty("reconnect.interval", Integer.toString(interval)); props.setProperty("max.message.size", Integer.toString(maxSize)); + props.setProperty("compression.codec", Integer.toString(compressionCodec)); - SyncProducer producer = new SyncProducer(new SyncProducerConfig(props)); + if (uri.getScheme().equals("kafka+zk")) { + // Software load balancer: + // URL: kafka+zk://# + // e.g. kafka+zk://kafka-zk:2181/kafka#foobar + + String zkConnect = uri.getAuthority() + uri.getPath(); + + props.setProperty("zk.connect", zkConnect); + job.set("kafka.zk.connect", zkConnect); + + topic = uri.getFragment(); + if (topic == null) + throw new IllegalArgumentException("no topic specified in kafka uri fragment"); + + log.info(String.format("using kafka zk.connect %s (topic %s)", zkConnect, topic)); + } else if (uri.getScheme().equals("kafka")) { + // using the legacy direct broker list + // URL: kafka:/// + // e.g. kafka://kafka-server:9000,kafka-server2:9000/foobar + + // Just enumerate broker_ids, as it really doesn't matter what they are as long as they're unique + // (KAFKA-258 will remove the broker_id requirement) + StringBuilder brokerListBuilder = new StringBuilder(); + String delim = ""; + int brokerId = 0; + for (String serverPort : uri.getAuthority().split(",")) { + brokerListBuilder.append(delim).append(String.format("%d:%s", brokerId, serverPort)); + delim = ","; + brokerId++; + } + String brokerList = brokerListBuilder.toString(); + + props.setProperty("broker.list", brokerList); + job.set("kafka.broker.list", brokerList); + + if (uri.getPath() == null || uri.getPath().length() <= 1) + throw new IllegalArgumentException("no topic specified in kafka uri"); + + topic = uri.getPath().substring(1); // ignore the initial '/' in the path + job.set("kafka.output.topic", topic); + log.info(String.format("using kafka broker %s (topic %s)", brokerList, topic)); + } else + throw new IllegalArgumentException("missing scheme from kafka uri (must be kafka:// or kafka+zk://)"); + + Producer producer = new Producer(new ProducerConfig(props)); return new KafkaRecordWriter(producer, topic, queueSize); } } - Index: contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java =================================================================== --- contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java (revision 1342312) +++ contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java (working copy) @@ -16,30 +16,28 @@ */ package kafka.bridge.hadoop; -import kafka.message.Message; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.javaapi.producer.SyncProducer; -import kafka.message.NoCompressionCodec; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import kafka.javaapi.producer.Producer; +import kafka.javaapi.producer.ProducerData; +import kafka.message.Message; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - public class KafkaRecordWriter extends RecordWriter { - protected SyncProducer producer; + protected Producer producer; protected String topic; - protected List msgList = new ArrayList(); + protected List> msgList = new LinkedList>(); protected int totalSize = 0; protected int queueSize; - public KafkaRecordWriter(SyncProducer producer, String topic, int queueSize) + public KafkaRecordWriter(Producer producer, String topic, int queueSize) { this.producer = producer; this.topic = topic; @@ -49,8 +47,7 @@ protected void sendMsgList() { if (msgList.size() > 0) { - ByteBufferMessageSet msgSet = new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, msgList); - producer.send(topic, msgSet); + producer.send(msgList); msgList.clear(); totalSize = 0; } @@ -60,10 +57,11 @@ public void write(NullWritable key, BytesWritable value) throws IOException, InterruptedException { Message msg = new Message(value.getBytes()); - msgList.add(msg); + msgList.add(new ProducerData(this.topic, msg)); totalSize += msg.size(); - if (totalSize > queueSize) + // MultiProducerRequest only supports sending up to Short.MAX_VALUE messages in one batch + if (totalSize > queueSize || msgList.size() >= Short.MAX_VALUE) sendMsgList(); } Index: contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java =================================================================== --- contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java (revision 1342312) +++ contrib/hadoop-producer/src/main/java/kafka/bridge/examples/TextPublisher.java (working copy) @@ -16,8 +16,9 @@ */ package kafka.bridge.examples; -import kafka.bridge.hadoop.KafkaOutputFormat; +import java.io.IOException; +import kafka.bridge.hadoop.KafkaOutputFormat; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; @@ -27,8 +28,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import java.io.IOException; - public class TextPublisher { public static void main(String[] args) throws Exception Index: contrib/hadoop-producer/README.md =================================================================== --- contrib/hadoop-producer/README.md (revision 1342312) +++ contrib/hadoop-producer/README.md (working copy) @@ -1,6 +1,14 @@ Hadoop to Kafka Bridge ====================== +What's new? +----------- + +* Now supports Kafka's software load balancer (Kafka URIs are specified with + kafka+zk as the scheme, as described below) +* Supports Kafka 0.7. Now uses the new Producer API, rather than the legacy + SyncProducer. + What is it? ----------- @@ -17,8 +25,10 @@ How do I use it? ---------------- -With this bridge, Kafka topics are URIs and are specified as -`kafka:///`. +With this bridge, Kafka topics are URIs and are specified in one of two +formats: `kafka+zk://#`, which uses the software load +balancer, or the legacy `kafka:///` to connect to a +specific Kafka broker. ### Pig ### @@ -27,17 +37,19 @@ with the Avro schema as its first argument. You'll need to register the appropriate Kafka JARs. Here is what an example Pig script looks like: - REGISTER hadoop-kafka-bridge-0.5.2.jar; + REGISTER hadoop-producer_2.8.0-0.7.0.jar; REGISTER avro-1.4.0.jar; REGISTER piggybank.jar; - REGISTER kafka-0.5.2.jar; + REGISTER kafka-0.7.0.jar; REGISTER jackson-core-asl-1.5.5.jar; REGISTER jackson-mapper-asl-1.5.5.jar; + REGISTER zkclient-20110412.jar; + REGISTER zookeeper-3.3.4.jar; REGISTER scala-library.jar; member_info = LOAD 'member_info.tsv' as (member_id : int, name : chararray); names = FOREACH member_info GENERATE name; - STORE member_info INTO 'kafka://my-broker:9092/member_info' USING kafka.bridge.AvroKafkaStorage('"string"'); + STORE member_info INTO 'kafka+zk://my-zookeeper:2181/kafka#member_info' USING kafka.bridge.AvroKafkaStorage('"string"'); That's it! The Pig StoreFunc makes use of AvroStorage in Piggybank to convert from Pig's data model to the specified Avro schema. @@ -46,8 +58,8 @@ multiple topics and brokers in the same job: SPLIT member_info INTO early_adopters IF member_id < 1000, others IF member_id >= 1000; - STORE early_adopters INTO 'kafka://my-broker:9092/early_adopters' USING AvroKafkaStorage('$schema'); - STORE others INTO 'kafka://my-broker:9092/others' USING AvroKafkaStorage('$schema'); + STORE early_adopters INTO 'kafka+zk://my-zookeeper:2181/kafka#early_adopters' USING AvroKafkaStorage('$schema'); + STORE others INTO 'kafka://my-broker:9092,my-broker2:9092/others' USING AvroKafkaStorage('$schema'); ### KafkaOutputFormat ### @@ -126,9 +138,10 @@ docs). Default is 64*1024 (64KB). * kafka.output.max_msgsize: Maximum message size in bytes (see Kafka producer docs). Default is 1024*1024 (1MB). +* kafka.output.compression_codec: The compression codec to use (see Kafka producer + docs). Default is 0 (no compression). -For easier debugging, the above values as well as the Kafka URI -(kafka.output.url), the output server (kafka.output.server), the topic -(kafka.output.topic), and the schema (kafka.output.schema) are injected into -the job's configuration. +For easier debugging, the above values as well as the Kafka broker information +(either kafka.zk.connect or kafka.broker.list), the topic (kafka.output.topic), +and the schema (kafka.output.schema) are injected into the job's configuration. Index: clients/cpp/src/encoder.hpp =================================================================== --- clients/cpp/src/encoder.hpp (revision 1342312) +++ clients/cpp/src/encoder.hpp (working copy) @@ -16,7 +16,7 @@ */ /* * encoder.hpp - * + */ #ifndef KAFKA_ENCODER_HPP_ #define KAFKA_ENCODER_HPP_ Index: examples/src/main/java/kafka/examples/Producer.java =================================================================== --- examples/src/main/java/kafka/examples/Producer.java (revision 1342312) +++ examples/src/main/java/kafka/examples/Producer.java (working copy) @@ -16,9 +16,10 @@ */ package kafka.examples; + +import java.util.Properties; import kafka.javaapi.producer.ProducerData; import kafka.producer.ProducerConfig; -import java.util.Properties; public class Producer extends Thread { Index: examples/src/main/java/kafka/examples/SimpleConsumerDemo.java =================================================================== --- examples/src/main/java/kafka/examples/SimpleConsumerDemo.java (revision 1342312) +++ examples/src/main/java/kafka/examples/SimpleConsumerDemo.java (working copy) @@ -19,17 +19,17 @@ import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.javaapi.FetchResponse; +import java.util.ArrayList; +import java.util.List; +import kafka.api.FetchRequest; +import kafka.javaapi.MultiFetchResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.javaapi.message.MessageSet; import kafka.message.MessageAndOffset; - -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; - public class SimpleConsumerDemo { private static void printMessages(ByteBufferMessageSet messageSet) { Index: examples/src/main/java/kafka/examples/Consumer.java =================================================================== --- examples/src/main/java/kafka/examples/Consumer.java (revision 1342312) +++ examples/src/main/java/kafka/examples/Consumer.java (working copy) @@ -16,17 +16,18 @@ */ package kafka.examples; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaMessageStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.Message; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.Message; + public class Consumer extends Thread { private final ConsumerConnector consumer; @@ -55,10 +56,10 @@ public void run() { Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(1)); - Map>> consumerMap = consumer.createMessageStreams(topicCountMap); - KafkaMessageStream stream = consumerMap.get(topic).get(0); + Map>> consumerMap = consumer.createMessageStreams(topicCountMap); + KafkaStream stream = consumerMap.get(topic).get(0); ConsumerIterator it = stream.iterator(); while(it.hasNext()) - System.out.println(ExampleUtils.getMessage(it.next())); + System.out.println(ExampleUtils.getMessage(it.next().message())); } } Index: examples/src/main/java/kafka/examples/ExampleUtils.java =================================================================== --- examples/src/main/java/kafka/examples/ExampleUtils.java (revision 1342312) +++ examples/src/main/java/kafka/examples/ExampleUtils.java (working copy) @@ -16,8 +16,8 @@ */ package kafka.examples; + import java.nio.ByteBuffer; - import kafka.message.Message; public class ExampleUtils