diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java index 429555f414..abbb1251f0 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/mutate/client/lock/Lock.java @@ -182,9 +182,9 @@ private LockRequest buildLockRequest(Long transactionId) { //todo: DataOperationType is set conservatively here, we'd really want to distinguish update/delete //and insert/select and if resource (that is written to) is ACID or not if (sinks.contains(table)) { - componentBuilder.setSemiShared().setOperationType(DataOperationType.UPDATE).setIsFullAcid(true); + componentBuilder.setSemiShared().setOperationType(DataOperationType.UPDATE).setIsTransactional(true); } else { - componentBuilder.setShared().setOperationType(DataOperationType.INSERT).setIsFullAcid(true); + componentBuilder.setShared().setOperationType(DataOperationType.INSERT).setIsTransactional(true); } LockComponent component = componentBuilder.build(); requestBuilder.addLockComponent(component); diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java index e45494254a..0a46faf90d 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/client/lock/TestLock.java @@ -176,13 +176,13 @@ public void testAcquireReadLockCheckLocks() throws Exception { LockComponent expected1 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB"); expected1.setTablename("SOURCE_1"); expected1.setOperationType(DataOperationType.INSERT); - expected1.setIsAcid(true); + expected1.setIsTransactional(true); assertTrue(components.contains(expected1)); LockComponent expected2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB"); expected2.setTablename("SOURCE_2"); expected2.setOperationType(DataOperationType.INSERT); - expected2.setIsAcid(true); + expected2.setIsTransactional(true); assertTrue(components.contains(expected2)); } @@ -203,19 +203,19 @@ public void testAcquireTxnLockCheckLocks() throws Exception { LockComponent expected1 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB"); expected1.setTablename("SOURCE_1"); expected1.setOperationType(DataOperationType.INSERT); - expected1.setIsAcid(true); + expected1.setIsTransactional(true); assertTrue(components.contains(expected1)); LockComponent expected2 = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "DB"); expected2.setTablename("SOURCE_2"); expected2.setOperationType(DataOperationType.INSERT); - expected2.setIsAcid(true); + expected2.setIsTransactional(true); assertTrue(components.contains(expected2)); LockComponent expected3 = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "DB"); expected3.setTablename("SINK"); expected3.setOperationType(DataOperationType.UPDATE); - expected3.setIsAcid(true); + expected3.setIsTransactional(true); assertTrue(components.contains(expected3)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index f99178dbc7..f38f23772d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -2059,6 +2059,14 @@ private int compact(Hive db, AlterTableSimpleDesc desc) throws HiveException { } partName = partitions.get(0).getName(); } + + if (AcidUtils.isInsertOnlyTable(tbl.getParameters())) { + if (desc.getProps() == null) { + desc.setProps(new HashMap<>()); + } + desc.getProps().put("isForInsertOnly".toUpperCase(), "true"); + } + CompactionResponse resp = db.compact2(tbl.getDbName(), tbl.getTableName(), partName, desc.getCompactionType(), desc.getProps()); if(resp.isAccepted()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 683aa954cf..7c28a20226 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -441,7 +441,7 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB continue; } if(t != null) { - compBuilder.setIsFullAcid(AcidUtils.isFullAcidTable(t)); + compBuilder.setIsTransactional(AcidUtils.isFullAcidTable(t) || AcidUtils.isInsertOnlyTable(t)); } LockComponent comp = compBuilder.build(); LOG.debug("Adding lock component to lock request " + comp.toString()); @@ -539,7 +539,7 @@ Seems much cleaner if each stmt is identified as a particular HiveOperation (whi output.getWriteType().toString()); } if(t != null) { - compBuilder.setIsFullAcid(AcidUtils.isFullAcidTable(t)); + compBuilder.setIsTransactional(AcidUtils.isFullAcidTable(t) || AcidUtils.isInsertOnlyTable(t)); } compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite()); diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index af0884c2d3..3fd715c2bb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -219,6 +219,8 @@ public void run() { private void clean(CompactionInfo ci) throws MetaException { LOG.info("Starting cleaning for " + ci.getFullPartitionName()); + boolean isInsertOnlyTable = false; + try { Table t = resolveTable(ci); if (t == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 7eda7fb90a..2dff814628 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -104,6 +104,10 @@ public void run() { continue; } + if (AcidUtils.isInsertOnlyTable(t.getParameters())) { + ci.setIsInsertOnlyTable(true); + } + // check if no compaction set for this table if (noAutoCompactSet(t)) { LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + "=true so we will not compact it."); @@ -235,6 +239,12 @@ private CompactionType checkForCompaction(final CompactionInfo ci, "initiating major compaction"); return CompactionType.MAJOR; } + + // If it is for insert-only transactional table, return null. + if (ci.isInsertOnlyTable()) { + return null; + } + if (runJobAsSelf(runAs)) { return determineCompactionType(ci, writeIds, sd, tblproperties); } else { @@ -261,10 +271,6 @@ private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdLi StorageDescriptor sd, Map tblproperties) throws IOException, InterruptedException { - if (AcidUtils.isInsertOnlyTable(tblproperties)) { - return CompactionType.MINOR; - } - boolean noBase = false; Path location = new Path(sd.getLocation()); FileSystem fs = location.getFileSystem(conf); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java index d4f1dd5a86..e386ef7ba6 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java @@ -591,6 +591,87 @@ public void testInsertOverwriteWithUnionAll() throws Exception { Assert.assertEquals(stringifyValues(rExpected), rs); } + @Test + public void testOperationsOnCompletedTxnComponentsForMmTable() throws Exception { + + // Insert two rows into the table. + runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)"); + // There should be 2 delta directories + verifyDirAndResult(2); + + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), + 2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS")); + + // Initiate a minor compaction request on the table. + runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MAJOR'"); + + // Run worker. + runWorker(hiveConf); + // Run Cleaner. + runCleaner(hiveConf); + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"), + 0, + TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS")); + } + + @Test + public void testSnapshotIsolationWithAbortedTxnOnMmTable() throws Exception { + + // Insert two rows into the table. + runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)"); + // There should be 2 delta directories + verifyDirAndResult(2); + + // Initiate a minor compaction request on the table. + runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MINOR'"); + + // Run Compaction Worker to do compaction. + runWorker(hiveConf); + verifyDirAndResult(2); + + // Start an INSERT statement transaction and roll back this transaction. + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + runStatementOnDriver("insert into " + TableExtended.MMTBL + " values (5, 6)"); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + // There should be 3 delta directories. The new one is the aborted one. + verifyDirAndResult(3); + + // Execute SELECT statement and verify the result set (should be two rows). + int [][] expected = new int[][] {{1,2},{3,4}}; + List rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(expected), rs); + + // Run Cleaner + runCleaner(hiveConf); + verifyDirAndResult(3); + + // Execute SELECT and verify that aborted operation is not counted for MM table. + rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(expected), rs); + + // Run initiator to execute CompactionTxnHandler.cleanEmptyAbortedTxns() + Initiator i = new Initiator(); + i.setThreadId((int)i.getId()); + i.setConf(hiveConf); + AtomicBoolean stop = new AtomicBoolean(true); + i.init(stop, new AtomicBoolean()); + i.run(); + verifyDirAndResult(3); + + // Execute SELECT statement and verify that aborted INSERT statement is not counted. + rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(expected), rs); + + // Initiate a minor compaction request on the table. + runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MINOR'"); + + // Run worker to delete aborted transaction's delta directory. + runWorker(hiveConf); + verifyDirAndResult(2); + } + private void verifyDirAndResult(int expectedDeltas) throws Exception { FileSystem fs = FileSystem.get(hiveConf); // Verify the content of subdirs diff --git standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index ef138e00bd..e84b399e2d 100644 --- standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -15141,9 +15141,9 @@ void LockComponent::__set_operationType(const DataOperationType::type val) { __isset.operationType = true; } -void LockComponent::__set_isAcid(const bool val) { - this->isAcid = val; -__isset.isAcid = true; +void LockComponent::__set_isTransactional(const bool val) { + this->isTransactional = val; +__isset.isTransactional = true; } void LockComponent::__set_isDynamicPartitionWrite(const bool val) { @@ -15231,8 +15231,8 @@ uint32_t LockComponent::read(::apache::thrift::protocol::TProtocol* iprot) { break; case 7: if (ftype == ::apache::thrift::protocol::T_BOOL) { - xfer += iprot->readBool(this->isAcid); - this->__isset.isAcid = true; + xfer += iprot->readBool(this->isTransactional); + this->__isset.isTransactional = true; } else { xfer += iprot->skip(ftype); } @@ -15295,9 +15295,9 @@ uint32_t LockComponent::write(::apache::thrift::protocol::TProtocol* oprot) cons xfer += oprot->writeI32((int32_t)this->operationType); xfer += oprot->writeFieldEnd(); } - if (this->__isset.isAcid) { - xfer += oprot->writeFieldBegin("isAcid", ::apache::thrift::protocol::T_BOOL, 7); - xfer += oprot->writeBool(this->isAcid); + if (this->__isset.isTransactional) { + xfer += oprot->writeFieldBegin("isTransactional", ::apache::thrift::protocol::T_BOOL, 7); + xfer += oprot->writeBool(this->isTransactional); xfer += oprot->writeFieldEnd(); } if (this->__isset.isDynamicPartitionWrite) { @@ -15318,7 +15318,7 @@ void swap(LockComponent &a, LockComponent &b) { swap(a.tablename, b.tablename); swap(a.partitionname, b.partitionname); swap(a.operationType, b.operationType); - swap(a.isAcid, b.isAcid); + swap(a.isTransactional, b.isTransactional); swap(a.isDynamicPartitionWrite, b.isDynamicPartitionWrite); swap(a.__isset, b.__isset); } @@ -15330,7 +15330,7 @@ LockComponent::LockComponent(const LockComponent& other662) { tablename = other662.tablename; partitionname = other662.partitionname; operationType = other662.operationType; - isAcid = other662.isAcid; + isTransactional = other662.isTransactional; isDynamicPartitionWrite = other662.isDynamicPartitionWrite; __isset = other662.__isset; } @@ -15341,7 +15341,7 @@ LockComponent& LockComponent::operator=(const LockComponent& other663) { tablename = other663.tablename; partitionname = other663.partitionname; operationType = other663.operationType; - isAcid = other663.isAcid; + isTransactional = other663.isTransactional; isDynamicPartitionWrite = other663.isDynamicPartitionWrite; __isset = other663.__isset; return *this; @@ -15355,7 +15355,7 @@ void LockComponent::printTo(std::ostream& out) const { out << ", " << "tablename="; (__isset.tablename ? (out << to_string(tablename)) : (out << "")); out << ", " << "partitionname="; (__isset.partitionname ? (out << to_string(partitionname)) : (out << "")); out << ", " << "operationType="; (__isset.operationType ? (out << to_string(operationType)) : (out << "")); - out << ", " << "isAcid="; (__isset.isAcid ? (out << to_string(isAcid)) : (out << "")); + out << ", " << "isTransactional="; (__isset.isTransactional ? (out << to_string(isTransactional)) : (out << "")); out << ", " << "isDynamicPartitionWrite="; (__isset.isDynamicPartitionWrite ? (out << to_string(isDynamicPartitionWrite)) : (out << "")); out << ")"; } diff --git standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h index 835cbb3308..c22ccd7c9e 100644 --- standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ standalone-metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -6255,11 +6255,11 @@ inline std::ostream& operator<<(std::ostream& out, const AllocateTableWriteIdsRe } typedef struct _LockComponent__isset { - _LockComponent__isset() : tablename(false), partitionname(false), operationType(true), isAcid(true), isDynamicPartitionWrite(true) {} + _LockComponent__isset() : tablename(false), partitionname(false), operationType(true), isTransactional(true), isDynamicPartitionWrite(true) {} bool tablename :1; bool partitionname :1; bool operationType :1; - bool isAcid :1; + bool isTransactional :1; bool isDynamicPartitionWrite :1; } _LockComponent__isset; @@ -6268,7 +6268,7 @@ class LockComponent { LockComponent(const LockComponent&); LockComponent& operator=(const LockComponent&); - LockComponent() : type((LockType::type)0), level((LockLevel::type)0), dbname(), tablename(), partitionname(), operationType((DataOperationType::type)5), isAcid(false), isDynamicPartitionWrite(false) { + LockComponent() : type((LockType::type)0), level((LockLevel::type)0), dbname(), tablename(), partitionname(), operationType((DataOperationType::type)5), isTransactional(false), isDynamicPartitionWrite(false) { operationType = (DataOperationType::type)5; } @@ -6280,7 +6280,7 @@ class LockComponent { std::string tablename; std::string partitionname; DataOperationType::type operationType; - bool isAcid; + bool isTransactional; bool isDynamicPartitionWrite; _LockComponent__isset __isset; @@ -6297,7 +6297,7 @@ class LockComponent { void __set_operationType(const DataOperationType::type val); - void __set_isAcid(const bool val); + void __set_isTransactional(const bool val); void __set_isDynamicPartitionWrite(const bool val); @@ -6321,9 +6321,9 @@ class LockComponent { return false; else if (__isset.operationType && !(operationType == rhs.operationType)) return false; - if (__isset.isAcid != rhs.__isset.isAcid) + if (__isset.isTransactional != rhs.__isset.isTransactional) return false; - else if (__isset.isAcid && !(isAcid == rhs.isAcid)) + else if (__isset.isTransactional && !(isTransactional == rhs.isTransactional)) return false; if (__isset.isDynamicPartitionWrite != rhs.__isset.isDynamicPartitionWrite) return false; diff --git standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockComponent.java standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockComponent.java index 0307540f50..77de5c9cf8 100644 --- standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockComponent.java +++ standalone-metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockComponent.java @@ -44,7 +44,7 @@ private static final org.apache.thrift.protocol.TField TABLENAME_FIELD_DESC = new org.apache.thrift.protocol.TField("tablename", org.apache.thrift.protocol.TType.STRING, (short)4); private static final org.apache.thrift.protocol.TField PARTITIONNAME_FIELD_DESC = new org.apache.thrift.protocol.TField("partitionname", org.apache.thrift.protocol.TType.STRING, (short)5); private static final org.apache.thrift.protocol.TField OPERATION_TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("operationType", org.apache.thrift.protocol.TType.I32, (short)6); - private static final org.apache.thrift.protocol.TField IS_ACID_FIELD_DESC = new org.apache.thrift.protocol.TField("isAcid", org.apache.thrift.protocol.TType.BOOL, (short)7); + private static final org.apache.thrift.protocol.TField IS_TRANSACTIONAL_FIELD_DESC = new org.apache.thrift.protocol.TField("isTransactional", org.apache.thrift.protocol.TType.BOOL, (short)7); private static final org.apache.thrift.protocol.TField IS_DYNAMIC_PARTITION_WRITE_FIELD_DESC = new org.apache.thrift.protocol.TField("isDynamicPartitionWrite", org.apache.thrift.protocol.TType.BOOL, (short)8); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); @@ -59,7 +59,7 @@ private String tablename; // optional private String partitionname; // optional private DataOperationType operationType; // optional - private boolean isAcid; // optional + private boolean isTransactional; // optional private boolean isDynamicPartitionWrite; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @@ -82,7 +82,7 @@ * @see DataOperationType */ OPERATION_TYPE((short)6, "operationType"), - IS_ACID((short)7, "isAcid"), + IS_TRANSACTIONAL((short)7, "isTransactional"), IS_DYNAMIC_PARTITION_WRITE((short)8, "isDynamicPartitionWrite"); private static final Map byName = new HashMap(); @@ -110,8 +110,8 @@ public static _Fields findByThriftId(int fieldId) { return PARTITIONNAME; case 6: // OPERATION_TYPE return OPERATION_TYPE; - case 7: // IS_ACID - return IS_ACID; + case 7: // IS_TRANSACTIONAL + return IS_TRANSACTIONAL; case 8: // IS_DYNAMIC_PARTITION_WRITE return IS_DYNAMIC_PARTITION_WRITE; default: @@ -154,10 +154,10 @@ public String getFieldName() { } // isset id assignments - private static final int __ISACID_ISSET_ID = 0; + private static final int __ISTRANSACTIONAL_ISSET_ID = 0; private static final int __ISDYNAMICPARTITIONWRITE_ISSET_ID = 1; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.TABLENAME,_Fields.PARTITIONNAME,_Fields.OPERATION_TYPE,_Fields.IS_ACID,_Fields.IS_DYNAMIC_PARTITION_WRITE}; + private static final _Fields optionals[] = {_Fields.TABLENAME,_Fields.PARTITIONNAME,_Fields.OPERATION_TYPE,_Fields.IS_TRANSACTIONAL,_Fields.IS_DYNAMIC_PARTITION_WRITE}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -173,7 +173,7 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.OPERATION_TYPE, new org.apache.thrift.meta_data.FieldMetaData("operationType", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, DataOperationType.class))); - tmpMap.put(_Fields.IS_ACID, new org.apache.thrift.meta_data.FieldMetaData("isAcid", org.apache.thrift.TFieldRequirementType.OPTIONAL, + tmpMap.put(_Fields.IS_TRANSACTIONAL, new org.apache.thrift.meta_data.FieldMetaData("isTransactional", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); tmpMap.put(_Fields.IS_DYNAMIC_PARTITION_WRITE, new org.apache.thrift.meta_data.FieldMetaData("isDynamicPartitionWrite", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); @@ -184,7 +184,7 @@ public String getFieldName() { public LockComponent() { this.operationType = org.apache.hadoop.hive.metastore.api.DataOperationType.UNSET; - this.isAcid = false; + this.isTransactional = false; this.isDynamicPartitionWrite = false; @@ -224,7 +224,7 @@ public LockComponent(LockComponent other) { if (other.isSetOperationType()) { this.operationType = other.operationType; } - this.isAcid = other.isAcid; + this.isTransactional = other.isTransactional; this.isDynamicPartitionWrite = other.isDynamicPartitionWrite; } @@ -241,7 +241,7 @@ public void clear() { this.partitionname = null; this.operationType = org.apache.hadoop.hive.metastore.api.DataOperationType.UNSET; - this.isAcid = false; + this.isTransactional = false; this.isDynamicPartitionWrite = false; @@ -409,26 +409,26 @@ public void setOperationTypeIsSet(boolean value) { } } - public boolean isIsAcid() { - return this.isAcid; + public boolean isIsTransactional() { + return this.isTransactional; } - public void setIsAcid(boolean isAcid) { - this.isAcid = isAcid; - setIsAcidIsSet(true); + public void setIsTransactional(boolean isTransactional) { + this.isTransactional = isTransactional; + setIsTransactionalIsSet(true); } - public void unsetIsAcid() { - __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __ISACID_ISSET_ID); + public void unsetIsTransactional() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __ISTRANSACTIONAL_ISSET_ID); } - /** Returns true if field isAcid is set (has been assigned a value) and false otherwise */ - public boolean isSetIsAcid() { - return EncodingUtils.testBit(__isset_bitfield, __ISACID_ISSET_ID); + /** Returns true if field isTransactional is set (has been assigned a value) and false otherwise */ + public boolean isSetIsTransactional() { + return EncodingUtils.testBit(__isset_bitfield, __ISTRANSACTIONAL_ISSET_ID); } - public void setIsAcidIsSet(boolean value) { - __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __ISACID_ISSET_ID, value); + public void setIsTransactionalIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __ISTRANSACTIONAL_ISSET_ID, value); } public boolean isIsDynamicPartitionWrite() { @@ -503,11 +503,11 @@ public void setFieldValue(_Fields field, Object value) { } break; - case IS_ACID: + case IS_TRANSACTIONAL: if (value == null) { - unsetIsAcid(); + unsetIsTransactional(); } else { - setIsAcid((Boolean)value); + setIsTransactional((Boolean)value); } break; @@ -542,8 +542,8 @@ public Object getFieldValue(_Fields field) { case OPERATION_TYPE: return getOperationType(); - case IS_ACID: - return isIsAcid(); + case IS_TRANSACTIONAL: + return isIsTransactional(); case IS_DYNAMIC_PARTITION_WRITE: return isIsDynamicPartitionWrite(); @@ -571,8 +571,8 @@ public boolean isSet(_Fields field) { return isSetPartitionname(); case OPERATION_TYPE: return isSetOperationType(); - case IS_ACID: - return isSetIsAcid(); + case IS_TRANSACTIONAL: + return isSetIsTransactional(); case IS_DYNAMIC_PARTITION_WRITE: return isSetIsDynamicPartitionWrite(); } @@ -646,12 +646,12 @@ public boolean equals(LockComponent that) { return false; } - boolean this_present_isAcid = true && this.isSetIsAcid(); - boolean that_present_isAcid = true && that.isSetIsAcid(); - if (this_present_isAcid || that_present_isAcid) { - if (!(this_present_isAcid && that_present_isAcid)) + boolean this_present_isTransactional = true && this.isSetIsTransactional(); + boolean that_present_isTransactional = true && that.isSetIsTransactional(); + if (this_present_isTransactional || that_present_isTransactional) { + if (!(this_present_isTransactional && that_present_isTransactional)) return false; - if (this.isAcid != that.isAcid) + if (this.isTransactional != that.isTransactional) return false; } @@ -701,10 +701,10 @@ public int hashCode() { if (present_operationType) list.add(operationType.getValue()); - boolean present_isAcid = true && (isSetIsAcid()); - list.add(present_isAcid); - if (present_isAcid) - list.add(isAcid); + boolean present_isTransactional = true && (isSetIsTransactional()); + list.add(present_isTransactional); + if (present_isTransactional) + list.add(isTransactional); boolean present_isDynamicPartitionWrite = true && (isSetIsDynamicPartitionWrite()); list.add(present_isDynamicPartitionWrite); @@ -782,12 +782,12 @@ public int compareTo(LockComponent other) { return lastComparison; } } - lastComparison = Boolean.valueOf(isSetIsAcid()).compareTo(other.isSetIsAcid()); + lastComparison = Boolean.valueOf(isSetIsTransactional()).compareTo(other.isSetIsTransactional()); if (lastComparison != 0) { return lastComparison; } - if (isSetIsAcid()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.isAcid, other.isAcid); + if (isSetIsTransactional()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.isTransactional, other.isTransactional); if (lastComparison != 0) { return lastComparison; } @@ -875,10 +875,10 @@ public String toString() { } first = false; } - if (isSetIsAcid()) { + if (isSetIsTransactional()) { if (!first) sb.append(", "); - sb.append("isAcid:"); - sb.append(this.isAcid); + sb.append("isTransactional:"); + sb.append(this.isTransactional); first = false; } if (isSetIsDynamicPartitionWrite()) { @@ -992,10 +992,10 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, LockComponent struc org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 7: // IS_ACID + case 7: // IS_TRANSACTIONAL if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { - struct.isAcid = iprot.readBool(); - struct.setIsAcidIsSet(true); + struct.isTransactional = iprot.readBool(); + struct.setIsTransactionalIsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1057,9 +1057,9 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, LockComponent stru oprot.writeFieldEnd(); } } - if (struct.isSetIsAcid()) { - oprot.writeFieldBegin(IS_ACID_FIELD_DESC); - oprot.writeBool(struct.isAcid); + if (struct.isSetIsTransactional()) { + oprot.writeFieldBegin(IS_TRANSACTIONAL_FIELD_DESC); + oprot.writeBool(struct.isTransactional); oprot.writeFieldEnd(); } if (struct.isSetIsDynamicPartitionWrite()) { @@ -1097,7 +1097,7 @@ public void write(org.apache.thrift.protocol.TProtocol prot, LockComponent struc if (struct.isSetOperationType()) { optionals.set(2); } - if (struct.isSetIsAcid()) { + if (struct.isSetIsTransactional()) { optionals.set(3); } if (struct.isSetIsDynamicPartitionWrite()) { @@ -1113,8 +1113,8 @@ public void write(org.apache.thrift.protocol.TProtocol prot, LockComponent struc if (struct.isSetOperationType()) { oprot.writeI32(struct.operationType.getValue()); } - if (struct.isSetIsAcid()) { - oprot.writeBool(struct.isAcid); + if (struct.isSetIsTransactional()) { + oprot.writeBool(struct.isTransactional); } if (struct.isSetIsDynamicPartitionWrite()) { oprot.writeBool(struct.isDynamicPartitionWrite); @@ -1144,8 +1144,8 @@ public void read(org.apache.thrift.protocol.TProtocol prot, LockComponent struct struct.setOperationTypeIsSet(true); } if (incoming.get(3)) { - struct.isAcid = iprot.readBool(); - struct.setIsAcidIsSet(true); + struct.isTransactional = iprot.readBool(); + struct.setIsTransactionalIsSet(true); } if (incoming.get(4)) { struct.isDynamicPartitionWrite = iprot.readBool(); diff --git standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php index a6047bf7b3..50e0d87546 100644 --- standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php +++ standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -15134,7 +15134,7 @@ class LockComponent { /** * @var bool */ - public $isAcid = false; + public $isTransactional = false; /** * @var bool */ @@ -15168,7 +15168,7 @@ class LockComponent { 'type' => TType::I32, ), 7 => array( - 'var' => 'isAcid', + 'var' => 'isTransactional', 'type' => TType::BOOL, ), 8 => array( @@ -15196,8 +15196,8 @@ class LockComponent { if (isset($vals['operationType'])) { $this->operationType = $vals['operationType']; } - if (isset($vals['isAcid'])) { - $this->isAcid = $vals['isAcid']; + if (isset($vals['isTransactional'])) { + $this->isTransactional = $vals['isTransactional']; } if (isset($vals['isDynamicPartitionWrite'])) { $this->isDynamicPartitionWrite = $vals['isDynamicPartitionWrite']; @@ -15268,7 +15268,7 @@ class LockComponent { break; case 7: if ($ftype == TType::BOOL) { - $xfer += $input->readBool($this->isAcid); + $xfer += $input->readBool($this->isTransactional); } else { $xfer += $input->skip($ftype); } @@ -15323,9 +15323,9 @@ class LockComponent { $xfer += $output->writeI32($this->operationType); $xfer += $output->writeFieldEnd(); } - if ($this->isAcid !== null) { - $xfer += $output->writeFieldBegin('isAcid', TType::BOOL, 7); - $xfer += $output->writeBool($this->isAcid); + if ($this->isTransactional !== null) { + $xfer += $output->writeFieldBegin('isTransactional', TType::BOOL, 7); + $xfer += $output->writeBool($this->isTransactional); $xfer += $output->writeFieldEnd(); } if ($this->isDynamicPartitionWrite !== null) { diff --git standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 0c60aff5aa..a51759d259 100644 --- standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -10538,7 +10538,7 @@ class LockComponent: - tablename - partitionname - operationType - - isAcid + - isTransactional - isDynamicPartitionWrite """ @@ -10550,18 +10550,18 @@ class LockComponent: (4, TType.STRING, 'tablename', None, None, ), # 4 (5, TType.STRING, 'partitionname', None, None, ), # 5 (6, TType.I32, 'operationType', None, 5, ), # 6 - (7, TType.BOOL, 'isAcid', None, False, ), # 7 + (7, TType.BOOL, 'isTransactional', None, False, ), # 7 (8, TType.BOOL, 'isDynamicPartitionWrite', None, False, ), # 8 ) - def __init__(self, type=None, level=None, dbname=None, tablename=None, partitionname=None, operationType=thrift_spec[6][4], isAcid=thrift_spec[7][4], isDynamicPartitionWrite=thrift_spec[8][4],): + def __init__(self, type=None, level=None, dbname=None, tablename=None, partitionname=None, operationType=thrift_spec[6][4], isTransactional=thrift_spec[7][4], isDynamicPartitionWrite=thrift_spec[8][4],): self.type = type self.level = level self.dbname = dbname self.tablename = tablename self.partitionname = partitionname self.operationType = operationType - self.isAcid = isAcid + self.isTransactional = isTransactional self.isDynamicPartitionWrite = isDynamicPartitionWrite def read(self, iprot): @@ -10605,7 +10605,7 @@ def read(self, iprot): iprot.skip(ftype) elif fid == 7: if ftype == TType.BOOL: - self.isAcid = iprot.readBool() + self.isTransactional = iprot.readBool() else: iprot.skip(ftype) elif fid == 8: @@ -10647,9 +10647,9 @@ def write(self, oprot): oprot.writeFieldBegin('operationType', TType.I32, 6) oprot.writeI32(self.operationType) oprot.writeFieldEnd() - if self.isAcid is not None: - oprot.writeFieldBegin('isAcid', TType.BOOL, 7) - oprot.writeBool(self.isAcid) + if self.isTransactional is not None: + oprot.writeFieldBegin('isTransactional', TType.BOOL, 7) + oprot.writeBool(self.isTransactional) oprot.writeFieldEnd() if self.isDynamicPartitionWrite is not None: oprot.writeFieldBegin('isDynamicPartitionWrite', TType.BOOL, 8) @@ -10676,7 +10676,7 @@ def __hash__(self): value = (value * 31) ^ hash(self.tablename) value = (value * 31) ^ hash(self.partitionname) value = (value * 31) ^ hash(self.operationType) - value = (value * 31) ^ hash(self.isAcid) + value = (value * 31) ^ hash(self.isTransactional) value = (value * 31) ^ hash(self.isDynamicPartitionWrite) return value diff --git standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index 625baae566..6efbb87a7f 100644 --- standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -2353,7 +2353,7 @@ class LockComponent TABLENAME = 4 PARTITIONNAME = 5 OPERATIONTYPE = 6 - ISACID = 7 + ISTRANSACTIONAL = 7 ISDYNAMICPARTITIONWRITE = 8 FIELDS = { @@ -2363,7 +2363,7 @@ class LockComponent TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tablename', :optional => true}, PARTITIONNAME => {:type => ::Thrift::Types::STRING, :name => 'partitionname', :optional => true}, OPERATIONTYPE => {:type => ::Thrift::Types::I32, :name => 'operationType', :default => 5, :optional => true, :enum_class => ::DataOperationType}, - ISACID => {:type => ::Thrift::Types::BOOL, :name => 'isAcid', :default => false, :optional => true}, + ISTRANSACTIONAL => {:type => ::Thrift::Types::BOOL, :name => 'isTransactional', :default => false, :optional => true}, ISDYNAMICPARTITIONWRITE => {:type => ::Thrift::Types::BOOL, :name => 'isDynamicPartitionWrite', :default => false, :optional => true} } diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java index 6e4518740d..1ad06381cc 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java @@ -77,8 +77,8 @@ public LockComponentBuilder setOperationType(DataOperationType dop) { return this; } - public LockComponentBuilder setIsFullAcid(boolean t) { - component.setIsAcid(t); + public LockComponentBuilder setIsTransactional(boolean t) { + component.setIsTransactional(t); return this; } /** diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index b74f69dffd..967e109b02 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -20,10 +20,13 @@ import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.utils.StringableMap; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.Properties; /** * Information on a possible or running compaction. @@ -40,6 +43,7 @@ public String runAs; public String properties; public boolean tooManyAborts = false; + public boolean isInsertOnlyTable = false; /** * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL) * See {@link TxnStore#setCompactionHighestWriteId(CompactionInfo, long)} for precise definition. @@ -89,6 +93,27 @@ public String getFullTableName() { } return fullTableName; } + + public boolean isInsertOnlyTable() { + return isInsertOnlyTable; + } + + public void setIsInsertOnlyTable(String stringProps) { + if (stringProps != null) { + StringableMap sm = new StringableMap(stringProps); + Properties props = sm.toProperties(); + String str = props.getProperty("isForInsertOnly".toUpperCase()); + if (str == null || !str.equalsIgnoreCase("true")) { + return; + } + this.isInsertOnlyTable = true; + } + } + + public void setIsInsertOnlyTable(boolean insertOnlyTable) { + isInsertOnlyTable = insertOnlyTable; + } + public boolean isMajorCompaction() { return CompactionType.MAJOR == type; } @@ -130,6 +155,7 @@ static CompactionInfo loadFullFromCompactionQueue(ResultSet rs) throws SQLExcept fullCi.highestWriteId = rs.getLong(11); fullCi.metaInfo = rs.getBytes(12); fullCi.hadoopJobId = rs.getString(13); + fullCi.setIsInsertOnlyTable(fullCi.properties); return fullCi; } static void insertIntoCompletedCompactions(PreparedStatement pStmt, CompactionInfo ci, long endTime) throws SQLException { diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index ba006cf9ec..0d590ffa1c 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -287,7 +287,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select cq_id, cq_database, cq_table, cq_partition, " - + "cq_type, cq_run_as, cq_highest_write_id from COMPACTION_QUEUE where cq_state = '" + + "cq_type, cq_tblproperties, cq_run_as, cq_highest_write_id from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -302,8 +302,11 @@ public void markCompacted(CompactionInfo info) throws MetaException { case MINOR_TYPE: info.type = CompactionType.MINOR; break; default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); } - info.runAs = rs.getString(6); - info.highestWriteId = rs.getLong(7); + info.properties = rs.getString(6); + info.setIsInsertOnlyTable(info.properties); + info.runAs = rs.getString(7); + info.highestWriteId = rs.getLong(8); + rc.add(info); } LOG.debug("Going to rollback"); diff --git standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 6a745948a3..f36eaa02c8 100644 --- standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -123,6 +123,7 @@ import org.apache.hadoop.hive.metastore.metrics.MetricsConstants; import org.apache.hadoop.hive.metastore.tools.SQLGenerator; import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.metastore.utils.StringableMap; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -1318,7 +1319,7 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc // For each component in this lock request, // add an entry to the txn_components table for (LockComponent lc : rqst.getComponent()) { - if(lc.isSetIsAcid() && !lc.isIsAcid()) { + if(lc.isSetIsTransactional() && !lc.isIsTransactional()) { //we don't prevent using non-acid resources in a txn but we do lock them continue; } @@ -1404,7 +1405,7 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { //old version of thrift client should have (lc.isSetOperationType() == false) but they do not //If you add a default value to a variable, isSet() for that variable is true regardless of the where the - //message was created (for object variables. It works correctly for boolean vars, e.g. LockComponent.isAcid). + //message was created (for object variables. It works correctly for boolean vars, e.g. LockComponent.isTransactional). //in test mode, upgrades are not tested, so client version and server version of thrift always matches so //we see UNSET here it means something didn't set the appropriate value. throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " diff --git standalone-metastore/src/main/thrift/hive_metastore.thrift standalone-metastore/src/main/thrift/hive_metastore.thrift index b11ee380b4..b3606a1922 100644 --- standalone-metastore/src/main/thrift/hive_metastore.thrift +++ standalone-metastore/src/main/thrift/hive_metastore.thrift @@ -775,7 +775,7 @@ struct LockComponent { 4: optional string tablename, 5: optional string partitionname, 6: optional DataOperationType operationType = DataOperationType.UNSET, - 7: optional bool isAcid = false, + 7: optional bool isTransactional = false, 8: optional bool isDynamicPartitionWrite = false }