Details
Description
I have next test code to check the Kafka functionality:
package com.company; import kafka.common.FailedToSendMessageException; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Date; import java.util.Properties; public class Main { public static void main(String[] args) { Properties props = new Properties(); props.put("metadata.broker.list", "192.168.9.3:9092"); props.put("serializer.class", "com.company.KafkaMessageSerializer"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); // The first is the type of the Partition key, the second the type of the message. Producer<String, String> messagesProducer = new Producer<String, String>(config); // Send String topicName = "my_messages"; String message = "hello world"; KeyedMessage<String, String> data = new KeyedMessage<String, String>(topicName, message); try { System.out.println(new Date() + ": sending..."); messagesProducer.send(data); System.out.println(new Date() + ": sent"); }catch (FailedToSendMessageException e){ System.out.println("e: " + e); e.printStackTrace(); }catch (Exception exc){ System.out.println("e: " + exc); exc.printStackTrace(); } } }
package com.company; import kafka.serializer.Encoder; import kafka.utils.VerifiableProperties; /** * Created by igorkhomenko on 2/2/15. */ public class KafkaMessageSerializer implements Encoder<String> { public KafkaMessageSerializer(VerifiableProperties verifiableProperties) { /* This constructor must be present for successful compile. */ } @Override public byte[] toBytes(String entity) { byte [] serializedMessage = doCustomSerialization(entity); return serializedMessage; } private byte[] doCustomSerialization(String entity) { return entity.getBytes(); } }
Here is also GitHub version https://github.com/soulfly/Kafka-java-producer
So it just hungs on next line:
messagesProducer.send(data)
When I replaced the brokerlist to
props.put("metadata.broker.list", "localhost:9092");
then I got an exception:
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
so it's okay
Why it hungs with wrong brokerlist? Any ideas?