diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index d0b5cf6047..b0416f3f42 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -88,7 +89,6 @@ public void setUp() throws Exception { hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); @@ -181,12 +181,12 @@ public void testNonStandardConversion01() throws Exception { List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a, b, INPUT__FILE__NAME"); String expected0[][] = { - {"1\t2", "/1/000000_0"}, - {"3\t4", "/1/000000_0"}, - {"5\t6", "/1/000000_0"}, - {"5\t6", "/2/000000_0"}, - {"7\t8", "/2/000000_0"}, - {"9\t10", "/2/000000_0"}, + {"1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"7\t8", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, }; Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size()); //verify data and layout @@ -206,12 +206,12 @@ public void testNonStandardConversion01() throws Exception { /* * Expected result 0th entry i the RecordIdentifier + data. 1st entry file before compact*/ String expected[][] = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, }; Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size()); //verify data and layout @@ -228,9 +228,9 @@ public void testNonStandardConversion01() throws Exception { LOG.warn(s); } String[][] expected2 = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000021_0000021_0000/bucket_00000"} }; Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size()); @@ -306,8 +306,12 @@ public void testNonStandardConversion01() throws Exception { * How to do this? CTAS is the only way to create data files which are not immediate children * of the partition dir. CTAS/Union/Tez doesn't support partition tables. The only way is to copy * data files in directly. + * + * Actually Insert Into ... select ... union all ... with + * HIVE_OPTIMIZE_UNION_REMOVE (and HIVEFETCHTASKCONVERSION="none"?) will create subdirs + */ - @Ignore("HIVE-17214") +// @Ignore("HIVE-17214") @Test public void testNonStandardConversion02() throws Exception { HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf @@ -322,11 +326,11 @@ public void testNonStandardConversion02() throws Exception { List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a, b"); String expected0[][] = { - {"1\t2", "/1/000000_0"}, - {"3\t4", "/1/000000_0"}, - {"5\t6", "/3/000000_0"}, - {"7\t8", "/2/000000_0"}, - {"9\t10", "/2/000000_0"}, + {"1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "3/000000_0"}, + {"7\t8", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, }; Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size()); //verify data and layout @@ -357,11 +361,11 @@ public void testNonStandardConversion02() throws Exception { nonacidpart/ └── p=1 ├── 000000_0 - ├── 1 + ├── HIVE_UNION_SUBDIR__1 │   └── 000000_0 - ├── 2 + ├── HIVE_UNION_SUBDIR_2 │   └── 000000_0 - └── 3 + └── HIVE_UNION_SUBDIR_3 └── 000000_0 4 directories, 4 files @@ -375,11 +379,11 @@ public void testNonStandardConversion02() throws Exception { } String[][] expected = { {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t100\t110\t1", "nonacidpart/p=1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t1", "nonacidpart/p=1/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t3\t4\t1", "nonacidpart/p=1/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10\t1", "nonacidpart/p=1/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8\t1", "nonacidpart/p=1/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6\t1", "nonacidpart/p=1/3/000000_0"} + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t1", "nonacidpart/p=1/HIVE_UNION_SUBDIR_1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t3\t4\t1", "nonacidpart/p=1/HIVE_UNION_SUBDIR_1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10\t1", "nonacidpart/p=1/HIVE_UNION_SUBDIR_2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8\t1", "nonacidpart/p=1/HIVE_UNION_SUBDIR_2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6\t1", "nonacidpart/p=1/HIVE_UNION_SUBDIR_3/000000_0"} }; Assert.assertEquals("Wrong row count", expected.length, rs.size()); //verify data and layout @@ -407,6 +411,7 @@ public void testNonStandardConversion02() throws Exception { /** * CTAS + Tez + Union creates a non-standard layout in table dir * Each leg of the union places data into a subdir of the table/partition. Subdirs are named 1/, 2/, etc + * todo: fix comment * The way this currently works is that CTAS creates an Acid table but the insert statement writes * the data in non-acid layout. Then on read, it's treated like an non-acid to acid conversion. * Longer term CTAS should create acid layout from the get-go. @@ -414,8 +419,15 @@ public void testNonStandardConversion02() throws Exception { @Test public void testCtasTezUnion() throws Exception { HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + confForTez.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); setupTez(confForTez); //CTAS with ACID target table + List rs0 = runStatementOnDriver("explain create table " + Table.ACIDNOBUCKET + " stored as ORC TBLPROPERTIES('transactional'='true') as " + + "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez); + LOG.warn("explain ctas:");//TezEdgeProperty.EdgeType + for (String s : rs0) { + LOG.warn(s); + } runStatementOnDriver("create table " + Table.ACIDNOBUCKET + " stored as ORC TBLPROPERTIES('transactional'='true') as " + "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez); List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID"); @@ -425,14 +437,14 @@ public void testCtasTezUnion() throws Exception { } Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); /* - * Expected result 0th entry i the RecordIdentifier + data. 1st entry file before compact*/ + * Expected result 0th entry is the RecordIdentifier + data. 1st entry file before compact*/ String expected[][] = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":2}\t5\t6", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000018_0000018_0002/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "/delta_0000018_0000018_0002/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":2}\t5\t6", "/delta_0000018_0000018_0002/bucket_00000"}, }; Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size()); //verify data and layout @@ -449,10 +461,10 @@ public void testCtasTezUnion() throws Exception { LOG.warn(s); } String[][] expected2 = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, - {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000019_0000019_0000/bucket_00000"} + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000018_0000018_0002/bucket_00000"}, + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000020_0000020_0000/bucket_00000"} }; Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size()); //verify data and layout @@ -464,7 +476,7 @@ public void testCtasTezUnion() throws Exception { FileSystem fs = FileSystem.get(hiveConf); FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - String[] expectedDelDelta = {"delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000"}; + String[] expectedDelDelta = {"delete_delta_0000020_0000020_0000", "delete_delta_0000021_0000021_0000"}; for(FileStatus stat : status) { for(int i = 0; i < expectedDelDelta.length; i++) { if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) { @@ -493,7 +505,7 @@ public void testCtasTezUnion() throws Exception { //check we have right delete delta files after minor compaction status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - String[] expectedDelDelta2 = { "delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000", "delete_delta_0000019_0000020"}; + String[] expectedDelDelta2 = { "delete_delta_0000020_0000020_0000", "delete_delta_0000021_0000021_0000", "delete_delta_0000018_0000021"}; for(FileStatus stat : status) { for(int i = 0; i < expectedDelDelta2.length; i++) { if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) { @@ -517,8 +529,71 @@ public void testCtasTezUnion() throws Exception { for(int i = 0; i < expected2.length; i++) { Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); //everything is now in base/ - Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000020/bucket_00000")); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000021/bucket_00000")); + } + } + @Test + public void testUnionRemove() throws Exception {//todo HIVE-17505 + HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + setupTez(confForTez); +// hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE, true); +// hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); +// d.close(); +// d = new Driver(hiveConf); + int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}}; + runStatementOnDriver("drop table if exists T", hiveConf); + //todo: 9/12/2017 - this works for both acid and non aicd tables but neither does union removal( or at least no subdirs) - it does + //create 2 writers in both cases but non acid case has a merge stage at the end but not for acid + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='false')", hiveConf);//todo: try with T bucketd + runStatementOnDriver("insert into T(a,b) select a, b from " + Table.NONACIDORCTBL + " where a between 1 and 3 group by a, b union all select a, b from " + Table.NONACIDORCTBL + " where a between 5 and 7 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 9", hiveConf); + List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME", hiveConf); + LOG.warn("before converting to acid"); + for(String s : rs) { + LOG.warn(s); + } + //todo: test file names, etc + rs = runStatementOnDriver("select * from T order by a, b", hiveConf); + Assert.assertEquals("", TestTxnCommands2.stringifyValues(values), rs); + } + //@Ignore("HIVE-17505") + @Test + public void testToAcidConversionMultiBucket() throws Exception {//todo HIVE-17505 + HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + setupTez(confForTez); + runStatementOnDriver("drop table if exists T", hiveConf); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='false')", hiveConf);//todo: try with T bucketd + /* + with T non-acid, see HIVE-17505: we get data in subdirs (i.e. union is removed) but it's not moved to T + With T acid, we end up writing each leg to a different delta (by suffix) though this may be due to transient changes in HIVE-15899.03.patch + */ + runStatementOnDriver("insert into T(a,b) select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a >= 9", hiveConf); + List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME", hiveConf); + LOG.warn("before converting to acid"); + for(String s : rs) { + LOG.warn(s); + } + Assert.assertEquals(5, rs.size()); + //this duplicates data + } + @Test + public void testToAcidConversionMultiBucket02() throws Exception {//todo HIVE-17505 + HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + setupTez(confForTez); + int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}}; + runStatementOnDriver("drop table if exists T", hiveConf); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='true')", hiveConf);//todo: try with T bucketd + /* + with T non-acid, see HIVE-17505: we get data in subdirs (i.e. union is removed) but it's not moved to T + With T acid, we end up writing each leg to a different delta (by suffix) though this may be due to transient changes in HIVE-15899.03.patch + */ + runStatementOnDriver("insert into T(a,b) select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 1 and 3 union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a >= 9", hiveConf); + List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME", hiveConf); + LOG.warn("before converting to acid"); + for(String s : rs) { + LOG.warn(s); } + Assert.assertEquals(5, rs.size()); + //this duplicates data } // Ideally test like this should be a qfile test. However, the explain output from qfile is always // slightly different depending on where the test is run, specifically due to file size estimation @@ -613,6 +688,7 @@ private void setupMapJoin(HiveConf conf) { private List runStatementOnDriver(String stmt, HiveConf conf) throws Exception { Driver driver = new Driver(conf); + driver.setMaxRows(10000); CommandProcessorResponse cpr = driver.run(stmt); if(cpr.getResponseCode() != 0) { throw new RuntimeException(stmt + " failed: " + cpr); diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 4e7c80f184..a3bbf00042 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1199,7 +1199,11 @@ private int acquireLocks() { } // Set the transaction id in all of the acid file sinks if (haveAcidWrite()) { - for (FileSinkDesc desc : plan.getAcidSinks()) { + List acidSinks = new ArrayList<>(plan.getAcidSinks()); + //sorting makes tests easier to write since file names and ROW__IDs depend on statementId + //so this makes (file name -> data) mapping stable + acidSinks.sort((FileSinkDesc fsd1, FileSinkDesc fsd2) -> fsd1.getDirName().compareTo(fsd2.getDirName())); + for (FileSinkDesc desc : acidSinks) { desc.setTransactionId(txnMgr.getCurrentTxnId()); //it's possible to have > 1 FileSink writing to the same table/partition //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes diff --git ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java index 2ddabd9ec6..b21c4738b9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java +++ ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java @@ -159,7 +159,7 @@ public boolean hasAcidResourcesInQuery() { /** * @return Collection of FileSinkDesc representing writes to Acid resources */ - Set getAcidSinks() { + public Set getAcidSinks() { return acidSinks; } public String getQueryStr() { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index cde2805142..d122cf02ac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -269,9 +270,36 @@ public int execute(DriverContext driverContext) { if (lfd != null) { Path targetPath = lfd.getTargetDir(); Path sourcePath = lfd.getSourcePath(); - moveFile(sourcePath, targetPath, lfd.getIsDfsDir()); + if(lfd.getWriteType() == AcidUtils.Operation.INSERT) { + //'targetPath' is table root of un-partitioned table + //'sourcePath' result of 'select ...' part of CTAS statement + //todo: can sourcePath have > 1 file? currently, CTAS is not supported for + //bucketed tables so number of writers should be same as number of files... + assert lfd.getIsDfsDir(); + FileSystem srcFs = sourcePath.getFileSystem(conf); + List newFiles = new ArrayList<>(); + /* for Tez+Union ctas + acid target sourcePath looks like + -ext-10002 + ├── HIVE_UNION_SUBDIR_1 + │   └── 000000_0 + │   ├── _orc_acid_version + │   └── delta_0000000_0000000 + │   └── bucket_00000 + └── HIVE_UNION_SUBDIR_2 + └── 000000_0 + ├── _orc_acid_version + └── delta_0000000_0000000 + └── bucket_00000 + */ + List sources = new ArrayList<>(); + Collections.addAll(sources, srcFs.globStatus(new Path(sourcePath , AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "[0-9]*"))); + sources.add(srcFs.globStatus(sourcePath)[0]);//todo: perhaps this should do either union or 'simple' but not both + Hive.moveAcidFiles(srcFs, sources.toArray(new FileStatus[sources.size()]), targetPath, newFiles); + } + else { + moveFile(sourcePath, targetPath, lfd.getIsDfsDir()); + } } - // Multi-file load is for dynamic partitions when some partitions do not // need to merge and they can simply be moved to the target directory. LoadMultiFilesDesc lmfd = work.getLoadMultiFilesWork(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index fa0ba63c25..8aa2c4b18f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.CreateTableDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; @@ -1142,6 +1144,26 @@ public static boolean isAcidTable(Table table) { return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } + public static boolean isAcidTable(TableDesc table) { + if (table == null || table.getProperties() == null) { + return false; + } + String tableIsTransactional = table.getProperties().getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (tableIsTransactional == null) { + tableIsTransactional = table.getProperties().getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + } + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); + } + public static boolean isAcidTable(CreateTableDesc table) { + if (table == null || table.getTblProps() == null) { + return false; + } + String tableIsTransactional = table.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (tableIsTransactional == null) { + tableIsTransactional = table.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + } + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); + } /** * Sets the acidOperationalProperties in the configuration object argument. diff --git ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 9f98b69b18..e4836293dc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3463,18 +3463,41 @@ static protected void copyFiles(HiveConf conf, Path srcf, Path destf, // If we're moving files around for an ACID write then the rules and paths are all different. // You can blame this on Owen. if (isAcid) { + if(false) { + List sources = new ArrayList<>(); + Path pathPattern = new Path(srcf, "[0-9]*");//todo: make patter so that it's at least 1 digit? otherwise it seems to pick up foo/0000_0 and foo/00001_0 when using foo/[0-9]*... + try { + /** + * {@link HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE}=true can create a different data + * layout in the table/parition dir: + * T/1/ + * T/2/ + * T/3/.... etc + */ + Collections.addAll(sources, srcFs.globStatus(pathPattern)); + if(!sources.isEmpty()) { + Collections.addAll(sources, srcs); + srcs = sources.toArray(new FileStatus[sources.size()]); + } + } + catch(IOException ex) { + LOG.error("Unable to glob nested dirs: " + pathPattern); + } + } moveAcidFiles(srcFs, srcs, destf, newFiles); } else { copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, newFiles); } } - private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, + public static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, List newFiles) throws HiveException { // The layout for ACID files is table|partname/base|delta|delete_delta/bucket - // We will always only be writing delta files. In the buckets created by FileSinkOperator - // it will look like bucket/delta|delete_delta/bucket. So we need to move that into - // the above structure. For the first mover there will be no delta directory, + // We will always only be writing delta files ( except IOW which writes base_X/ ). + // In the buckets created by FileSinkOperator + // it will look like original_bucket/delta|delete_delta/bucket + // (e.g. .../-ext-10004/000000_0/delta_0000014_0000014_0000/bucket_00000). So we need to + // move that into the above structure. For the first mover there will be no delta directory, // so we can move the whole directory. // For everyone else we will need to just move the buckets under the existing delta // directory. @@ -3502,7 +3525,7 @@ private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, fs, dst, origBucketPath, createdDeltaDirs, newFiles); moveAcidFiles(AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.deleteEventDeltaDirFilter, fs, dst,origBucketPath, createdDeltaDirs, newFiles); - moveAcidFiles(AcidUtils.BASE_PREFIX, AcidUtils.baseFileFilter, + moveAcidFiles(AcidUtils.BASE_PREFIX, AcidUtils.baseFileFilter,//for Insert Overwrite fs, dst, origBucketPath, createdDeltaDirs, newFiles); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java new file mode 100644 index 0000000000..5d5d2fadb8 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java @@ -0,0 +1,183 @@ +package org.apache.hadoop.hive.ql.optimizer; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.lib.Rule; +import org.apache.hadoop.hive.ql.lib.RuleRegExp; +import org.apache.hadoop.hive.ql.parse.GenTezProcContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + +/** + * This runs rules after the final plan is generated. This should not make modifications to + * the plan. + */ +public class QueryPlanPostProcessor { + private ParseContext pctx; + private List transformations = new ArrayList<>(); + private static final Logger LOG = LoggerFactory.getLogger(QueryPlanPostProcessor.class); + + + public QueryPlanPostProcessor(QueryPlan plan) { + this(plan.getRootTasks(), plan.getAcidSinks()); + } + public QueryPlanPostProcessor(List> rootTasks, Set acidSinks) { + for(Task t : rootTasks) { + //Work + Object work = t.getWork(); + if(work instanceof TezWork) { + for(BaseWork bw : ((TezWork)work).getAllWorkUnsorted()) { + collectFileSinkDescs(bw.getAllLeafOperators(), acidSinks); + } + } + else if(work instanceof BaseWork) { + collectFileSinkDescs(((BaseWork)work).getAllLeafOperators(), acidSinks); + } + else if(work instanceof MapredWork) { + MapredWork w = (MapredWork)work; + if(w.getMapWork() != null) { + collectFileSinkDescs(w.getMapWork().getAllLeafOperators(), acidSinks); + } + if(w.getReduceWork() != null) { + collectFileSinkDescs(w.getReduceWork().getAllLeafOperators(), acidSinks); + } + } + else if(work instanceof SparkWork) { + for(BaseWork bw : ((SparkWork)work).getRoots()) { + collectFileSinkDescs(bw.getAllLeafOperators(), acidSinks); + } + } + else if(work instanceof MapredLocalWork) { + Set fileSinkOperatorSet = OperatorUtils.findOperators(((MapredLocalWork)work).getAliasToWork().values(), FileSinkOperator.class); + for(FileSinkOperator fsop : fileSinkOperatorSet) { + collectFileSinkDescs(fsop, acidSinks); + } + } + else { + throw new IllegalArgumentException("Unexpected Work object: " + work.getClass()); + } + } + } + private void collectFileSinkDescs(Operator leaf, Set acidSinks) { + if(leaf instanceof FileSinkOperator) { + FileSinkDesc fsd = ((FileSinkOperator) leaf).getConf(); + if(fsd.getWriteType() != AcidUtils.Operation.NOT_ACID) { + acidSinks.add(fsd); + } + } + } + private void collectFileSinkDescs(Set> leaves, Set acidSinks) { + for(Operator leaf : leaves) { + collectFileSinkDescs(leaf, acidSinks); + } + } + + public QueryPlanPostProcessor(ParseContext pCtx) { + this.pctx = pCtx; + } + public void initialize(HiveConf hiveConf) { + transformations.add(new FileSinkDescCollector()); + } + public ParseContext optimize() throws SemanticException { + for (Transform t : transformations) { + t.beginPerfLogging(); + pctx = t.transform(pctx); + t.endPerfLogging(t.toString()); + } + return pctx; + } + private static class FileSinkDescCollector extends Transform { + @Override + public ParseContext transform(ParseContext pctx) throws SemanticException { + Map opRules = new LinkedHashMap(); + + String FS = FileSinkOperator.getOperatorName() + "%"; + + opRules.put(new RuleRegExp("FileSinkDesc Collector", FS), new MyNodeProcessor(pctx)); + + Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null); + GraphWalker ogw = new DefaultGraphWalker(disp); + + ArrayList topNodes = new ArrayList<>(); + topNodes.addAll(pctx.getTopOps().values()); + ogw.startWalking(topNodes, null); + for(Operator n : pctx.getTopOps().values()) { + Set fsinks = OperatorUtils.findOperators(n, FileSinkOperator.class); + for(FileSinkOperator fsop : fsinks) { + pctx.getAcidSinks().add(fsop.getConf()); + } + } + + return pctx; + } + private class MyNodeProcessor implements NodeProcessor { + + private final Logger LOG = LoggerFactory.getLogger(MyNodeProcessor.class); + protected ParseContext parseCtx; + + private MyNodeProcessor(ParseContext pCtx) { + this.parseCtx = pCtx; + } + + /** + * There are optimizations like removing the UnionAll operator that create new FileSinkOperatorS + * For example, + * {@link org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcFactory.UnionNoProcessFile} + * {@link org.apache.hadoop.hive.ql.parse.GenTezUtils#removeUnionOperators(GenTezProcContext, BaseWork, int)} + * + * So here we need to find all of them doing Acid writes to make sure + */ + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { + FileSinkOperator fsOp = (FileSinkOperator) nd; + if(fsOp.getConf().getWriteType() != AcidUtils.Operation.NOT_ACID) { + parseCtx.getAcidSinks().add(fsOp.getConf()); + } + if(fsOp.getConf().isLinkedFileSink()) { + /* + There is something fishy here: Consider TestTxnNoBuckets.testUnionRemove() + process() does not visit all the FileSinkOperatorS in the plan somehow + The plan must be disconnected? todo: print it out in trasnform()*/ + for(FileSinkDesc fsd : fsOp.getConf().getLinkedFileSinkDesc()) { + if(fsd.getWriteType() != AcidUtils.Operation.NOT_ACID) { + parseCtx.getAcidSinks().add(fsd); + } + } + } + return null; + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 1c74779dec..3d42043029 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryProperties; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; @@ -131,6 +132,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.Optimizer; +import org.apache.hadoop.hive.ql.optimizer.QueryPlanPostProcessor; import org.apache.hadoop.hive.ql.optimizer.Transform; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature; @@ -7129,6 +7131,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) viewDesc.setSchema(new ArrayList(field_schemas)); } + destTableIsAcid = tblDesc != null && AcidUtils.isAcidTable(tblDesc); + boolean isDestTempFile = true; if (!ctx.isMRTmpFileURI(dest_path.toUri().toString())) { idToTableNameMap.put(String.valueOf(destTableId), dest_path.toUri().toString()); @@ -7139,8 +7143,8 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE); loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, queryTmpdir, dest_path, isDfsDir, cols, - colTypes)); - + colTypes, destTableIsAcid ? Operation.INSERT : Operation.NOT_ACID)); +//todo: what about WriteEntity.writeType? does it matter here? We could lock the DB of the new table so that it doesn't get dropped.... if (tblDesc == null) { if (viewDesc != null) { table_desc = PlanUtils.getTableDesc(viewDesc, cols, colTypes); @@ -7171,7 +7175,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) } else { table_desc = PlanUtils.getTableDesc(tblDesc, cols, colTypes); } - +//todo: should this WriteEntity have writeType for locks acquisition? if (!outputs.add(new WriteEntity(dest_path, !isDfsDir, isDestTempFile))) { throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES .getMsg(dest_path.toUri().toString())); @@ -7210,7 +7214,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) } RowSchema fsRS = new RowSchema(vecCol); - +//todo: what does this mean? concat? // The output files of a FileSink can be merged if they are either not being written to a table // or are being written to a table which is not bucketed // and table the table is not sorted @@ -7248,10 +7252,22 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) (deleting(dest) ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT); fileSinkDesc.setWriteType(wt); - String destTableFullName = dest_tab.getCompleteName().replace('@', '.'); - Map iowMap = qb.getParseInfo().getInsertOverwriteTables(); - if (iowMap.containsKey(destTableFullName)) { - fileSinkDesc.setInsertOverwrite(true); + switch (dest_type) { + case QBMetaData.DEST_PARTITION: + //fall through + case QBMetaData.DEST_TABLE: + //INSERT [OVERWRITE] path + String destTableFullName = dest_tab.getCompleteName().replace('@', '.'); + Map iowMap = qb.getParseInfo().getInsertOverwriteTables(); + if (iowMap.containsKey(destTableFullName)) { + fileSinkDesc.setInsertOverwrite(true); + } + break; + case QBMetaData.DEST_DFS_FILE: + //CTAS path + break; + default: + throw new IllegalStateException("Unexpected dest_type=" + dest_tab); } acidFileSinks.add(fileSinkDesc); } @@ -11497,6 +11513,7 @@ void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticExce compiler.compile(pCtx, rootTasks, inputs, outputs); fetchTask = pCtx.getFetchTask(); } + QueryPlanPostProcessor qp = new QueryPlanPostProcessor(rootTasks, acidFileSinks); LOG.info("Completed plan generation"); // 10. put accessed columns to readEntity diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index 15836ecf48..01580076ff 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -559,6 +559,7 @@ protected void generateTaskTree(List> rootTasks, Pa for (BaseWork w: procCtx.workWithUnionOperators) { GenTezUtils.removeUnionOperators(procCtx, w, indexForTezUnion++); } +// pCtx.getAcidSinks().addAll(procCtx.parseContext.getAcidSinks()); // then we make sure the file sink operators are set up right for (FileSinkOperator fileSink: procCtx.fileSinkSet) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java index 03202fb7f5..d451413237 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java @@ -21,7 +21,7 @@ import java.io.Serializable; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.PTFUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils; /** * LoadFileDesc. @@ -36,6 +36,14 @@ private String columnTypes; private String destinationCreateTable; + //todo: push up to parent? (and LoadTableDesc) + // Need to remember whether this is an acid compliant operation, and if so whether it is an + // insert, update, or delete. + private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID; + public AcidUtils.Operation getWriteType() { + return writeType; + } + public LoadFileDesc() { } @@ -51,7 +59,7 @@ public LoadFileDesc(final LoadFileDesc o) { public LoadFileDesc(final CreateTableDesc createTableDesc, final CreateViewDesc createViewDesc, final Path sourcePath, final Path targetDir, final boolean isDfsDir, - final String columns, final String columnTypes) { + final String columns, final String columnTypes, AcidUtils.Operation writeType) { this(sourcePath, targetDir, isDfsDir, columns, columnTypes); if (createTableDesc != null && createTableDesc.getDatabaseName() != null && createTableDesc.getTableName() != null) { @@ -63,6 +71,7 @@ public LoadFileDesc(final CreateTableDesc createTableDesc, final CreateViewDesc // qualified. destinationCreateTable = createViewDesc.getViewName(); } + this.writeType = writeType; } public LoadFileDesc(final Path sourcePath, final Path targetDir, diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index b695f0f809..bbed9be82e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -22,12 +22,12 @@ public class TezEdgeProperty { - public enum EdgeType { - SIMPLE_EDGE, + public enum EdgeType {//todo: HIVE-15549 + SIMPLE_EDGE,//SORT_PARTITION_EDGE BROADCAST_EDGE, - CONTAINS, - CUSTOM_EDGE, - CUSTOM_SIMPLE_EDGE, + CONTAINS,//used for union (all?) + CUSTOM_EDGE,//CO_PARTITION_EDGE + CUSTOM_SIMPLE_EDGE,//PARTITION_EDGE ONE_TO_ONE_EDGE } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index 1f0c269f5e..e1ef32da05 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -5,6 +5,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +66,7 @@ public void testNoBuckets() throws Exception { Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001")); Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t")); Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001")); - /*todo: WTF? + /*todo: WTF? Fix comment RS for update seems to spray randomly... is that OK? maybe as long as all resultant files have different names... will they? Assuming we name them based on taskId, we should create bucketX and bucketY. we delete events can be written to bucketX file it could be useful for filter delete for a split by file name since the insert @@ -154,7 +155,7 @@ public void testNoBuckets() throws Exception { /** * all of these pass but don't do exactly the right thing * files land as if it's not an acid table "warehouse/myctas4/000000_0" - * even though in {@link org.apache.hadoop.hive.metastore.TransactionalValidationListener} fires + * even though {@link org.apache.hadoop.hive.metastore.TransactionalValidationListener} fires * and sees it as transactional table * look for QB.isCTAS() and CreateTableDesc() in SemanticAnalyzer * @@ -169,6 +170,7 @@ public void testCTAS() throws Exception { runStatementOnDriver("insert into " + Table.NONACIDORCTBL + makeValuesClause(values)); runStatementOnDriver("create table myctas stored as ORC TBLPROPERTIES ('transactional" + "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL); + //todo: look at the files - make sure CTAS did an Acid write List rs = runStatementOnDriver("select * from myctas order by a, b"); Assert.assertEquals(stringifyValues(values), rs); @@ -190,9 +192,118 @@ public void testCTAS() throws Exception { rs = runStatementOnDriver("select * from myctas4 order by a, b"); Assert.assertEquals(stringifyValues(values), rs); } + @Ignore("HIVE-17505") + @Test + public void testUnionRemove() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE, true); + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + d.close(); + d = new Driver(hiveConf); + int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}}; + runStatementOnDriver("insert into " + TxnCommandsBaseForTests.Table.ACIDTBL + makeValuesClause(values));//HIVE-17138: this creates 1 delta_0000013_0000013_0000/bucket_00001 + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='true')"); + /* + with T non-acid, see HIVE-17505: we get data in subdirs (i.e. union is removed) but it's not moved to T + With T acid, we end up writing each leg to a different delta (by suffix) though this may be due to transient changes in HIVE-15899.03.patch + */ + runStatementOnDriver("insert into T(a,b) select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a >= 9"); + List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + LOG.warn("before converting to acid"); + for(String s : rs) { + LOG.warn(s); + } + //todo: test file names, etc + rs = runStatementOnDriver("select * from T order by a, b"); + Assert.assertEquals("", TestTxnCommands2.stringifyValues(values), rs); + } + + /** + * There seem to be various issues around HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE which is + * false by default (e.g. HIVE-17505). Even if those are fixed, the optimization causes extra + * level of subdirectories created under table/partion - same as CTAS+Tez+Union + * (@link TestAcidOnTez.testCtasTezUnion()}. Acid tables will not handle this. + * (Hive.copyFiles(), Hive.move() don't deal with dirs below table/partition root) + * So if the default for this param is changed this tests will flag it. + * @throws Exception + */ + @Test + public void testUnionRemoveOption() throws Exception { + Assert.assertFalse("This will break Acid", hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE)); + } + + /** + * The idea here is to create a non acid table that was written by multiple writers, i.e. + * unbucketed table that has 000000_0 & 000001_0, for example. Unfortunately this doesn't work + * due to 'merge' logic - see comments in the method + */ + @Ignore + @Test + public void testToAcidConversionMultiBucket() throws Exception { + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(values)); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='false')"); + /*T non-acid + non bucketd - 3 writers are created and then followed by merge to create a single output file + though how the data from union is split between writers is a mystery + (bucketed tables don't do merge) + Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10000/000000_0 [length: 515] +{"a":6,"b":8} +{"a":9,"b":10} +{"a":5,"b":6} +{"a":1,"b":2} +{"a":2,"b":4} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10003/000000_0 [length: 242] +{"a":6,"b":8} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10003/000001_0 [length: 244] +{"a":9,"b":10} +{"a":5,"b":6} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10003/000002_0 [length: 242] +{"a":1,"b":2} +{"a":2,"b":4} + */ + runStatementOnDriver("insert into T(a,b) select a, b from " + Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + Table.ACIDTBL + " where a >= 9"); + List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + LOG.warn("before converting to acid"); + for(String s : rs) { + LOG.warn(s); + } + } + @Test + public void testInsertFromUnion() throws Exception { + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; + runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + makeValuesClause(values)); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='true')");//todo: try with T bucketd + runStatementOnDriver("insert into T(a,b) select a, b from " + Table.NONACIDNONBUCKET + " where a between 1 and 3 group by a, b union all select a, b from " + Table.NONACIDNONBUCKET + " where a between 5 and 7 union all select a, b from " + Table.NONACIDNONBUCKET + " where a >= 9"); + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + LOG.warn("before converting to acid"); + for(String s : rs) { + LOG.warn(s); + } + /* + The number of writers seems to be based on number of MR jobs for the src query. todo check number of FileSinks + warehouse/t/.hive-staging_hive_2017-09-13_08-59-28_141_6304543600372946004-1/-ext-10000/000000_0/delta_0000016_0000016_0000/bucket_00000 [length: 648] + {"operation":0,"originalTransaction":16,"bucket":536870912,"rowId":0,"currentTransaction":16,"row":{"_col0":1,"_col1":2}} + {"operation":0,"originalTransaction":16,"bucket":536870912,"rowId":1,"currentTransaction":16,"row":{"_col0":2,"_col1":4}} + ________________________________________________________________________________________________________________________ + warehouse/t/.hive-staging_hive_2017-09-13_08-59-28_141_6304543600372946004-1/-ext-10000/000001_0/delta_0000016_0000016_0000/bucket_00001 [length: 658] + {"operation":0,"originalTransaction":16,"bucket":536936448,"rowId":0,"currentTransaction":16,"row":{"_col0":5,"_col1":6}} + {"operation":0,"originalTransaction":16,"bucket":536936448,"rowId":1,"currentTransaction":16,"row":{"_col0":6,"_col1":8}} + {"operation":0,"originalTransaction":16,"bucket":536936448,"rowId":2,"currentTransaction":16,"row":{"_col0":9,"_col1":10}} + */ + rs = runStatementOnDriver("select a, b from T order by a, b"); + Assert.assertEquals(stringifyValues(values), rs); + } /** * see HIVE-16177 - * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()} todo need test with > 1 bucket file + * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()} */ @Test public void testToAcidConversion02() throws Exception {