diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index 2dc2351793..314b8b47c2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; @@ -208,8 +209,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if(!VirtualColumn.ROWID.getTypeInfo().equals(ci.getType())) { throw new IllegalStateException("expected 1st column to be ROW__ID but got wrong type: " + ci.toString()); } - //HIVE-17328: not sure this is correct... I don't think is gets wrapped in UDFToInteger.... - bucketColumns.add(new ExprNodeColumnDesc(ci)); + //add a cast(ROW__ID as int) to wrap in UDFToInteger() + bucketColumns.add(ParseUtils.createConversionCast(new ExprNodeColumnDesc(ci), TypeInfoFactory.intTypeInfo)); } else { if (!destTable.getSortCols().isEmpty()) { // Sort columns specified by table diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java index a25406ddb2..833e63745f 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -222,4 +222,65 @@ public void testAcidMetaColumsDecode() throws Exception { rs = runStatementOnDriver("select a, b from T order by a, b"); Assert.assertEquals(stringifyValues(dataAll), rs); } + + /** + * Test that rows are routed to proper files based on bucket col/ROW__ID + * Only the Vectorized Acid Reader checks if bucketId in ROW__ID inside the file + * matches the file name and only for files in delete_delta + */ + @Test + public void testSdpoBucketed() throws Exception { + testSdpoBucketed(true, true, 1); + testSdpoBucketed(true, false, 1); + testSdpoBucketed(false, true, 1); + testSdpoBucketed(false, false,1); + + testSdpoBucketed(true, true, 2); + testSdpoBucketed(true, false, 2); + testSdpoBucketed(false, true, 2); + testSdpoBucketed(false, false,2); + } + private void testSdpoBucketed(boolean isVectorized, boolean isSdpo, int bucketing_version) + throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITION, isSdpo); + runStatementOnDriver("drop table if exists acid_uap"); + runStatementOnDriver("create transactional table acid_uap(a int, b varchar(128)) " + + "partitioned by (ds string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES " + + "('bucketing_version'='" + bucketing_version + "')"); + runStatementOnDriver("insert into table acid_uap partition (ds='tomorrow') " + + "values (1, 'bah'),(2, 'yah')"); + runStatementOnDriver("insert into table acid_uap partition (ds='today') " + + "values (1, 'bah'),(2, 'yah')"); + runStatementOnDriver("select a,b, ds from acid_uap order by a,b, ds"); + + String testQuery = isVectorized ? + "select ROW__ID, a, b, ds from acid_uap order by ds, a, b" : + "select ROW__ID, a, b, ds, INPUT__FILE__NAME from acid_uap order by ds, a, b"; + String[][] expected = new String[][]{ + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\tbah\ttoday", + "warehouse/acid_uap/ds=today/delta_0000002_0000002_0000/bucket_00001"}, + {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t2\tyah\ttoday", + "warehouse/acid_uap/ds=today/delta_0000002_0000002_0000/bucket_00000"}, + + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\tbah\ttomorrow", + "warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00001"}, + {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\tyah\ttomorrow", + "warehouse/acid_uap/ds=tomorrow/delta_0000001_0000001_0000/bucket_00000"}}; + checkResult(expected, testQuery, isVectorized, "after insert", LOG); + + runStatementOnDriver("update acid_uap set b = 'fred'"); + + String[][] expected2 = new String[][]{ + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t1\tfred\ttoday", + "warehouse/acid_uap/ds=today/delta_0000003_0000003_0000/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\tfred\ttoday", + "warehouse/acid_uap/ds=today/delta_0000003_0000003_0000/bucket_00000"}, + + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t1\tfred\ttomorrow", + "warehouse/acid_uap/ds=tomorrow/delta_0000003_0000003_0000/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\tfred\ttomorrow", + "warehouse/acid_uap/ds=tomorrow/delta_0000003_0000003_0000/bucket_00000"}}; + checkResult(expected2, testQuery, isVectorized, "after update", LOG); + } }