Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.6.0, 1.5.2
-
None
-
None
Description
while flume hdfs sink write data to hdfs ,the hdfs cluster come unkown error and namenode could not offer a healthy node, the BucketWriter will not close。
timedRollerPool will Interrupt because of close(true) has error! Code is:
if (rollInterval > 0) {
Callable<Void> action = new Callable<Void>() {
public Void call() throws Exception {
LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
bucketPath, rollInterval);
try
return null;
}
};
timedRollFuture = timedRollerPool.schedule(action, rollInterval,
TimeUnit.SECONDS);
}
And in my hdfs sink conf ,hdfs.rollSize=0,hdfs.rollCount=0,hdfs.rollInterval=1200,and directory like YYYY/MM/DD, so only depend on rollInterval time to roll hdfs file, my channel is memory channel, if colse fail, HDFSEventSink‘s sfWriters will not remove the error file, so hdfs sink will not work ,can't consume data in channel, if channel capacity is small or data more ,
maybe :1、channel will be full come soon ;2、 hdfs sink always process the data correspond the error file,because the data's timestamp is belong to the error file...
so , i think while close file has exception remove the writer in sfWriters like :
if (rollInterval > 0) {
Callable<Void> action = new Callable<Void>() {
public Void call() throws Exception {
LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
bucketPath, rollInterval);
try { // Roll the file and remove reference from sfWriters map. close(true); }
catch(Throwable t)
{ LOG.error("Unexpected error", t); runCloseAction(); closed = true; } return null;
}
};
timedRollFuture = timedRollerPool.schedule(action, rollInterval,
TimeUnit.SECONDS);
}
OK ?