diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 52dadb7..6ed5b13 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -449,6 +449,8 @@ IMPORT_SEMANTIC_ERROR(10324, "Import Semantic Analyzer Error"), INVALID_FK_SYNTAX(10325, "Invalid Foreign Key syntax"), INVALID_PK_SYNTAX(10326, "Invalid Primary Key syntax"), + ACID_NOT_ENOUGH_HISTORY(10327, "Not enough history available for ({0},{1}). " + + "Oldest available base: {2}", true), //========================== 20000 range starts here ========================// SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 36f38f6..c150ec5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io; import org.apache.hadoop.hive.metastore.api.DataOperationType; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -475,7 +476,9 @@ public static Directory getAcidState(Path directory, /** State class for getChildState; cannot modify 2 things in a method. */ private static class TxnBase { private FileStatus status; - private long txn; + private long txn = 0; + private long oldestBaseTxnId = Long.MAX_VALUE; + private Path oldestBase = null; } /** @@ -571,6 +574,21 @@ else if(next.maxTransaction == current && lastStmtId >= 0) { } } + if(bestBase.oldestBase != null && bestBase.status == null) { + /** + * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given + * {@link txnList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus + * cannot have any data for an open txn. We could check {@link deltas} has files to cover + * [1,n] w/o gaps but this would almost never happen...*/ + //todo: this should only care about 'open' tnxs (HIVE-14211) + long[] exceptions = txnList.getInvalidTransactions(); + String minOpenTxn = exceptions != null && exceptions.length > 0 ? + Long.toString(exceptions[0]) : "x"; + throw new IOException(ErrorMsg.ACID_NOT_ENOUGH_HISTORY.format( + Long.toString(txnList.getHighWatermark()), + minOpenTxn, bestBase.oldestBase.toString())); + } + final Path base = bestBase.status == null ? null : bestBase.status.getPath(); LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " + deltas.size()); @@ -598,7 +616,26 @@ public Path getBaseDirectory() { } }; } - + /** + * We can only use a 'base' if it doesn't have an open txn (from specific reader's point of view) + * A 'base' with open txn in its range doesn't have 'enough history' info to produce a correct + * snapshot for this reader. + * Note that such base is NOT obsolete. Obsolete files are those that are "covered" by other + * files within the snapshot. + */ + private static boolean isValidBase(long baseTxnId, ValidTxnList txnList) { + /*This implementation is suboptimal. It considers open/aborted txns invalid while we are only + * concerned with 'open' ones. (Compaction removes any data that belongs to aborted txns and + * reads skip anything that belongs to aborted txn, thus base_7 is still OK if the only exception + * is txn 5 which is aborted). So this implementation can generate false positives. (HIVE-14211) + * */ + if(baseTxnId == Long.MIN_VALUE) { + //such base is created by 1st compaction in case of non-acid to acid table conversion + //By definition there are no open txns with id < 1. + return true; + } + return ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeValid(1, baseTxnId); + } private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, ValidTxnList txnList, List working, List originalDirectories, List original, List obsolete, TxnBase bestBase, boolean ignoreEmptyFiles) { @@ -606,13 +643,22 @@ private static void getChildState(FileStatus child, HdfsFileStatusWithId childWi String fn = p.getName(); if (fn.startsWith(BASE_PREFIX) && child.isDir()) { long txn = parseBase(p); + if(bestBase.oldestBaseTxnId > txn) { + //keep track for error reporting + bestBase.oldestBase = p; + bestBase.oldestBaseTxnId = txn; + } if (bestBase.status == null) { - bestBase.status = child; - bestBase.txn = txn; + if(isValidBase(txn, txnList)) { + bestBase.status = child; + bestBase.txn = txn; + } } else if (bestBase.txn < txn) { - obsolete.add(bestBase.status); - bestBase.status = child; - bestBase.txn = txn; + if(isValidBase(txn, txnList)) { + obsolete.add(bestBase.status); + bestBase.status = child; + bestBase.txn = txn; + } } else { obsolete.add(child); } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java index f5eb8a1..08fcff4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java @@ -47,6 +47,7 @@ public String getServiceDescription() { return "Count number of open transactions"; } private static final class OpenTxnsCounter implements Runnable { + private static volatile long lastLogTime = 0; private final TxnStore txnHandler; private final AtomicInteger isAliveCounter; private OpenTxnsCounter(HiveConf hiveConf, AtomicInteger isAliveCounter) { @@ -59,7 +60,12 @@ public void run() { long startTime = System.currentTimeMillis(); txnHandler.countOpenTxns(); int count = isAliveCounter.incrementAndGet(); - LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count); + if(System.currentTimeMillis() - lastLogTime > 60*1000) { + //don't flood the logs with too many msgs + LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - startTime) / 1000 + + "seconds. isAliveCounter=" + count); + lastLogTime = System.currentTimeMillis(); + } } catch(Throwable t) { LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index 5745dee..2ff8f0d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -27,8 +27,10 @@ import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem; import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockPath; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; +import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.List; import static org.junit.Assert.assertEquals; @@ -217,25 +219,107 @@ public void testBestBase() throws Exception { new MockFile("mock:/tbl/part1/base_5/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/base_10/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_98_100/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_120_130/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/base_200/bucket_0", 500, new byte[0])); Path part = new MockPath(fs, "/tbl/part1"); AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:")); - assertEquals("mock:/tbl/part1/base_200", dir.getBaseDirectory().toString()); + assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString()); + assertEquals(1, dir.getCurrentDirectories().size()); + assertEquals("mock:/tbl/part1/delta_120_130", + dir.getCurrentDirectories().get(0).getPath().toString()); List obsoletes = dir.getObsolete(); assertEquals(4, obsoletes.size()); assertEquals("mock:/tbl/part1/base_10", obsoletes.get(0).getPath().toString()); - assertEquals("mock:/tbl/part1/base_100", obsoletes.get(1).getPath().toString()); - assertEquals("mock:/tbl/part1/base_25", obsoletes.get(2).getPath().toString()); - assertEquals("mock:/tbl/part1/base_5", obsoletes.get(3).getPath().toString()); + assertEquals("mock:/tbl/part1/base_25", obsoletes.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/base_5", obsoletes.get(2).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_98_100", obsoletes.get(3).getPath().toString()); assertEquals(0, dir.getOriginalFiles().size()); - assertEquals(0, dir.getCurrentDirectories().size()); - // we should always get the latest base + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:")); - assertEquals("mock:/tbl/part1/base_200", dir.getBaseDirectory().toString()); - } + assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString()); + assertEquals(0, dir.getCurrentDirectories().size()); + obsoletes = dir.getObsolete(); + assertEquals(1, obsoletes.size()); + assertEquals("mock:/tbl/part1/base_5", obsoletes.get(0).getPath().toString()); + assertEquals(0, dir.getOriginalFiles().size()); + + /*Single statemnt txns only: since we don't compact a txn range that includes an open txn, + the existence of delta_120_130 implies that 121 in the exception list is aborted unless + delta_120_130 is from streaming ingest in which case 121 can be open + (and thus 122-130 are open too) + For multi-statment txns, see HIVE-13369*/ + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:121")); + assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString()); + assertEquals(1, dir.getCurrentDirectories().size()); + assertEquals("mock:/tbl/part1/delta_120_130", + dir.getCurrentDirectories().get(0).getPath().toString()); + obsoletes = dir.getObsolete(); + assertEquals(4, obsoletes.size()); + assertEquals("mock:/tbl/part1/base_10", obsoletes.get(0).getPath().toString()); + assertEquals("mock:/tbl/part1/base_25", obsoletes.get(1).getPath().toString()); + assertEquals("mock:/tbl/part1/base_5", obsoletes.get(2).getPath().toString()); + assertEquals("mock:/tbl/part1/delta_98_100", obsoletes.get(3).getPath().toString()); + + boolean gotException = false; + try { + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("125:5")); + } + catch(IOException e) { + gotException = true; + Assert.assertEquals("Not enough history available for (125,5). Oldest available base: " + + "mock:/tbl/part1/base_5", e.getMessage()); + } + Assert.assertTrue("Expected exception", gotException); + + fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_1_10/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_12_25/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0])); + part = new MockPath(fs, "/tbl/part1"); + try { + gotException = false; + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7")); + } + catch(IOException e) { + gotException = true; + Assert.assertEquals("Not enough history available for (150,7). Oldest available base: " + + "mock:/tbl/part1/base_25", e.getMessage()); + } + Assert.assertTrue("Expected exception", gotException); + + fs = new MockFileSystem(conf, + new MockFile("mock:/tbl/part1/delta_2_10/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0])); + part = new MockPath(fs, "/tbl/part1"); + try { + gotException = false; + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7")); + } + catch(IOException e) { + gotException = true; + Assert.assertEquals("Not enough history available for (150,7). Oldest available base: " + + "mock:/tbl/part1/base_25", e.getMessage()); + } + Assert.assertTrue("Expected exception", gotException); + fs = new MockFileSystem(conf, + //non-acid to acid table conversion + new MockFile("mock:/tbl/part1/base_" + Long.MIN_VALUE + "/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), + new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0])); + part = new MockPath(fs, "/tbl/part1"); + //note that we don't include current txn of the client in exception list to read-you-writes + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("1:")); + assertEquals("mock:/tbl/part1/base_" + Long.MIN_VALUE, dir.getBaseDirectory().toString()); + assertEquals(1, dir.getCurrentDirectories().size()); + assertEquals("mock:/tbl/part1/delta_1_1", dir.getCurrentDirectories().get(0).getPath().toString()); + assertEquals(0, dir.getObsolete().size()); + } @Test public void testObsoleteOriginals() throws Exception { Configuration conf = new Configuration();