diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index fd6d2ad..8649ccf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -127,7 +127,7 @@ /** * When picking the hosts for a split that crosses block boundaries, - * any drop any host that has fewer than MIN_INCLUDED_LOCATION of the + * drop any host that has fewer than MIN_INCLUDED_LOCATION of the * number of bytes available on the host with the most. * If host1 has 10MB of the split, host2 has 20MB, and host3 has 18MB the * split will contain host2 (100% of host2) and host3 (90% of host2). Host1 @@ -1283,6 +1283,15 @@ public float getProgress() throws IOException { } else { bucket = (int) split.getStart(); reader = null; + if(deltas != null && deltas.length > 0) { + /* w/o schema evolution (which ACID doesn't support yet) all delta + files have the same schema, so choosing the 1st one*/ + final List types = + OrcFile.createReader(AcidUtils.createBucketFile(deltas[0], bucket), + OrcFile.readerOptions(conf)).getTypes(); + readOptions.include(genIncludedColumns(types, conf, split.isOriginal())); + setSearchArgument(readOptions, types, conf, split.isOriginal()); + } } String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY, Long.MAX_VALUE + ":"); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 58c2fca..9294c86 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -107,7 +107,6 @@ private void dropTables() throws Exception { public void tearDown() throws Exception { try { if (d != null) { - // runStatementOnDriver("set autocommit true"); dropTables(); d.destroy(); d.close(); @@ -126,13 +125,51 @@ public void testOrcPPD() throws Exception { public void testOrcNoPPD() throws Exception { testOrcPPD(false); } - private void testOrcPPD(boolean enablePPD) throws Exception { + + /** + * this is run 2 times: 1 with PPD on, 1 with off + * Also, the queries are such that if we were to push predicate down to an update/delete delta, + * the test would produce wrong results + * @param enablePPD + * @throws Exception + */ + private void testOrcPPD(boolean enablePPD) throws Exception { boolean originalPpd = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER); hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, enablePPD);//enables ORC PPD - int[][] tableData = {{1,2},{3,4}}; - runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); - List rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b"); - runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + //create delta_0001_0001_0000 (should push predicate here) + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(new int[][]{{1, 2}, {3, 4}})); + List explain; + String query = "update " + Table.ACIDTBL + " set b = 5 where a = 3"; + if (enablePPD) { + explain = runStatementOnDriver("explain " + query); + /* + here is a portion of the above "explain". The "filterExpr:" in the TableScan is the pushed predicate + w/o PPD, the line is simply not there, otherwise the plan is the same + Map Operator Tree:, + TableScan, + alias: acidtbl, + filterExpr: (a = 3) (type: boolean), + Filter Operator, + predicate: (a = 3) (type: boolean), + Select Operator, + ... + */ + assertPredicateIsPushed("filterExpr: (a = 3)", explain); + } + //create delta_0002_0002_0000 (can't push predicate) + runStatementOnDriver(query); + query = "select a,b from " + Table.ACIDTBL + " where b = 4 order by a,b"; + if (enablePPD) { + /*at this point we have 2 delta files, 1 for insert 1 for update + * we should push predicate into 1st one but not 2nd. If the following 'select' were to + * push into the 'update' delta, we'd filter out {3,5} before doing merge and thus + * produce {3,4} as the value for 2nd row. The right result is 0-rows.*/ + explain = runStatementOnDriver("explain " + query); + assertPredicateIsPushed("filterExpr: (b = 4)", explain); + } + List rs0 = runStatementOnDriver(query); + Assert.assertEquals("Read failed", 0, rs0.size()); + runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'MAJOR'"); Worker t = new Worker(); t.setThreadId((int) t.getId()); t.setHiveConf(hiveConf); @@ -142,18 +179,37 @@ private void testOrcPPD(boolean enablePPD) throws Exception { t.init(stop, looped); t.run(); //now we have base_0001 file - int[][] tableData2 = {{1,7},{5,6},{7,8},{9,10}}; + int[][] tableData2 = {{1, 7}, {5, 6}, {7, 8}, {9, 10}}; runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2)); - //now we have delta_0002_0002_0000 with inserts only (ok to push predicate) + //now we have delta_0003_0003_0000 with inserts only (ok to push predicate) + if (enablePPD) { + explain = runStatementOnDriver("explain delete from " + Table.ACIDTBL + " where a=7 and b=8"); + assertPredicateIsPushed("filterExpr: ((a = 7) and (b = 8))", explain); + } runStatementOnDriver("delete from " + Table.ACIDTBL + " where a=7 and b=8"); - //now we have delta_0003_0003_0000 with delete events (can't push predicate) - runStatementOnDriver("update " + Table.ACIDTBL + " set b = 11 where a = 9"); - //and another delta with update op - List rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b"); - int [][] resultData = {{3,4},{5,6},{9,11}}; + //now we have delta_0004_0004_0000 with delete events + + /*(can't push predicate to 'delete' delta) + * if we were to push to 'delete' delta, we'd filter out all rows since the 'row' is always NULL for + * delete events and we'd produce data as if the delete never happened*/ + query = "select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b"; + if(enablePPD) { + explain = runStatementOnDriver("explain " + query); + assertPredicateIsPushed("filterExpr: (a > 1)", explain); + } + List rs1 = runStatementOnDriver(query); + int [][] resultData = new int[][] {{3, 5}, {5, 6}, {9, 10}}; Assert.assertEquals("Update failed", stringifyValues(resultData), rs1); hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, originalPpd); } + private static void assertPredicateIsPushed(String ppd, List queryPlan) { + for(String line : queryPlan) { + if(line != null && line.contains(ppd)) { + return; + } + } + Assert.assertFalse("PPD '" + ppd + "' wasn't pushed", true); + } @Ignore("alter table") @Test public void testAlterTable() throws Exception {