Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3364

TailDir BackOff Method always blocks the thread

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • 1.9.0
    • None
    • Sinks+Sources
    • None
    • in mac system.  The backoff method will block the thread with the data has been read. 

    Description

      The backoff method in taildir source is invalid because of a code bug.

      This part should not set default status as false

      //代码占位符
      public Status process() {
        Status status = Status.BACKOFF;
        try {
          existingInodes.clear();
          existingInodes.addAll(reader.updateTailFiles());
          for (long inode : existingInodes) {
            TailFile tf = reader.getTailFiles().get(inode);
            if (tf.needTail()) {
              boolean hasMoreLines = tailFileProcess(tf, true);
              if (hasMoreLines) {
                status = Status.READY;
              }
            }
          }
          closeTailFiles();
        } catch (Throwable t) {
          logger.error("Unable to tail files", t);
          sourceCounter.incrementEventReadFail();
          status = Status.BACKOFF;
        }
        return status;
      }
      

       

      //代码占位符
      private boolean tailFileProcess(TailFile tf, boolean backoffWithoutNL)
          throws IOException, InterruptedException {
        long batchCount = 0;
        while (true) {
          reader.setCurrentFile(tf);
          List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
          if (events.isEmpty()) {
            return false;
          }
          sourceCounter.addToEventReceivedCount(events.size());
          sourceCounter.incrementAppendBatchReceivedCount();
          try {
            getChannelProcessor().processEventBatch(events);
            reader.commit();
          } catch (ChannelException ex) {
            logger.warn("The channel is full or unexpected failure. " +
                "The source will try again after " + retryInterval + " ms");
            sourceCounter.incrementChannelWriteFail();
            TimeUnit.MILLISECONDS.sleep(retryInterval);
            retryInterval = retryInterval << 1;
            retryInterval = Math.min(retryInterval, maxRetryInterval);
            continue;
          }
          retryInterval = 1000;
          sourceCounter.addToEventAcceptedCount(events.size());
          sourceCounter.incrementAppendBatchAcceptedCount();
          if (events.size() < batchSize) {
            logger.debug("The events taken from " + tf.getPath() + " is less than " + batchSize);
            return false;
          }
          if (++batchCount >= maxBatchCount) {
            logger.debug("The batches read from the same file is larger than " + maxBatchCount );
            return true;
          }
        }
      }
      

      Attachments

        1. image-2020-04-26-21-44-01-785.png
          481 kB
          wang qun
        2. image-2020-04-26-21-43-52-084.png
          393 kB
          wang qun

        Activity

          People

            Unassigned Unassigned
            wangqunASF wang qun
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - 1h
                1h
                Remaining:
                Remaining Estimate - 1h
                1h
                Logged:
                Time Spent - Not Specified
                Not Specified