diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c16880ed2b..41e639b338 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1838,7 +1838,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " of the lock manager is dumped to log file. This is for debugging. See also " + "hive.lock.numretries and hive.lock.sleep.between.retries."), - HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 0, + HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 1, "Sets the operational properties that control the appropriate behavior for various\n" + "versions of the Hive ACID subsystem. Setting it to zero will turn on the legacy mode\n" + "for ACID, while setting it to one will enable a split-update feature found in the newer\n" diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 5e8fe6263b..73c43bdf8f 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -27,6 +27,7 @@ import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -44,7 +45,7 @@ import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.conf.Validator; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -59,6 +60,7 @@ import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.TxnState; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; @@ -97,6 +99,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.BUCKET_COUNT; + public class TestStreaming { private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class); @@ -448,7 +452,9 @@ public void testTableValidation() throws Exception { } } - + /** + * @deprecated use {@link #checkDataWritten2(Path, long, long, int, String, String...)} + */ private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles, String... records) throws Exception { ValidTxnList txns = msClient.getValidTxns(); @@ -473,13 +479,14 @@ private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int InputFormat inf = new OrcInputFormat(); JobConf job = new JobConf(); job.set("mapred.input.dir", partitionPath.toString()); - job.set("bucket_count", Integer.toString(buckets)); + job.set(BUCKET_COUNT, Integer.toString(buckets)); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg"); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string"); - job.set(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, "true"); + AcidUtils.setTransactionalTableScan(job,true); + job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); InputSplit[] splits = inf.getSplits(job, buckets); - Assert.assertEquals(buckets, splits.length); + Assert.assertEquals(numExpectedFiles, splits.length); org.apache.hadoop.mapred.RecordReader rr = inf.getRecordReader(splits[0], job, Reporter.NULL); @@ -491,6 +498,43 @@ private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int } Assert.assertEquals(false, rr.next(key, value)); } + /** + * @param validationQuery query to read from table to compare data against {@code records} + * @param records expected data. each row is CVS list of values + */ + private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles, + String validationQuery, String... records) throws Exception { + ValidTxnList txns = msClient.getValidTxns(); + AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns); + Assert.assertEquals(0, dir.getObsolete().size()); + Assert.assertEquals(0, dir.getOriginalFiles().size()); + List current = dir.getCurrentDirectories(); + System.out.println("Files found: "); + for (AcidUtils.ParsedDelta pd : current) System.out.println(pd.getPath().toString()); + Assert.assertEquals(numExpectedFiles, current.size()); + + // find the absolute minimum transaction + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + for (AcidUtils.ParsedDelta pd : current) { + if (pd.getMaxTransaction() > max) max = pd.getMaxTransaction(); + if (pd.getMinTransaction() < min) min = pd.getMinTransaction(); + } + Assert.assertEquals(minTxn, min); + Assert.assertEquals(maxTxn, max); +//todo: should this also try w/Vectorization? + String currStrategy = conf.getVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY); + for(String strategy : ((Validator.StringSet)HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.getValidator()).getExpected()) { + //run it with each split strategy - make sure there are differences + conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, strategy.toUpperCase()); + List actualResult = queryTable(driver, validationQuery); + for (int i = 0; i < actualResult.size(); i++) { + Assert.assertEquals("diff at [" + i + "]. actual=" + actualResult + " expected=" + + Arrays.toString(records), records[i], actualResult.get(i)); + } + } + conf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, currStrategy); + } private void checkNothingWritten(Path partitionPath) throws Exception { ValidTxnList txns = msClient.getValidTxns(); @@ -1016,15 +1060,15 @@ public void testMultipleTransactionBatchCommits() throws Exception { txnBatch.beginNextTransaction(); txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); - - checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}"); + String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg"; + checkDataWritten2(partLoc, 15, 24, 1, validationQuery, "1\tHello streaming"); txnBatch.beginNextTransaction(); txnBatch.write("2,Welcome to streaming".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}", - "{2, Welcome to streaming}"); + checkDataWritten2(partLoc, 15, 24, 1, validationQuery, "1\tHello streaming", + "2\tWelcome to streaming"); txnBatch.close(); @@ -1034,16 +1078,16 @@ public void testMultipleTransactionBatchCommits() throws Exception { txnBatch.write("3,Hello streaming - once again".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}", - "{2, Welcome to streaming}", "{3, Hello streaming - once again}"); + checkDataWritten2(partLoc, 15, 40, 2, validationQuery, "1\tHello streaming", + "2\tWelcome to streaming", "3\tHello streaming - once again"); txnBatch.beginNextTransaction(); txnBatch.write("4,Welcome to streaming - once again".getBytes()); txnBatch.commit(); - checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}", - "{2, Welcome to streaming}", "{3, Hello streaming - once again}", - "{4, Welcome to streaming - once again}"); + checkDataWritten2(partLoc, 15, 40, 2, validationQuery, "1\tHello streaming", + "2\tWelcome to streaming", "3\tHello streaming - once again", + "4\tWelcome to streaming - once again"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); @@ -1053,7 +1097,6 @@ public void testMultipleTransactionBatchCommits() throws Exception { connection.close(); } - @Test public void testInterleavedTransactionBatchCommits() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName, tblName, @@ -1078,32 +1121,69 @@ public void testInterleavedTransactionBatchCommits() throws Exception { txnBatch2.commit(); - checkDataWritten(partLoc, 24, 33, 1, 1, "{3, Hello streaming - once again}"); + String validationQuery = "select id, msg from " + dbName + "." + tblName + " order by id, msg"; + checkDataWritten2(partLoc, 24, 33, 1, + validationQuery, "3\tHello streaming - once again"); txnBatch1.commit(); - - checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); + /*now both batches have committed (but not closed) so we for each primary file we expect a side + file to exist and indicate the true length of primary file*/ + FileSystem fs = partLoc.getFileSystem(conf); + AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns()); + for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { + for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { + Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath()); + Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile)); + long lengthFileSize = fs.getFileStatus(lengthFile).getLen(); + Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" + + lengthFileSize, lengthFileSize > 0); + long logicalLength = AcidUtils.getLogicalLength(fs, stat); + long actualLength = stat.getLen(); + Assert.assertTrue("", logicalLength == actualLength); + } + } + checkDataWritten2(partLoc, 14, 33, 2, + validationQuery,"1\tHello streaming", "3\tHello streaming - once again"); txnBatch1.beginNextTransaction(); txnBatch1.write("2,Welcome to streaming".getBytes()); txnBatch2.beginNextTransaction(); txnBatch2.write("4,Welcome to streaming - once again".getBytes()); - - checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); + //here each batch has written data and committed (to bucket0 since table only has 1 bucket) + //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length. Furthermore, each bucket0 + //has now received more data(logically - it's buffered) but it is not yet committed. + //lets check that side files exist, etc + dir = AcidUtils.getAcidState(partLoc, conf, msClient.getValidTxns()); + for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) { + for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) { + Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath()); + Assert.assertTrue(lengthFile + " missing", fs.exists(lengthFile)); + long lengthFileSize = fs.getFileStatus(lengthFile).getLen(); + Assert.assertTrue("Expected " + lengthFile + " to be non empty. lengh=" + + lengthFileSize, lengthFileSize > 0); + long logicalLength = AcidUtils.getLogicalLength(fs, stat); + long actualLength = stat.getLen(); + Assert.assertTrue("", logicalLength <= actualLength); + } + } + checkDataWritten2(partLoc, 14, 33, 2, + validationQuery, "1\tHello streaming", "3\tHello streaming - once again"); txnBatch1.commit(); - checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", - "{2, Welcome to streaming}", - "{3, Hello streaming - once again}"); + checkDataWritten2(partLoc, 14, 33, 2, + validationQuery, "1\tHello streaming", + "2\tWelcome to streaming", + "3\tHello streaming - once again"); txnBatch2.commit(); - checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", - "{2, Welcome to streaming}", - "{3, Hello streaming - once again}", - "{4, Welcome to streaming - once again}"); + checkDataWritten2(partLoc, 14, 33, 2, + validationQuery, "1\tHello streaming", + "2\tWelcome to streaming", + "3\tHello streaming - once again", + "4\tWelcome to streaming - once again"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch1.getCurrentTransactionState()); @@ -2035,11 +2115,12 @@ private static boolean runDDL(Driver driver, String sql) throws QueryFailedExcep public static ArrayList queryTable(Driver driver, String query) throws CommandNeedRetryException, IOException { - driver.run(query); + CommandProcessorResponse cpr = driver.run(query); + if(cpr.getResponseCode() != 0) { + throw new RuntimeException(query + " failed: " + cpr); + } ArrayList res = new ArrayList(); driver.getResults(res); - if(res.isEmpty()) - System.err.println(driver.getErrorMsg()); return res; } diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java index de41d344c1..d5429fbbd6 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java @@ -27,12 +27,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.AcidInputFormat.AcidRecordReader; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.IOConstants; @@ -123,35 +123,50 @@ public void assertMaxTransactionId(long expectedMaxTransactionId) { } List readRecords() throws Exception { + return readRecords(1); + } + + /** + * TODO: this would be more flexible doing a SQL select statement rather than using InputFormat directly + * see {@link org.apache.hive.hcatalog.streaming.TestStreaming#checkDataWritten2(Path, long, long, int, String, String...)} + * @param numSplitsExpected + * @return + * @throws Exception + */ + List readRecords(int numSplitsExpected) throws Exception { if (currentDeltas.isEmpty()) { throw new AssertionError("No data"); } InputFormat inputFormat = new OrcInputFormat(); JobConf job = new JobConf(); job.set("mapred.input.dir", partitionLocation.toString()); - job.set("bucket_count", Integer.toString(table.getSd().getNumBuckets())); + job.set(hive_metastoreConstants.BUCKET_COUNT, Integer.toString(table.getSd().getNumBuckets())); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "id,msg"); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "bigint:string"); - job.set(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, "true"); + AcidUtils.setTransactionalTableScan(job,true); + job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); InputSplit[] splits = inputFormat.getSplits(job, 1); - assertEquals(1, splits.length); - - final AcidRecordReader recordReader = (AcidRecordReader) inputFormat - .getRecordReader(splits[0], job, Reporter.NULL); + assertEquals(numSplitsExpected, splits.length); - NullWritable key = recordReader.createKey(); - OrcStruct value = recordReader.createValue(); List records = new ArrayList<>(); - while (recordReader.next(key, value)) { - RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier(); - Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(), + for(InputSplit is : splits) { + final AcidRecordReader recordReader = (AcidRecordReader) inputFormat + .getRecordReader(is, job, Reporter.NULL); + + NullWritable key = recordReader.createKey(); + OrcStruct value = recordReader.createValue(); + + while (recordReader.next(key, value)) { + RecordIdentifier recordIdentifier = recordReader.getRecordIdentifier(); + Record record = new Record(new RecordIdentifier(recordIdentifier.getTransactionId(), recordIdentifier.getBucketProperty(), recordIdentifier.getRowId()), value.toString()); - System.out.println(record); - records.add(record); + System.out.println(record); + records.add(record); + } + recordReader.close(); } - recordReader.close(); return records; } diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java index ab9f313686..5bfa04dbe8 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/TestMutations.java @@ -529,22 +529,23 @@ public void testUpdatesAndDeletes() throws Exception { StreamingAssert indiaAssertions = assertionFactory.newStreamingAssert(table, ASIA_INDIA); indiaAssertions.assertMinTransactionId(1L); indiaAssertions.assertMaxTransactionId(2L); - List indiaRecords = indiaAssertions.readRecords(); + List indiaRecords = indiaAssertions.readRecords(2); assertThat(indiaRecords.size(), is(3)); assertThat(indiaRecords.get(0).getRow(), is("{1, Namaste streaming 1}")); assertThat(indiaRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, encodeBucket(0), 0L))); assertThat(indiaRecords.get(1).getRow(), is("{2, UPDATED: Namaste streaming 2}")); - assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(1L, - encodeBucket(0), 1L))); + assertThat(indiaRecords.get(1).getRecordIdentifier(), is(new RecordIdentifier(2L, + encodeBucket(0), 0L)));//with split update, new version of the row is a new insert assertThat(indiaRecords.get(2).getRow(), is("{20, Namaste streaming 3}")); assertThat(indiaRecords.get(2).getRecordIdentifier(), is(new RecordIdentifier(2L, - encodeBucket(0), 0L))); + encodeBucket(0), 1L))); StreamingAssert ukAssertions = assertionFactory.newStreamingAssert(table, EUROPE_UK); ukAssertions.assertMinTransactionId(1L); ukAssertions.assertMaxTransactionId(2L); - List ukRecords = ukAssertions.readRecords(); + //1 split since mutateTransaction txn just does deletes + List ukRecords = ukAssertions.readRecords(1); assertThat(ukRecords.size(), is(1)); assertThat(ukRecords.get(0).getRow(), is("{4, Hello streaming 2}")); assertThat(ukRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, @@ -553,11 +554,11 @@ public void testUpdatesAndDeletes() throws Exception { StreamingAssert franceAssertions = assertionFactory.newStreamingAssert(table, EUROPE_FRANCE); franceAssertions.assertMinTransactionId(1L); franceAssertions.assertMaxTransactionId(2L); - List franceRecords = franceAssertions.readRecords(); + List franceRecords = franceAssertions.readRecords(2); assertThat(franceRecords.size(), is(1)); assertThat(franceRecords.get(0).getRow(), is("{6, UPDATED: Bonjour streaming 2}")); - assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(1L, - encodeBucket(0), 1L))); + assertThat(franceRecords.get(0).getRecordIdentifier(), is(new RecordIdentifier(2L, + encodeBucket(0), 0L)));//with split update, new version of the row is a new insert client.close(); } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index 0f08f434e7..023d703543 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -39,7 +39,6 @@ // These constants are also imported by org.apache.hadoop.hive.ql.io.AcidUtils. public static final String DEFAULT_TRANSACTIONAL_PROPERTY = "default"; - public static final String LEGACY_TRANSACTIONAL_PROPERTY = "legacy"; TransactionalValidationListener(Configuration conf) { super(conf); @@ -276,7 +275,6 @@ private String validateTransactionalProperties(String transactionalProperties) { boolean isValid = false; switch (transactionalProperties) { case DEFAULT_TRANSACTIONAL_PROPERTY: - case LEGACY_TRANSACTIONAL_PROPERTY: isValid = true; break; default: diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 1e33424e24..6245ec550c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.Ref; +import org.apache.orc.impl.OrcAcidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +68,18 @@ public boolean accept(Path path) { }; public static final String DELTA_PREFIX = "delta_"; public static final String DELETE_DELTA_PREFIX = "delete_delta_"; + /** + * Acid Streaming Ingest writes multiple transactions to the same file. It also maintains a + * {@link org.apache.orc.impl.OrcAcidUtils#getSideFile(Path)} side file which stores the length of + * the primary file as of the last commit (flush). That is the 'logical length'. + * Once the primary is closed, the side file is deleted (logical length = actual length) but if + * the writer dies or the primary file is being read while its still being written to, anything + * past the logical length should be ignored. + * + * @see org.apache.orc.impl.OrcAcidUtils#DELTA_SIDE_FILE_SUFFIX + * @see org.apache.orc.impl.OrcAcidUtils#getLastFlushLength(FileSystem, Path) + * @see #getLogicalLength(FileSystem, FileStatus) + */ public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length"; public static final PathFilter deltaFileFilter = new PathFilter() { @Override @@ -167,7 +180,7 @@ public static String deltaSubdir(long min, long max, int statementId) { * This is format of delete delta dir name prior to Hive 2.2.x */ @VisibleForTesting - static String deleteDeltaSubdir(long min, long max) { + public static String deleteDeltaSubdir(long min, long max) { return DELETE_DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" + String.format(DELTA_DIGITS, max); } @@ -371,21 +384,10 @@ public HdfsFileStatusWithId getHdfsFileStatusWithId() { public static final int HASH_BASED_MERGE_BIT = 0x02; public static final String HASH_BASED_MERGE_STRING = "hash_merge"; public static final String DEFAULT_VALUE_STRING = TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY; - public static final String LEGACY_VALUE_STRING = TransactionalValidationListener.LEGACY_TRANSACTIONAL_PROPERTY; private AcidOperationalProperties() { } - /** - * Returns an acidOperationalProperties object that represents ACID behavior for legacy tables - * that were created before ACID type system using operational properties was put in place. - * @return the acidOperationalProperties object - */ - public static AcidOperationalProperties getLegacy() { - AcidOperationalProperties obj = new AcidOperationalProperties(); - // In legacy mode, none of these properties are turned on. - return obj; - } /** * Returns an acidOperationalProperties object that represents default ACID behavior for tables @@ -406,14 +408,11 @@ public static AcidOperationalProperties getDefault() { */ public static AcidOperationalProperties parseString(String propertiesStr) { if (propertiesStr == null) { - return AcidOperationalProperties.getLegacy(); + return AcidOperationalProperties.getDefault(); } if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) { return AcidOperationalProperties.getDefault(); } - if (propertiesStr.equalsIgnoreCase(LEGACY_VALUE_STRING)) { - return AcidOperationalProperties.getLegacy(); - } AcidOperationalProperties obj = new AcidOperationalProperties(); String[] options = propertiesStr.split("\\|"); for (String option : options) { @@ -1119,7 +1118,12 @@ public static void setTransactionalTableScan(Map parameters, boo public static void setTransactionalTableScan(Configuration conf, boolean isAcidTable) { HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isAcidTable); } - + /** + * @param p - not null + */ + public static boolean isDeleteDelta(Path p) { + return p.getName().startsWith(DELETE_DELTA_PREFIX); + } /** Checks if a table is a valid ACID table. * Note, users are responsible for using the correct TxnManager. We do not look at * SessionState.get().getTxnMgr().supportsAcid() here @@ -1172,7 +1176,7 @@ public static AcidOperationalProperties getAcidOperationalProperties(Table table hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); if (transactionalProperties == null) { // If the table does not define any transactional properties, we return a legacy type. - return AcidOperationalProperties.getLegacy(); + return AcidOperationalProperties.getDefault(); } return AcidOperationalProperties.parseString(transactionalProperties); } @@ -1184,7 +1188,7 @@ public static AcidOperationalProperties getAcidOperationalProperties(Table table */ public static AcidOperationalProperties getAcidOperationalProperties(Configuration conf) { // If the conf does not define any transactional properties, the parseInt() should receive - // a value of zero, which will set AcidOperationalProperties to a legacy type and return that. + // a value of 1, which will set AcidOperationalProperties to a default type and return that. return AcidOperationalProperties.parseInt( HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES)); } @@ -1197,8 +1201,8 @@ public static AcidOperationalProperties getAcidOperationalProperties(Configurati public static AcidOperationalProperties getAcidOperationalProperties(Properties props) { String resultStr = props.getProperty(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); if (resultStr == null) { - // If the properties does not define any transactional properties, we return a legacy type. - return AcidOperationalProperties.getLegacy(); + // If the properties does not define any transactional properties, we return a default type. + return AcidOperationalProperties.getDefault(); } return AcidOperationalProperties.parseString(resultStr); } @@ -1212,9 +1216,42 @@ public static AcidOperationalProperties getAcidOperationalProperties( Map parameters) { String resultStr = parameters.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); if (resultStr == null) { - // If the parameters does not define any transactional properties, we return a legacy type. - return AcidOperationalProperties.getLegacy(); + // If the parameters does not define any transactional properties, we return a default type. + return AcidOperationalProperties.getDefault(); } return AcidOperationalProperties.parseString(resultStr); } + /** + * See comments at {@link AcidUtils#DELTA_SIDE_FILE_SUFFIX}. + + * This relies on the fact that if delta_x_y has no committed transactions it wil be filtered out + * by {@link #getAcidState(Path, Configuration, ValidTxnList)} and so won't be read at all. + * @param file - data file to split + * @return logical length of file + */ + public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOException { + Path lengths = OrcAcidUtils.getSideFile(file.getPath()); + if(!fs.exists(lengths)) { + /** + * if here for delta_x_y that means y is resolved and all files in this delta are closed so + * they should all have a valid ORC footer and info from NameNode about length is good + */ + return file.getLen(); + } + long len = OrcAcidUtils.getLastFlushLength(fs, file.getPath()); + if(len >= 0) { + /** + * if here something is still writing to delta_x_y so read only as far as the last commit, + * i.e. where last footer was written + */ + return len; + } + /** + * if here, side file is there but we couldn't read it. We want to avoid a read where + * (file.getLen() < 'value from side file' which may happen if file is not closed) because this + * means some committed data from 'file' would be skipped. + * This should be very unusual. + */ + throw new IOException(lengths + " found but is not readable. Consider waiting or orcfiledump --recover"); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index de49fc84bb..6462e94ee3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -641,6 +641,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, // The table properties are copied to job conf at HiveInputFormat::addSplitsForGroup(), // & therefore we should be able to retrieve them here and determine appropriate behavior. // Note that this will be meaningless for non-acid tables & will be set to null. + //this is set by Utilities.copyTablePropertiesToConf() boolean isTableTransactional = conf.getBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false); String transactionalProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); this.acidOperationalProperties = isTableTransactional ? @@ -977,11 +978,15 @@ public BISplitStrategy(Context context, FileSystem fs, if (fileKey == null && allowSyntheticFileIds) { fileKey = new SyntheticFileId(fileStatus); } + long logicalLen = AcidUtils.getLogicalLength(fs, fileStatus); TreeMap blockOffsets = SHIMS.getLocationsWithOffset(fs, fileStatus); for (Map.Entry entry : blockOffsets.entrySet()) { + if(entry.getKey() + entry.getValue().getLength() > logicalLen) { + continue; + } OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(), entry.getValue().getLength(), entry.getValue().getHosts(), null, isOriginal, true, - deltas, -1, fileStatus.getLen()); + deltas, -1, logicalLen); splits.add(orcSplit); } } @@ -1000,6 +1005,11 @@ public String toString() { /** * ACID split strategy is used when there is no base directory (when transactions are enabled). + * todo: after HIVE-17089 should this be removed? Compaction doesn't use OrcInputFormat to generate splits + * and for regular read we should never have a situation where where we have delete_delta w/o a + * split of some base. In other words there can't be any uncovered splits with Acid 2.0 + * May be not. For unbucketed table we may have delete events in delta_20_20/bucket0 but perhaps + * some base_18/ doesn't have bucket0... */ static class ACIDSplitStrategy implements SplitStrategy { Path dir; @@ -1034,6 +1044,9 @@ public ACIDSplitStrategy(Path dir, int numBuckets, List deltas, b // This happens in the case where a bucket just has deltas and no // base. if (!deltas.isEmpty()) { + //todo: assert false? + // if there is no base why would we have a split with only delete events? + //it's completely pointless and should make sure not to generate it for (int b = 0; b < numBuckets; ++b) { if (!covered[b]) { splits.add(new OrcSplit(dir, null, b, 0, new String[0], null, false, false, deltas, -1, -1)); @@ -1133,7 +1146,7 @@ private AcidDirInfo callInternal() throws IOException { if (val == null || val) { try { List insertDeltaFiles = - SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter); + SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter); for (HdfsFileStatusWithId fileId : insertDeltaFiles) { baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA)); } @@ -1149,7 +1162,7 @@ private AcidDirInfo callInternal() throws IOException { } } // Fall back to regular API and create statuses without ID. - List children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.hiddenFileFilter); + List children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), AcidUtils.bucketFileFilter); for (FileStatus child : children) { HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child); baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.INSERT_DELTA)); @@ -1402,7 +1415,7 @@ public String toString() { populateAndCacheStripeDetails(); boolean[] includeStripe = null; // We can't eliminate stripes if there are deltas because the - // deltas may change the rows making them match the predicate. + // deltas may change the rows making them match the predicate. todo: See HIVE-14516. if ((deltas == null || deltas.isEmpty()) && context.sarg != null) { String[] colNames = extractNeededColNames((readerTypes == null ? fileTypes : readerTypes), @@ -1516,7 +1529,7 @@ private void populateAndCacheStripeDetails() throws IOException { Reader orcReader = OrcFile.createReader(file.getPath(), OrcFile.readerOptions(context.conf) .filesystem(fs) - .maxLength(file.getLen())); + .maxLength(AcidUtils.getLogicalLength(fs, file))); orcTail = new OrcTail(orcReader.getFileTail(), orcReader.getSerializedFileFooter(), file.getModificationTime()); if (context.cacheStripeDetails) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 814782a503..10818c759e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -865,6 +865,10 @@ Path getBucketPath() { eventOptions.range(0, Long.MAX_VALUE); if (deltaDirectory != null) { for(Path delta: deltaDirectory) { + if(!mergerOptions.isCompacting() && !AcidUtils.isDeleteDelta(delta)) { + //all inserts should be in baseReader for normal read so this should always be delete delta if not compacting + throw new IllegalStateException(delta + " is not delete delta and is not compacting."); + } ReaderKey key = new ReaderKey(); Path deltaFile = AcidUtils.createBucketFile(delta, bucket); AcidUtils.ParsedDelta deltaDir = AcidUtils.parsedDelta(delta); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index c30e8fe75a..135042a299 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -348,7 +348,7 @@ private void addSimpleEvent(int operation, long currentTransaction, long rowId, this.originalTransaction.set(originalTransaction); item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation)); item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row)); - indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); + indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId);//todo: so this rowId is not what we want for if (writer == null) { writer = OrcFile.createWriter(path, writerOptions); } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index c50c1a8726..dc6b70a110 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -867,7 +867,7 @@ public void testNonAcidToAcidConversion01() throws Exception { Assert.assertTrue(rs.get(2), rs.get(2).startsWith("{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t5")); Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nonacidorctbl/000001_0_copy_1")); Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":14,\"bucketid\":536936448,\"rowid\":0}\t1\t17")); - Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/000001_0_copy_1")); + Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nonacidorctbl/delta_0000014_0000014_0000/bucket_00001")); //run Compaction runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'"); TestTxnCommands2.runWorker(hiveConf); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 408c08921b..e80a4f9d94 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -64,6 +64,7 @@ import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,6 +114,8 @@ String getPartitionColumns() { this.partitionColumns = partitionColumns; } } + @Rule + public ExpectedException expectedException = ExpectedException.none(); @Before public void setUp() throws Exception { @@ -366,7 +369,7 @@ public void testNonAcidToAcidConversion02() throws Exception { {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":0}\t0\t13", "bucket_00000"}, {"{\"transactionid\":18,\"bucketid\":536870912,\"rowid\":0}\t0\t15", "bucket_00000"}, {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t0\t17", "bucket_00000"}, - {"{\"transactionid\":0,\"bucketid\":0,\"rowid\":1}\t0\t120", "bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t0\t120", "bucket_00000"}, {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":1}\t1\t2", "bucket_00001"}, {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":3}\t1\t4", "bucket_00001"}, {"{\"transactionid\":0,\"bucketid\":1,\"rowid\":2}\t1\t5", "bucket_00001"}, @@ -392,11 +395,23 @@ public void testNonAcidToAcidConversion02() throws Exception { } //make sure they are the same before and after compaction } - /** - * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction + * In current implementation of ACID, altering the value of transactional_properties or trying to + * set a value for previously unset value for an acid table will throw an exception. + * @throws Exception + */ + @Test + public void testFailureOnAlteringTransactionalProperties() throws Exception { + expectedException.expect(RuntimeException.class); + expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created"); + runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')"); + } + /** + * Test the query correctness and directory layout for ACID table conversion with split-update + * enabled. * 1. Insert a row to Non-ACID table - * 2. Convert Non-ACID to ACID table + * 2. Convert Non-ACID to ACID table with split-update enabled * 3. Insert a row to ACID table * 4. Perform Major compaction * 5. Clean @@ -410,7 +425,7 @@ public void testNonAcidToAcidConversion1() throws Exception { // 1. Insert a row to Non-ACID table runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); // There should be 2 original bucket files in the location (000000_0 and 000001_0) Assert.assertEquals(BUCKET_COUNT, status.length); for (int i = 0; i < status.length; i++) { @@ -424,9 +439,10 @@ public void testNonAcidToAcidConversion1() throws Exception { Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); // 2. Convert NONACIDORCTBL to ACID table - runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); // Everything should be same as before Assert.assertEquals(BUCKET_COUNT, status.length); for (int i = 0; i < status.length; i++) { @@ -442,24 +458,23 @@ public void testNonAcidToAcidConversion1() throws Exception { // 3. Insert another row to newly-converted ACID table runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory. - // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001) + // The delta directory should also have only 1 bucket file (bucket_00001) Assert.assertEquals(3, status.length); boolean sawNewDelta = false; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("delta_.*")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(BUCKET_COUNT, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + Assert.assertEquals(1, buckets.length); // only one bucket file + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); } else { Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); } } Assert.assertTrue(sawNewDelta); - rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b"); resultData = new int[][] {{1, 2}, {3, 4}}; Assert.assertEquals(stringifyValues(resultData), rs); rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); @@ -472,16 +487,15 @@ public void testNonAcidToAcidConversion1() throws Exception { // There should be 1 new directory: base_xxxxxxx. // Original bucket files and delta directory should stay until Cleaner kicks in. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); Assert.assertEquals(4, status.length); boolean sawNewBase = false; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("base_.*")) { sawNewBase = true; FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(BUCKET_COUNT, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + Assert.assertEquals(1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); } } Assert.assertTrue(sawNewBase); @@ -495,13 +509,13 @@ public void testNonAcidToAcidConversion1() throws Exception { // 5. Let Cleaner delete obsolete files/dirs // Note, here we create a fake directory along with fake files as original directories/files String fakeFile0 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() + - "/subdir/000000_0"; + "/subdir/000000_0"; String fakeFile1 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() + - "/subdir/000000_1"; + "/subdir/000000_1"; fs.create(new Path(fakeFile0)); fs.create(new Path(fakeFile1)); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); // Before Cleaner, there should be 5 items: // 2 original files, 1 original directory, 1 base directory and 1 delta directory Assert.assertEquals(5, status.length); @@ -509,13 +523,12 @@ public void testNonAcidToAcidConversion1() throws Exception { // There should be only 1 directory left: base_xxxxxxx. // Original bucket files and delta directory should have been cleaned up. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); Assert.assertEquals(1, status.length); Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(BUCKET_COUNT, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + Assert.assertEquals(1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 2}, {3, 4}}; Assert.assertEquals(stringifyValues(resultData), rs); @@ -525,9 +538,10 @@ public void testNonAcidToAcidConversion1() throws Exception { } /** - * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction + * Test the query correctness and directory layout for ACID table conversion with split-update + * enabled. * 1. Insert a row to Non-ACID table - * 2. Convert Non-ACID to ACID table + * 2. Convert Non-ACID to ACID table with split update enabled. * 3. Update the existing row in ACID table * 4. Perform Major compaction * 5. Clean @@ -541,7 +555,7 @@ public void testNonAcidToAcidConversion2() throws Exception { // 1. Insert a row to Non-ACID table runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); // There should be 2 original bucket files in the location (000000_0 and 000001_0) Assert.assertEquals(BUCKET_COUNT, status.length); for (int i = 0; i < status.length; i++) { @@ -550,14 +564,15 @@ public void testNonAcidToAcidConversion2() throws Exception { List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); int [][] resultData = new int[][] {{1, 2}}; Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *? + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); int resultCount = 1; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); // 2. Convert NONACIDORCTBL to ACID table - runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); // Everything should be same as before Assert.assertEquals(BUCKET_COUNT, status.length); for (int i = 0; i < status.length; i++) { @@ -566,29 +581,39 @@ public void testNonAcidToAcidConversion2() throws Exception { rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 2}}; Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *? + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); resultCount = 1; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); // 3. Update the existing row in newly-converted ACID table runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory. + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 original bucket files (000000_0 and 000001_0), plus one delta directory + // and one delete_delta directory. When split-update is enabled, an update event is split into + // a combination of delete and insert, that generates the delete_delta directory. // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001) - Assert.assertEquals(3, status.length); + // and so should the delete_delta directory. + Assert.assertEquals(4, status.length); boolean sawNewDelta = false; + boolean sawNewDeleteDelta = false; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("delta_.*")) { sawNewDelta = true; FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + } else if (status[i].getPath().getName().matches("delete_delta_.*")) { + sawNewDeleteDelta = true; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); } else { Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); } } Assert.assertTrue(sawNewDelta); + Assert.assertTrue(sawNewDeleteDelta); rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 3}}; Assert.assertEquals(stringifyValues(resultData), rs); @@ -602,8 +627,8 @@ public void testNonAcidToAcidConversion2() throws Exception { // There should be 1 new directory: base_0000001. // Original bucket files and delta directory should stay until Cleaner kicks in. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(4, status.length); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(5, status.length); boolean sawNewBase = false; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("base_.*")) { @@ -623,15 +648,15 @@ public void testNonAcidToAcidConversion2() throws Exception { // 5. Let Cleaner delete obsolete files/dirs status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - // Before Cleaner, there should be 4 items: - // 2 original files, 1 delta directory and 1 base directory - Assert.assertEquals(4, status.length); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // Before Cleaner, there should be 5 items: + // 2 original files, 1 delta directory, 1 delete_delta directory and 1 base directory + Assert.assertEquals(5, status.length); runCleaner(hiveConf); // There should be only 1 directory left: base_0000001. - // Original bucket files and delta directory should have been cleaned up. + // Original bucket files, delta directory and delete_delta should have been cleaned up. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); Assert.assertEquals(1, status.length); Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); @@ -646,9 +671,10 @@ public void testNonAcidToAcidConversion2() throws Exception { } /** - * Test the query correctness and directory layout after ACID table conversion and MAJOR compaction + * Test the query correctness and directory layout for ACID table conversion with split-update + * enabled. * 1. Insert a row to Non-ACID table - * 2. Convert Non-ACID to ACID table + * 2. Convert Non-ACID to ACID table with split-update enabled * 3. Perform Major compaction * 4. Insert a new row to ACID table * 5. Perform another Major compaction @@ -663,7 +689,7 @@ public void testNonAcidToAcidConversion3() throws Exception { // 1. Insert a row to Non-ACID table runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); // There should be 2 original bucket files in the location (000000_0 and 000001_0) Assert.assertEquals(BUCKET_COUNT, status.length); for (int i = 0; i < status.length; i++) { @@ -676,10 +702,11 @@ public void testNonAcidToAcidConversion3() throws Exception { int resultCount = 1; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - // 2. Convert NONACIDORCTBL to ACID table - runStatementOnDriver("alter table " + Table.NONACIDORCTBL + " SET TBLPROPERTIES ('transactional'='true')"); + // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default) + runStatementOnDriver("alter table " + Table.NONACIDORCTBL + + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); // Everything should be same as before Assert.assertEquals(BUCKET_COUNT, status.length); for (int i = 0; i < status.length; i++) { @@ -698,7 +725,7 @@ public void testNonAcidToAcidConversion3() throws Exception { // There should be 1 new directory: base_-9223372036854775808 // Original bucket files should stay until Cleaner kicks in. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); Assert.assertEquals(3, status.length); boolean sawNewBase = false; for (int i = 0; i < status.length; i++) { @@ -722,12 +749,14 @@ public void testNonAcidToAcidConversion3() throws Exception { runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); Arrays.sort(status); // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000 // There should be 2 original bucket files (000000_0 and 000001_0), a base directory, - // plus two new delta directories - Assert.assertEquals(5, status.length); + // plus two new delta directories and one delete_delta directory that would be created due to + // the update statement (remember split-update U=D+I)! + Assert.assertEquals(6, status.length); int numDelta = 0; + int numDeleteDelta = 0; sawNewBase = false; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("delta_.*")) { @@ -740,9 +769,17 @@ public void testNonAcidToAcidConversion3() throws Exception { Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } else if (numDelta == 2) { Assert.assertEquals("delta_0000023_0000023_0000", status[i].getPath().getName()); - Assert.assertEquals(BUCKET_COUNT, buckets.length); - Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); - Assert.assertEquals("bucket_00001", buckets[1].getPath().getName()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); + } + } else if (status[i].getPath().getName().matches("delete_delta_.*")) { + numDeleteDelta++; + FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Arrays.sort(buckets); + if (numDeleteDelta == 1) { + Assert.assertEquals("delete_delta_0000022_0000022_0000", status[i].getPath().getName()); + Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } } else if (status[i].getPath().getName().matches("base_.*")) { Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); @@ -755,11 +792,13 @@ public void testNonAcidToAcidConversion3() throws Exception { } } Assert.assertEquals(2, numDelta); + Assert.assertEquals(1, numDeleteDelta); Assert.assertTrue(sawNewBase); + rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 3}, {3, 4}}; Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *? + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); resultCount = 2; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); @@ -767,11 +806,12 @@ public void testNonAcidToAcidConversion3() throws Exception { runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); runWorker(hiveConf); // There should be 1 new base directory: base_0000001 - // Original bucket files, delta directories and the previous base directory should stay until Cleaner kicks in. + // Original bucket files, delta directories, delete_delta directories and the + // previous base directory should stay until Cleaner kicks in. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); Arrays.sort(status); - Assert.assertEquals(6, status.length); + Assert.assertEquals(7, status.length); int numBase = 0; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName().matches("base_.*")) { @@ -785,9 +825,8 @@ public void testNonAcidToAcidConversion3() throws Exception { } else if (numBase == 2) { // The new base dir now has two bucket files, since the delta dir has two bucket files Assert.assertEquals("base_0000023", status[i].getPath().getName()); - Assert.assertEquals(BUCKET_COUNT, buckets.length); - Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); - Assert.assertEquals("bucket_00001", buckets[1].getPath().getName()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); } } } @@ -795,28 +834,27 @@ public void testNonAcidToAcidConversion3() throws Exception { rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 3}, {3, 4}}; Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL);//todo: what is the point of this if we just did select *? + rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); resultCount = 2; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); // 6. Let Cleaner delete obsolete files/dirs status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); // Before Cleaner, there should be 6 items: - // 2 original files, 2 delta directories and 2 base directories - Assert.assertEquals(6, status.length); + // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories + Assert.assertEquals(7, status.length); runCleaner(hiveConf); // There should be only 1 directory left: base_0000001. // Original bucket files, delta directories and previous base directory should have been cleaned up. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); Assert.assertEquals(1, status.length); Assert.assertEquals("base_0000023", status[0].getPath().getName()); FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); Arrays.sort(buckets); - Assert.assertEquals(BUCKET_COUNT, buckets.length); - Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); - Assert.assertEquals("bucket_00001", buckets[1].getPath().getName()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 3}, {3, 4}}; Assert.assertEquals(stringifyValues(resultData), rs); @@ -824,9 +862,6 @@ public void testNonAcidToAcidConversion3() throws Exception { resultCount = 2; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); } - - - @Test public void testValidTxnsBookkeeping() throws Exception { // 1. Run a query against a non-ACID table, and we shouldn't have txn logged in conf diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java deleted file mode 100644 index 520e958af3..0000000000 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java +++ /dev/null @@ -1,545 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql; - -import java.util.Arrays; -import java.util.List; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.FileUtils; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -/** - * Same as TestTxnCommands2 but tests ACID tables with 'transactional_properties' set to 'default'. - * This tests whether ACID tables with split-update turned on are working correctly or not - * for the same set of tests when it is turned off. Of course, it also adds a few tests to test - * specific behaviors of ACID tables with split-update turned on. - */ -public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 { - - public TestTxnCommands2WithSplitUpdate() { - super(); - } - - @Rule - public ExpectedException expectedException = ExpectedException.none(); - - @Override - @Before - public void setUp() throws Exception { - setUpWithTableProperties("'transactional'='true','transactional_properties'='default'"); - } - - @Override - @Test - public void testInitiatorWithMultipleFailedCompactions() throws Exception { - // Test with split-update turned on. - testInitiatorWithMultipleFailedCompactionsForVariousTblProperties("'transactional'='true','transactional_properties'='default'"); - } - - @Override - @Test - public void writeBetweenWorkerAndCleaner() throws Exception { - writeBetweenWorkerAndCleanerForVariousTblProperties("'transactional'='true','transactional_properties'='default'"); - } - - @Override - @Test - public void testACIDwithSchemaEvolutionAndCompaction() throws Exception { - testACIDwithSchemaEvolutionForVariousTblProperties("'transactional'='true','transactional_properties'='default'"); - } - - /** - * In current implementation of ACID, altering the value of transactional_properties or trying to - * set a value for previously unset value for an acid table will throw an exception. - * @throws Exception - */ - @Test - public void testFailureOnAlteringTransactionalProperties() throws Exception { - expectedException.expect(RuntimeException.class); - expectedException.expectMessage("TBLPROPERTIES with 'transactional_properties' cannot be altered after the table is created"); - runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')"); - } - /** - * Test the query correctness and directory layout for ACID table conversion with split-update - * enabled. - * 1. Insert a row to Non-ACID table - * 2. Convert Non-ACID to ACID table with split-update enabled - * 3. Insert a row to ACID table - * 4. Perform Major compaction - * 5. Clean - * @throws Exception - */ - @Test - @Override - public void testNonAcidToAcidConversion1() throws Exception { - FileSystem fs = FileSystem.get(hiveConf); - FileStatus[] status; - - // 1. Insert a row to Non-ACID table - runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - // There should be 2 original bucket files in the location (000000_0 and 000001_0) - Assert.assertEquals(BUCKET_COUNT, status.length); - for (int i = 0; i < status.length; i++) { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } - List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - int [][] resultData = new int[][] {{1, 2}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - int resultCount = 1; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - - // 2. Convert NONACIDORCTBL to ACID table - runStatementOnDriver("alter table " + Table.NONACIDORCTBL - + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - // Everything should be same as before - Assert.assertEquals(BUCKET_COUNT, status.length); - for (int i = 0; i < status.length; i++) { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } - rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 2}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - resultCount = 1; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - - // 3. Insert another row to newly-converted ACID table - runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - // There should be 2 original bucket files (000000_0 and 000001_0), plus a new delta directory. - // The delta directory should also have only 1 bucket file (bucket_00001) - Assert.assertEquals(3, status.length); - boolean sawNewDelta = false; - for (int i = 0; i < status.length; i++) { - if (status[i].getPath().getName().matches("delta_.*")) { - sawNewDelta = true; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(1, buckets.length); // only one bucket file - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); - } else { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } - } - Assert.assertTrue(sawNewDelta); - rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b"); - resultData = new int[][] {{1, 2}, {3, 4}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - resultCount = 2; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - - // 4. Perform a major compaction - runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); - runWorker(hiveConf); - // There should be 1 new directory: base_xxxxxxx. - // Original bucket files and delta directory should stay until Cleaner kicks in. - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(4, status.length); - boolean sawNewBase = false; - for (int i = 0; i < status.length; i++) { - if (status[i].getPath().getName().matches("base_.*")) { - sawNewBase = true; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(1, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); - } - } - Assert.assertTrue(sawNewBase); - rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 2}, {3, 4}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - resultCount = 2; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - - // 5. Let Cleaner delete obsolete files/dirs - // Note, here we create a fake directory along with fake files as original directories/files - String fakeFile0 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() + - "/subdir/000000_0"; - String fakeFile1 = TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDORCTBL).toString().toLowerCase() + - "/subdir/000000_1"; - fs.create(new Path(fakeFile0)); - fs.create(new Path(fakeFile1)); - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - // Before Cleaner, there should be 5 items: - // 2 original files, 1 original directory, 1 base directory and 1 delta directory - Assert.assertEquals(5, status.length); - runCleaner(hiveConf); - // There should be only 1 directory left: base_xxxxxxx. - // Original bucket files and delta directory should have been cleaned up. - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(1, status.length); - Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); - FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(1, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); - rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 2}, {3, 4}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - resultCount = 2; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - } - - /** - * Test the query correctness and directory layout for ACID table conversion with split-update - * enabled. - * 1. Insert a row to Non-ACID table - * 2. Convert Non-ACID to ACID table with split update enabled. - * 3. Update the existing row in ACID table - * 4. Perform Major compaction - * 5. Clean - * @throws Exception - */ - @Test - @Override - public void testNonAcidToAcidConversion2() throws Exception { - FileSystem fs = FileSystem.get(hiveConf); - FileStatus[] status; - - // 1. Insert a row to Non-ACID table - runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - // There should be 2 original bucket files in the location (000000_0 and 000001_0) - Assert.assertEquals(BUCKET_COUNT, status.length); - for (int i = 0; i < status.length; i++) { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } - List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - int [][] resultData = new int[][] {{1, 2}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - int resultCount = 1; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - - // 2. Convert NONACIDORCTBL to ACID table - runStatementOnDriver("alter table " + Table.NONACIDORCTBL - + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - // Everything should be same as before - Assert.assertEquals(BUCKET_COUNT, status.length); - for (int i = 0; i < status.length; i++) { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } - rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 2}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - resultCount = 1; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - - // 3. Update the existing row in newly-converted ACID table - runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - // There should be 2 original bucket files (000000_0 and 000001_0), plus one delta directory - // and one delete_delta directory. When split-update is enabled, an update event is split into - // a combination of delete and insert, that generates the delete_delta directory. - // The delta directory should also have 2 bucket files (bucket_00000 and bucket_00001) - // and so should the delete_delta directory. - Assert.assertEquals(4, status.length); - boolean sawNewDelta = false; - boolean sawNewDeleteDelta = false; - for (int i = 0; i < status.length; i++) { - if (status[i].getPath().getName().matches("delta_.*")) { - sawNewDelta = true; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - } else if (status[i].getPath().getName().matches("delete_delta_.*")) { - sawNewDeleteDelta = true; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); - } else { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } - } - Assert.assertTrue(sawNewDelta); - Assert.assertTrue(sawNewDeleteDelta); - rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 3}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - resultCount = 1; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - - // 4. Perform a major compaction - runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); - runWorker(hiveConf); - // There should be 1 new directory: base_0000001. - // Original bucket files and delta directory should stay until Cleaner kicks in. - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(5, status.length); - boolean sawNewBase = false; - for (int i = 0; i < status.length; i++) { - if (status[i].getPath().getName().matches("base_.*")) { - sawNewBase = true; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); - } - } - Assert.assertTrue(sawNewBase); - rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 3}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - resultCount = 1; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - - // 5. Let Cleaner delete obsolete files/dirs - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - // Before Cleaner, there should be 5 items: - // 2 original files, 1 delta directory, 1 delete_delta directory and 1 base directory - Assert.assertEquals(5, status.length); - runCleaner(hiveConf); - // There should be only 1 directory left: base_0000001. - // Original bucket files, delta directory and delete_delta should have been cleaned up. - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(1, status.length); - Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); - FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); - Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_00001")); - rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 3}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - resultCount = 1; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - } - - /** - * Test the query correctness and directory layout for ACID table conversion with split-update - * enabled. - * 1. Insert a row to Non-ACID table - * 2. Convert Non-ACID to ACID table with split-update enabled - * 3. Perform Major compaction - * 4. Insert a new row to ACID table - * 5. Perform another Major compaction - * 6. Clean - * @throws Exception - */ - @Test - @Override - public void testNonAcidToAcidConversion3() throws Exception { - FileSystem fs = FileSystem.get(hiveConf); - FileStatus[] status; - - // 1. Insert a row to Non-ACID table - runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(1,2)"); - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - // There should be 2 original bucket files in the location (000000_0 and 000001_0) - Assert.assertEquals(BUCKET_COUNT, status.length); - for (int i = 0; i < status.length; i++) { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } - List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - int [][] resultData = new int[][] {{1, 2}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - int resultCount = 1; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - - // 2. Convert NONACIDORCTBL to ACID table with split_update enabled. (txn_props=default) - runStatementOnDriver("alter table " + Table.NONACIDORCTBL - + " SET TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')"); - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - // Everything should be same as before - Assert.assertEquals(BUCKET_COUNT, status.length); - for (int i = 0; i < status.length; i++) { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } - rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 2}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - resultCount = 1; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - - // 3. Perform a major compaction - runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); - runWorker(hiveConf); - // There should be 1 new directory: base_-9223372036854775808 - // Original bucket files should stay until Cleaner kicks in. - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(3, status.length); - boolean sawNewBase = false; - for (int i = 0; i < status.length; i++) { - if (status[i].getPath().getName().matches("base_.*")) { - Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); - sawNewBase = true; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); - Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); - } - } - Assert.assertTrue(sawNewBase); - rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 2}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - resultCount = 1; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - - // 4. Update the existing row, and insert another row to newly-converted ACID table - runStatementOnDriver("update " + Table.NONACIDORCTBL + " set b=3 where a=1"); - runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(3,4)"); - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - Arrays.sort(status); // make sure delta_0000001_0000001_0000 appears before delta_0000002_0000002_0000 - // There should be 2 original bucket files (000000_0 and 000001_0), a base directory, - // plus two new delta directories and one delete_delta directory that would be created due to - // the update statement (remember split-update U=D+I)! - Assert.assertEquals(6, status.length); - int numDelta = 0; - int numDeleteDelta = 0; - sawNewBase = false; - for (int i = 0; i < status.length; i++) { - if (status[i].getPath().getName().matches("delta_.*")) { - numDelta++; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Arrays.sort(buckets); - if (numDelta == 1) { - Assert.assertEquals("delta_0000022_0000022_0000", status[i].getPath().getName()); - Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); - Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); - } else if (numDelta == 2) { - Assert.assertEquals("delta_0000023_0000023_0000", status[i].getPath().getName()); - Assert.assertEquals(1, buckets.length); - Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); - } - } else if (status[i].getPath().getName().matches("delete_delta_.*")) { - numDeleteDelta++; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Arrays.sort(buckets); - if (numDeleteDelta == 1) { - Assert.assertEquals("delete_delta_0000022_0000022_0000", status[i].getPath().getName()); - Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); - Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); - } - } else if (status[i].getPath().getName().matches("base_.*")) { - Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); - sawNewBase = true; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); - Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); - } else { - Assert.assertTrue(status[i].getPath().getName().matches("00000[01]_0")); - } - } - Assert.assertEquals(2, numDelta); - Assert.assertEquals(1, numDeleteDelta); - Assert.assertTrue(sawNewBase); - - rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 3}, {3, 4}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - resultCount = 2; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - - // 5. Perform another major compaction - runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'"); - runWorker(hiveConf); - // There should be 1 new base directory: base_0000001 - // Original bucket files, delta directories, delete_delta directories and the - // previous base directory should stay until Cleaner kicks in. - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - Arrays.sort(status); - Assert.assertEquals(7, status.length); - int numBase = 0; - for (int i = 0; i < status.length; i++) { - if (status[i].getPath().getName().matches("base_.*")) { - numBase++; - FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Arrays.sort(buckets); - if (numBase == 1) { - Assert.assertEquals("base_-9223372036854775808", status[i].getPath().getName()); - Assert.assertEquals(BUCKET_COUNT - 1, buckets.length); - Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); - } else if (numBase == 2) { - // The new base dir now has two bucket files, since the delta dir has two bucket files - Assert.assertEquals("base_0000023", status[i].getPath().getName()); - Assert.assertEquals(1, buckets.length); - Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); - } - } - } - Assert.assertEquals(2, numBase); - rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 3}, {3, 4}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - resultCount = 2; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - - // 6. Let Cleaner delete obsolete files/dirs - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - // Before Cleaner, there should be 6 items: - // 2 original files, 2 delta directories, 1 delete_delta directory and 2 base directories - Assert.assertEquals(7, status.length); - runCleaner(hiveConf); - // There should be only 1 directory left: base_0000001. - // Original bucket files, delta directories and previous base directory should have been cleaned up. - status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - Assert.assertEquals(1, status.length); - Assert.assertEquals("base_0000023", status[0].getPath().getName()); - FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); - Arrays.sort(buckets); - Assert.assertEquals(1, buckets.length); - Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); - rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); - resultData = new int[][] {{1, 3}, {3, 4}}; - Assert.assertEquals(stringifyValues(resultData), rs); - rs = runStatementOnDriver("select count(*) from " + Table.NONACIDORCTBL); - resultCount = 2; - Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); - } -} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java index 44a94127ec..c76d654b95 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java @@ -23,11 +23,11 @@ import org.junit.Test; /** - * Same as TestTxnCommands2WithSplitUpdate but tests ACID tables with vectorization turned on by + * Same as TestTxnCommands2 but tests ACID tables with vectorization turned on by * default, and having 'transactional_properties' set to 'default'. This specifically tests the * fast VectorizedOrcAcidRowBatchReader for ACID tables with split-update turned on. */ -public class TestTxnCommands2WithSplitUpdateAndVectorization extends TestTxnCommands2WithSplitUpdate { +public class TestTxnCommands2WithSplitUpdateAndVectorization extends TestTxnCommands2 { public TestTxnCommands2WithSplitUpdateAndVectorization() { super(); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index 44ff65c77d..7247711fb2 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -669,21 +669,12 @@ public void testDeleteEventDeltaDirPathFilter() throws Exception { @Test public void testAcidOperationalProperties() throws Exception { - AcidUtils.AcidOperationalProperties testObj = AcidUtils.AcidOperationalProperties.getLegacy(); - assertsForAcidOperationalProperties(testObj, "legacy"); - - testObj = AcidUtils.AcidOperationalProperties.getDefault(); + AcidUtils.AcidOperationalProperties testObj = AcidUtils.AcidOperationalProperties.getDefault(); assertsForAcidOperationalProperties(testObj, "default"); - testObj = AcidUtils.AcidOperationalProperties.parseInt(0); - assertsForAcidOperationalProperties(testObj, "legacy"); - testObj = AcidUtils.AcidOperationalProperties.parseInt(1); assertsForAcidOperationalProperties(testObj, "split_update"); - testObj = AcidUtils.AcidOperationalProperties.parseString("legacy"); - assertsForAcidOperationalProperties(testObj, "legacy"); - testObj = AcidUtils.AcidOperationalProperties.parseString("default"); assertsForAcidOperationalProperties(testObj, "default"); @@ -699,12 +690,6 @@ private void assertsForAcidOperationalProperties(AcidUtils.AcidOperationalProper assertEquals(1, testObj.toInt()); assertEquals("|split_update", testObj.toString()); break; - case "legacy": - assertEquals(false, testObj.isSplitUpdate()); - assertEquals(false, testObj.isHashBasedMerge()); - assertEquals(0, testObj.toInt()); - assertEquals("", testObj.toString()); - break; default: break; } @@ -716,7 +701,7 @@ public void testAcidOperationalPropertiesSettersAndGetters() throws Exception { Configuration testConf = new Configuration(); // Test setter for configuration object. AcidUtils.setAcidOperationalProperties(testConf, oprProps); - assertEquals(1, testConf.getInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, 0)); + assertEquals(1, testConf.getInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, -1)); // Test getter for configuration object. assertEquals(oprProps.toString(), AcidUtils.getAcidOperationalProperties(testConf).toString()); @@ -726,10 +711,7 @@ public void testAcidOperationalPropertiesSettersAndGetters() throws Exception { assertEquals(oprProps.toString(), parameters.get(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname)); // Test getter for map object. - // Calling a get on the 'parameters' will still return legacy type because the setters/getters - // for map work on different string keys. The setter will set the HIVE_TXN_OPERATIONAL_PROPERTIES - // while the getter will try to read the key TABLE_TRANSACTIONAL_PROPERTIES. - assertEquals(0, AcidUtils.getAcidOperationalProperties(parameters).toInt()); + assertEquals(1, AcidUtils.getAcidOperationalProperties(parameters).toInt()); parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, oprProps.toString()); // Set the appropriate key in the map and test that we are able to read it back correctly. assertEquals(1, AcidUtils.getAcidOperationalProperties(parameters).toInt()); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index b004cf5de7..107313b7ea 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -2690,9 +2690,11 @@ public void testSplitGenReadOps() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktable - // call-2: open - mock:/mocktable/0_0 - // call-3: open - mock:/mocktable/0_1 - assertEquals(3, readOpsDelta); + // call-2: check existence of side file for mock:/mocktable/0_0 + // call-3: open - mock:/mocktable/0_0 + // call-4: check existence of side file for mock:/mocktable/0_1 + // call-5: open - mock:/mocktable/0_1 + assertEquals(5, readOpsDelta); assertEquals(2, splits.length); // revert back to local fs @@ -2748,9 +2750,11 @@ public void testSplitGenReadOpsLocalCache() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktbl - // call-2: open - mock:/mocktbl/0_0 - // call-3: open - mock:/mocktbl/0_1 - assertEquals(3, readOpsDelta); + // call-2: check existence of side file for mock:/mocktbl/0_0 + // call-3: open - mock:/mocktbl/0_0 + // call-4: check existence of side file for mock:/mocktbl/0_1 + // call-5: open - mock:/mocktbl/0_1 + assertEquals(5, readOpsDelta); // force BI to avoid reading footers conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); @@ -2768,7 +2772,9 @@ public void testSplitGenReadOpsLocalCache() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktbl - assertEquals(1, readOpsDelta); + // call-2: check existence of side file for mock:/mocktbl/0_0 + // call-3: check existence of side file for mock:/mocktbl/0_1 + assertEquals(3, readOpsDelta); // enable cache and use default strategy conf.set(ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_MEMORY_SIZE.varname, "10Mb"); @@ -2787,9 +2793,11 @@ public void testSplitGenReadOpsLocalCache() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktbl - // call-2: open - mock:/mocktbl/0_0 - // call-3: open - mock:/mocktbl/0_1 - assertEquals(3, readOpsDelta); + // call-2: check existence of side file for mock:/mocktbl/0_0 + // call-3: open - mock:/mocktbl/0_0 + // call-4: check existence of side file for mock:/mocktbl/0_1 + // call-5: open - mock:/mocktbl/0_1 + assertEquals(5, readOpsDelta); for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { if (statistics.getScheme().equalsIgnoreCase("mock")) { @@ -2859,9 +2867,11 @@ public void testSplitGenReadOpsLocalCacheChangeFileLen() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktable - // call-2: open - mock:/mocktbl1/0_0 - // call-3: open - mock:/mocktbl1/0_1 - assertEquals(3, readOpsDelta); + // call-2: check side file for mock:/mocktbl1/0_0 + // call-3: open - mock:/mocktbl1/0_0 + // call-4: check side file for mock:/mocktbl1/0_1 + // call-5: open - mock:/mocktbl1/0_1 + assertEquals(5, readOpsDelta); // change file length and look for cache misses @@ -2898,9 +2908,11 @@ public void testSplitGenReadOpsLocalCacheChangeFileLen() throws Exception { } } // call-1: listLocatedStatus - mock:/mocktable - // call-2: open - mock:/mocktbl1/0_0 - // call-3: open - mock:/mocktbl1/0_1 - assertEquals(3, readOpsDelta); + // call-2: check side file for mock:/mocktbl1/0_0 + // call-3: open - mock:/mocktbl1/0_0 + // call-4: check side file for mock:/mocktbl1/0_1 + // call-5: open - mock:/mocktbl1/0_1 + assertEquals(5, readOpsDelta); for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { if (statistics.getScheme().equalsIgnoreCase("mock")) { @@ -2971,9 +2983,11 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti } } // call-1: listLocatedStatus - mock:/mocktbl2 - // call-2: open - mock:/mocktbl2/0_0 - // call-3: open - mock:/mocktbl2/0_1 - assertEquals(3, readOpsDelta); + // call-2: check side file for mock:/mocktbl2/0_0 + // call-3: open - mock:/mocktbl2/0_0 + // call-4: check side file for mock:/mocktbl2/0_1 + // call-5: open - mock:/mocktbl2/0_1 + assertEquals(5, readOpsDelta); // change file modification time and look for cache misses FileSystem fs1 = FileSystem.get(conf); @@ -2993,8 +3007,9 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti } } // call-1: listLocatedStatus - mock:/mocktbl2 - // call-2: open - mock:/mocktbl2/0_1 - assertEquals(2, readOpsDelta); + // call-2: check side file for mock:/mocktbl2/0_1 + // call-3: open - mock:/mocktbl2/0_1 + assertEquals(3, readOpsDelta); // touch the next file fs1 = FileSystem.get(conf); @@ -3014,8 +3029,9 @@ public void testSplitGenReadOpsLocalCacheChangeModificationTime() throws Excepti } } // call-1: listLocatedStatus - mock:/mocktbl2 - // call-2: open - mock:/mocktbl2/0_0 - assertEquals(2, readOpsDelta); + // call-2: check side file for mock:/mocktbl2/0_0 + // call-3: open - mock:/mocktbl2/0_0 + assertEquals(3, readOpsDelta); for (FileSystem.Statistics statistics : FileSystem.getAllStatistics()) { if (statistics.getScheme().equalsIgnoreCase("mock")) { diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index ba8d6756ef..d2936a8bd4 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.orc.CompressionKind; import org.apache.orc.MemoryManager; @@ -524,6 +525,7 @@ public void testEmpty() throws Exception { AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) .inspector(inspector).bucket(BUCKET).writingBase(true) .maximumTransactionId(100).finalDestination(root); + //this doesn't create an empty file and thus no base_x of.getRecordUpdater(root, options).close(false); ValidTxnList txnList = new ValidReadTxnList("200:" + Long.MAX_VALUE); @@ -1009,20 +1011,33 @@ public synchronized void addedRow(int rows) throws IOException { ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY)); } } - ru.delete(100, new BigRow(9, 0, BUCKET_PROPERTY)); + ru.delete(1, new BigRow(9, 0, BUCKET_PROPERTY)); ru.close(false); // write a delta - options.minimumTransactionId(2).maximumTransactionId(2); + options.minimumTransactionId(100).maximumTransactionId(100); ru = of.getRecordUpdater(root, options); values = new String[]{null, null, "1.0", null, null, null, null, "3.1"}; - for(int i=0; i < values.length; ++i) { + for(int i=0; i < values.length - 1; ++i) { if (values[i] != null) { - ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY)); + ru.update(100, new BigRow(i, i, values[i], i, i, i, 0, BUCKET_PROPERTY)); } } + //do this before next update so that delte_delta is properly sorted ru.delete(100, new BigRow(8, 0, BUCKET_PROPERTY)); + //because row 8 was updated and thus has a different RecordIdentifier now + ru.update(100, new BigRow(7, 7, values[values.length - 1], 7, 7, 2, 1, BUCKET_PROPERTY)); ru.close(false); + MyResult[] expected = new MyResult[10]; + int k = 0; + expected[k++] = new MyResult(0, "0.0"); + expected[k++] = new MyResult(1, "0.1"); + expected[k++] = new MyResult(2, "1.0"); + expected[k++] = new MyResult(3, "1.1"); + expected[k++] = new MyResult(4, "2.0"); + expected[k++] = new MyResult(5, "2.1"); + expected[k++] = new MyResult(6, "3.0"); + expected[k] = new MyResult(7, "3.1"); InputFormat inf = new OrcInputFormat(); JobConf job = new JobConf(); @@ -1031,11 +1046,41 @@ public synchronized void addedRow(int rows) throws IOException { job.set("mapred.input.dir", root.toString()); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty()); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, BigRow.getColumnTypesProperty()); - HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); + AcidUtils.setTransactionalTableScan(job,true); + job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); InputSplit[] splits = inf.getSplits(job, 5); - assertEquals(5, splits.length); + //base has 10 rows, so 5 splits, 1 delta has 2 rows so 1 split, and 1 delta has 3 so 2 splits + assertEquals(8, splits.length); org.apache.hadoop.mapred.RecordReader rr; - +/** + * OrcRecordUpdater.parseKeyIndex(reader) returns {originalTxn: 0, bucket: 537591808(1.11.0), row: 3} when reading + * Hive ORC Reader(file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/testRecordReaderNewBaseAndDelta/delta_0000001_0000001_0000/bucket_00011, 2095) + * how does htis happen? there is no row with otxnid 0 in this file.... so then base reader returns nothing + */ + for(int j = 0; j < splits.length; j++) { + InputSplit split = splits[j]; + rr = inf.getRecordReader(split, job, Reporter.NULL); + NullWritable key = rr.createKey(); + OrcStruct value = rr.createValue(); + while(rr.next(key, value)) { + MyResult mr = new MyResult(Integer.parseInt(value.getFieldValue(0).toString()), value.getFieldValue(2).toString()); + int i = 0; + for(; i < expected.length; i++) { + if(mr.equals(expected[i])) { + expected[i] = null; + break; + } + } + if(i >= expected.length) { + //not found + assertTrue("Found unexpected row: " + mr, false ); + } + } + } + for(MyResult mr : expected) { + assertTrue("Expected " + mr + " not found in any InputSplit", mr == null); + } +/* // loop through the 5 splits and read each for(int i=0; i < 4; ++i) { System.out.println("starting split " + i + " = " + splits[i]); @@ -1054,8 +1099,28 @@ public synchronized void addedRow(int rows) throws IOException { } rr = inf.getRecordReader(splits[4], job, Reporter.NULL); assertEquals(false, rr.next(rr.createKey(), rr.createValue())); + */ + } + private static final class MyResult { + private final int myInt; + private final String myText; + MyResult(int myInt, String myText) { + this.myInt = myInt; + this.myText = myText; + } + @Override + public boolean equals(Object t) { + if(!(t instanceof MyResult)) { + return false; + } + MyResult that = (MyResult)t; + return myInt == that.myInt && myText.equals(that.myText); + } + @Override + public String toString() { + return "(" + myInt + "," + myText +")"; + } } - /** * Test the RecordReader when there is a new base and a delta. * @throws Exception @@ -1174,11 +1239,13 @@ private void testRecordReaderIncompleteDelta(boolean use130Format) throws Except job.set("bucket_count", "2"); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, MyRow.getColumnNamesProperty()); job.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, MyRow.getColumnTypesProperty()); - HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); + AcidUtils.setTransactionalTableScan(job,true); + job.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); // read the keys before the delta is flushed InputSplit[] splits = inf.getSplits(job, 1); - assertEquals(2, splits.length); + //1 split since we only have 1 bucket file in base/. delta is not flushed (committed) yet, i.e. empty + assertEquals(1, splits.length); org.apache.hadoop.mapred.RecordReader rr = inf.getRecordReader(splits[0], job, Reporter.NULL); NullWritable key = rr.createKey(); @@ -1201,17 +1268,24 @@ private void testRecordReaderIncompleteDelta(boolean use130Format) throws Except splits = inf.getSplits(job, 1); assertEquals(2, splits.length); - rr = inf.getRecordReader(splits[0], job, Reporter.NULL); Path sideFile = new Path(root + "/" + (use130Format ? AcidUtils.deltaSubdir(10,19,0) : AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length"); assertEquals(true, fs.exists(sideFile)); assertEquals(24, fs.getFileStatus(sideFile).getLen()); - for(int i=1; i < 11; ++i) { + rr = inf.getRecordReader(splits[0], job, Reporter.NULL); + for(int i=1; i <= 5; ++i) { assertEquals(true, rr.next(key, value)); assertEquals(Integer.toString(i), value.getFieldValue(0).toString()); } assertEquals(false, rr.next(key, value)); + + rr = inf.getRecordReader(splits[1], job, Reporter.NULL); + for(int i=6; i < 11; ++i) { + assertEquals("i="+ i, true, rr.next(key, value)); + assertEquals(Integer.toString(i), value.getFieldValue(0).toString()); + } + assertEquals(false, rr.next(key, value)); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java index be155176fc..a601a32607 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java @@ -266,26 +266,46 @@ public void testUpdates() throws Exception { Reader reader = OrcFile.createReader(bucketPath, new OrcFile.ReaderOptions(conf).filesystem(fs)); - assertEquals(2, reader.getNumberOfRows()); + assertEquals(1, reader.getNumberOfRows()); RecordReader rows = reader.rows(); // check the contents of the file assertEquals(true, rows.hasNext()); OrcStruct row = (OrcStruct) rows.next(null); - assertEquals(OrcRecordUpdater.UPDATE_OPERATION, + assertEquals(OrcRecordUpdater.INSERT_OPERATION, OrcRecordUpdater.getOperation(row)); assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row)); - assertEquals(10, OrcRecordUpdater.getOriginalTransaction(row)); - assertEquals(20, OrcRecordUpdater.getBucket(row)); - assertEquals(30, OrcRecordUpdater.getRowId(row)); + assertEquals(100, OrcRecordUpdater.getOriginalTransaction(row)); + int bucketProperty = OrcRecordUpdater.getBucket(row); + assertEquals(bucket, BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty)); + assertEquals(0, OrcRecordUpdater.getRowId(row)); assertEquals("update", OrcRecordUpdater.getRow(row).getFieldValue(0).toString()); + rows.close(); + + options.writingDeleteDelta(true); + bucketPath = AcidUtils.createFilename(root, options); + reader = OrcFile.createReader(bucketPath, + new OrcFile.ReaderOptions(conf).filesystem(fs)); + assertEquals(2, reader.getNumberOfRows()); + + rows = reader.rows(); + assertEquals(true, rows.hasNext()); + row = (OrcStruct) rows.next(null); + assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row)); + assertEquals(10, OrcRecordUpdater.getOriginalTransaction(row)); + bucketProperty = OrcRecordUpdater.getBucket(row); + assertEquals(bucket, BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty)); + assertEquals(30, OrcRecordUpdater.getRowId(row)); + assertNull(OrcRecordUpdater.getRow(row)); + assertEquals(true, rows.hasNext()); row = (OrcStruct) rows.next(null); assertEquals(100, OrcRecordUpdater.getCurrentTransaction(row)); assertEquals(40, OrcRecordUpdater.getOriginalTransaction(row)); - assertEquals(20, OrcRecordUpdater.getBucket(row)); + bucketProperty = OrcRecordUpdater.getBucket(row); + assertEquals(bucket, BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty)); assertEquals(60, OrcRecordUpdater.getRowId(row)); assertNull(OrcRecordUpdater.getRow(row)); assertEquals(false, rows.hasNext()); diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 439ec9bb8a..43e0a4a431 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -246,10 +246,6 @@ private void testVectorizedOrcAcidRowBatchReader(String deleteEventRegistry) thr @Test public void testCanCreateVectorizedAcidRowBatchReaderOnSplit() throws Exception { OrcSplit mockSplit = Mockito.mock(OrcSplit.class); - conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, - AcidUtils.AcidOperationalProperties.getLegacy().toInt()); - // Test false when trying to create a vectorized ACID row batch reader for a legacy table. - assertFalse(VectorizedOrcAcidRowBatchReader.canCreateVectorizedAcidRowBatchReaderOnSplit(conf, mockSplit)); conf.setInt(HiveConf.ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, AcidUtils.AcidOperationalProperties.getDefault().toInt()); diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index bbed591144..081340ab7c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -409,6 +409,12 @@ public ObjectInspector getObjectInspector() { return null; } + /** + * This is bogus especially with split update acid tables. This causes compaction to create + * delete_delta_x_y where none existed before + * @param value + * @return + */ @Override public boolean isDelete(Text value) { // Alternate between returning deleted and not. This is easier than actually @@ -552,4 +558,7 @@ String makeDeltaDirName(long minTxnId, long maxTxnId) { String makeDeltaDirNameCompacted(long minTxnId, long maxTxnId) { return AcidUtils.deltaSubdir(minTxnId, maxTxnId); } + String makeDeleteDeltaDirNameCompacted(long minTxnId, long maxTxnId) { + return AcidUtils.deleteDeltaSubdir(minTxnId, maxTxnId); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index efd6ed8fe7..a15ea23a4e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -289,7 +289,7 @@ public void minorTableWithBase() throws Exception { CompactionRequest rqst = new CompactionRequest("default", "mtwb", CompactionType.MINOR); txnHandler.compact(rqst); - startWorker(); + startWorker();//adds delta and delete_delta ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -299,7 +299,7 @@ public void minorTableWithBase() throws Exception { // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(4, stat.length); + Assert.assertEquals(5, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; @@ -310,9 +310,19 @@ public void minorTableWithBase() throws Exception { Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(208L, buckets[0].getLen()); - Assert.assertEquals(208L, buckets[1].getLen()); - } else { + Assert.assertEquals(104L, buckets[0].getLen()); + Assert.assertEquals(104L, buckets[1].getLen()); + } + if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(2, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + Assert.assertEquals(104L, buckets[0].getLen()); + Assert.assertEquals(104L, buckets[1].getLen()); + } + else { LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); } } @@ -348,14 +358,15 @@ public void minorWithOpenInMiddle() throws Exception { // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(4, stat.length); + Assert.assertEquals(5, stat.length); // Find the new delta file and make sure it has the right contents Arrays.sort(stat); Assert.assertEquals("base_20", stat[0].getPath().getName()); - Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[1].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName()); + Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 22), stat[1].getPath().getName()); + Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[2].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName()); } @Test @@ -380,18 +391,19 @@ public void minorWithAborted() throws Exception { Assert.assertEquals(1, compacts.size()); Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); - // There should still now be 5 directories in the location + // There should still now be 6 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(5, stat.length); + Assert.assertEquals(6, stat.length); // Find the new delta file and make sure it has the right contents Arrays.sort(stat); Assert.assertEquals("base_20", stat[0].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName()); - Assert.assertEquals(makeDeltaDirNameCompacted(21, 27), stat[2].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(23, 25), stat[3].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(26, 27), stat[4].getPath().getName()); + Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 27), stat[1].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(21, 22), stat[2].getPath().getName()); + Assert.assertEquals(makeDeltaDirNameCompacted(21, 27), stat[3].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName()); } @Test @@ -409,7 +421,7 @@ public void minorPartitionWithBase() throws Exception { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); - startWorker(); + startWorker();//this will create delta_20_24 and delete_delta_20_24. See MockRawReader ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List compacts = rsp.getCompacts(); @@ -419,7 +431,7 @@ public void minorPartitionWithBase() throws Exception { // There should still be four directories in the location. FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); - Assert.assertEquals(4, stat.length); + Assert.assertEquals(5, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; @@ -430,9 +442,18 @@ public void minorPartitionWithBase() throws Exception { Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(208L, buckets[0].getLen()); - Assert.assertEquals(208L, buckets[1].getLen()); - } else { + Assert.assertEquals(104L, buckets[0].getLen()); + Assert.assertEquals(104L, buckets[1].getLen()); + } + if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 24))) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(2, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + Assert.assertEquals(104L, buckets[0].getLen()); + Assert.assertEquals(104L, buckets[1].getLen()); + } else { LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); } } @@ -462,7 +483,7 @@ public void minorTableNoBase() throws Exception { // There should still now be 5 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(3, stat.length); + Assert.assertEquals(4, stat.length); // Find the new delta file and make sure it has the right contents boolean sawNewDelta = false; @@ -473,8 +494,17 @@ public void minorTableNoBase() throws Exception { Assert.assertEquals(2, buckets.length); Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); - Assert.assertEquals(208L, buckets[0].getLen()); - Assert.assertEquals(208L, buckets[1].getLen()); + Assert.assertEquals(104L, buckets[0].getLen()); + Assert.assertEquals(104L, buckets[1].getLen()); + } + if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(1, 4))) { + sawNewDelta = true; + FileStatus[] buckets = fs.listStatus(stat[i].getPath()); + Assert.assertEquals(2, buckets.length); + Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]")); + Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]")); + Assert.assertEquals(104L, buckets[0].getLen()); + Assert.assertEquals(104L, buckets[1].getLen()); } else { LOG.debug("This is not the delta file you are looking for " + stat[i].getPath().getName()); } @@ -534,6 +564,17 @@ public void minorNoBaseLotsOfDeltas() throws Exception { public void majorNoBaseLotsOfDeltas() throws Exception { compactNoBaseLotsOfDeltas(CompactionType.MAJOR); } + + /** + * These tests are starting to be a hack. The files writtern by addDeltaFile() are not proper + * Acid files and the {@link CompactorTest.MockRawReader} performs no merging of delta files and + * fakes isDelete() as a shortcut. This makes files created on disk to not be representative of + * what the should look like in a real system. + * Making {@link org.apache.hadoop.hive.ql.txn.compactor.CompactorTest.MockRawReader} do proper + * delete event handling would be duplicating either OrcRawRecordMerger or VectorizedOrcAcidRowBatchReaer. + * @param type + * @throws Exception + */ private void compactNoBaseLotsOfDeltas(CompactionType type) throws Exception { conf.setIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA, 2); Table t = newTable("default", "mapwb", true); @@ -570,45 +611,55 @@ private void compactNoBaseLotsOfDeltas(CompactionType type) throws Exception { FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(p.getSd().getLocation())); - Assert.assertEquals(9, stat.length); + /* delete_delta_21_23 and delete_delta_25_33 which are created as a result of compacting*/ + int numFilesExpected = 11 + (type == CompactionType.MINOR ? 1 : 0); + Assert.assertEquals(numFilesExpected, stat.length); // Find the new delta file and make sure it has the right contents - BitSet matchesFound = new BitSet(9); - for (int i = 0; i < stat.length; i++) { + BitSet matchesFound = new BitSet(numFilesExpected); + for (int i = 0, j = 0; i < stat.length; i++) { + if(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21,23))) { + matchesFound.set(j++); + } + if(stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(25,33))) { + matchesFound.set(j++); + } if(stat[i].getPath().getName().equals(makeDeltaDirName(21,21))) { - matchesFound.set(0); + matchesFound.set(j++); } else if(stat[i].getPath().getName().equals(makeDeltaDirName(23, 23))) { - matchesFound.set(1); + matchesFound.set(j++); } else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25, 29))) { - matchesFound.set(2); + matchesFound.set(j++); } else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 32))) { - matchesFound.set(3); + matchesFound.set(j++); } else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(31, 33))) { - matchesFound.set(4); + matchesFound.set(j++); } else if(stat[i].getPath().getName().equals(makeDeltaDirName(35, 35))) { - matchesFound.set(5); + matchesFound.set(j++); } else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,23))) { - matchesFound.set(6); + matchesFound.set(j++); } else if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(25,33))) { - matchesFound.set(7); + matchesFound.set(j++); } switch (type) { - //yes, both do set(8) case MINOR: if(stat[i].getPath().getName().equals(makeDeltaDirNameCompacted(21,35))) { - matchesFound.set(8); + matchesFound.set(j++); + } + if (stat[i].getPath().getName().equals(makeDeleteDeltaDirNameCompacted(21, 35))) { + matchesFound.set(j++); } break; case MAJOR: if(stat[i].getPath().getName().equals(AcidUtils.baseDir(35))) { - matchesFound.set(8); + matchesFound.set(j++); } break; default: