Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
None
-
None
-
None
-
web
Description
The example code for the "Consumer Code" section on http://sna-projects.com/kafka/quickstart.php seems to contain a couple of errors.
Here's the working code:
// specify some consumer properties Properties props = new Properties(); props.put("zk.connect", "localhost:2181"); props.put("zk.connectiontimeout.ms", "1000000"); props.put("groupid", "test_group"); // Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.create(consumerConfig); // create 4 partitions of the stream for topic "test", to allow 4 threads to consume Map<String, List<KafkaMessageStream>> topicMessageStreams = consumerConnector.createMessageStreams(ImmutableMap.of("test", 4)); // create list of 4 threads to consume from each of the partitions List<KafkaMessageStream> streams = topicMessageStreams.get("test"); ExecutorService executor = Executors.newFixedThreadPool(4); // consume the messages in the threads for (final KafkaMessageStream stream : streams) { executor.submit(new Runnable() { //final KafkaMessageStream stream = topicStream.getValue(); public void run() { for (Message message : stream) { // process message } } }); }
It might also be worth specifying the imports:
import kafka.consumer.*; import kafka.message.Message; import java.util.Properties; import java.util.Map; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.google.common.collect.ImmutableMap;