diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 170dcd7..181bb35 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -443,6 +443,8 @@ CANNOT_CHANGE_COLUMN_TYPE(10312, "Changing from type {0} to {1} is not supported for column {2}. SerDe may be incompatible", true), REPLACE_CANNOT_DROP_COLUMNS(10313, "Replacing columns cannot drop columns for table {0}. SerDe may be incompatible", true), REPLACE_UNSUPPORTED_TYPE_CONVERSION(10314, "Replacing columns with unsupported type conversion (from {0} to {1}) for column {2}. SerDe may be incompatible", true), + ACID_NOT_ENOUGH_HISTORY(10327, "Not enough history available for ({0},{1}). " + + "Oldest available base: {2}", true), //========================== 20000 range starts here ========================// SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 7d1517d..fc63f20 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; @@ -468,7 +469,7 @@ public static Directory getAcidState(Path directory, return getAcidState(directory, conf, txnList, false); } - /** + /** * Get the ACID state of the given directory. It finds the minimal set of * base and diff directories. Note that because major compactions don't * preserve the history, we can't use a base directory that includes a @@ -487,6 +488,8 @@ public static Directory getAcidState(Path directory, FileSystem fs = directory.getFileSystem(conf); FileStatus bestBase = null; long bestBaseTxn = 0; + long oldestBaseTxnId = Long.MAX_VALUE; + Path oldestBase = null; final List deltas = new ArrayList(); List working = new ArrayList(); List originalDirectories = new ArrayList(); @@ -499,13 +502,22 @@ public static Directory getAcidState(Path directory, String fn = p.getName(); if (fn.startsWith(BASE_PREFIX) && child.isDir()) { long txn = parseBase(p); + if (oldestBaseTxnId > txn) { + //keep track for error reporting + oldestBase = p; + oldestBaseTxnId = txn; + } if (bestBase == null) { - bestBase = child; - bestBaseTxn = txn; + if (isValidBase(txn, txnList)) { + bestBase = child; + bestBaseTxn = txn; + } } else if (bestBaseTxn < txn) { - obsolete.add(bestBase); - bestBase = child; - bestBaseTxn = txn; + if (isValidBase(txn, txnList)) { + obsolete.add(bestBase); + bestBase = child; + bestBaseTxn = txn; + } } else { obsolete.add(child); } @@ -572,6 +584,20 @@ else if(next.maxTransaction == current && lastStmtId >= 0) { } } + if(oldestBase != null && bestBase == null) { + /** + * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given + * {@link txnList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus + * cannot have any data for an open txn. We could check {@link deltas} has files to cover + * [1,n] w/o gaps but this would almost never happen...*/ + //todo: this should only care about 'open' tnxs (HIVE-14211) + long[] exceptions = txnList.getInvalidTransactions(); + String minOpenTxn = exceptions != null && exceptions.length > 0 ? + Long.toString(exceptions[0]) : "x"; + throw new IOException(ErrorMsg.ACID_NOT_ENOUGH_HISTORY.format( + Long.toString(txnList.getHighWatermark()), + minOpenTxn, oldestBase.toString())); + } final Path base = bestBase == null ? null : bestBase.getPath(); LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " + deltas.size()); @@ -599,6 +625,26 @@ public Path getBaseDirectory() { } }; } + /** + * We can only use a 'base' if it doesn't have an open txn (from specific reader's point of view) + * A 'base' with open txn in its range doesn't have 'enough history' info to produce a correct + * snapshot for this reader. + * Note that such base is NOT obsolete. Obsolete files are those that are "covered" by other + * files within the snapshot. + */ + private static boolean isValidBase(long baseTxnId, ValidTxnList txnList) { + /*This implementation is suboptimal. It considers open/aborted txns invalid while we are only + * concerned with 'open' ones. (Compaction removes any data that belongs to aborted txns and + * reads skip anything that belongs to aborted txn, thus base_7 is still OK if the only exception + * is txn 5 which is aborted). So this implementation can generate false positives. (HIVE-14211) + * */ + if(baseTxnId == Long.MIN_VALUE) { + //such base is created by 1st compaction in case of non-acid to acid table conversion + //By definition there are no open txns with id < 1. + return true; + } + return ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeValid(1, baseTxnId); + } /** * Find the original files (non-ACID layout) recursively under the partition diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java index f5eb8a1..08fcff4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java @@ -47,6 +47,7 @@ public String getServiceDescription() { return "Count number of open transactions"; } private static final class OpenTxnsCounter implements Runnable { + private static volatile long lastLogTime = 0; private final TxnStore txnHandler; private final AtomicInteger isAliveCounter; private OpenTxnsCounter(HiveConf hiveConf, AtomicInteger isAliveCounter) { @@ -59,7 +60,12 @@ public void run() { long startTime = System.currentTimeMillis(); txnHandler.countOpenTxns(); int count = isAliveCounter.incrementAndGet(); - LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count); + if(System.currentTimeMillis() - lastLogTime > 60*1000) { + //don't flood the logs with too many msgs + LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - startTime) / 1000 + + "seconds. isAliveCounter=" + count); + lastLogTime = System.currentTimeMillis(); + } } catch(Throwable t) { LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index 3172723..22126e0 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -26,8 +26,10 @@ import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockPath; +import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.List; import static org.junit.Assert.assertEquals; @@ -214,25 +216,107 @@ public void testBestBase() throws Exception { new MockFile("mock:/tbl/part1/base_5/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/base_10/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_98_100/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_120_130/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/base_200/bucket_0", 500, new byte[0])); Path part = new MockPath(fs, "/tbl/part1"); AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:")); - assertEquals("mock:/tbl/part1/base_200", dir.getBaseDirectory().toString()); + assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString()); + assertEquals(1, dir.getCurrentDirectories().size()); + assertEquals("mock:/tbl/part1/delta_120_130", + dir.getCurrentDirectories().get(0).getPath().toString()); List obsoletes = dir.getObsolete(); assertEquals(4, obsoletes.size()); assertEquals("mock:/tbl/part1/base_10", obsoletes.get(0).getPath().toString()); - assertEquals("mock:/tbl/part1/base_100", obsoletes.get(1).getPath().toString()); - assertEquals("mock:/tbl/part1/base_25", obsoletes.get(2).getPath().toString()); - assertEquals("mock:/tbl/part1/base_5", obsoletes.get(3).getPath().toString()); + assertEquals("mock:/tbl/part1/base_25", obsoletes.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/base_5", obsoletes.get(2).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_98_100", obsoletes.get(3).getPath().toString()); assertEquals(0, dir.getOriginalFiles().size()); - assertEquals(0, dir.getCurrentDirectories().size()); - // we should always get the latest base + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:")); - assertEquals("mock:/tbl/part1/base_200", dir.getBaseDirectory().toString()); - } + assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString()); + assertEquals(0, dir.getCurrentDirectories().size()); + obsoletes = dir.getObsolete(); + assertEquals(1, obsoletes.size()); + assertEquals("mock:/tbl/part1/base_5", obsoletes.get(0).getPath().toString()); + assertEquals(0, dir.getOriginalFiles().size()); + /*Single statemnt txns only: since we don't compact a txn range that includes an open txn, + the existence of delta_120_130 implies that 121 in the exception list is aborted unless + delta_120_130 is from streaming ingest in which case 121 can be open + (and thus 122-130 are open too) + For multi-statment txns, see HIVE-13369*/ + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:121")); + assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString()); + assertEquals(1, dir.getCurrentDirectories().size()); + assertEquals("mock:/tbl/part1/delta_120_130", + dir.getCurrentDirectories().get(0).getPath().toString()); + obsoletes = dir.getObsolete(); + assertEquals(4, obsoletes.size()); + assertEquals("mock:/tbl/part1/base_10", obsoletes.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/base_25", obsoletes.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/base_5", obsoletes.get(2).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_98_100", obsoletes.get(3).getPath().toString()); + + boolean gotException = false; + try { + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("125:5")); + } + catch(IOException e) { + gotException = true; + Assert.assertEquals("Not enough history available for (125,5). Oldest available base: " + + "mock:/tbl/part1/base_5", e.getMessage()); + } + Assert.assertTrue("Expected exception", gotException); + + fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_1_10/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_12_25/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0])); + part = new MockPath(fs, "/tbl/part1"); + try { + gotException = false; + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7")); + } + catch(IOException e) { + gotException = true; + Assert.assertEquals("Not enough history available for (150,7). Oldest available base: " + + "mock:/tbl/part1/base_25", e.getMessage()); + } + Assert.assertTrue("Expected exception", gotException); + + fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_2_10/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0])); + part = new MockPath(fs, "/tbl/part1"); + try { + gotException = false; + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7")); + } + catch(IOException e) { + gotException = true; + Assert.assertEquals("Not enough history available for (150,7). Oldest available base: " + + "mock:/tbl/part1/base_25", e.getMessage()); + } + Assert.assertTrue("Expected exception", gotException); + + fs = new MockFileSystem(conf, + //non-acid to acid table conversion + new MockFile("mock:/tbl/part1/base_" + Long.MIN_VALUE + "/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0])); + part = new MockPath(fs, "/tbl/part1"); + //note that we don't include current txn of the client in exception list to read-you-writes + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("1:")); + assertEquals("mock:/tbl/part1/base_" + Long.MIN_VALUE, dir.getBaseDirectory().toString()); + assertEquals(1, dir.getCurrentDirectories().size()); + assertEquals("mock:/tbl/part1/delta_1_1", dir.getCurrentDirectories().get(0).getPath().toString()); + assertEquals(0, dir.getObsolete().size()); + } @Test public void testObsoleteOriginals() throws Exception { Configuration conf = new Configuration();