Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-52

Consumer Code documentation

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • None
    • 0.6
    • None
    • None
    • web

    Description

      The example code for the "Consumer Code" section on http://sna-projects.com/kafka/quickstart.php seems to contain a couple of errors.

      Here's the working code:

       
      // specify some consumer properties 
      Properties props = new Properties(); 
      props.put("zk.connect", "localhost:2181"); 
      props.put("zk.connectiontimeout.ms", "1000000"); 
      props.put("groupid", "test_group"); 
      
      // Create the connection to the cluster 
      ConsumerConfig consumerConfig = new ConsumerConfig(props); 
      ConsumerConnector consumerConnector = Consumer.create(consumerConfig); 
      
      // create 4 partitions of the stream for topic "test", to allow 4 threads to consume 
      Map<String, List<KafkaMessageStream>> topicMessageStreams = 
      consumerConnector.createMessageStreams(ImmutableMap.of("test", 4)); 
      // create list of 4 threads to consume from each of the partitions 
      List<KafkaMessageStream> streams = topicMessageStreams.get("test"); 
      ExecutorService executor = Executors.newFixedThreadPool(4); 
      
      // consume the messages in the threads 
      for (final KafkaMessageStream stream : streams) { 
      executor.submit(new Runnable() { 
      //final KafkaMessageStream stream = topicStream.getValue(); 
      public void run() { 
      for (Message message : stream) { 
      // process message 
      } 
      } 
      }); 
      } 
      

      It might also be worth specifying the imports:

       
      import kafka.consumer.*; 
      import kafka.message.Message; 
      
      import java.util.Properties; 
      import java.util.Map; 
      import java.util.List; 
      import java.util.concurrent.ExecutorService; 
      import java.util.concurrent.Executors; 
      
      import com.google.common.collect.ImmutableMap; 
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            Anonymous Anonymous
            Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - 5m
                5m
                Remaining:
                Remaining Estimate - 5m
                5m
                Logged:
                Time Spent - Not Specified
                Not Specified