Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7628

KafkaStream is not closing

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 0.11.0.1
    • None
    • streams
    • None
    • Macbook Pro

    Description

      I'm closing a KafkaStream when I need based on a certain condition:

      Closing:

       

      if(kafkaStream == null) {
                  logger.info("KafkaStream already closed?");
              } else {
                  boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
                  if(closed) {
                      kafkaStream = null;
                      logger.info("KafkaStream closed");
                  } else {
                      logger.info("KafkaStream could not closed");
                  }
              }
      

      Starting:

       

      if(kafkaStream == null) {
                  logger.info("KafkaStream is starting");
                  kafkaStream = KafkaManager.getInstance().getStream(this.getConfigFilePath(),
                          this,
                          this.getTopic()
                  );
                  kafkaStream.start();
                  logger.info("KafkaStream is started");
              }
      

       

       

      In my implementation of Processor, process(String key, byte[] value) is still called although successfully closing stream:

       

      // code placeholder
      public abstract class BaseKafkaProcessor implements Processor<String, byte[]> {
          private static Logger logger = LogManager.getLogger(BaseKafkaProcessor.class);
          private ProcessorContext context;
      
      
          private ProcessorContext getContext() {
              return context;
          }
      
          @Override
          public void init(ProcessorContext context) {
              this.context = context;
              this.context.schedule(1000);
          }
      
      
          @Override
          public void process(String key, byte[] value) {
              try {
                  String topic = key.split("-")[0];
                  byte[] uncompressed = GzipCompressionUtil.uncompress(value);
                  String json = new String(uncompressed, "UTF-8");
                  processRecord(topic, json);
                  this.getContext().commit();
              } catch (Exception e) {
                  logger.error("Error processing json", e);
              }
          }
      
          protected abstract void processRecord(String topic, String json);
      
          @Override
          public void punctuate(long timestamp) {
              this.getContext().commit();
          }
      
          @Override
          public void close() {
              this.getContext().commit();
          }
      }
      

       

      My configuration for KafkaStreams:

       

      application.id=dv_ws_in_app_activity_dev4
      bootstrap.servers=VLXH1
      auto.offset.reset=latest
      num.stream.threads=1
      key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
      value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
      poll.ms = 100
      commit.interval.ms=1000
      state.dir=../../temp/kafka-state-dir
      

      Version: 0.11.0.1 

       

      I'm witnessing that after closing() the streams, these ports are still listening:

       

      $ sudo lsof -i -n -P | grep 9092
      java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
      
      java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
      
      java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
      
      java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
      
      java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
      
      java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
      

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            lugrugzo Ozgur
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: