commit edbbc9d3dc57d1e59483071c86d26d90bca630b8 Author: Alan Gates Date: Tue Dec 9 15:08:46 2014 -0800 HIVE-8966 Change AcidUtils.getAcidState to ignore deltas with flush length files when compacting. HIVE-8966 Fixed it so that once an open delta is seen all further deltas are skipped. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index bede378..1fbcc44 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -50,6 +50,13 @@ public boolean accept(Path path) { return path.getName().startsWith(DELTA_PREFIX); } }; + public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length"; + public static final PathFilter deltaSideFileFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().endsWith(DELTA_SIDE_FILE_SUFFIX); + } + }; public static final String BUCKET_PREFIX = "bucket_"; public static final PathFilter bucketFileFilter = new PathFilter() { @Override @@ -190,8 +197,7 @@ static long parseBase(Path path) { List getOriginalFiles(); /** - * Get the list of base and delta directories that are valid and not - * obsolete. + * Get the list of delta directories that are valid and not obsolete. * @return the minimal list of current directories */ List getCurrentDirectories(); @@ -209,11 +215,16 @@ static long parseBase(Path path) { final long minTransaction; final long maxTransaction; final FileStatus path; + /** + * Indicates that this delta is still open and being written to by a streaming process + */ + final boolean open; - ParsedDelta(long min, long max, FileStatus path) { + ParsedDelta(long min, long max, FileStatus path, boolean open) { this.minTransaction = min; this.maxTransaction = max; this.path = path; + this.open = open; } public long getMinTransaction() { @@ -293,14 +304,27 @@ public int compareTo(ParsedDelta parsedDelta) { return result; } - static ParsedDelta parseDelta(FileStatus path) { + static ParsedDelta parseDelta(FileSystem fs, FileStatus path, boolean compacting) throws + IOException { String filename = path.getPath().getName(); + boolean open = false; + if (compacting) { + // Make sure this isn't a delta that is currently being written to. If so, we can't + // compact it. + FileStatus[] files = fs.listStatus(path.getPath(), AcidUtils.deltaSideFileFilter); + if (files != null && files.length > 0) { + // + LOG.debug("Found a flush length file while parsing deltas for the compactor, excluding " + + "directory " + path.getPath().getName()); + open = true; + } + } if (filename.startsWith(DELTA_PREFIX)) { String rest = filename.substring(DELTA_PREFIX.length()); int split = rest.indexOf('_'); long min = Long.parseLong(rest.substring(0, split)); long max = Long.parseLong(rest.substring(split + 1)); - return new ParsedDelta(min, max, path); + return new ParsedDelta(min, max, path, open); } throw new IllegalArgumentException(path + " does not start with " + DELTA_PREFIX); @@ -342,6 +366,27 @@ public static boolean isAcid(Path directory, public static Directory getAcidState(Path directory, Configuration conf, ValidTxnList txnList + ) throws IOException { + return getAcidState(directory, conf, txnList, false); + } + + + /** + * Get the ACID state of the given directory. It finds the minimal set of + * base and diff directories. Note that because major compactions don't + * preserve the history, we can't use a base directory that includes a + * transaction id that we must exclude. + * @param directory the partition directory to analyze + * @param conf the configuration + * @param txnList the list of transactions that we are reading + * @param compacting true if this is being called from the compactor + * @return the state of the directory + * @throws IOException + */ + public static Directory getAcidState(Path directory, + Configuration conf, + ValidTxnList txnList, + boolean compacting ) throws IOException { FileSystem fs = directory.getFileSystem(conf); FileStatus bestBase = null; @@ -368,10 +413,10 @@ public static Directory getAcidState(Path directory, obsolete.add(child); } } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) { - ParsedDelta delta = parseDelta(child); - if (txnList.isTxnRangeCommitted(delta.minTransaction, - delta.maxTransaction) != - ValidTxnList.RangeResponse.NONE) { + ParsedDelta delta = parseDelta(fs, child, compacting); + if (delta.open || + txnList.isTxnRangeCommitted(delta.minTransaction, delta.maxTransaction) != + ValidTxnList.RangeResponse.NONE) { working.add(delta); } } else { @@ -400,6 +445,12 @@ public static Directory getAcidState(Path directory, Collections.sort(working); long current = bestBaseTxn; for(ParsedDelta next: working) { + if (next.open) { + // Bail out, we don't want to put anything past this in the list, as we don't want to + // compact an open delta or anything beyond it. + LOG.debug("Stopping delta selection at " + next.toString() + " as it is open"); + break; + } if (next.maxTransaction > current) { // are any of the new transactions ones that we care about? if (txnList.isTxnRangeCommitted(current+1, next.maxTransaction) != diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 44bc391..b62aa17 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -129,7 +129,7 @@ String serialize() { } static Path getSideFile(Path main) { - return new Path(main + "_flush_length"); + return new Path(main + AcidUtils.DELTA_SIDE_FILE_SUFFIX); } static int getOperation(OrcStruct struct) { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 3cdbc6c..fef83fc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -130,7 +130,7 @@ void run(HiveConf conf, String jobName, Table t, StorageDescriptor sd, // and discovering that in getSplits is too late as we then have no way to pass it to our // mapper. - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns); + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns, true); StringableList dirsToSearch = new StringableList(); Path baseDir = null; if (isMajor) { diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index c0bdb00..8104b8e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -252,4 +252,53 @@ public void testOverlapingDelta() throws Exception { assertEquals("mock:/tbl/part1/delta_000062_62", delts.get(2).getPath().toString()); assertEquals("mock:/tbl/part1/delta_0000063_63", delts.get(3).getPath().toString()); } + + @Test + public void deltasWithSideFilesIncludedInRead() throws Exception { + Configuration conf = new Configuration(); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0" + AcidUtils.DELTA_SIDE_FILE_SUFFIX, 500, + new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("100:")); + List delts = dir.getCurrentDirectories(); + assertEquals(2, delts.size()); + assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_2_5", delts.get(1).getPath().toString()); + } + + @Test + public void deltasWithSideFilesNotIncludedInCompact() throws Exception { + Configuration conf = new Configuration(); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0" + AcidUtils.DELTA_SIDE_FILE_SUFFIX, 500, + new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = + AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("100:"), true); + List delts = dir.getCurrentDirectories(); + assertEquals(1, delts.size()); + assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); + } + + @Test + public void deltasWithSideFilesNotIncludedInCompact2() throws Exception { + Configuration conf = new Configuration(); + MockFileSystem fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_2_5/bucket_0" + AcidUtils.DELTA_SIDE_FILE_SUFFIX, 500, + new byte[0]), + new MockFile("mock:/tbl/part1/delta_6_10/bucket_0", 500, new byte[0])); + Path part = new MockPath(fs, "mock:/tbl/part1"); + AcidUtils.Directory dir = + AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("100:"), true); + List delts = dir.getCurrentDirectories(); + assertEquals(1, delts.size()); + assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index d68e431..58b2751 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -153,25 +153,30 @@ protected long openTxn() throws MetaException { protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords) throws Exception { - addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true); + addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true, false); + } + + protected void addDeltaWithFlushLength(Table t, Partition p, long minTxn, long maxTxn, int + numRecords) throws Exception { + addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true, true); } protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords) throws Exception { - addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true); + addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true, false); } protected void addLegacyFile(Table t, Partition p, int numRecords) throws Exception { - addFile(t, p, 0, 0, numRecords, FileType.LEGACY, 2, true); + addFile(t, p, 0, 0, numRecords, FileType.LEGACY, 2, true, false); } protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords, int numBuckets, boolean allBucketsPresent) throws Exception { - addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, numBuckets, allBucketsPresent); + addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, numBuckets, allBucketsPresent, false); } protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords, int numBuckets, boolean allBucketsPresent) throws Exception { - addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, numBuckets, allBucketsPresent); + addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, numBuckets, allBucketsPresent, false); } protected List getDirectories(HiveConf conf, Table t, Partition p) throws Exception { @@ -253,7 +258,7 @@ private String getLocation(String tableName, String partValue) { private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords, FileType type, int numBuckets, - boolean allBucketsPresent) throws Exception { + boolean allBucketsPresent, boolean addFlushLengthFile) throws Exception { String partValue = (p == null) ? null : p.getValues().get(0); Path location = new Path(getLocation(t.getTableName(), partValue)); String filename = null; @@ -267,12 +272,18 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, for (int bucket = 0; bucket < numBuckets; bucket++) { if (bucket == 0 && !allBucketsPresent) continue; // skip one Path partFile = null; + Path flushLengthFile = null; if (type == FileType.LEGACY) { partFile = new Path(location, String.format(AcidUtils.BUCKET_DIGITS, bucket) + "_0"); } else { Path dir = new Path(location, filename); fs.mkdirs(dir); partFile = AcidUtils.createBucketFile(dir, bucket); + if (type == FileType.DELTA && addFlushLengthFile) { + flushLengthFile = new Path(dir, + AcidUtils.BUCKET_PREFIX + String.format(AcidUtils.BUCKET_DIGITS, bucket) + + AcidUtils.DELTA_SIDE_FILE_SUFFIX); + } } FSDataOutputStream out = fs.create(partFile); for (int i = 0; i < numRecords; i++) { @@ -281,6 +292,11 @@ private void addFile(Table t, Partition p, long minTxn, long maxTxn, out.writeBytes("mary had a little lamb its fleece was white as snow\n"); } out.close(); + if (flushLengthFile != null) { + out = fs.create(flushLengthFile); + out.write(1); + out.close(); + } } } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index e525ef9..0ba36f3 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -29,6 +29,7 @@ import java.io.*; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -274,7 +275,6 @@ public void minorTableWithBase() throws Exception { // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - for (int i = 0; i < stat.length; i++) System.out.println("HERE: " + stat[i].getPath().toString()); Assert.assertEquals(4, stat.length); // Find the new delta file and make sure it has the right contents @@ -296,6 +296,42 @@ public void minorTableWithBase() throws Exception { } @Test + public void minorWithFlushLength() throws Exception { + LOG.debug("Starting minorTableWithBase"); + Table t = newTable("default", "mtwb", false); + + addBaseFile(t, null, 20L, 20); + addDeltaFile(t, null, 21L, 22L, 2); + addDeltaWithFlushLength(t, null, 23L, 24L, 2); + addDeltaFile(t, null, 25L, 30L, 2); + + burnThroughTransactions(27); + + CompactionRequest rqst = new CompactionRequest("default", "mtwb", CompactionType.MINOR); + txnHandler.compact(rqst); + + startWorker(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + + // There should still now be 5 directories in the location + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); + Assert.assertEquals(5, stat.length); + + // Find the new delta file and make sure it has the right contents + Arrays.sort(stat); + Assert.assertEquals("base_20", stat[0].getPath().getName()); + Assert.assertEquals("delta_0000021_0000022", stat[1].getPath().getName()); + Assert.assertEquals("delta_21_22", stat[2].getPath().getName()); + Assert.assertEquals("delta_23_24", stat[3].getPath().getName()); + Assert.assertEquals("delta_25_30", stat[4].getPath().getName()); + } + + @Test public void minorPartitionWithBase() throws Exception { Table t = newTable("default", "mpwb", true); Partition p = newPartition(t, "today"); @@ -474,6 +510,44 @@ public void majorPartitionWithBase() throws Exception { } @Test + public void majorWithFlushLength() throws Exception { + LOG.debug("Starting majorPartitionWithBase"); + Table t = newTable("default", "mapwb", true); + Partition p = newPartition(t, "today"); + + addBaseFile(t, p, 20L, 20); + addDeltaFile(t, p, 21L, 22L, 2); + addDeltaFile(t, p, 23L, 24L, 2); + addDeltaWithFlushLength(t, p, 25L, 30L, 2); + + burnThroughTransactions(28); + + CompactionRequest rqst = new CompactionRequest("default", "mapwb", CompactionType.MAJOR); + rqst.setPartitionname("ds=today"); + txnHandler.compact(rqst); + + startWorker(); + + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List compacts = rsp.getCompacts(); + Assert.assertEquals(1, compacts.size()); + Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); + + // There should still be four directories in the location. + FileSystem fs = FileSystem.get(conf); + FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); + Assert.assertEquals(5, stat.length); + + // Find the new delta file and make sure it has the right contents + Arrays.sort(stat); + Assert.assertEquals("base_0000024", stat[0].getPath().getName()); + Assert.assertEquals("base_20", stat[1].getPath().getName()); + Assert.assertEquals("delta_21_22", stat[2].getPath().getName()); + Assert.assertEquals("delta_23_24", stat[3].getPath().getName()); + Assert.assertEquals("delta_25_30", stat[4].getPath().getName()); + } + + @Test public void majorTableNoBase() throws Exception { LOG.debug("Starting majorTableNoBase"); Table t = newTable("default", "matnb", false);