Details
-
Bug
-
Status: Patch Available
-
Critical
-
Resolution: Unresolved
-
1.8.0, 1.9.0
-
None
-
apache-flume-bin-1.8.0
java-1.8.92
Description
如果skipToEnd为true,file A 处理正在读取处理中时生成file B,等fileA处理完下次处理时file B 从末尾开始读取,而不是从头读取导致数据丢失
public List<Long> updateTailFiles(boolean skipToEnd) throws IOException { updateTime = System.currentTimeMillis(); List<Long> updatedInodes = Lists.newArrayList(); for (TaildirMatcher taildir : taildirCache) { Map<String, String> headers = headerTable.row(taildir.getFileGroup()); for (File f : taildir.getMatchingFiles()) { long inode; try { inode = getInode(f); } catch (NoSuchFileException e) { logger.info("File has been deleted in the meantime: " + e.getMessage()); continue; } TailFile tf = tailFiles.get(inode); if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) { long startPos = skipToEnd ? f.length() : 0; tf = openFile(f, headers, inode, startPos); } else { boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length(); if (updated) { if (tf.getRaf() == null) { tf = openFile(f, headers, inode, tf.getPos()); } if (f.length() < tf.getPos()) { logger.info("Pos " + tf.getPos() + " is larger than file size! " + "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode); tf.updatePos(tf.getPath(), inode, 0); } } tf.setNeedTail(updated); } tailFiles.put(inode, tf); updatedInodes.add(inode); } } return updatedInodes; }