diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 5856cfd..d7f1b42 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -1181,6 +1181,10 @@ public boolean isUseBucketizedHiveInputFormat() { return useBucketizedHiveInputFormat; } + /** + * Before setting this to {@code true} make sure it's not reading ACID tables + * @param useBucketizedHiveInputFormat + */ public void setUseBucketizedHiveInputFormat(boolean useBucketizedHiveInputFormat) { this.useBucketizedHiveInputFormat = useBucketizedHiveInputFormat; } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java index 76cc540..7cb0f15 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -215,6 +216,9 @@ private boolean checkTable(Table table, private void storeBucketPathMapping(TableScanOperator tsOp, FileStatus[] srcs) { Map bucketFileNameMapping = new HashMap(); for (int pos = 0; pos < srcs.length; pos++) { + if(!srcs[pos].isFile()) { + throw new RuntimeException("Was expecting '" + srcs[pos].getPath() + "' to be bucket file."); + } bucketFileNameMapping.put(srcs[pos].getPath().getName(), pos); } tsOp.getConf().setBucketFileNameMapping(bucketFileNameMapping); @@ -376,6 +380,14 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } + if(stack.get(0) instanceof TableScanOperator) { + TableScanOperator tso = ((TableScanOperator)stack.get(0)); + if(SemanticAnalyzer.isAcidTable(tso.getConf().getTableMetadata())) { + /*ACID tables have complex directory layout and require merging of delta files + * on read thus we should not try to read bucket files directly*/ + return null; + } + } // Support for dynamic partitions can be added later if (fsOp.getConf().getDynPartCtx() != null) { return null; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 8e65b59..1d2c764 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -12036,7 +12036,7 @@ else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : // Even if the table is of Acid type, if we aren't working with an Acid compliant TxnManager // then return false. - private boolean isAcidTable(Table tab) { + public static boolean isAcidTable(Table tab) { if (tab == null) return false; if (!SessionState.get().getTxnMgr().supportsAcid()) return false; String tableIsTransactional = diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 1431e19..3c987dd 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -98,8 +98,8 @@ public void tearDown() throws Exception { d.destroy(); d.close(); d = null; - TxnDbUtil.cleanDb(); } + TxnDbUtil.cleanDb(); } finally { FileUtils.deleteDirectory(new File(TEST_DATA_DIR)); } @@ -142,6 +142,22 @@ public void testDeleteIn() throws Exception { Assert.assertEquals("Bulk update2 failed", stringifyValues(updatedData2), rs2); } + /** + * https://issues.apache.org/jira/browse/HIVE-10151 + */ + @Test + public void testBucketizedInputFormat() throws Exception { + int[][] tableData = {{1,2}}; + runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData)); + + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) select a,b from " + Table.ACIDTBLPART + " where p = 1"); + List rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL);//no order by as it's just 1 row + Assert.assertEquals("Insert into " + Table.ACIDTBL + " didn't match:", stringifyValues(tableData), rs); + + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) select a,b from " + Table.ACIDTBLPART + " where p = 1"); + List rs2 = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);//no order by as it's just 1 row + Assert.assertEquals("Insert into " + Table.NONACIDORCTBL + " didn't match:", stringifyValues(tableData), rs2); + } @Test public void testInsertOverwriteWithSelfJoin() throws Exception { int[][] part1Data = {{1,7}};