diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index d0b5cf6047..ee318c211f 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -88,7 +89,6 @@ public void setUp() throws Exception { hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); @@ -179,14 +179,14 @@ public void testNonStandardConversion01() throws Exception { runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + " stored as ORC TBLPROPERTIES('transactional'='false') as " + "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez); - List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a, b, INPUT__FILE__NAME"); + List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a, b, INPUT__FILE__NAME", confForTez); String expected0[][] = { - {"1\t2", "/1/000000_0"}, - {"3\t4", "/1/000000_0"}, - {"5\t6", "/1/000000_0"}, - {"5\t6", "/2/000000_0"}, - {"7\t8", "/2/000000_0"}, - {"9\t10", "/2/000000_0"}, + {"1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"7\t8", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, }; Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size()); //verify data and layout @@ -195,9 +195,9 @@ public void testNonStandardConversion01() throws Exception { Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected0[i][1])); } //make the table ACID - runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " SET TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " SET TBLPROPERTIES ('transactional'='true')", confForTez); - rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID", confForTez); LOG.warn("after ctas:"); for (String s : rs) { LOG.warn(s); @@ -206,12 +206,12 @@ public void testNonStandardConversion01() throws Exception { /* * Expected result 0th entry i the RecordIdentifier + data. 1st entry file before compact*/ String expected[][] = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, }; Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size()); //verify data and layout @@ -220,17 +220,17 @@ public void testNonStandardConversion01() throws Exception { Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); } //perform some Update/Delete - runStatementOnDriver("update " + Table.NONACIDNONBUCKET + " set a = 70, b = 80 where a = 7"); - runStatementOnDriver("delete from " + Table.NONACIDNONBUCKET + " where a = 5"); - rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID"); + runStatementOnDriver("update " + Table.NONACIDNONBUCKET + " set a = 70, b = 80 where a = 7", confForTez); + runStatementOnDriver("delete from " + Table.NONACIDNONBUCKET + " where a = 5", confForTez); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID", confForTez); LOG.warn("after update/delete:"); for (String s : rs) { LOG.warn(s); } String[][] expected2 = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000021_0000021_0000/bucket_00000"} }; Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size()); @@ -255,9 +255,9 @@ public void testNonStandardConversion01() throws Exception { Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on disk", expectedDelDelta[i]); } //run Minor compaction - runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'minor'"); + runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'minor'", confForTez); TestTxnCommands2.runWorker(hiveConf); - rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID", confForTez); LOG.warn("after compact minor:"); for (String s : rs) { LOG.warn(s); @@ -285,9 +285,9 @@ public void testNonStandardConversion01() throws Exception { Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found on disk", expectedDelDelta2[i]); } //run Major compaction - runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'major'"); + runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'major'", confForTez); TestTxnCommands2.runWorker(hiveConf); - rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID", confForTez); LOG.warn("after compact major:"); for (String s : rs) { LOG.warn(s); @@ -306,8 +306,12 @@ public void testNonStandardConversion01() throws Exception { * How to do this? CTAS is the only way to create data files which are not immediate children * of the partition dir. CTAS/Union/Tez doesn't support partition tables. The only way is to copy * data files in directly. + * + * Actually Insert Into ... select ... union all ... with + * HIVE_OPTIMIZE_UNION_REMOVE (and HIVEFETCHTASKCONVERSION="none"?) will create subdirs + * but if writing to non acid table there is a merge task on MR (but not on Tez) */ - @Ignore("HIVE-17214") +// @Ignore("HIVE-17214")//this consistently works locally but never in ptest.... @Test public void testNonStandardConversion02() throws Exception { HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf @@ -320,13 +324,13 @@ public void testNonStandardConversion02() throws Exception { "union all select a, b from " + Table.ACIDTBL + " where a = 5", confForTez); List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " + - Table.NONACIDNONBUCKET + " order by a, b"); + Table.NONACIDNONBUCKET + " order by a, b", confForTez); String expected0[][] = { - {"1\t2", "/1/000000_0"}, - {"3\t4", "/1/000000_0"}, - {"5\t6", "/3/000000_0"}, - {"7\t8", "/2/000000_0"}, - {"9\t10", "/2/000000_0"}, + {"1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "3/000000_0"}, + {"7\t8", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, }; Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size()); //verify data and layout @@ -338,7 +342,7 @@ public void testNonStandardConversion02() throws Exception { FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); //ensure there is partition dir - runStatementOnDriver("insert into " + Table.NONACIDPART + " partition (p=1) values (100,110)"); + runStatementOnDriver("insert into " + Table.NONACIDPART + " partition (p=1) values (100,110)", confForTez); //creates more files in that partition for(FileStatus stat : status) { int limit = 5; @@ -357,29 +361,29 @@ public void testNonStandardConversion02() throws Exception { nonacidpart/ └── p=1 ├── 000000_0 - ├── 1 + ├── HIVE_UNION_SUBDIR__1 │   └── 000000_0 - ├── 2 + ├── HIVE_UNION_SUBDIR_2 │   └── 000000_0 - └── 3 + └── HIVE_UNION_SUBDIR_3 └── 000000_0 4 directories, 4 files **/ //make the table ACID - runStatementOnDriver("alter table " + Table.NONACIDPART + " SET TBLPROPERTIES ('transactional'='true')"); - rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID"); + runStatementOnDriver("alter table " + Table.NONACIDPART + " SET TBLPROPERTIES ('transactional'='true')", confForTez); + rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID", confForTez); LOG.warn("after acid conversion:"); for (String s : rs) { LOG.warn(s); } String[][] expected = { {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t100\t110\t1", "nonacidpart/p=1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t1", "nonacidpart/p=1/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t3\t4\t1", "nonacidpart/p=1/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10\t1", "nonacidpart/p=1/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8\t1", "nonacidpart/p=1/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6\t1", "nonacidpart/p=1/3/000000_0"} + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t3\t4\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "3/000000_0"} }; Assert.assertEquals("Wrong row count", expected.length, rs.size()); //verify data and layout @@ -389,9 +393,9 @@ public void testNonStandardConversion02() throws Exception { } //run Major compaction - runStatementOnDriver("alter table " + Table.NONACIDPART + " partition (p=1) compact 'major'"); + runStatementOnDriver("alter table " + Table.NONACIDPART + " partition (p=1) compact 'major'", confForTez); TestTxnCommands2.runWorker(hiveConf); - rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID"); + rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID", confForTez); LOG.warn("after major compaction:"); for (String s : rs) { LOG.warn(s); @@ -406,33 +410,42 @@ public void testNonStandardConversion02() throws Exception { } /** * CTAS + Tez + Union creates a non-standard layout in table dir - * Each leg of the union places data into a subdir of the table/partition. Subdirs are named 1/, 2/, etc - * The way this currently works is that CTAS creates an Acid table but the insert statement writes - * the data in non-acid layout. Then on read, it's treated like an non-acid to acid conversion. - * Longer term CTAS should create acid layout from the get-go. + * Each leg of the union places data into a subdir of the table/partition. + * Subdirs are named HIVE_UNION_SUBDIR_1/, HIVE_UNION_SUBDIR_2/, etc + * For Acid tables the writer for each dir must have a different statementId ensured by + * {@link org.apache.hadoop.hive.ql.optimizer.QueryPlanPostProcessor}. + * {@link org.apache.hadoop.hive.ql.exec.MoveTask#execute(DriverContext)} drops the union subdirs + * since each delta file has a unique name. */ @Test public void testCtasTezUnion() throws Exception { HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + confForTez.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); setupTez(confForTez); //CTAS with ACID target table + List rs0 = runStatementOnDriver("explain create table " + Table.ACIDNOBUCKET + " stored as ORC TBLPROPERTIES('transactional'='true') as " + + "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez); + LOG.warn("explain ctas:");//TezEdgeProperty.EdgeType + for (String s : rs0) { + LOG.warn(s); + } runStatementOnDriver("create table " + Table.ACIDNOBUCKET + " stored as ORC TBLPROPERTIES('transactional'='true') as " + "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez); - List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID"); + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID", confForTez); LOG.warn("after ctas:"); for (String s : rs) { LOG.warn(s); } Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); /* - * Expected result 0th entry i the RecordIdentifier + data. 1st entry file before compact*/ + * Expected result 0th entry is the RecordIdentifier + data. 1st entry file before compact*/ String expected[][] = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":2}\t5\t6", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000018_0000018_0002/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "/delta_0000018_0000018_0002/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":2}\t5\t6", "/delta_0000018_0000018_0002/bucket_00000"}, }; Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size()); //verify data and layout @@ -441,18 +454,18 @@ public void testCtasTezUnion() throws Exception { Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); } //perform some Update/Delete - runStatementOnDriver("update " + Table.ACIDNOBUCKET + " set a = 70, b = 80 where a = 7"); - runStatementOnDriver("delete from " + Table.ACIDNOBUCKET + " where a = 5"); - rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID"); + runStatementOnDriver("update " + Table.ACIDNOBUCKET + " set a = 70, b = 80 where a = 7", confForTez); + runStatementOnDriver("delete from " + Table.ACIDNOBUCKET + " where a = 5", confForTez); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID", confForTez); LOG.warn("after update/delete:"); for (String s : rs) { LOG.warn(s); } String[][] expected2 = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, - {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000019_0000019_0000/bucket_00000"} + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000018_0000018_0002/bucket_00000"}, + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000020_0000020_0000/bucket_00000"} }; Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size()); //verify data and layout @@ -464,7 +477,7 @@ public void testCtasTezUnion() throws Exception { FileSystem fs = FileSystem.get(hiveConf); FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - String[] expectedDelDelta = {"delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000"}; + String[] expectedDelDelta = {"delete_delta_0000020_0000020_0000", "delete_delta_0000021_0000021_0000"}; for(FileStatus stat : status) { for(int i = 0; i < expectedDelDelta.length; i++) { if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) { @@ -476,9 +489,9 @@ public void testCtasTezUnion() throws Exception { Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on disk", expectedDelDelta[i]); } //run Minor compaction - runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'minor'"); + runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'minor'", confForTez); TestTxnCommands2.runWorker(hiveConf); - rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID", confForTez); LOG.warn("after compact minor:"); for (String s : rs) { LOG.warn(s); @@ -493,7 +506,7 @@ public void testCtasTezUnion() throws Exception { //check we have right delete delta files after minor compaction status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - String[] expectedDelDelta2 = { "delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000", "delete_delta_0000019_0000020"}; + String[] expectedDelDelta2 = { "delete_delta_0000020_0000020_0000", "delete_delta_0000021_0000021_0000", "delete_delta_0000018_0000021"}; for(FileStatus stat : status) { for(int i = 0; i < expectedDelDelta2.length; i++) { if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) { @@ -506,9 +519,9 @@ public void testCtasTezUnion() throws Exception { Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found on disk", expectedDelDelta2[i]); } //run Major compaction - runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'major'"); + runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'major'", confForTez); TestTxnCommands2.runWorker(hiveConf); - rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID", confForTez); LOG.warn("after compact major:"); for (String s : rs) { LOG.warn(s); @@ -517,7 +530,178 @@ public void testCtasTezUnion() throws Exception { for(int i = 0; i < expected2.length; i++) { Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); //everything is now in base/ - Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000020/bucket_00000")); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000021/bucket_00000")); + } + } + /** + * 1. Insert into regular unbucketed table from Union all - union is removed and data is placed in + * subdirs of target table. + * 2. convert to acid table and check data + * 3. compact and check data + * Compare with {@link #testAcidInsertWithRemoveUnion()} where T is transactional=true + */ + @Test + public void testInsertWithRemoveUnion() throws Exception { + int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}}; + HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + setupTez(confForTez); + runStatementOnDriver("drop table if exists T", confForTez); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='false')", confForTez); + /* +ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505502329802/warehouse/t/.hive-staging_hive_2017-09-15_12-07-33_224_7717909516029836949-1/ +/Users/ekoifman/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505502329802/warehouse/t/.hive-staging_hive_2017-09-15_12-07-33_224_7717909516029836949-1/ +└── -ext-10000 + ├── HIVE_UNION_SUBDIR_1 + │   └── 000000_0 + ├── HIVE_UNION_SUBDIR_2 + │   └── 000000_0 + └── HIVE_UNION_SUBDIR_3 + └── 000000_0 + +4 directories, 3 files + */ + runStatementOnDriver("insert into T(a,b) select a, b from " + Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + Table.ACIDTBL + " where a >= 9", confForTez); + List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME", confForTez); + LOG.warn(testName.getMethodName() + ": before converting to acid"); + for(String s : rs) { + LOG.warn(s); + } + String[][] expected = { + {"1\t2","warehouse/t/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"3\t4","warehouse/t/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"5\t6","warehouse/t/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"7\t8","warehouse/t/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"9\t10","warehouse/t/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "3/000000_0"} + }; + Assert.assertEquals("Unexpected row count after conversion", expected.length, rs.size()); + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); + } + //make the table ACID + runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional'='true')", confForTez); + rs = runStatementOnDriver("select a,b from T order by a, b", confForTez); + Assert.assertEquals("After to Acid conversion", TestTxnCommands2.stringifyValues(values), rs); + + //run Major compaction + runStatementOnDriver("alter table T compact 'major'", confForTez); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID", confForTez); + LOG.warn(testName.getMethodName() + ": after compact major of T:"); + for (String s : rs) { + LOG.warn(s); + } + String[][] expected2 = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "warehouse/t/base_-9223372036854775808/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "warehouse/t/base_-9223372036854775808/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "warehouse/t/base_-9223372036854775808/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t7\t8", "warehouse/t/base_-9223372036854775808/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10", "warehouse/t/base_-9223372036854775808/bucket_00000"} + }; + Assert.assertEquals("Unexpected row count after major compact", expected2.length, rs.size()); + for(int i = 0; i < expected2.length; i++) { + Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); + Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected2[i][1])); + } + } + /** + * 1. Insert into unbucketed acid table from Union all - union is removed and data is placed in + * subdirs of target table. + * 2. convert to acid table and check data + * 3. compact and check data + * Compare with {@link #testInsertWithRemoveUnion()} where T is transactional=false + */ + @Test + public void testAcidInsertWithRemoveUnion() throws Exception { + HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + setupTez(confForTez); + int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}}; + runStatementOnDriver("drop table if exists T", confForTez); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='true')", confForTez); + /*On Tez, below (T is transactional), we get the following layout +ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505500035574/warehouse/t/.hive-staging_hive_2017-09-15_11-28-33_960_9111484239090506828-1/ +/Users/ekoifman/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505500035574/warehouse/t/.hive-staging_hive_2017-09-15_11-28-33_960_9111484239090506828-1/ +└── -ext-10000 + ├── HIVE_UNION_SUBDIR_1 + │   └── 000000_0 + │   ├── _orc_acid_version + │   └── delta_0000019_0000019_0001 + │   └── bucket_00000 + ├── HIVE_UNION_SUBDIR_2 + │   └── 000000_0 + │   ├── _orc_acid_version + │   └── delta_0000019_0000019_0002 + │   └── bucket_00000 + └── HIVE_UNION_SUBDIR_3 + └── 000000_0 + ├── _orc_acid_version + └── delta_0000019_0000019_0003 + └── bucket_00000 + +10 directories, 6 files */ + runStatementOnDriver("insert into T(a,b) select a, b from " + Table.ACIDTBL + " where a between 1 and 3 union all select a, b from " + Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + Table.ACIDTBL + " where a >= 9", confForTez); + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b", confForTez); + LOG.warn(testName.getMethodName() + ": reading acid table T"); + for(String s : rs) { + LOG.warn(s); + } + + String[][] expected2 = { + {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000019_0000019_0001/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "warehouse/t/delta_0000019_0000019_0001/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "warehouse/t/delta_0000019_0000019_0002/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "warehouse/t/delta_0000019_0000019_0002/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "warehouse/t/delta_0000019_0000019_0003/bucket_00000"} + }; + Assert.assertEquals("Unexpected row count", expected2.length, rs.size()); + for(int i = 0; i < expected2.length; i++) { + Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); + Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected2[i][1])); + } + } + @Test + public void testBucketedAcidInsertWithRemoveUnion() throws Exception { + HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + setupTez(confForTez); + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; + runStatementOnDriver("delete from " + Table.ACIDTBL, confForTez); + runStatementOnDriver("insert into " + Table.ACIDTBL + TestTxnCommands2.makeValuesClause(values));//make sure both buckets are not empty + runStatementOnDriver("drop table if exists T", confForTez); + /* + With bucketed target table Union All is not removed + + ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505510130462/warehouse/t/.hive-staging_hive_2017-09-15_14-16-32_422_4626314315862498838-1/ +/Users/ekoifman/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505510130462/warehouse/t/.hive-staging_hive_2017-09-15_14-16-32_422_4626314315862498838-1/ +└── -ext-10000 + ├── 000000_0 + │   ├── _orc_acid_version + │   └── delta_0000021_0000021_0000 + │   └── bucket_00000 + └── 000001_0 + ├── _orc_acid_version + └── delta_0000021_0000021_0000 + └── bucket_00001 + +5 directories, 4 files +*/ + runStatementOnDriver("create table T (a int, b int) clustered by (a) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true')", confForTez); + runStatementOnDriver("insert into T(a,b) select a, b from " + Table.ACIDTBL + " where a between 1 and 3 union all select a, b from " + Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + Table.ACIDTBL + " where a >= 9", confForTez); + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b", confForTez); + LOG.warn(testName.getMethodName() + ": reading bucketed acid table T"); + for(String s : rs) { + LOG.warn(s); + } + String[][] expected2 = { + {"{\"transactionid\":21,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000021_0000021_0000/bucket_00001"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t2\t4", "warehouse/t/delta_0000021_0000021_0000/bucket_00000"}, + {"{\"transactionid\":21,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/delta_0000021_0000021_0000/bucket_00001"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t6\t8", "warehouse/t/delta_0000021_0000021_0000/bucket_00000"}, + {"{\"transactionid\":21,\"bucketid\":536936448,\"rowid\":2}\t9\t10", "warehouse/t/delta_0000021_0000021_0000/bucket_00001"} + }; + Assert.assertEquals("Unexpected row count", expected2.length, rs.size()); + for(int i = 0; i < expected2.length; i++) { + Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); + Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected2[i][1])); } } // Ideally test like this should be a qfile test. However, the explain output from qfile is always @@ -613,6 +797,7 @@ private void setupMapJoin(HiveConf conf) { private List runStatementOnDriver(String stmt, HiveConf conf) throws Exception { Driver driver = new Driver(conf); + driver.setMaxRows(10000); CommandProcessorResponse cpr = driver.run(stmt); if(cpr.getResponseCode() != 0) { throw new RuntimeException(stmt + " failed: " + cpr); diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4e7c80f184..a3bbf00042 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1199,7 +1199,11 @@ private int acquireLocks() { } // Set the transaction id in all of the acid file sinks if (haveAcidWrite()) { - for (FileSinkDesc desc : plan.getAcidSinks()) { + List acidSinks = new ArrayList<>(plan.getAcidSinks()); + //sorting makes tests easier to write since file names and ROW__IDs depend on statementId + //so this makes (file name -> data) mapping stable + acidSinks.sort((FileSinkDesc fsd1, FileSinkDesc fsd2) -> fsd1.getDirName().compareTo(fsd2.getDirName())); + for (FileSinkDesc desc : acidSinks) { desc.setTransactionId(txnMgr.getCurrentTxnId()); //it's possible to have > 1 FileSink writing to the same table/partition //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index cde2805142..a1e4f96628 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -269,9 +269,18 @@ public int execute(DriverContext driverContext) { if (lfd != null) { Path targetPath = lfd.getTargetDir(); Path sourcePath = lfd.getSourcePath(); - moveFile(sourcePath, targetPath, lfd.getIsDfsDir()); + if(lfd.getWriteType() == AcidUtils.Operation.INSERT) { + //'targetPath' is table root of un-partitioned table/partition + //'sourcePath' result of 'select ...' part of CTAS statement + assert lfd.getIsDfsDir(); + FileSystem srcFs = sourcePath.getFileSystem(conf); + List newFiles = new ArrayList<>(); + Hive.moveAcidFiles(srcFs, srcFs.globStatus(sourcePath), targetPath, newFiles); + } + else { + moveFile(sourcePath, targetPath, lfd.getIsDfsDir()); + } } - // Multi-file load is for dynamic partitions when some partitions do not // need to merge and they can simply be moved to the target directory. LoadMultiFilesDesc lmfd = work.getLoadMultiFilesWork(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index fa0ba63c25..c64bc8c72c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.CreateTableDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; @@ -1142,6 +1144,16 @@ public static boolean isAcidTable(Table table) { return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } + public static boolean isAcidTable(CreateTableDesc table) { + if (table == null || table.getTblProps() == null) { + return false; + } + String tableIsTransactional = table.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (tableIsTransactional == null) { + tableIsTransactional = table.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + } + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); + } /** * Sets the acidOperationalProperties in the configuration object argument. diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 9f98b69b18..54a31b7016 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -130,6 +130,7 @@ import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.FunctionTask; import org.apache.hadoop.hive.ql.exec.FunctionUtils; @@ -3469,12 +3470,14 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, } } - private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, + public static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, List newFiles) throws HiveException { // The layout for ACID files is table|partname/base|delta|delete_delta/bucket - // We will always only be writing delta files. In the buckets created by FileSinkOperator - // it will look like bucket/delta|delete_delta/bucket. So we need to move that into - // the above structure. For the first mover there will be no delta directory, + // We will always only be writing delta files ( except IOW which writes base_X/ ). + // In the buckets created by FileSinkOperator + // it will look like original_bucket/delta|delete_delta/bucket + // (e.g. .../-ext-10004/000000_0/delta_0000014_0000014_0000/bucket_00000). So we need to + // move that into the above structure. For the first mover there will be no delta directory, // so we can move the whole directory. // For everyone else we will need to just move the buckets under the existing delta // directory. @@ -3489,6 +3492,30 @@ private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, FileStatus[] origBucketStats = null; try { origBucketStats = fs.listStatus(srcPath, AcidUtils.originalBucketFilter); + if(origBucketStats == null || origBucketStats.length == 0) { + /* + check if we are dealing with data produced by a Union All query which looks like + └── -ext-10000 + ├── HIVE_UNION_SUBDIR_1 + │   └── 000000_0 + │   ├── _orc_acid_version + │   └── delta_0000019_0000019_0001 + │   └── bucket_00000 + ├── HIVE_UNION_SUBDIR_2 + │   └── 000000_0 + │   ├── _orc_acid_version + │   └── delta_0000019_0000019_0002 + │   └── bucket_00000 + The assumption is that we either have all data in subdirs or root of srcPath but not both + For Union case, we expect delta dirs to have unique names + */ + FileStatus[] unionSubdirs = fs.globStatus(new Path(srcPath, AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "[0-9]*")); + List buckets = new ArrayList<>(); + for(FileStatus unionSubdir : unionSubdirs) { + Collections.addAll(buckets, fs.listStatus(unionSubdir.getPath(), AcidUtils.originalBucketFilter)); + } + origBucketStats = buckets.toArray(new FileStatus[buckets.size()]); + } } catch (IOException e) { String msg = "Unable to look for bucket files in src path " + srcPath.toUri().toString(); LOG.error(msg); @@ -3502,7 +3529,7 @@ private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, fs, dst, origBucketPath, createdDeltaDirs, newFiles); moveAcidFiles(AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.deleteEventDeltaDirFilter, fs, dst,origBucketPath, createdDeltaDirs, newFiles); - moveAcidFiles(AcidUtils.BASE_PREFIX, AcidUtils.baseFileFilter, + moveAcidFiles(AcidUtils.BASE_PREFIX, AcidUtils.baseFileFilter,//for Insert Overwrite fs, dst, origBucketPath, createdDeltaDirs, newFiles); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java new file mode 100644 index 0000000000..04df427a82 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java @@ -0,0 +1,83 @@ +package org.apache.hadoop.hive.ql.optimizer; + +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.parse.GenTezProcContext; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Set; + +/** + * Finds Acid FileSinkDesc objects which can be created in the physical (disconnected) plan, e.g. + * {@link org.apache.hadoop.hive.ql.parse.GenTezUtils#removeUnionOperators(GenTezProcContext, BaseWork, int)} + * so that statementId can be properly assigned to ensure unique ROW__IDs + */ +public class QueryPlanPostProcessor { + private static final Logger LOG = LoggerFactory.getLogger(QueryPlanPostProcessor.class); + + public QueryPlanPostProcessor(List> rootTasks, Set acidSinks) { + for(Task t : rootTasks) { + //Work + Object work = t.getWork(); + if(work instanceof TezWork) { + for(BaseWork bw : ((TezWork)work).getAllWorkUnsorted()) { + collectFileSinkDescs(bw.getAllLeafOperators(), acidSinks); + } + } + else if(work instanceof BaseWork) { + collectFileSinkDescs(((BaseWork)work).getAllLeafOperators(), acidSinks); + } + else if(work instanceof MapredWork) { + MapredWork w = (MapredWork)work; + if(w.getMapWork() != null) { + collectFileSinkDescs(w.getMapWork().getAllLeafOperators(), acidSinks); + } + if(w.getReduceWork() != null) { + collectFileSinkDescs(w.getReduceWork().getAllLeafOperators(), acidSinks); + } + } + else if(work instanceof SparkWork) { + for(BaseWork bw : ((SparkWork)work).getRoots()) { + collectFileSinkDescs(bw.getAllLeafOperators(), acidSinks); + } + } + else if(work instanceof MapredLocalWork) { + Set fileSinkOperatorSet = OperatorUtils.findOperators(((MapredLocalWork)work).getAliasToWork().values(), FileSinkOperator.class); + for(FileSinkOperator fsop : fileSinkOperatorSet) { + collectFileSinkDescs(fsop, acidSinks); + } + } + else { + throw new IllegalArgumentException("Unexpected Work object: " + work.getClass()); + } + } + } + private void collectFileSinkDescs(Operator leaf, Set acidSinks) { + if(leaf instanceof FileSinkOperator) { + FileSinkDesc fsd = ((FileSinkOperator) leaf).getConf(); + if(fsd.getWriteType() != AcidUtils.Operation.NOT_ACID) { + if(acidSinks.add(fsd)) { + if(LOG.isDebugEnabled()) { + LOG.debug("Found Acid Sink: " + fsd.getDirName()); + } + } + } + } + } + private void collectFileSinkDescs(Set> leaves, Set acidSinks) { + for(Operator leaf : leaves) { + collectFileSinkDescs(leaf, acidSinks); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java index 2a7f3d4f03..2fbadd3a31 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java @@ -24,6 +24,7 @@ import java.util.Stack; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -219,7 +220,7 @@ private void pushOperatorsAboveUnion(UnionOperator union, for (Operator parent : parents) { FileSinkDesc fileSinkDesc = (FileSinkDesc) fileSinkOp.getConf().clone(); - fileSinkDesc.setDirName(new Path(parentDirName, parent.getIdentifier())); + fileSinkDesc.setDirName(new Path(parentDirName, AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + parent.getIdentifier())); fileSinkDesc.setLinkedFileSink(true); fileSinkDesc.setParentDir(parentDirName); parent.setChildOperators(null); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 1c74779dec..6f1c9ea8d0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -131,6 +131,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.Optimizer; +import org.apache.hadoop.hive.ql.optimizer.QueryPlanPostProcessor; import org.apache.hadoop.hive.ql.optimizer.Transform; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature; @@ -7129,6 +7130,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) viewDesc.setSchema(new ArrayList(field_schemas)); } + destTableIsAcid = tblDesc != null && AcidUtils.isAcidTable(tblDesc); + boolean isDestTempFile = true; if (!ctx.isMRTmpFileURI(dest_path.toUri().toString())) { idToTableNameMap.put(String.valueOf(destTableId), dest_path.toUri().toString()); @@ -7139,8 +7142,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE); loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, queryTmpdir, dest_path, isDfsDir, cols, - colTypes)); - + colTypes, destTableIsAcid ? Operation.INSERT : Operation.NOT_ACID)); +//todo: what about WriteEntity.writeType? does it matter here? We could lock the DB of the new table so that it doesn't get dropped.... if (tblDesc == null) { if (viewDesc != null) { table_desc = PlanUtils.getTableDesc(viewDesc, cols, colTypes); @@ -7171,7 +7174,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) } else { table_desc = PlanUtils.getTableDesc(tblDesc, cols, colTypes); } - +//todo: should this WriteEntity have writeType for locks acquisition? if (!outputs.add(new WriteEntity(dest_path, !isDfsDir, isDestTempFile))) { throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES .getMsg(dest_path.toUri().toString())); @@ -7248,10 +7251,22 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) (deleting(dest) ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT); fileSinkDesc.setWriteType(wt); - String destTableFullName = dest_tab.getCompleteName().replace('@', '.'); - Map iowMap = qb.getParseInfo().getInsertOverwriteTables(); - if (iowMap.containsKey(destTableFullName)) { - fileSinkDesc.setInsertOverwrite(true); + switch (dest_type) { + case QBMetaData.DEST_PARTITION: + //fall through + case QBMetaData.DEST_TABLE: + //INSERT [OVERWRITE] path + String destTableFullName = dest_tab.getCompleteName().replace('@', '.'); + Map iowMap = qb.getParseInfo().getInsertOverwriteTables(); + if (iowMap.containsKey(destTableFullName)) { + fileSinkDesc.setInsertOverwrite(true); + } + break; + case QBMetaData.DEST_DFS_FILE: + //CTAS path + break; + default: + throw new IllegalStateException("Unexpected dest_type=" + dest_tab); } acidFileSinks.add(fileSinkDesc); } @@ -11497,6 +11512,8 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce compiler.compile(pCtx, rootTasks, inputs, outputs); fetchTask = pCtx.getFetchTask(); } + //find all Acid FileSinkOperatorS + QueryPlanPostProcessor qp = new QueryPlanPostProcessor(rootTasks, acidFileSinks); LOG.info("Completed plan generation"); // 10. put accessed columns to readEntity diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java index 03202fb7f5..d451413237 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java @@ -21,7 +21,7 @@ import java.io.Serializable; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.PTFUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils; /** * LoadFileDesc. @@ -36,6 +36,14 @@ private String columnTypes; private String destinationCreateTable; + //todo: push up to parent? (and LoadTableDesc) + // Need to remember whether this is an acid compliant operation, and if so whether it is an + // insert, update, or delete. + private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID; + public AcidUtils.Operation getWriteType() { + return writeType; + } + public LoadFileDesc() { } @@ -51,7 +59,7 @@ public LoadFileDesc(final LoadFileDesc o) { public LoadFileDesc(final CreateTableDesc createTableDesc, final CreateViewDesc createViewDesc, final Path sourcePath, final Path targetDir, final boolean isDfsDir, - final String columns, final String columnTypes) { + final String columns, final String columnTypes, AcidUtils.Operation writeType) { this(sourcePath, targetDir, isDfsDir, columns, columnTypes); if (createTableDesc != null && createTableDesc.getDatabaseName() != null && createTableDesc.getTableName() != null) { @@ -63,6 +71,7 @@ public LoadFileDesc(final CreateTableDesc createTableDesc, final CreateViewDesc // qualified. destinationCreateTable = createViewDesc.getViewName(); } + this.writeType = writeType; } public LoadFileDesc(final Path sourcePath, final Path targetDir, diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index b695f0f809..bbed9be82e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -22,12 +22,12 @@ public class TezEdgeProperty { - public enum EdgeType { - SIMPLE_EDGE, + public enum EdgeType {//todo: HIVE-15549 + SIMPLE_EDGE,//SORT_PARTITION_EDGE BROADCAST_EDGE, - CONTAINS, - CUSTOM_EDGE, - CUSTOM_SIMPLE_EDGE, + CONTAINS,//used for union (all?) + CUSTOM_EDGE,//CO_PARTITION_EDGE + CUSTOM_SIMPLE_EDGE,//PARTITION_EDGE ONE_TO_ONE_EDGE } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index 1f0c269f5e..d2a8c74595 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -5,7 +5,10 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,6 +23,8 @@ File.separator + TestTxnNoBuckets.class.getCanonicalName() + "-" + System.currentTimeMillis() ).getPath().replaceAll("\\\\", "/"); + @Rule + public TestName testName = new TestName(); @Override String getTestDataDir() { return TEST_DATA_DIR; @@ -65,17 +70,7 @@ public void testNoBuckets() throws Exception { Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001")); Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t")); Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001")); - /*todo: WTF? - RS for update seems to spray randomly... is that OK? maybe as long as all resultant files have different names... will they? - Assuming we name them based on taskId, we should create bucketX and bucketY. - we delete events can be written to bucketX file it could be useful for filter delete for a split by file name since the insert - events seem to be written to a proper bucketX file. In fact this may reduce the number of changes elsewhere like compactor... maybe - But this limits the parallelism - what is worse, you don't know what the parallelism should be until you have a list of all the - input files since bucket count is no longer a metadata property. Also, with late Update split, the file name has already been determined - from taskId so the Insert part won't end up matching the bucketX property necessarily. - With early Update split, the Insert can still be an insert - i.e. go to appropriate bucketX. But deletes will still go wherever (random shuffle) - unless you know all the bucketX files to be read - may not be worth the trouble. - * 2nd: something in FS fails. ArrayIndexOutOfBoundsException: 1 at FileSinkOperator.process(FileSinkOperator.java:779)*/ + runStatementOnDriver("update nobuckets set c3 = 17 where c3 in(0,1)"); rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID"); LOG.warn("after update"); @@ -154,7 +149,7 @@ public void testNoBuckets() throws Exception { /** * all of these pass but don't do exactly the right thing * files land as if it's not an acid table "warehouse/myctas4/000000_0" - * even though in {@link org.apache.hadoop.hive.metastore.TransactionalValidationListener} fires + * even though {@link org.apache.hadoop.hive.metastore.TransactionalValidationListener} fires * and sees it as transactional table * look for QB.isCTAS() and CreateTableDesc() in SemanticAnalyzer * @@ -169,6 +164,7 @@ public void testCTAS() throws Exception { runStatementOnDriver("insert into " + Table.NONACIDORCTBL + makeValuesClause(values)); runStatementOnDriver("create table myctas stored as ORC TBLPROPERTIES ('transactional" + "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL); + //todo: look at the files - make sure CTAS did an Acid write List rs = runStatementOnDriver("select * from myctas order by a, b"); Assert.assertEquals(stringifyValues(values), rs); @@ -191,8 +187,143 @@ public void testCTAS() throws Exception { Assert.assertEquals(stringifyValues(values), rs); } /** + * Insert into unbucketed acid table from union all query + * Union All is flattend so nested subdirs are created and acid move drops them since + * delta dirs have unique names + */ + @Test + public void testInsertToAcidWithUnionRemove() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE, true); + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + d.close(); + d = new Driver(hiveConf); + int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}}; + runStatementOnDriver("insert into " + TxnCommandsBaseForTests.Table.ACIDTBL + makeValuesClause(values));//HIVE-17138: this creates 1 delta_0000013_0000013_0000/bucket_00001 + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='true')"); + /* + So Union All removal kicks in and we get 3 subdirs in staging. +ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505516390532/warehouse/t/.hive-staging_hive_2017-09-15_16-05-06_895_1123322677843388168-1/ +└── -ext-10000 + ├── HIVE_UNION_SUBDIR_19 + │   └── 000000_0 + │   ├── _orc_acid_version + │   └── delta_0000016_0000016_0001 + ├── HIVE_UNION_SUBDIR_20 + │   └── 000000_0 + │   ├── _orc_acid_version + │   └── delta_0000016_0000016_0002 + └── HIVE_UNION_SUBDIR_21 + └── 000000_0 + ├── _orc_acid_version + └── delta_0000016_0000016_0003*/ + runStatementOnDriver("insert into T(a,b) select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a >= 9"); + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"); + LOG.warn(testName.getMethodName() + ": read data"); + for(String s : rs) { + LOG.warn(s); + } + String expected[][] = { + {"{\"transactionid\":16,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000016_0000016_0001/bucket_00000"}, + {"{\"transactionid\":16,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000016_0000016_0001/bucket_00000"}, + {"{\"transactionid\":16,\"bucketid\":536870914,\"rowid\":0}\t7\t8", "/delta_0000016_0000016_0002/bucket_00000"}, + {"{\"transactionid\":16,\"bucketid\":536870914,\"rowid\":1}\t5\t6", "/delta_0000016_0000016_0002/bucket_00000"}, + {"{\"transactionid\":16,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "/delta_0000016_0000016_0003/bucket_00000"}, + }; + Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size()); + //verify data and layout + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); + } + } + + /** + * There seem to be various issues around HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE which is + * false by default (e.g. HIVE-17505). Even if those are fixed, the optimization causes extra + * level of subdirectories created under table/partion - same as CTAS+Tez+Union + * (@link TestAcidOnTez.testCtasTezUnion()}. Acid tables will not handle this. + * (Hive.copyFiles(), Hive.move() don't deal with dirs below table/partition root) + * So if the default for this param is changed this tests will flag it. + * @throws Exception + */ + @Test + public void testUnionRemoveOption() throws Exception { + Assert.assertFalse("This will break Acid", hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE)); + } + + /** + * The idea here is to create a non acid table that was written by multiple writers, i.e. + * unbucketed table that has 000000_0 & 000001_0, for example. Unfortunately this doesn't work + * due to 'merge' logic - see comments in the method + */ + @Ignore + @Test + public void testToAcidConversionMultiBucket() throws Exception { + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(values)); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='false')"); + /*T non-acid + non bucketd - 3 writers are created and then followed by merge to create a single output file + though how the data from union is split between writers is a mystery + (bucketed tables don't do merge) + Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10000/000000_0 [length: 515] +{"a":6,"b":8} +{"a":9,"b":10} +{"a":5,"b":6} +{"a":1,"b":2} +{"a":2,"b":4} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10003/000000_0 [length: 242] +{"a":6,"b":8} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10003/000001_0 [length: 244] +{"a":9,"b":10} +{"a":5,"b":6} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10003/000002_0 [length: 242] +{"a":1,"b":2} +{"a":2,"b":4} + */ + runStatementOnDriver("insert into T(a,b) select a, b from " + Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + Table.ACIDTBL + " where a >= 9"); + List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + LOG.warn("before converting to acid"); + for(String s : rs) { + LOG.warn(s); + } + } + @Test + public void testInsertFromUnion() throws Exception { + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; + runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + makeValuesClause(values)); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='true')");//todo: try with T bucketd + runStatementOnDriver("insert into T(a,b) select a, b from " + Table.NONACIDNONBUCKET + " where a between 1 and 3 group by a, b union all select a, b from " + Table.NONACIDNONBUCKET + " where a between 5 and 7 union all select a, b from " + Table.NONACIDNONBUCKET + " where a >= 9"); + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + LOG.warn("before converting to acid"); + for(String s : rs) { + LOG.warn(s); + } + /* + The number of writers seems to be based on number of MR jobs for the src query. todo check number of FileSinks + warehouse/t/.hive-staging_hive_2017-09-13_08-59-28_141_6304543600372946004-1/-ext-10000/000000_0/delta_0000016_0000016_0000/bucket_00000 [length: 648] + {"operation":0,"originalTransaction":16,"bucket":536870912,"rowId":0,"currentTransaction":16,"row":{"_col0":1,"_col1":2}} + {"operation":0,"originalTransaction":16,"bucket":536870912,"rowId":1,"currentTransaction":16,"row":{"_col0":2,"_col1":4}} + ________________________________________________________________________________________________________________________ + warehouse/t/.hive-staging_hive_2017-09-13_08-59-28_141_6304543600372946004-1/-ext-10000/000001_0/delta_0000016_0000016_0000/bucket_00001 [length: 658] + {"operation":0,"originalTransaction":16,"bucket":536936448,"rowId":0,"currentTransaction":16,"row":{"_col0":5,"_col1":6}} + {"operation":0,"originalTransaction":16,"bucket":536936448,"rowId":1,"currentTransaction":16,"row":{"_col0":6,"_col1":8}} + {"operation":0,"originalTransaction":16,"bucket":536936448,"rowId":2,"currentTransaction":16,"row":{"_col0":9,"_col1":10}} + */ + rs = runStatementOnDriver("select a, b from T order by a, b"); + Assert.assertEquals(stringifyValues(values), rs); + } + /** * see HIVE-16177 - * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()} todo need test with > 1 bucket file + * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()} */ @Test public void testToAcidConversion02() throws Exception {