Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6052

Windows: Consumers not polling when isolation.level=read_committed

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.11.0.0, 1.0.1
    • 1.1.1, 2.0.0
    • consumer, producer
    • Windows 10. All processes running in embedded mode.

    Description

      The same code is running fine in Linux. I am trying to send a transactional record with exactly once schematics. These are my producer, consumer and broker setups.
      public void sendWithTTemp(String topic, EHEvent event) {
      Properties props = new Properties();
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
      "localhost:9092,localhost:9093,localhost:9094");
      // props.put("bootstrap.servers", "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092");
      props.put(ProducerConfig.ACKS_CONFIG, "all");
      props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
      props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
      props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
      props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
      props.put("transactional.id", "TID" + transactionId.incrementAndGet());
      props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000");

      Producer<String, String> producer =
      new KafkaProducer<>(props,
      new StringSerializer(),
      new StringSerializer());

      Logger.log(this, "Initializing transaction...");

      producer.initTransactions();

      Logger.log(this, "Initializing done.");

      try

      { Logger.log(this, "Begin transaction..."); producer.beginTransaction(); Logger.log(this, "Begin transaction done."); Logger.log(this, "Sending events..."); producer.send(new ProducerRecord<>(topic, event.getKey().toString(), event.getValue().toString())); Logger.log(this, "Sending events done."); Logger.log(this, "Committing..."); producer.commitTransaction(); Logger.log(this, "Committing done."); }

      catch (ProducerFencedException | OutOfOrderSequenceException

      AuthorizationException e) { producer.close(); e.printStackTrace(); }

      catch (KafkaException e)

      { producer.abortTransaction(); e.printStackTrace(); }

      producer.close();
      }

      In Consumer
      I have set isolation.level=read_committed
      In 3 Brokers
      I'm running with the following properties
      Properties props = new Properties();
      props.setProperty("broker.id", "" + i);
      props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i));
      props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "
      B" + i);
      props.setProperty("num.partitions", "1");
      props.setProperty("zookeeper.connect", "localhost:2181");
      props.setProperty("zookeeper.connection.timeout.ms", "6000");
      props.setProperty("min.insync.replicas", "2");
      props.setProperty("offsets.topic.replication.factor", "2");
      props.setProperty("offsets.topic.num.partitions", "1");
      props.setProperty("transaction.state.log.num.partitions", "2");
      props.setProperty("transaction.state.log.replication.factor", "2");
      props.setProperty("transaction.state.log.min.isr", "2");

      I am not getting any records in the consumer. When I set isolation.level=read_uncommitted, I get the records. I assume that the records are not getting commited. What could be the problem? log attached

      Attachments

        1. Separate_Logs.zip
          598 kB
          Ansel Zandegran
        2. Prducer_Consumer.log
          606 kB
          Ansel Zandegran
        3. logFile.log
          58.50 MB
          Ansel Zandegran
        4. kafka-logs.zip
          232 kB
          Ansel Zandegran

        Issue Links

          Activity

            People

              vahid Vahid Hashemian
              zandegran Ansel Zandegran
              Votes:
              4 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: