Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2741

hdfs sink could not close file,should remove writer from sfWriters

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.6.0, 1.5.2
    • None
    • Sinks+Sources
    • None

    Description

      while flume hdfs sink write data to hdfs ,the hdfs cluster come unkown error and namenode could not offer a healthy node, the BucketWriter will not close。
      timedRollerPool will Interrupt because of close(true) has error! Code is:

      if (rollInterval > 0) {
      Callable<Void> action = new Callable<Void>() {
      public Void call() throws Exception {
      LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
      bucketPath, rollInterval);
      try

      { // Roll the file and remove reference from sfWriters map. close(true); } catch(Throwable t) { LOG.error("Unexpected error", t); }
      return null;
      }
      };
      timedRollFuture = timedRollerPool.schedule(action, rollInterval,
      TimeUnit.SECONDS);
      }

      And in my hdfs sink conf ,hdfs.rollSize=0,hdfs.rollCount=0,hdfs.rollInterval=1200,and directory like YYYY/MM/DD, so only depend on rollInterval time to roll hdfs file, my channel is memory channel, if colse fail, HDFSEventSink‘s sfWriters will not remove the error file, so hdfs sink will not work ,can't consume data in channel, if channel capacity is small or data more ,
      maybe :1、channel will be full come soon ;2、 hdfs sink always process the data correspond the error file,because the data's timestamp is belong to the error file...

      so , i think while close file has exception remove the writer in sfWriters like :

      if (rollInterval > 0) {
      Callable<Void> action = new Callable<Void>() {
      public Void call() throws Exception {
      LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
      bucketPath, rollInterval);
      try { // Roll the file and remove reference from sfWriters map. close(true); }

      catch(Throwable t)

      { LOG.error("Unexpected error", t); runCloseAction(); closed = true; }

      return null;
      }
      };
      timedRollFuture = timedRollerPool.schedule(action, rollInterval,
      TimeUnit.SECONDS);
      }

      OK ?

      Attachments

        Activity

          People

            Unassigned Unassigned
            liu xiaofei liu xiaofei
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: