diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index 880329dee8..5cb78a7b25 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -1,11 +1,11 @@ package org.apache.hadoop.hive.ql; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -252,46 +252,125 @@ private void checkExpected(List rs, String[][] expected, String msg) { } /** * The idea here is to create a non acid table that was written by multiple writers, i.e. - * unbucketed table that has 000000_0 & 000001_0, for example. Unfortunately this doesn't work - * due to 'merge' logic - see comments in the method + * unbucketed table that has 000000_0 & 000001_0, for example. + * Also, checks that we can handle a case when data files can be at multiple levels (subdirs) + * of the table. */ - @Ignore @Test public void testToAcidConversionMultiBucket() throws Exception { + //need to disable these so that automatic merge doesn't merge the files + hiveConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPFILES, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVEMERGEMAPREDFILES, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false); + d.close(); + d = new Driver(hiveConf); + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(values)); runStatementOnDriver("drop table if exists T"); runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='false')"); - /*T non-acid + non bucketd - 3 writers are created and then followed by merge to create a single output file - though how the data from union is split between writers is a mystery - (bucketed tables don't do merge) - Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10000/000000_0 [length: 515] -{"a":6,"b":8} -{"a":9,"b":10} -{"a":5,"b":6} -{"a":1,"b":2} -{"a":2,"b":4} -________________________________________________________________________________________________________________________ + runStatementOnDriver("insert into T(a,b) select a, b from " + Table.ACIDTBL + + " where a between 1 and 3 group by a, b union all select a, b from " + Table.ACIDTBL + + " where a between 5 and 7 union all select a, b from " + Table.ACIDTBL + " where a >= 9"); + runStatementOnDriver("insert into T values(12,12)"); -Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10003/000000_0 [length: 242] -{"a":6,"b":8} -________________________________________________________________________________________________________________________ + List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + //previous insert+union creates 3 data files (0-3) + //insert (12,12) creates 000000_0_copy_1 + String expected[][] = { + {"1\t2", "warehouse/t/000002_0"}, + {"2\t4", "warehouse/t/000002_0"}, + {"5\t6", "warehouse/t/000001_0"}, + {"6\t8", "warehouse/t/000000_0"}, + {"9\t10", "warehouse/t/000001_0"}, + {"12\t12", "warehouse/t/000000_0_copy_1"} + }; + checkExpected(rs, expected,"before converting to acid"); -Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10003/000001_0 [length: 244] -{"a":9,"b":10} -{"a":5,"b":6} -________________________________________________________________________________________________________________________ + //now do Insert from Union here to create data files in sub dirs + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE, true); + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + d.close(); + d = new Driver(hiveConf); + runStatementOnDriver("insert into T(a,b) select a * 10, b * 10 from " + Table.ACIDTBL + + " where a between 1 and 3 group by a, b union all select a * 10, b * 10 from " + Table.ACIDTBL + + " where a between 5 and 7"); + //now we have a table with data files at multiple different levels. + String expected1[][] = { + {"1\t2", "warehouse/t/000002_0"}, + {"2\t4", "warehouse/t/000002_0"}, + {"5\t6", "warehouse/t/000001_0"}, + {"6\t8", "warehouse/t/000000_0"}, + {"9\t10", "warehouse/t/000001_0"}, + {"10\t20", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, + {"12\t12", "warehouse/t/000000_0_copy_1"}, + {"20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, + {"50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"}, + {"60\t80", "warehouse/t/HIVE_UNION_SUBDIR_16/000001_0"} + }; + rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + checkExpected(rs, expected1,"before converting to acid (with multi level data layout)"); -Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10003/000002_0 [length: 242] -{"a":1,"b":2} -{"a":2,"b":4} - */ - runStatementOnDriver("insert into T(a,b) select a, b from " + Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + Table.ACIDTBL + " where a >= 9"); - List rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); - LOG.warn("before converting to acid"); - for(String s : rs) { - LOG.warn(s); - } + //make it an Acid table and make sure we assign ROW__IDs correctly + runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional'='true')"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + /** + now that T is Acid, data for each writerId is treated like a logical bucket (though T is not + bucketed), so rowid are assigned per logical bucket (e.g. 000000_0 + 000000_0_copy_1 + subdirs). + {@link AcidUtils.Directory#getOriginalFiles()} ensures consistent ordering of files within + logical bucket (tranche) + */ + String expected2[][] = { + {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/000002_0"}, + {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/000002_0"}, + {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/000001_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t6\t8", "warehouse/t/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t9\t10", "warehouse/t/000001_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t10\t20", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t12\t12", "warehouse/t/000000_0_copy_1"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":2}\t60\t80", "warehouse/t/HIVE_UNION_SUBDIR_16/000001_0"}, + }; + checkExpected(rs, expected2,"after converting to acid (no compaction)"); + Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); + Assert.assertEquals(2, BucketCodec.determineVersion(537001984).decodeWriterId(537001984)); + Assert.assertEquals(1, BucketCodec.determineVersion(536936448).decodeWriterId(536936448)); + + runStatementOnDriver("update T set b = 88 where b = 80"); + runStatementOnDriver("delete from T where b = 8"); + String expected3[][] = { + {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/000002_0"}, + {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/000002_0"}, + {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/000001_0"}, + {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t9\t10", "warehouse/t/000001_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t10\t20", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t12\t12", "warehouse/t/000000_0_copy_1"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t20\t40", "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t50\t60", "warehouse/t/HIVE_UNION_SUBDIR_16/000000_0"}, + {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/delta_0000023_0000023_0000/bucket_00000"}, + }; + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + checkExpected(rs, expected3,"after converting to acid (no compaction with updates)"); + + //major compaction + check data + files + runStatementOnDriver("alter table T compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + + /*Compaction preserves location of rows wrt buckets/tranches (for now)*/ + String expected4[][] = { + {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2", "warehouse/t/base_0000024/bucket_00002"}, + {"{\"transactionid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4", "warehouse/t/base_0000024/bucket_00002"}, + {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/base_0000024/bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":536936448,\"rowid\":0}\t9\t10", "warehouse/t/base_0000024/bucket_00001"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t10\t20", "warehouse/t/base_0000024/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t12\t12", "warehouse/t/base_0000024/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t20\t40", "warehouse/t/base_0000024/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t50\t60", "warehouse/t/base_0000024/bucket_00000"}, + {"{\"transactionid\":23,\"bucketid\":536870912,\"rowid\":0}\t60\t88", "warehouse/t/base_0000024/bucket_00000"}, + }; + checkExpected(rs, expected4,"after major compact"); } @Test public void testInsertFromUnion() throws Exception {