Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-945

Problem with test to send a message and then consume it

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Invalid
    • 0.8.0
    • None
    • None
    • None

    Description

      A simple test, which sends on message synchronously, and then consumes it, is failing, with the latest 0.8 beta release candidate (produced from sha: 23acbd309f5e17de71db46cb6f1a60c8d38ea4e4

      Note this problem did not occur with a previous version of the 0.8 branch (e.g. it seems to work fine for sha: 988d4d8e65a14390abd748318a64e281e4a37c19).

      Essentially, it appears that the message never gets sent (after complaining about missing partition leader, etc.).

      To reproduce, run the sample zookeeper and kafka scripts, that come with the distribution (but first delete all pre-existing state by removing the data directories that zookeeper and kafka use:

      rm -rf /tmp/zookeeper
      rm -rf/tmp/kafka_logs
      ./bin/zookeeper-server-start.sh config/zookeeper.properties
      ./bin/kafka-server-start.sh config/server.properties

      Then execute the simple test code (I will attach a tarball which you should be able to unpack and run this example).

      @Test public void produceAndConsumeMessage() throws Exception {

      // assumes zookeeper and kafka server are running.
      String zkConnect = "localhost:2181";
      int port = 9092;

      Properties pProps = new Properties();
      pProps.put("metadata.broker.list", "localhost:" + port);
      pProps.put("serializer.class", "kafka.serializer.StringEncoder");
      ProducerConfig pConfig = new ProducerConfig(pProps);
      Producer<Integer, String> producer = new Producer<Integer, String>(pConfig);
      KeyedMessage<Integer, String> data =
      new KeyedMessage<Integer, String>("test-topic", "test-message");
      producer.send(data);
      producer.close();

      Properties cProps = new Properties();
      cProps.put("zookeeper.connect", zkConnect);
      cProps.put("group.id", "group1");
      ConsumerConfig consumerConfig = new ConsumerConfig(cProps);
      ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

      Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
      consumerConnector.createMessageStreams(ImmutableMap.of("test-topic", 1));
      List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test-topic");
      final KafkaStream<byte[], byte[]> stream = streams.get(0);
      final ConsumerIterator<byte[], byte[]> iter = stream.iterator();

      // run in a separate thread
      final AtomicBoolean success = new AtomicBoolean(false);
      Thread consumeThread = new Thread(new Runnable() {
      public void run() {
      while (iter.hasNext())

      { byte[] msg = iter.next().message(); String msgStr = new String(msg); success.set(msgStr.equals("test-message")); break; }

      }
      });

      consumeThread.start();
      consumeThread.join();

      consumerConnector.shutdown();
      assertTrue(success.get());
      }

      Attachments

        1. kafka-945.tar.gz
          8.72 MB
          Jason Rosenberg
        2. kafak-945.out
          9 kB
          Jason Rosenberg

        Activity

          People

            Unassigned Unassigned
            jbrosenberg Jason Rosenberg
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: