diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e3ddbf197b..0dd2419e9e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2849,6 +2849,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Number of aborted transactions involving a given table or partition that will trigger\n" + "a major compaction."), + HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD("hive.compactor.aborted.txn.time.threshold", "12h", + new TimeValidator(TimeUnit.HOURS), + "Age of table/partition's oldest aborted transaction when compaction will be triggered. " + + "Default time unit is: hours. Set to a negative number to disable."), + HIVE_COMPACTOR_WAIT_TIMEOUT("hive.compactor.wait.timeout", 300000L, "Time out in " + "milliseconds for blocking compaction. It's value has to be higher than 2000 milliseconds. "), /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 37a5862791..a7a4219e2f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -89,6 +89,9 @@ public void run() { int abortedThreshold = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD); + long abortedTimeThreshold = HiveConf + .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, + TimeUnit.MILLISECONDS); // Make sure we run through the loop once before checking to stop as this makes testing // much easier. The stop value is only for testing anyway and not used when called from @@ -109,7 +112,8 @@ public void run() { //todo: add method to only get current i.e. skip history - more efficient ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); - Set potentials = txnHandler.findPotentialCompactions(abortedThreshold, compactionInterval) + Set potentials = txnHandler.findPotentialCompactions(abortedThreshold, + abortedTimeThreshold, compactionInterval) .stream().filter(ci -> checkCompactionElig(ci, currentCompactions)).collect(Collectors.toSet()); LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); @@ -271,6 +275,16 @@ private CompactionType checkForCompaction(final CompactionInfo ci, return CompactionType.MAJOR; } + if (ci.hasOldAbort) { + HiveConf.ConfVars oldAbortedTimeoutProp = + HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD; + LOG.debug("Found an aborted transaction for " + ci.getFullPartitionName() + + " with age older than threshold " + oldAbortedTimeoutProp + ": " + conf + .getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. " + + "Initiating minor compaction."); + return CompactionType.MINOR; + } + if (runJobAsSelf(runAs)) { return determineCompactionType(ci, writeIds, sd, tblproperties); } else { diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index 15fcfc0e35..8828953d01 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -410,7 +410,7 @@ public void testFindPotentialCompactions() throws Exception { txnHandler.commitTxn(new CommitTxnRequest(txnid)); assertEquals(0, txnHandler.numLocksInLockTable()); - Set potentials = txnHandler.findPotentialCompactions(100); + Set potentials = txnHandler.findPotentialCompactions(100, -1L); assertEquals(2, potentials.size()); boolean sawMyTable = false, sawYourTable = false; for (CompactionInfo ci : potentials) { @@ -422,13 +422,13 @@ public void testFindPotentialCompactions() throws Exception { assertTrue(sawMyTable); assertTrue(sawYourTable); - potentials = txnHandler.findPotentialCompactions(100, 1); + potentials = txnHandler.findPotentialCompactions(100, -1, 1); assertEquals(2, potentials.size()); //simulate auto-compaction interval TimeUnit.SECONDS.sleep(2); - potentials = txnHandler.findPotentialCompactions(100, 1); + potentials = txnHandler.findPotentialCompactions(100, -1, 1); assertEquals(0, potentials.size()); //simulate prev failed compaction @@ -437,7 +437,7 @@ public void testFindPotentialCompactions() throws Exception { CompactionInfo ci = txnHandler.findNextToCompact("fred"); txnHandler.markFailed(ci); - potentials = txnHandler.findPotentialCompactions(100, 1); + potentials = txnHandler.findPotentialCompactions(100, -1, 1); assertEquals(1, potentials.size()); } @@ -565,7 +565,7 @@ public void addDynamicPartitions() throws Exception { txnHandler.addDynamicPartitions(adp); txnHandler.commitTxn(new CommitTxnRequest(txnId)); - Set potentials = txnHandler.findPotentialCompactions(1000); + Set potentials = txnHandler.findPotentialCompactions(1000, -1L); assertEquals(2, potentials.size()); SortedSet sorted = new TreeSet(potentials); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index 1151466f8c..bc79ff588d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -50,6 +50,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; + /** * Tests for the compactor Initiator thread. */ @@ -232,6 +233,50 @@ public void cleanEmptyAbortedTxns() throws Exception { Assert.assertEquals(1, openTxns.getOpen_txnsSize()); } + /** + * Test that HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD triggers compaction. + * + * @throws Exception + */ + @Test + public void compactExpiredAbortedTxns() throws Exception { + Table t = newTable("default", "expiredAbortedTxns", false); + // abort a txn + long txnid = openTxn(); + LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default"); + comp.setOperationType(DataOperationType.DELETE); + comp.setTablename("expiredAbortedTxns"); + List components = new ArrayList(1); + components.add(comp); + LockRequest req = new LockRequest(components, "me", "localhost"); + req.setTxnid(txnid); + txnHandler.lock(req); + txnHandler.abortTxn(new AbortTxnRequest(txnid)); + + // before setting, check that no compaction is queued + initiateAndVerifyCompactionQueueLength(0); + + // negative number disables threshold check + conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, -1, + TimeUnit.MILLISECONDS); + Thread.sleep(1L); + initiateAndVerifyCompactionQueueLength(0); + + // set to 1 ms, wait 1 ms, and check that minor compaction is queued + conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, 1, TimeUnit.MILLISECONDS); + Thread.sleep(1L); + ShowCompactResponse rsp = initiateAndVerifyCompactionQueueLength(1); + Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType()); + } + + private ShowCompactResponse initiateAndVerifyCompactionQueueLength(int expectedLength) + throws Exception { + startInitiator(); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + Assert.assertEquals(expectedLength, rsp.getCompactsSize()); + return rsp; + } + @Test public void noCompactWhenNoCompactSet() throws Exception { Map parameters = new HashMap(1); diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java index 31b6ed450b..b338f47592 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java @@ -51,6 +51,7 @@ private static final org.apache.thrift.protocol.TField START_FIELD_DESC = new org.apache.thrift.protocol.TField("start", org.apache.thrift.protocol.TType.I64, (short)11); private static final org.apache.thrift.protocol.TField HIGHEST_WRITE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("highestWriteId", org.apache.thrift.protocol.TType.I64, (short)12); private static final org.apache.thrift.protocol.TField ERROR_MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("errorMessage", org.apache.thrift.protocol.TType.STRING, (short)13); + private static final org.apache.thrift.protocol.TField HASOLDABORT_FIELD_DESC = new org.apache.thrift.protocol.TField("hasoldabort", org.apache.thrift.protocol.TType.BOOL, (short)14); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -71,6 +72,7 @@ private long start; // optional private long highestWriteId; // optional private String errorMessage; // optional + private boolean hasoldabort; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -90,7 +92,8 @@ WORKER_ID((short)10, "workerId"), START((short)11, "start"), HIGHEST_WRITE_ID((short)12, "highestWriteId"), - ERROR_MESSAGE((short)13, "errorMessage"); + ERROR_MESSAGE((short)13, "errorMessage"), + HASOLDABORT((short)14, "hasoldabort"); private static final Map byName = new HashMap(); @@ -131,6 +134,8 @@ public static _Fields findByThriftId(int fieldId) { return HIGHEST_WRITE_ID; case 13: // ERROR_MESSAGE return ERROR_MESSAGE; + case 14: // HASOLDABORT + return HASOLDABORT; default: return null; } @@ -175,8 +180,9 @@ public String getFieldName() { private static final int __TOOMANYABORTS_ISSET_ID = 1; private static final int __START_ISSET_ID = 2; private static final int __HIGHESTWRITEID_ISSET_ID = 3; + private static final int __HASOLDABORT_ISSET_ID = 4; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.TOOMANYABORTS,_Fields.STATE,_Fields.WORKER_ID,_Fields.START,_Fields.HIGHEST_WRITE_ID,_Fields.ERROR_MESSAGE}; + private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.TOOMANYABORTS,_Fields.STATE,_Fields.WORKER_ID,_Fields.START,_Fields.HIGHEST_WRITE_ID,_Fields.ERROR_MESSAGE,_Fields.HASOLDABORT}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -206,6 +212,8 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); tmpMap.put(_Fields.ERROR_MESSAGE, new org.apache.thrift.meta_data.FieldMetaData("errorMessage", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.HASOLDABORT, new org.apache.thrift.meta_data.FieldMetaData("hasoldabort", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CompactionInfoStruct.class, metaDataMap); } @@ -263,6 +271,7 @@ public CompactionInfoStruct(CompactionInfoStruct other) { if (other.isSetErrorMessage()) { this.errorMessage = other.errorMessage; } + this.hasoldabort = other.hasoldabort; } public CompactionInfoStruct deepCopy() { @@ -288,6 +297,8 @@ public void clear() { setHighestWriteIdIsSet(false); this.highestWriteId = 0; this.errorMessage = null; + setHasoldabortIsSet(false); + this.hasoldabort = false; } public long getId() { @@ -593,6 +604,28 @@ public void setErrorMessageIsSet(boolean value) { } } + public boolean isHasoldabort() { + return this.hasoldabort; + } + + public void setHasoldabort(boolean hasoldabort) { + this.hasoldabort = hasoldabort; + setHasoldabortIsSet(true); + } + + public void unsetHasoldabort() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __HASOLDABORT_ISSET_ID); + } + + /** Returns true if field hasoldabort is set (has been assigned a value) and false otherwise */ + public boolean isSetHasoldabort() { + return EncodingUtils.testBit(__isset_bitfield, __HASOLDABORT_ISSET_ID); + } + + public void setHasoldabortIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __HASOLDABORT_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case ID: @@ -699,6 +732,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case HASOLDABORT: + if (value == null) { + unsetHasoldabort(); + } else { + setHasoldabort((Boolean)value); + } + break; + } } @@ -743,6 +784,9 @@ public Object getFieldValue(_Fields field) { case ERROR_MESSAGE: return getErrorMessage(); + case HASOLDABORT: + return isHasoldabort(); + } throw new IllegalStateException(); } @@ -780,6 +824,8 @@ public boolean isSet(_Fields field) { return isSetHighestWriteId(); case ERROR_MESSAGE: return isSetErrorMessage(); + case HASOLDABORT: + return isSetHasoldabort(); } throw new IllegalStateException(); } @@ -914,6 +960,15 @@ public boolean equals(CompactionInfoStruct that) { return false; } + boolean this_present_hasoldabort = true && this.isSetHasoldabort(); + boolean that_present_hasoldabort = true && that.isSetHasoldabort(); + if (this_present_hasoldabort || that_present_hasoldabort) { + if (!(this_present_hasoldabort && that_present_hasoldabort)) + return false; + if (this.hasoldabort != that.hasoldabort) + return false; + } + return true; } @@ -986,6 +1041,11 @@ public int hashCode() { if (present_errorMessage) list.add(errorMessage); + boolean present_hasoldabort = true && (isSetHasoldabort()); + list.add(present_hasoldabort); + if (present_hasoldabort) + list.add(hasoldabort); + return list.hashCode(); } @@ -1127,6 +1187,16 @@ public int compareTo(CompactionInfoStruct other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetHasoldabort()).compareTo(other.isSetHasoldabort()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetHasoldabort()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.hasoldabort, other.hasoldabort); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -1252,6 +1322,12 @@ public String toString() { } first = false; } + if (isSetHasoldabort()) { + if (!first) sb.append(", "); + sb.append("hasoldabort:"); + sb.append(this.hasoldabort); + first = false; + } sb.append(")"); return sb.toString(); } @@ -1417,6 +1493,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, CompactionInfoStruc org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 14: // HASOLDABORT + if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { + struct.hasoldabort = iprot.readBool(); + struct.setHasoldabortIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1505,6 +1589,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, CompactionInfoStru oprot.writeFieldEnd(); } } + if (struct.isSetHasoldabort()) { + oprot.writeFieldBegin(HASOLDABORT_FIELD_DESC); + oprot.writeBool(struct.hasoldabort); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1554,7 +1643,10 @@ public void write(org.apache.thrift.protocol.TProtocol prot, CompactionInfoStruc if (struct.isSetErrorMessage()) { optionals.set(8); } - oprot.writeBitSet(optionals, 9); + if (struct.isSetHasoldabort()) { + optionals.set(9); + } + oprot.writeBitSet(optionals, 10); if (struct.isSetPartitionname()) { oprot.writeString(struct.partitionname); } @@ -1582,6 +1674,9 @@ public void write(org.apache.thrift.protocol.TProtocol prot, CompactionInfoStruc if (struct.isSetErrorMessage()) { oprot.writeString(struct.errorMessage); } + if (struct.isSetHasoldabort()) { + oprot.writeBool(struct.hasoldabort); + } } @Override @@ -1595,7 +1690,7 @@ public void read(org.apache.thrift.protocol.TProtocol prot, CompactionInfoStruct struct.setTablenameIsSet(true); struct.type = org.apache.hadoop.hive.metastore.api.CompactionType.findByValue(iprot.readI32()); struct.setTypeIsSet(true); - BitSet incoming = iprot.readBitSet(9); + BitSet incoming = iprot.readBitSet(10); if (incoming.get(0)) { struct.partitionname = iprot.readString(); struct.setPartitionnameIsSet(true); @@ -1632,6 +1727,10 @@ public void read(org.apache.thrift.protocol.TProtocol prot, CompactionInfoStruct struct.errorMessage = iprot.readString(); struct.setErrorMessageIsSet(true); } + if (incoming.get(9)) { + struct.hasoldabort = iprot.readBool(); + struct.setHasoldabortIsSet(true); + } } } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php index 9fb7ff011a..5dedb1207c 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php @@ -22398,6 +22398,10 @@ class CompactionInfoStruct { * @var string */ public $errorMessage = null; + /** + * @var bool + */ + public $hasoldabort = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -22454,6 +22458,10 @@ class CompactionInfoStruct { 'var' => 'errorMessage', 'type' => TType::STRING, ), + 14 => array( + 'var' => 'hasoldabort', + 'type' => TType::BOOL, + ), ); } if (is_array($vals)) { @@ -22496,6 +22504,9 @@ class CompactionInfoStruct { if (isset($vals['errorMessage'])) { $this->errorMessage = $vals['errorMessage']; } + if (isset($vals['hasoldabort'])) { + $this->hasoldabort = $vals['hasoldabort']; + } } } @@ -22609,6 +22620,13 @@ class CompactionInfoStruct { $xfer += $input->skip($ftype); } break; + case 14: + if ($ftype == TType::BOOL) { + $xfer += $input->readBool($this->hasoldabort); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -22687,6 +22705,11 @@ class CompactionInfoStruct { $xfer += $output->writeString($this->errorMessage); $xfer += $output->writeFieldEnd(); } + if ($this->hasoldabort !== null) { + $xfer += $output->writeFieldBegin('hasoldabort', TType::BOOL, 14); + $xfer += $output->writeBool($this->hasoldabort); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 4f317b3453..94e7c50cc6 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -15568,6 +15568,7 @@ class CompactionInfoStruct: - start - highestWriteId - errorMessage + - hasoldabort """ thrift_spec = ( @@ -15585,9 +15586,10 @@ class CompactionInfoStruct: (11, TType.I64, 'start', None, None, ), # 11 (12, TType.I64, 'highestWriteId', None, None, ), # 12 (13, TType.STRING, 'errorMessage', None, None, ), # 13 + (14, TType.BOOL, 'hasoldabort', None, None, ), # 14 ) - def __init__(self, id=None, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, toomanyaborts=None, state=None, workerId=None, start=None, highestWriteId=None, errorMessage=None,): + def __init__(self, id=None, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, toomanyaborts=None, state=None, workerId=None, start=None, highestWriteId=None, errorMessage=None, hasoldabort=None,): self.id = id self.dbname = dbname self.tablename = tablename @@ -15601,6 +15603,7 @@ def __init__(self, id=None, dbname=None, tablename=None, partitionname=None, typ self.start = start self.highestWriteId = highestWriteId self.errorMessage = errorMessage + self.hasoldabort = hasoldabort def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -15676,6 +15679,11 @@ def read(self, iprot): self.errorMessage = iprot.readString() else: iprot.skip(ftype) + elif fid == 14: + if ftype == TType.BOOL: + self.hasoldabort = iprot.readBool() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -15738,6 +15746,10 @@ def write(self, oprot): oprot.writeFieldBegin('errorMessage', TType.STRING, 13) oprot.writeString(self.errorMessage) oprot.writeFieldEnd() + if self.hasoldabort is not None: + oprot.writeFieldBegin('hasoldabort', TType.BOOL, 14) + oprot.writeBool(self.hasoldabort) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -15768,6 +15780,7 @@ def __hash__(self): value = (value * 31) ^ hash(self.start) value = (value * 31) ^ hash(self.highestWriteId) value = (value * 31) ^ hash(self.errorMessage) + value = (value * 31) ^ hash(self.hasoldabort) return value def __repr__(self): diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index e64ae0ead2..cb36f0bb42 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -3470,6 +3470,7 @@ class CompactionInfoStruct START = 11 HIGHESTWRITEID = 12 ERRORMESSAGE = 13 + HASOLDABORT = 14 FIELDS = { ID => {:type => ::Thrift::Types::I64, :name => 'id'}, @@ -3484,7 +3485,8 @@ class CompactionInfoStruct WORKERID => {:type => ::Thrift::Types::STRING, :name => 'workerId', :optional => true}, START => {:type => ::Thrift::Types::I64, :name => 'start', :optional => true}, HIGHESTWRITEID => {:type => ::Thrift::Types::I64, :name => 'highestWriteId', :optional => true}, - ERRORMESSAGE => {:type => ::Thrift::Types::STRING, :name => 'errorMessage', :optional => true} + ERRORMESSAGE => {:type => ::Thrift::Types::STRING, :name => 'errorMessage', :optional => true}, + HASOLDABORT => {:type => ::Thrift::Types::BOOL, :name => 'hasoldabort', :optional => true} } def struct_fields; FIELDS; end diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index 1e3d6e9b8b..4052749b47 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -1158,6 +1158,7 @@ struct CompactionInfoStruct { 11: optional i64 start 12: optional i64 highestWriteId 13: optional string errorMessage + 14: optional bool hasoldabort } struct OptionalCompactionInfoStruct { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index 70d63ab18b..062a97c290 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -50,6 +50,7 @@ public String runAs; public String properties; public boolean tooManyAborts = false; + public boolean hasOldAbort = false; /** * The highest write id that the compaction job will pay attention to. * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL) @@ -118,6 +119,7 @@ public String toString() { "properties:" + properties + "," + "runAs:" + runAs + "," + "tooManyAborts:" + tooManyAborts + "," + + "hasOldAbort:" + hasOldAbort + "," + "highestWriteId:" + highestWriteId + "," + "errorMessage:" + errorMessage; } @@ -193,6 +195,9 @@ public static CompactionInfo compactionStructToInfo(CompactionInfoStruct cr) { if (cr.isSetToomanyaborts()) { ci.tooManyAborts = cr.isToomanyaborts(); } + if (cr.isSetHasoldabort()) { + ci.hasOldAbort = cr.isHasoldabort(); + } if (cr.isSetState() && cr.getState().length() != 1) { throw new IllegalStateException("State should only be one character but it was set to " + cr.getState()); } else if (cr.isSetState()) { @@ -220,6 +225,7 @@ public static CompactionInfoStruct compactionInfoToStruct(CompactionInfo ci) { cr.setRunas(ci.runAs); cr.setProperties(ci.properties); cr.setToomanyaborts(ci.tooManyAborts); + cr.setHasoldabort(ci.hasOldAbort); cr.setStart(ci.start); cr.setState(Character.toString(ci.state)); cr.setWorkerId(ci.workerId); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 2344c2d5f6..e12e6be7cc 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -20,7 +20,6 @@ import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.util.StringUtils; @@ -32,6 +31,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -59,13 +59,15 @@ public CompactionTxnHandler() { */ @Override @RetrySemantics.ReadOnly - public Set findPotentialCompactions(int abortedThreshold) throws MetaException { - return findPotentialCompactions(abortedThreshold, -1); + public Set findPotentialCompactions(int abortedThreshold, long abortedTimeThreshold) + throws MetaException { + return findPotentialCompactions(abortedThreshold, abortedTimeThreshold, -1); } @Override @RetrySemantics.ReadOnly - public Set findPotentialCompactions(int abortedThreshold, long checkInterval) throws MetaException { + public Set findPotentialCompactions(int abortedThreshold, + long abortedTimeThreshold, long checkInterval) throws MetaException { Connection dbConn = null; Set response = new HashSet<>(); Statement stmt = null; @@ -75,7 +77,8 @@ public CompactionTxnHandler() { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); // Check for completed transactions - String s = "SELECT DISTINCT \"TC\".\"CTC_DATABASE\", \"TC\".\"CTC_TABLE\", \"TC\".\"CTC_PARTITION\" " + + final String s = "SELECT DISTINCT \"TC\".\"CTC_DATABASE\", \"TC\".\"CTC_TABLE\", \"TC\"" + + ".\"CTC_PARTITION\" " + "FROM \"COMPLETED_TXN_COMPONENTS\" \"TC\" " + (checkInterval > 0 ? "LEFT JOIN ( " + " SELECT \"C1\".* FROM \"COMPLETED_COMPACTIONS\" \"C1\" " + @@ -101,38 +104,51 @@ public CompactionTxnHandler() { } rs.close(); - // Check for aborted txns - s = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " + - "FROM \"TXNS\", \"TXN_COMPONENTS\" " + - "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = '" + TXN_ABORTED + "' " + - "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " + - "HAVING COUNT(*) > " + abortedThreshold; + // Check for aborted txns: number of aborted txns past threshold and age of aborted txns + // past time threshold + boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0; + final String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\"," + + "MIN(\"TXN_STARTED\"), COUNT(*)" + + "FROM \"TXNS\", \"TXN_COMPONENTS\" " + + "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = '" + TXN_ABORTED + "' " + + "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\"" + + (checkAbortedTimeThreshold ? "" : " HAVING COUNT(*) > " + abortedThreshold); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); + LOG.debug("Going to execute query <" + sCheckAborted + ">"); + rs = stmt.executeQuery(sCheckAborted); + long systemTime = System.currentTimeMillis(); while (rs.next()) { - CompactionInfo info = new CompactionInfo(); - info.dbname = rs.getString(1); - info.tableName = rs.getString(2); - info.partName = rs.getString(3); - info.tooManyAborts = true; - response.add(info); + boolean pastTimeThreshold = + checkAbortedTimeThreshold && rs.getLong(4) + abortedTimeThreshold < systemTime; + int numAbortedTxns = rs.getInt(5); + if (numAbortedTxns > abortedThreshold || pastTimeThreshold) { + CompactionInfo info = new CompactionInfo(); + info.dbname = rs.getString(1); + info.tableName = rs.getString(2); + info.partName = rs.getString(3); + info.tooManyAborts = numAbortedTxns > abortedThreshold; + info.hasOldAbort = pastTimeThreshold; + response.add(info); + } } LOG.debug("Going to rollback"); dbConn.rollback(); } catch (SQLException e) { LOG.error("Unable to connect to transaction database " + e.getMessage()); - checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + abortedThreshold + ")"); + checkRetryable(dbConn, e, + "findPotentialCompactions(maxAborted:" + abortedThreshold + + ", abortedTimeThreshold:" + abortedTimeThreshold + ")"); } finally { close(rs, stmt, dbConn); } return response; } catch (RetryException e) { - return findPotentialCompactions(abortedThreshold, checkInterval); + return findPotentialCompactions(abortedThreshold, abortedTimeThreshold, checkInterval); } } + /** * This will grab the next compaction request off of * the queue, and assign it to the worker. diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 87130a519d..b4fe55e4e8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -23,7 +23,6 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hive.common.ValidTxnList; -import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; @@ -31,7 +30,6 @@ import java.sql.SQLException; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; /** @@ -315,14 +313,17 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old * that may be ready for compaction. Also, look through txns and txn_components tables for * aborted transactions that we should add to the list. * @param abortedThreshold number of aborted queries forming a potential compaction request. + * @param abortedTimeThreshold age of an aborted txn in milliseconds that will trigger a + * potential compaction request. * @return list of CompactionInfo structs. These will not have id, type, * or runAs set since these are only potential compactions not actual ones. */ @RetrySemantics.ReadOnly - Set findPotentialCompactions(int abortedThreshold) throws MetaException; + Set findPotentialCompactions(int abortedThreshold, long abortedTimeThreshold) throws MetaException; @RetrySemantics.ReadOnly - Set findPotentialCompactions(int abortedThreshold, long checkInterval) throws MetaException; + Set findPotentialCompactions(int abortedThreshold, long abortedTimeThreshold, long checkInterval) + throws MetaException; /** * This updates COMPACTION_QUEUE. Set runAs username for the case where the request was