Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1913

App hungs when calls producer.send to wrong IP of Kafka broker

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 0.8.1.1
    • 0.9.0.0
    • producer
    • None
    • OS X 10.10.1, Java 7, AWS Linux

    Description

      I have next test code to check the Kafka functionality:

      package com.company;
      
      import kafka.common.FailedToSendMessageException;
      import kafka.javaapi.producer.Producer;
      import kafka.producer.KeyedMessage;
      import kafka.producer.ProducerConfig;
      
      import java.util.Date;
      import java.util.Properties;
      
      public class Main {
      
          public static void main(String[] args) {
      
              Properties props = new Properties();
              props.put("metadata.broker.list", "192.168.9.3:9092");
              props.put("serializer.class", "com.company.KafkaMessageSerializer");
              props.put("request.required.acks", "1");
      
              ProducerConfig config = new ProducerConfig(props);
      
              // The first is the type of the Partition key, the second the type of the message.
              Producer<String, String> messagesProducer = new Producer<String, String>(config);
      
              // Send
              String topicName = "my_messages";
              String message = "hello world";
              KeyedMessage<String, String> data = new KeyedMessage<String, String>(topicName, message);
      
              try {
                  System.out.println(new Date() + ": sending...");
      
                  messagesProducer.send(data);
      
                  System.out.println(new Date() +  ": sent");
      
              }catch (FailedToSendMessageException e){
                  System.out.println("e: " + e);
                  e.printStackTrace();
      
              }catch (Exception exc){
                  System.out.println("e: " + exc);
                  exc.printStackTrace();
              }
          }
      }
      
      package com.company;
      
      import kafka.serializer.Encoder;
      import kafka.utils.VerifiableProperties;
      
      /**
       * Created by igorkhomenko on 2/2/15.
       */
      public class KafkaMessageSerializer implements Encoder<String> {
      
          public KafkaMessageSerializer(VerifiableProperties verifiableProperties) {
              /* This constructor must be present for successful compile. */
          }
      
          @Override
          public byte[] toBytes(String entity) {
              byte [] serializedMessage = doCustomSerialization(entity);
              return serializedMessage;
          }
      
          private byte[] doCustomSerialization(String entity) {
              return entity.getBytes();
          }
      }
      

      Here is also GitHub version https://github.com/soulfly/Kafka-java-producer

      So it just hungs on next line:

      messagesProducer.send(data)
      

      When I replaced the brokerlist to

      props.put("metadata.broker.list", "localhost:9092");
      

      then I got an exception:

      kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
      

      so it's okay

      Why it hungs with wrong brokerlist? Any ideas?

      Attachments

        Activity

          People

            junrao Jun Rao
            igor.quickblox Igor Khomenko
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: