diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 1d3cf00..b325005 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -162,6 +162,12 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, return; } + removeNotCompactableDeltas(parsedDeltas); + if (parsedDeltas == null || parsedDeltas.size() == 0) { + LOG.info("no delta file ready for compaction in " + sd.getLocation()); + return; + } + StringableList deltaDirs = new StringableList(); long minTxn = Long.MAX_VALUE; long maxTxn = Long.MIN_VALUE; @@ -184,6 +190,32 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, JobClient.runJob(job).waitForCompletion(); su.gatherStats(); } + + + /** + * remove the most recently delta if any non bucket file in the directory. + * when the transaction batch is not close, the delta will has one non-bucket file. + * @param deltas list of delta directories + */ + private void removeNotCompactableDeltas(List deltas) throws IOException{ + java.util.Collections.sort(deltas); + for (int i=deltas.size()-1;i>=0;i--) { + Path dir = deltas.get(i).getPath(); + if (dir.getName().startsWith(AcidUtils.DELTA_PREFIX)) { + FileSystem fs = dir.getFileSystem(new Configuration()); + FileStatus[] files = fs.listStatus(dir, AcidUtils.bucketFileFilter); + for (FileStatus file : files) { + Matcher matcher = AcidUtils.BUCKET_DIGIT_PATTERN.matcher(file.getPath().getName()); + if (!matcher.find()) { + LOG.info("Directory: "+dir.getName()+" includes non bucket file:"+file.getPath().getName()+", ignore this directory"); + deltas.remove(i); + return; + } + } + return; + } + } + } /** * Set the column names and types into the job conf for the input format