Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2741

hdfs sink could not close file,should remove writer from sfWriters

Agile BoardAttach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.6.0, 1.5.2
    • None
    • Sinks+Sources
    • None

    Description

      while flume hdfs sink write data to hdfs ,the hdfs cluster come unkown error and namenode could not offer a healthy node, the BucketWriter will not close。
      timedRollerPool will Interrupt because of close(true) has error! Code is:

      if (rollInterval > 0) {
      Callable<Void> action = new Callable<Void>() {
      public Void call() throws Exception {
      LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
      bucketPath, rollInterval);
      try

      { // Roll the file and remove reference from sfWriters map. close(true); } catch(Throwable t) { LOG.error("Unexpected error", t); }
      return null;
      }
      };
      timedRollFuture = timedRollerPool.schedule(action, rollInterval,
      TimeUnit.SECONDS);
      }

      And in my hdfs sink conf ,hdfs.rollSize=0,hdfs.rollCount=0,hdfs.rollInterval=1200,and directory like YYYY/MM/DD, so only depend on rollInterval time to roll hdfs file, my channel is memory channel, if colse fail, HDFSEventSink‘s sfWriters will not remove the error file, so hdfs sink will not work ,can't consume data in channel, if channel capacity is small or data more ,
      maybe :1、channel will be full come soon ;2、 hdfs sink always process the data correspond the error file,because the data's timestamp is belong to the error file...

      so , i think while close file has exception remove the writer in sfWriters like :

      if (rollInterval > 0) {
      Callable<Void> action = new Callable<Void>() {
      public Void call() throws Exception {
      LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
      bucketPath, rollInterval);
      try { // Roll the file and remove reference from sfWriters map. close(true); }

      catch(Throwable t)

      { LOG.error("Unexpected error", t); runCloseAction(); closed = true; }

      return null;
      }
      };
      timedRollFuture = timedRollerPool.schedule(action, rollInterval,
      TimeUnit.SECONDS);
      }

      OK ?

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            liu xiaofei liu xiaofei

            Dates

              Created:
              Updated:

              Slack

                Issue deployment