diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 138e56eb0c..128cdf1699 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -24,6 +24,7 @@ import java.util.Map.Entry; import java.util.TreeMap; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidReadTxnList; @@ -36,9 +37,11 @@ import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -56,6 +59,9 @@ * directly read from the base files/insert_only deltas in vectorized row batches. The deleted * rows can then be easily indicated via the 'selected' field of the vectorized row batch. * Refer HIVE-14233 for more details. + * + * + * todo: annotate the plan to indicate which reader is used? */ public class VectorizedOrcAcidRowBatchReader implements org.apache.hadoop.mapred.RecordReader { @@ -72,6 +78,7 @@ private boolean addPartitionCols = true; private ValidTxnList validTxnList; private DeleteEventRegistry deleteEventRegistry; + private final boolean isOriginal; public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { @@ -88,6 +95,7 @@ public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf); } final OrcSplit orcSplit = (OrcSplit) inputSplit; + isOriginal = orcSplit.isOriginal(); rbCtx = Utilities.getVectorizedRowBatchCtx(conf); @@ -145,7 +153,7 @@ public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, * @param inputSplit * @return true if it is possible, else false. */ - public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, InputSplit inputSplit) { + static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, InputSplit inputSplit) throws IOException { if (!(inputSplit instanceof OrcSplit)) { return false; // must be an instance of OrcSplit. } @@ -153,20 +161,45 @@ public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, // To simplify the vectorization logic, the vectorized acid row batch reader does not handle // original files for now as they have a different schema than a regular ACID file. final OrcSplit split = (OrcSplit) inputSplit; - if (AcidUtils.getAcidOperationalProperties(conf).isSplitUpdate() && !split.isOriginal()) { + if(!AcidUtils.getAcidOperationalProperties(conf).isSplitUpdate()) { // When split-update is turned on for ACID, a more optimized vectorized batch reader // can be created. But still only possible when we are *NOT* reading any originals. - return true; + //todo: fix above comment + return false; + } + if(split.isOriginal()) { + Path[] deleteEvents = getDeleteDeltaDirsFromSplit(split); + if(deleteEvents.length > 0) { + //if we have delete events then we need ROW__IDs to apply them; bail for now + return false; + } + VectorizedRowBatchCtx rbCtx= Utilities.getVectorizedRowBatchCtx(conf); + if(rbCtx == null) { + //means this can't be vectorized (e.g. INPUT__FILE__NAME is projected) + return false; + } + if(rbCtx.getVirtualColumnCount() > 0) { + for(VirtualColumn vc : rbCtx.getNeededVirtualColumns()) { + if(vc == VirtualColumn.ROWID) { + //The query needs ROW__ID: maybe explicitly asked, maybe it's a Delete statement. + //Either way, we need to decorate "original" rows with row__id so bail for now + return false; + } + } + } } - return false; // no split-update or possibly reading originals! + return true; } + /** + * ToDo: refactor/merge with {@link OrcInputFormat#getReader(InputSplit, AcidInputFormat.Options)} + */ private static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException { Path path = orcSplit.getPath(); Path root; if (orcSplit.hasBase()) { if (orcSplit.isOriginal()) { - root = path.getParent(); + root = orcSplit.getRootDir(); } else { root = path.getParent().getParent(); } @@ -218,8 +251,11 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti selectedBitSet.set(0, vectorizedRowBatchBase.size, true); } - // Case 1- find rows which belong to transactions that are not valid. - findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); + //all "original" data belongs to txnid:0 and is always valid/committed for every reader + if(!isOriginal) { + // Case 1- find rows which belong to transactions that are not valid. + findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); + } // Case 2- find rows which have been deleted. this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchBase, selectedBitSet); @@ -242,6 +278,11 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } } + if(isOriginal) { + System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0, value.getDataColumnCount()); + progress = baseReader.getProgress(); + return true; + } // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch. // NOTE: We only link up the user columns and not the ACID metadata columns because this // vectorized code path is not being used in cases of update/delete, when the metadata columns diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 39d6b2b414..8a5bcc82bc 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; @@ -37,7 +36,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService; -import org.junit.After; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -57,8 +55,8 @@ * * Can also test, calling commit in AC=true mode, etc, toggling AC... * - * Tests here are for multi-statement transactions (WIP) and those that don't need to - * run with Acid 2.0 (see subclasses of TestTxnCommands2) + * Tests here are for multi-statement transactions (WIP) and others + * Mostly uses bucketed tables */ public class TestTxnCommands extends TxnCommandsBaseForTests { static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index c827dc4a0e..bf370c0182 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -522,5 +522,32 @@ public void testCtasBucketed() throws Exception { // Assert.assertEquals("Wrong msg", ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode(), cpr.getErrorCode()); Assert.assertTrue(cpr.getErrorMessage().contains("CREATE-TABLE-AS-SELECT does not support")); } + @Test + public void testNonAcidToAcidVectorzied() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + //this enables vectorization of ROW__ID + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ROW_IDENTIFIER_ENABLED, true);//HIVE-12631 + runStatementOnDriver("create table T(a int, b int) stored as orc"); + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; + runStatementOnDriver("insert into T(a, b) " + makeValuesClause(values)); + //, 'transactional_properties'='default' + runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional'='true')"); + //this uses VectorizedOrcAcidRowBatchReader + List rs = runStatementOnDriver("select a from T where b > 6 order by a"); + Assert.assertEquals("", 2, rs.size()); + Assert.assertEquals(Integer.toString(6), rs.get(0)); + Assert.assertEquals(Integer.toString(9), rs.get(1)); + //this doesn't vectorize but "select ROW__ID, a ..." can vectorize we just don't do it yet so + //TODO: how do you verify the right acid reader is used. + rs = runStatementOnDriver("select ROW__ID, a, INPUT__FILE__NAME from T where b > 6 order by a"); + Assert.assertEquals("", 2, rs.size()); + String[][] expected = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t6", "warehouse/t/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t9", "warehouse/t/000000_0"} + }; + checkExpected(rs, expected, "After non-vectorized read"); + Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 3e4f6f6675..5828d66a80 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -93,12 +93,23 @@ void setUpInternal() throws Exception { d = new Driver(hiveConf); d.setMaxRows(10000); dropTables(); - runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); - runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); - runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + "(a int, b int) stored as orc"); + runStatementOnDriver("create table " + Table.ACIDTBL + + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.ACIDTBLPART + + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.NONACIDORCTBL + + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + + "(a int, b int) stored as orc"); } private void dropTables() throws Exception { for(TxnCommandsBaseForTests.Table t : TxnCommandsBaseForTests.Table.values()) {