diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 138e56eb0c..54ba0b68f4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -24,6 +24,7 @@ import java.util.Map.Entry; import java.util.TreeMap; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidReadTxnList; @@ -36,9 +37,13 @@ import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -56,6 +61,9 @@ * directly read from the base files/insert_only deltas in vectorized row batches. The deleted * rows can then be easily indicated via the 'selected' field of the vectorized row batch. * Refer HIVE-14233 for more details. + * + * + * todo: annotate the plan to indicate which reader is used? */ public class VectorizedOrcAcidRowBatchReader implements org.apache.hadoop.mapred.RecordReader { @@ -72,6 +80,12 @@ private boolean addPartitionCols = true; private ValidTxnList validTxnList; private DeleteEventRegistry deleteEventRegistry; + private final boolean isOriginal; + private final boolean needRowIds; + //partition root + private final Path rootPath; + private final long rowIdOffset; + private final int bucketProperty; public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { @@ -90,6 +104,9 @@ public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, final OrcSplit orcSplit = (OrcSplit) inputSplit; rbCtx = Utilities.getVectorizedRowBatchCtx(conf); + isOriginal = orcSplit.isOriginal(); + needRowIds = needRowIds(rbCtx); + rootPath = orcSplit.getRootDir(); reporter.setStatus(orcSplit.toString()); Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, orcSplit); @@ -137,36 +154,112 @@ public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, // delete event on-demand. Caps the memory consumption to (some_const * no. of readers). this.deleteEventRegistry = new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); } + + OffsetAndBucketProperty r = computeOffsetAndBucket(orcSplit, conf, validTxnList); + rowIdOffset = r.offset; + bucketProperty = r.bucketProperty; } + private static final class OffsetAndBucketProperty { + private final long offset; + private final int bucketProperty; + private OffsetAndBucketProperty(long offset, int bucketProperty) { + this.offset = offset; + this.bucketProperty = bucketProperty; + } + } + /** + * When reading a split of an "original" file and we need to decorate data with ROW__ID + * + * + * we could call getAcidState() here to (anywhere in this class) to figure out + * the offset but this could kill NN since this is done per split... on the other hand we do the same + * OrcRawRecordMerger - that's bad reasoning. + * Where else can this be computed? + * Compute Directory once and store in OrcSplit? this would be more efficient.... + * actually all we need to store is the "offset" for each split + * This is worth doing to make sure there is no perf regression + * + */ + private OffsetAndBucketProperty computeOffsetAndBucket(OrcSplit split, JobConf conf, ValidTxnList validTxnList) throws IOException { + if(!(split.isOriginal() && needRowIds)) { + return new OffsetAndBucketProperty(0,0); + } + long rowIdOffset = 0; + int bucketId = AcidUtils.parseBaseOrDeltaBucketFilename(split.getPath(), conf).getBucketId(); + int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf).statementId(0).bucket(bucketId)); + AcidUtils.Directory directoryState = AcidUtils.getAcidState(split.getRootDir(), conf, validTxnList); + for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { + AcidOutputFormat.Options bucketOptions = + AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); + if (bucketOptions.getBucketId() != bucketId) { + continue; + } + if (f.getFileStatus().getPath().equals(split.getPath())) { + //'f' is the file whence this split is + break; + } + Reader reader = OrcFile.createReader(f.getFileStatus().getPath(), + OrcFile.readerOptions(conf)); + rowIdOffset += reader.getNumberOfRows(); + } + return new OffsetAndBucketProperty(rowIdOffset, bucketProperty); + } /** * Returns whether it is possible to create a valid instance of this class for a given split. * @param conf is the job configuration * @param inputSplit * @return true if it is possible, else false. */ - public static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, InputSplit inputSplit) { + static boolean canCreateVectorizedAcidRowBatchReaderOnSplit(JobConf conf, InputSplit inputSplit) throws IOException { if (!(inputSplit instanceof OrcSplit)) { return false; // must be an instance of OrcSplit. } - // First check if we are reading any original files in the split. - // To simplify the vectorization logic, the vectorized acid row batch reader does not handle - // original files for now as they have a different schema than a regular ACID file. final OrcSplit split = (OrcSplit) inputSplit; - if (AcidUtils.getAcidOperationalProperties(conf).isSplitUpdate() && !split.isOriginal()) { + if(!AcidUtils.getAcidOperationalProperties(conf).isSplitUpdate()) { // When split-update is turned on for ACID, a more optimized vectorized batch reader - // can be created. But still only possible when we are *NOT* reading any originals. - return true; + // can be created. + return false; } - return false; // no split-update or possibly reading originals! + if(split.isOriginal()) { + Path[] deleteEvents = getDeleteDeltaDirsFromSplit(split); + if(deleteEvents.length > 0) { + //if we have delete events then we need ROW__IDs to apply them; bail for now + return false; + } + VectorizedRowBatchCtx rbCtx= Utilities.getVectorizedRowBatchCtx(conf); +// if(rbCtx == null || needRowIds(rbCtx)) { + if(rbCtx == null) { + //means this can't be vectorized (e.g. INPUT__FILE__NAME is projected) + //or has to be decorated with ROW__IDs + return false; + } + } + return true; } + private static boolean needRowIds(VectorizedRowBatchCtx rbCtx) { + if(rbCtx.getVirtualColumnCount() == 0) { + return false; + } + for(VirtualColumn vc : rbCtx.getNeededVirtualColumns()) { + if(vc == VirtualColumn.ROWID) { + //The query needs ROW__ID: maybe explicitly asked, maybe it's a Delete statement. + //Either way, we need to decorate "original" rows with row__id + return true; + } + } + return false; + } + /** + * ToDo: refactor/merge with {@link OrcInputFormat#getReader(InputSplit, AcidInputFormat.Options)} + */ private static Path[] getDeleteDeltaDirsFromSplit(OrcSplit orcSplit) throws IOException { Path path = orcSplit.getPath(); Path root; if (orcSplit.hasBase()) { if (orcSplit.isOriginal()) { - root = path.getParent(); + root = orcSplit.getRootDir(); } else { root = path.getParent().getParent(); } @@ -218,8 +311,11 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti selectedBitSet.set(0, vectorizedRowBatchBase.size, true); } - // Case 1- find rows which belong to transactions that are not valid. - findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); + //all "original" data belongs to txnid:0 and is always valid/committed for every reader + if(!isOriginal) { + // Case 1- find rows which belong to transactions that are not valid. + findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); + } // Case 2- find rows which have been deleted. this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchBase, selectedBitSet); @@ -242,6 +338,39 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } } + if(isOriginal) { + /*to decorate with ROW__ID we have to set the synthetic columns in 'value' and copy user data from vectorizedRowBatchBase*/ + System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0, value.getDataColumnCount()); + if(needRowIds) { + StructColumnVector rowIds = (StructColumnVector) value.cols[value.cols.length - 1]; + //now we need to set noNulls=true; isRepeating=false; what are preFlattenIsRepeating=(def=false) and preFlattenNoNulls=(def=true) + //now for each rowIds.fields (each is a LongColumnVector set the same flags + + //originalTransactionId + rowIds.fields[0].noNulls = true; + rowIds.fields[0].isNull[0] = false;//since isRepeating=true, 1st slot has the value + rowIds.fields[0].isRepeating = true; + ((LongColumnVector) rowIds.fields[0]).vector[0] = 0;//set transaction id to 0 + + //bucketId (i.e. bucket property) + rowIds.fields[1].noNulls = true; + rowIds.fields[1].isNull[0] = false; + rowIds.fields[1].isRepeating = true; + ((LongColumnVector) rowIds.fields[1]).vector[0] = bucketProperty; + + //rowId + rowIds.fields[2].noNulls = true; + rowIds.fields[2].isNull[0] = false; + rowIds.fields[2].isRepeating = false; + long[] rowIdVector = ((LongColumnVector)rowIds.fields[2]).vector; + for(int i = 0; i < vectorizedRowBatchBase.size; i++) { + //baseReader.getRowNumber() seems to point at the start of the batch + rowIdVector[i] = this.rowIdOffset + baseReader.getRowNumber() + i; + } + } + progress = baseReader.getProgress(); + return true; + } // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch. // NOTE: We only link up the user columns and not the ACID metadata columns because this // vectorized code path is not being used in cases of update/delete, when the metadata columns diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 39d6b2b414..8a5bcc82bc 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; @@ -37,7 +36,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService; -import org.junit.After; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -57,8 +55,8 @@ * * Can also test, calling commit in AC=true mode, etc, toggling AC... * - * Tests here are for multi-statement transactions (WIP) and those that don't need to - * run with Acid 2.0 (see subclasses of TestTxnCommands2) + * Tests here are for multi-statement transactions (WIP) and others + * Mostly uses bucketed tables */ public class TestTxnCommands extends TxnCommandsBaseForTests { static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index c827dc4a0e..0b4df21abd 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -522,5 +523,50 @@ public void testCtasBucketed() throws Exception { // Assert.assertEquals("Wrong msg", ErrorMsg.CTAS_PARCOL_COEXISTENCE.getErrorCode(), cpr.getErrorCode()); Assert.assertTrue(cpr.getErrorMessage().contains("CREATE-TABLE-AS-SELECT does not support")); } + @Ignore("Gopal's use case") + @Test + public void testNonAcidCtasToAcid() throws Exception { + runStatementOnDriver("drop table if exists customer"); +// runStatementOnDriver("create table customer stored as orc as select * from tpcds_bin_partitioned_orc_1000.customer"); + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; + runStatementOnDriver("insert into " + TxnCommandsBaseForTests.Table.ACIDTBL + makeValuesClause(values)); + runStatementOnDriver("create table customer stored as orc as select * from " + Table.ACIDTBL); + runStatementOnDriver("alter table customer SET TBLPROPERTIES('transactional'='true')"); + runStatementOnDriver("alter table customer compact 'major'"); + TestTxnCommands2.runWorker(hiveConf); + List rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from customer order by a, b, INPUT__FILE__NAME"); + String[][] expected = { + + }; + checkExpected(rs, expected,""); + } + @Test + public void testNonAcidToAcidVectorzied() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + //this enables vectorization of ROW__ID + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ROW_IDENTIFIER_ENABLED, true);//HIVE-12631 + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T(a int, b int) stored as orc"); + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; + runStatementOnDriver("insert into T(a, b) " + makeValuesClause(values)); + //, 'transactional_properties'='default' + runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional'='true')"); + //this uses VectorizedOrcAcidRowBatchReader + List rs = runStatementOnDriver("select ROW__ID, a from T where b > 6 order by a");//why isn't PPD working.... + Assert.assertEquals("", 2, rs.size()); + Assert.assertEquals(Integer.toString(6), rs.get(0)); + Assert.assertEquals(Integer.toString(9), rs.get(1)); + //this doesn't vectorize but "select ROW__ID, a ..." can vectorize we just don't do it yet so + //TODO: how do you verify the right acid reader is used. + rs = runStatementOnDriver("select ROW__ID, a, INPUT__FILE__NAME from T where b > 6 order by a"); + Assert.assertEquals("", 2, rs.size()); + String[][] expected = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t6", "warehouse/t/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t9", "warehouse/t/000000_0"} + }; + checkExpected(rs, expected, "After non-vectorized read"); + Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 3e4f6f6675..5828d66a80 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -93,12 +93,23 @@ void setUpInternal() throws Exception { d = new Driver(hiveConf); d.setMaxRows(10000); dropTables(); - runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); - runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); - runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + "(a int, b int) stored as orc"); + runStatementOnDriver("create table " + Table.ACIDTBL + + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.ACIDTBLPART + + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.NONACIDORCTBL + + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + + "(a int, b int) stored as orc"); } private void dropTables() throws Exception { for(TxnCommandsBaseForTests.Table t : TxnCommandsBaseForTests.Table.values()) {