diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 9aced9fc02..3575dc8584 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1203,6 +1203,8 @@ private int acquireLocks() { //it's possible to have > 1 FileSink writing to the same table/partition //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes desc.setStatementId(txnMgr.getWriteIdAndIncrement()); + /*so if we have > 1 FS in the same plan we may be writing to the same partition in parallel*/ + desc.setNumBuckets(); } } /*It's imperative that {@code acquireLocks()} is called for all commands so that diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 3e0943251e..a475d7236b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -769,7 +769,7 @@ public void process(Object row, int tag) throws HiveException { ObjectInspector rowInspector = bDynParts ? subSetOI : outputObjInspector; Object recId = ((StructObjectInspector)rowInspector).getStructFieldData(row, recIdField); int bucketNum = - bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField)); + bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField)) % conf.getNumBuckets(); if (fpaths.acidLastBucket != bucketNum) { fpaths.acidLastBucket = bucketNum; // Switch files diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index b85b827424..6e605510a1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -51,6 +51,7 @@ private long minimumTransactionId; private long maximumTransactionId; private int bucket; + private int numBuckets;//"total number of buckets from cluster by xxx into N buckets" private PrintStream dummyStream = null; private boolean oldStyle = false; private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id @@ -178,7 +179,10 @@ public Options bucket(int bucket) { this.bucket = bucket; return this; } - + public Options numBuckets(int numBuckets) { + this.numBuckets = numBuckets; + return this; + } /** * Whether it should use the old style (0000000_0) filenames. * @param value should use the old style names @@ -278,6 +282,7 @@ public boolean isWritingDeleteDelta() { public int getBucket() { return bucket; } + public int getNumBuckets() { return numBuckets; } public int getRecordIdColumn() { return recIdCol; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index cc69c7eab8..c69d38be3f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -344,6 +344,7 @@ private static RecordUpdater getRecordUpdater(JobConf jc, .minimumTransactionId(conf.getTransactionId()) .maximumTransactionId(conf.getTransactionId()) .bucket(bucket) + .numBuckets(conf.getNumBuckets()) .inspector(inspector) .recordIdColumn(rowIdColNum) .statementId(conf.getStatementId()) diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 65f4a24750..57dcde152f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -48,10 +48,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; - /** * A RecordUpdater where the files are stored as ORC. + * + * A note on various record structure: the {@code row} coming in (as in {@link #insert(long, Object)} + * for example), is a struct like but what is written to the file + * is > (see {@link #createEventSchema(ObjectInspector)}) + * So there are OIs here to make the translation. */ public class OrcRecordUpdater implements RecordUpdater { @@ -93,8 +96,28 @@ private final IntWritable operation = new IntWritable(); private final LongWritable currentTransaction = new LongWritable(-1); private final LongWritable originalTransaction = new LongWritable(-1); + /** + * This is the bucket id used in the data file name. + * Invariant: (0 <= bucketFieldId < numBuckets) + */ + private final int bucketFileId; + /** + * This is the ID used for RecordIdendifier.Field.bucketId + * Invariant: (logicalBucketId % numBuckets == bucketFieldId) + */ + private final int logicalBucketId; + private final int numBuckets; + /** + * this contains the value for {@link RecordIdentifier.Field#bucketId}. For INSERT it should + * always be {@link #logicalBucketId}. For UPDATE/DELETE it should always come from the row + * being written. + * Invariant: (bucket.get() % numBuckets == bucketFieldId) + */ private final IntWritable bucket = new IntWritable(); private final LongWritable rowId = new LongWritable(); + /** + * per txn; used to generate RecordIdentifier.rowId + */ private long insertedRows = 0; private long rowIdOffset = 0; // This records how many rows have been inserted or deleted. It is separate from insertedRows @@ -102,15 +125,16 @@ private long rowCountDelta = 0; private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder(); private KeyIndexBuilder deleteEventIndexBuilder; - private StructField recIdField = null; // field to look for the record identifier in + private StructField recordIdentifierField = null; // field to look for the record identifier in private StructField rowIdField = null; // field inside recId to look for row id in private StructField originalTxnField = null; // field inside recId to look for original txn in private StructField bucketField = null; // field inside recId to look for bucket in private StructObjectInspector rowInspector; // OI for the original row - private StructObjectInspector recIdInspector; // OI for the record identifier struct + private StructObjectInspector recordIdentifierInspector; // OI for the record identifier struct private LongObjectInspector rowIdInspector; // OI for the long row id inside the recordIdentifier private LongObjectInspector origTxnInspector; // OI for the original txn inside the record // identifer + private IntObjectInspector bucketIdInspector; static int getOperation(OrcStruct struct) { return ((IntWritable) struct.getFieldValue(OPERATION)).get(); @@ -200,7 +224,16 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { this.acidOperationalProperties = AcidUtils.getAcidOperationalProperties(options.getConfiguration()); } - this.bucket.set(options.getBucket()); + numBuckets = options.getNumBuckets(); + bucketFileId = options.getBucket(); + assert numBuckets > 0 : "expected table to be bucketed: " + path; + assert options.getStatementId() >= 0 : "Negative statementId"; + + logicalBucketId = bucketFileId + numBuckets * options.getStatementId(); + //logicalBucketId = options.getBucket(); + assert logicalBucketId >= 0 && logicalBucketId % numBuckets == bucketFileId : + "lb=" + logicalBucketId + " b=" + bucketFileId + " stId=" + options.getStatementId(); + this.bucket.set(logicalBucketId); this.path = AcidUtils.createFilename(path, options); this.deleteEventWriter = null; this.deleteEventPath = null; @@ -221,7 +254,7 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { } } if (options.getMinimumTransactionId() != options.getMaximumTransactionId() - && !options.isWritingBase()){ + && !options.isWritingBase()){//note: compactor doesn't use OrcRecordUpdater flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), true, 8, options.getReporter()); } else { @@ -330,19 +363,20 @@ private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) { } else { RecIdStrippingObjectInspector newInspector = new RecIdStrippingObjectInspector(inspector, rowIdColNum); - recIdField = newInspector.getRecId(); + recordIdentifierField = newInspector.getRecId(); List fields = - ((StructObjectInspector) recIdField.getFieldObjectInspector()).getAllStructFieldRefs(); + ((StructObjectInspector) recordIdentifierField.getFieldObjectInspector()).getAllStructFieldRefs(); // Go by position, not field name, as field names aren't guaranteed. The order of fields // in RecordIdentifier is transactionId, bucketId, rowId originalTxnField = fields.get(0); origTxnInspector = (LongObjectInspector)originalTxnField.getFieldObjectInspector(); bucketField = fields.get(1); + bucketIdInspector = (IntObjectInspector)bucketField.getFieldObjectInspector(); rowIdField = fields.get(2); rowIdInspector = (LongObjectInspector)rowIdField.getFieldObjectInspector(); - recIdInspector = (StructObjectInspector) recIdField.getFieldObjectInspector(); + recordIdentifierInspector = (StructObjectInspector) recordIdentifierField.getFieldObjectInspector(); return newInspector; } } @@ -355,23 +389,27 @@ private void addSimpleEvent(int operation, long currentTransaction, long rowId, // it will be reset by the following if anyway. long originalTransaction = currentTransaction; if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) { - Object rowIdValue = rowInspector.getStructFieldData(row, recIdField); + Object recordIdentifier = rowInspector.getStructFieldData(row, recordIdentifierField); originalTransaction = origTxnInspector.get( - recIdInspector.getStructFieldData(rowIdValue, originalTxnField)); - rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, rowIdField)); + recordIdentifierInspector.getStructFieldData(recordIdentifier, originalTxnField)); + rowId = rowIdInspector.get(recordIdentifierInspector.getStructFieldData(recordIdentifier, rowIdField)); + //if this throws, something in the shuffle went wrong + setBucketId(bucketIdInspector.get(recordIdentifierInspector.getStructFieldData(recordIdentifier, bucketField)), this.path); } else if(operation == INSERT_OPERATION) { + assertInvariants(this.path);//if this fails, restoreBucketId() wasn't called somewhere rowId += rowIdOffset; } this.rowId.set(rowId); this.originalTransaction.set(originalTransaction); item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation)); item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row)); - indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); + updateIndex(indexBuilder, operation, originalTransaction, rowId); if (writer == null) { writer = OrcFile.createWriter(path, writerOptions); } writer.addRow(item); + restoreBucketId(); } private void addSplitUpdateEvent(int operation, long currentTransaction, long rowId, Object row) @@ -383,13 +421,14 @@ private void addSplitUpdateEvent(int operation, long currentTransaction, long ro } this.operation.set(operation); this.currentTransaction.set(currentTransaction); - Object rowValue = rowInspector.getStructFieldData(row, recIdField); + Object recordIdentifier = rowInspector.getStructFieldData(row, recordIdentifierField); long originalTransaction = origTxnInspector.get( - recIdInspector.getStructFieldData(rowValue, originalTxnField)); + recordIdentifierInspector.getStructFieldData(recordIdentifier, originalTxnField)); rowId = rowIdInspector.get( - recIdInspector.getStructFieldData(rowValue, rowIdField)); + recordIdentifierInspector.getStructFieldData(recordIdentifier, rowIdField)); if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) { + setBucketId(bucketIdInspector.get(recordIdentifierInspector.getStructFieldData(recordIdentifier, bucketField)), deleteEventPath); // Initialize a deleteEventWriter if not yet done. (Lazy initialization) if (deleteEventWriter == null) { // Initialize an indexBuilder for deleteEvents. @@ -412,8 +451,9 @@ private void addSplitUpdateEvent(int operation, long currentTransaction, long ro this.originalTransaction.set(originalTransaction); item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(DELETE_OPERATION)); item.setFieldValue(OrcRecordUpdater.ROW, null); // ROW is null for delete events. - deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalTransaction, bucket.get(), rowId); + updateIndex(deleteEventIndexBuilder, DELETE_OPERATION, originalTransaction, rowId); deleteEventWriter.addRow(item); + restoreBucketId(); } if (operation == UPDATE_OPERATION) { @@ -422,6 +462,31 @@ private void addSplitUpdateEvent(int operation, long currentTransaction, long ro } } + /** + * If these don't match we'll have data loss on read - make the write fail-fast + * @param writerPath - target file to make error msg more meaningful + */ + private void assertInvariants(Path writerPath) { + if(!(bucketFileId == bucket.get() % numBuckets)) { + throw new IllegalStateException("bucketFileId=" + bucketFileId + + " bucket.get()=" + bucket.get() + " numBuckets=" + numBuckets + + " logicalBucketId=" + logicalBucketId + " for " + writerPath); + } + } + /** + * all calls to this method must have a matching call to {@link #restoreBucketId()} + */ + private void setBucketId(int id, Path writerPath) { + bucket.set(id); + assertInvariants(writerPath); + } + private void restoreBucketId() { + bucket.set(logicalBucketId); + } + private void updateIndex(KeyIndexBuilder index, int operation, long originalTxnId, long rowId) { + index.addKey(operation, originalTxnId, bucket.get(), rowId); + } + @Override public void insert(long currentTransaction, Object row) throws IOException { if (this.currentTransaction.get() != currentTransaction) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 75c7680e26..7ee176c49c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -551,6 +551,9 @@ public boolean next(DeleteRecordKey deleteRecordKey) throws IOException { if (indexPtrInBatch >= batch.size) { // We have exhausted our current batch, read the next batch. if (recordReader.nextBatch(batch)) { + /* + * todo: this check only happens for 2nd batch + * */ // Whenever we are reading a batch, we must ensure that all the records in the batch // have the same bucket id as the bucket id of the split. If not, throw exception. // NOTE: this assertion might not hold, once virtual bucketing is in place. However, diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 0541a4035a..78c511b016 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -696,15 +696,7 @@ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) if(numWhenMatchedUpdateClauses > 1) { throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_UPDATE, ctx.getCmd()); } - assert numInsertClauses < 2; - if(numInsertClauses == 1 && numWhenMatchedUpdateClauses == 1) { - if(AcidUtils.getAcidOperationalProperties(targetTable).isSplitUpdate()) { - throw new IllegalStateException("Tables with " + - hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES + "=" + - TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY + " currently do not " + - "support MERGE with both Insert and Update clauses."); - } - } + assert numInsertClauses < 2: "too many Insert clauses"; } if(numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && extraPredicate == null) { throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ctx.getCmd()); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 4716adc945..c107b46e5f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -92,6 +92,7 @@ private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID; private long txnId = 0; // transaction id for this operation private int statementId = -1; + private int numBuckets = -1; private transient Table table; private Path destPath; @@ -509,4 +510,13 @@ public FileSinkOperatorExplainVectorization getFileSinkVectorization() { } return new FileSinkOperatorExplainVectorization(vectorDesc); } + public void setNumBuckets() { + if(table == null) { + throw new IllegalStateException("no 'table' object"); + } + this.numBuckets = table.getNumBuckets(); + } + public int getNumBuckets() { + return numBuckets; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java index 461ef86b83..74b3acaa9d 100755 --- ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/UDFToInteger.java @@ -220,7 +220,7 @@ public IntWritable evaluate(RecordIdentifier i) { if (i == null) { return null; } else { - intWritable.set(i.getBucketId()); + intWritable.set(i.getBucketId());//TODO: should this now do i.getBucketId()%numBuckets ???? or should that already be in ReduceSinkOperator.... I think the latter but VERIFY return intWritable; } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 7c66955e14..2d96d247ac 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -492,53 +492,10 @@ private static void pause(int timeMillis) { * sorts rows in dictionary order */ private List stringifyValues(int[][] rowsIn) { - assert rowsIn.length > 0; - int[][] rows = rowsIn.clone(); - Arrays.sort(rows, new RowComp()); - List rs = new ArrayList(); - for(int[] row : rows) { - assert row.length > 0; - StringBuilder sb = new StringBuilder(); - for(int value : row) { - sb.append(value).append("\t"); - } - sb.setLength(sb.length() - 1); - rs.add(sb.toString()); - } - return rs; - } - private static final class RowComp implements Comparator { - @Override - public int compare(int[] row1, int[] row2) { - assert row1 != null && row2 != null && row1.length == row2.length; - for(int i = 0; i < row1.length; i++) { - int comp = Integer.compare(row1[i], row2[i]); - if(comp != 0) { - return comp; - } - } - return 0; - } + return TestTxnCommands2.stringifyValues(rowsIn); } private String makeValuesClause(int[][] rows) { - assert rows.length > 0; - StringBuilder sb = new StringBuilder("values"); - for(int[] row : rows) { - assert row.length > 0; - if(row.length > 1) { - sb.append("("); - } - for(int value : row) { - sb.append(value).append(","); - } - sb.setLength(sb.length() - 1);//remove trailing comma - if(row.length > 1) { - sb.append(")"); - } - sb.append(","); - } - sb.setLength(sb.length() - 1);//remove trailing comma - return sb.toString(); + return TestTxnCommands2.makeValuesClause(rows); } private List runStatementOnDriver(String stmt) throws Exception { @@ -558,7 +515,6 @@ private CommandProcessorResponse runStatementOnDriverNegative(String stmt) throw throw new RuntimeException("Didn't get expected failure!"); } -// @Ignore @Test public void exchangePartition() throws Exception { runStatementOnDriver("create database ex1"); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 5786c4f659..a47377f538 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -87,19 +87,27 @@ protected Driver d; protected static enum Table { ACIDTBL("acidTbl"), - ACIDTBLPART("acidTblPart"), + ACIDTBLPART("acidTblPart", "p"), NONACIDORCTBL("nonAcidOrcTbl"), - NONACIDPART("nonAcidPart"), - NONACIDPART2("nonAcidPart2"), - ACIDNESTEDPART("acidNestedPart"); + NONACIDPART("nonAcidPart", "p"), + NONACIDPART2("nonAcidPart2", "p2"), + ACIDNESTEDPART("acidNestedPart", "p,q"); private final String name; + private final String partitionColumns; @Override public String toString() { return name; } + String getPartitionColumns() { + return partitionColumns; + } Table(String name) { + this(name, null); + } + Table(String name, String partitionColumns) { this.name = name; + this.partitionColumns = partitionColumns; } } @@ -1438,12 +1446,13 @@ public void testMergeWithPredicate() throws Exception { String query = "merge into " + Table.ACIDTBL + " t using " + Table.NONACIDPART2 + " s ON t.a = s.a2 " + "WHEN MATCHED AND t.b between 1 and 3 THEN UPDATE set b = s.b2 " + - "WHEN NOT MATCHED and s.b2 >= 11 THEN INSERT VALUES(s.a2, s.b2)"; + "WHEN NOT MATCHED and s.b2 >= 8 THEN INSERT VALUES(s.a2, s.b2)"; runStatementOnDriver(query); r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); - int[][] rExpected = {{2,2},{4,3},{5,6},{7,8},{11,11}}; + int[][] rExpected = {{2,2},{4,3},{5,6},{7,8},{8,8},{11,11}}; Assert.assertEquals(stringifyValues(rExpected), r); + assertUniqueID(Table.ACIDTBL); } /** @@ -1471,6 +1480,7 @@ public void testMerge2() throws Exception { r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); int[][] rExpected = {{2,2},{4,44},{5,5},{7,8},{11,11}}; Assert.assertEquals(stringifyValues(rExpected), r); + assertUniqueID(Table.ACIDTBL); } /** @@ -1497,27 +1507,21 @@ public void testMerge3() throws Exception { int[][] rExpected = {{7,8},{11,11}}; Assert.assertEquals(stringifyValues(rExpected), r); } - /** - * https://hortonworks.jira.com/browse/BUG-66580 - * @throws Exception - */ - @Ignore @Test public void testMultiInsert() throws Exception { - runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " + - "partitioned by (z int) clustered by (a) into 2 buckets " + - "stored as orc tblproperties('transactional'='true')"); runStatementOnDriver("create temporary table if not exists data1 (x int)"); -// runStatementOnDriver("create temporary table if not exists data2 (x int)"); - runStatementOnDriver("insert into data1 values (1),(2),(3)"); -// runStatementOnDriver("insert into data2 values (4),(5),(6)"); d.destroy(); hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); d = new Driver(hiveConf); - List r = runStatementOnDriver(" from data1 " + - "insert into srcpart partition(z) select 0,0,1,x " + - "insert into srcpart partition(z=1) select 0,0,1"); + + runStatementOnDriver(" from data1 " + + "insert into " + Table.ACIDTBLPART + " partition(p) select 0,x, 'p' || x " + + + "insert into " + Table.ACIDTBLPART + " partition(p='p1') select 0,x"); + List r = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b"); + Assert.assertEquals("[p1\t0\t1, p1\t0\t1, p1\t0\t2, p1\t0\t3, p2\t0\t2, p3\t0\t3]", r.toString()); + assertUniqueID(Table.ACIDTBLPART); } /** * Investigating DP and WriteEntity, etc @@ -1583,6 +1587,7 @@ public void testDynamicPartitionsMerge() throws Exception { r1 = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b"); String result= r1.toString(); Assert.assertEquals("[new part\t5\t5, new part\t11\t11, p1\t1\t1, p1\t2\t15, p1\t3\t3, p2\t4\t44]", result); + assertUniqueID(Table.ACIDTBLPART); } /** * Using nested partitions and thus DummyPartition @@ -1605,6 +1610,7 @@ public void testDynamicPartitionsMerge2() throws Exception { "when not matched then insert values(s.a, s.b, 3,4)"); r1 = runStatementOnDriver("select p,q,a,b from " + Table.ACIDNESTEDPART + " order by p,q, a, b"); Assert.assertEquals(stringifyValues(new int[][] {{1,1,1,1},{1,1,3,3},{1,2,2,15},{1,2,4,44},{3,4,5,5},{3,4,11,11}}), r1); + assertUniqueID(Table.ACIDNESTEDPART); } @Ignore("Covered elsewhere") @Test @@ -1661,7 +1667,7 @@ public void testValuesSource() throws Exception { } return rs; } - private static final class RowComp implements Comparator { + static class RowComp implements Comparator { @Override public int compare(int[] row1, int[] row2) { assert row1 != null && row2 != null && row1.length == row2.length; @@ -1674,7 +1680,7 @@ public int compare(int[] row1, int[] row2) { return 0; } } - String makeValuesClause(int[][] rows) { + static String makeValuesClause(int[][] rows) { assert rows.length > 0; StringBuilder sb = new StringBuilder("values"); for(int[] row : rows) { @@ -1705,4 +1711,19 @@ String makeValuesClause(int[][] rows) { d.getResults(rs); return rs; } + final void assertUniqueID(Table table) throws Exception { + String partCols = table.getPartitionColumns(); + //check to make sure there are no duplicate ROW__IDs - HIVE-16832 + StringBuilder sb = new StringBuilder("select "); + if(partCols != null && partCols.length() > 0) { + sb.append(partCols).append(","); + } + sb.append(" ROW__ID, count(*) from ").append(table).append(" group by "); + if(partCols != null && partCols.length() > 0) { + sb.append(partCols).append(","); + } + sb.append("ROW__ID having count(*) > 1"); + List r = runStatementOnDriver(sb.toString()); + Assert.assertTrue(r.toString(),r.size() == 0); + } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java index ea5ecbc842..520e958af3 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java @@ -542,45 +542,4 @@ public void testNonAcidToAcidConversion3() throws Exception { resultCount = 2; Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0))); } - @Ignore("HIVE-14947") - @Test - @Override - public void testDynamicPartitionsMerge() throws Exception {} - @Ignore("HIVE-14947") - @Test - @Override - public void testDynamicPartitionsMerge2() throws Exception {} - @Ignore("HIVE-14947") - @Test - @Override - public void testMerge() throws Exception {} - - /** - * todo: remove this test once HIVE-14947 is done (parent class has a better version) - */ - @Test - @Override - public void testMerge2() throws Exception { - int[][] baseValsOdd = {{5,5},{11,11}}; - int[][] baseValsEven = {{2,2},{4,44}}; - runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd)); - runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(baseValsEven)); - int[][] vals = {{2,1},{4,3},{5,6},{7,8}}; - runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals)); - List r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); - Assert.assertEquals(stringifyValues(vals), r); - String query = "merge into " + Table.ACIDTBL + - " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = source.a2 " + - "WHEN MATCHED THEN UPDATE set b = source.b2 "; - r = runStatementOnDriver(query); - - r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); - int[][] rExpected = {{2,2},{4,44},{5,5},{7,8}}; - Assert.assertEquals(stringifyValues(rExpected), r); - - } - @Ignore("HIVE-14947") - @Test - @Override - public void testMergeWithPredicate() throws Exception {} } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java index 44a94127ec..34df95d29c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdateAndVectorization.java @@ -19,9 +19,12 @@ package org.apache.hadoop.hive.ql; import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.List; + /** * Same as TestTxnCommands2WithSplitUpdate but tests ACID tables with vectorization turned on by * default, and having 'transactional_properties' set to 'default'. This specifically tests the @@ -48,5 +51,28 @@ public void testFailureOnAlteringTransactionalProperties() throws Exception { // To not override this test, that temporary table needs to be renamed. However, as // mentioned this does not serve any purpose, as this test does not relate to vectorization. } + @Test + public void testMultiInsertVectorized() throws Exception { + runStatementOnDriver("create temporary table if not exists data1 (x int)"); + runStatementOnDriver("insert into data1 values (1)"); + d.destroy(); + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + d = new Driver(hiveConf); + + runStatementOnDriver(" from data1 " + + "insert into " + Table.ACIDTBLPART + " partition(p) select 0, 0, 'p' || x " + + + "insert into " + Table.ACIDTBLPART + " partition(p='p1') select 0, 1"); + /*w/o the fix we'd have 2 rows with the same ID in the same bucket of the same partition + * acidtblpart/p=p1/delta_0000015_0000015_0000/bucket_00000 [length: 578] {"operation":0,"originalTransaction":15,"bucket":0,"rowId":0,"currentTransaction":15,"row":{"_col0":0,"_col1":1}} + acidtblpart/p=p1/delta_0000015_0000015_0001/bucket_00000 [length: 568] {"operation":0,"originalTransaction":15,"bucket":0,"rowId":0,"currentTransaction":15,"row":{"_col0":0,"_col1":0}}*/ + List r = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b"); + Assert.assertEquals("[p1\t0\t0, p1\t0\t1]", r.toString()); + assertUniqueID(Table.ACIDTBLPART); + runStatementOnDriver("delete from " + Table.ACIDTBLPART); + r = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b"); + Assert.assertEquals("[]", r.toString()); + + } }