diff --git common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java index ad79e2c..ac9449f 100644 --- common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java +++ common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java @@ -52,10 +52,13 @@ public ValidCompactorTxnList() { * @param highWatermark highest committed transaction */ public ValidCompactorTxnList(long[] exceptions, long minOpen, long highWatermark) { - super(exceptions, highWatermark); + super(exceptions, highWatermark, minOpen); minOpenTxn = minOpen; } - + @Override + public boolean isValidBase(long txnid) { + return true; + } public ValidCompactorTxnList(String value) { super(value); } @@ -70,40 +73,7 @@ public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { return minOpenTxn > maxTxnId ? RangeResponse.ALL : RangeResponse.NONE; } } - - @Override - public String writeToString() { - StringBuilder buf = new StringBuilder(); - buf.append(highWatermark); - buf.append(':'); - buf.append(minOpenTxn); - if (exceptions.length == 0) { - buf.append(':'); - } else { - for(long except: exceptions) { - buf.append(':'); - buf.append(except); - } - } - return buf.toString(); - } - - @Override - public void readFromString(String src) { - if (src == null || src.length() == 0) { - highWatermark = Long.MAX_VALUE; - exceptions = new long[0]; - } else { - String[] values = src.split(":"); - highWatermark = Long.parseLong(values[0]); - minOpenTxn = Long.parseLong(values[1]); - exceptions = new long[values.length - 2]; - for(int i = 2; i < values.length; ++i) { - exceptions[i-2] = Long.parseLong(values[i]); - } - } - } - + @VisibleForTesting public long getMinOpenTxn() { return minOpenTxn; diff --git common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java index fda242d..0cbd51d 100644 --- common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java +++ common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java @@ -28,18 +28,21 @@ public class ValidReadTxnList implements ValidTxnList { protected long[] exceptions; + private long minOpen = Long.MAX_VALUE; protected long highWatermark; public ValidReadTxnList() { - this(new long[0], Long.MAX_VALUE); + this(new long[0], Long.MAX_VALUE, Long.MAX_VALUE); } - public ValidReadTxnList(long[] exceptions, long highWatermark) { + public ValidReadTxnList(long[] exceptions, long highWatermark, long minOpen) { if (exceptions.length == 0) { this.exceptions = exceptions; + this.minOpen = Long.MAX_VALUE; } else { this.exceptions = exceptions.clone(); Arrays.sort(this.exceptions); + this.minOpen = minOpen; } this.highWatermark = highWatermark; } @@ -56,6 +59,14 @@ public boolean isTxnValid(long txnid) { return Arrays.binarySearch(exceptions, txnid) < 0; } + /** + * We cannot use a base file if its range contains an open txn. + * @param txnid from base_xxxx + */ + @Override + public boolean isValidBase(long txnid) { + return minOpen > txnid; + } @Override public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { // check the easy cases first @@ -92,6 +103,8 @@ public String toString() { public String writeToString() { StringBuilder buf = new StringBuilder(); buf.append(highWatermark); + buf.append(':'); + buf.append(minOpen); if (exceptions.length == 0) { buf.append(':'); } else { @@ -111,9 +124,10 @@ public void readFromString(String src) { } else { String[] values = src.split(":"); highWatermark = Long.parseLong(values[0]); - exceptions = new long[values.length - 1]; - for(int i = 1; i < values.length; ++i) { - exceptions[i-1] = Long.parseLong(values[i]); + minOpen = Long.parseLong(values[1]); + exceptions = new long[values.length - 2]; + for(int i = 2; i < values.length; ++i) { + exceptions[i-2] = Long.parseLong(values[i]); } } } diff --git common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java index 87e7e30..2e04591 100644 --- common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java +++ common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java @@ -45,6 +45,8 @@ * @return true if valid, false otherwise */ public boolean isTxnValid(long txnid); + + public boolean isValidBase(long txnid); /** * Find out if a range of transaction ids are valid. Note that valid may have different meanings diff --git common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java index d3c6803..0aab90e 100644 --- common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java +++ common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java @@ -34,7 +34,7 @@ @Test public void noExceptions() throws Exception { - ValidTxnList txnList = new ValidReadTxnList(new long[0], 1); + ValidTxnList txnList = new ValidReadTxnList(new long[0], 1, Long.MAX_VALUE); String str = txnList.writeToString(); Assert.assertEquals("1:", str); ValidTxnList newList = new ValidReadTxnList(); @@ -45,7 +45,7 @@ public void noExceptions() throws Exception { @Test public void exceptions() throws Exception { - ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, 5); + ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, 5, 4L); String str = txnList.writeToString(); Assert.assertEquals("5:2:4", str); ValidTxnList newList = new ValidReadTxnList(); @@ -62,7 +62,7 @@ public void exceptions() throws Exception { public void longEnoughToCompress() throws Exception { long[] exceptions = new long[1000]; for (int i = 0; i < 1000; i++) exceptions[i] = i + 100; - ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000); + ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000, 900); String str = txnList.writeToString(); ValidTxnList newList = new ValidReadTxnList(); newList.readFromString(str); @@ -76,7 +76,7 @@ public void longEnoughToCompress() throws Exception { public void readWriteConfig() throws Exception { long[] exceptions = new long[1000]; for (int i = 0; i < 1000; i++) exceptions[i] = i + 100; - ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000); + ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000, 900); String str = txnList.writeToString(); Configuration conf = new Configuration(); conf.set(ValidTxnList.VALID_TXNS_KEY, str); diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index ca2a912..731caa8 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -1038,6 +1038,10 @@ public long getHighWatermark() { public long[] getInvalidTransactions() { return new long[0]; } + @Override + public boolean isValidBase(long txnid) { + return true; + } }; OrcInputFormat aif = new OrcInputFormat(); diff --git metastore/if/hive_metastore.thrift metastore/if/hive_metastore.thrift index 4d92b73..a2e35b8 100755 --- metastore/if/hive_metastore.thrift +++ metastore/if/hive_metastore.thrift @@ -635,6 +635,7 @@ struct GetOpenTxnsInfoResponse { struct GetOpenTxnsResponse { 1: required i64 txn_high_water_mark, 2: required set open_txns, + 3: optional i64 min_open_txn, //since 1.3,2.2 } struct OpenTxnRequest { diff --git metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index 79460a8..174b539 100644 --- metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -11911,6 +11911,11 @@ void GetOpenTxnsResponse::__set_open_txns(const std::set & val) { this->open_txns = val; } +void GetOpenTxnsResponse::__set_min_open_txn(const int64_t val) { + this->min_open_txn = val; +__isset.min_open_txn = true; +} + uint32_t GetOpenTxnsResponse::read(::apache::thrift::protocol::TProtocol* iprot) { apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); @@ -11963,6 +11968,14 @@ uint32_t GetOpenTxnsResponse::read(::apache::thrift::protocol::TProtocol* iprot) xfer += iprot->skip(ftype); } break; + case 3: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->min_open_txn); + this->__isset.min_open_txn = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -12000,6 +12013,11 @@ uint32_t GetOpenTxnsResponse::write(::apache::thrift::protocol::TProtocol* oprot } xfer += oprot->writeFieldEnd(); + if (this->__isset.min_open_txn) { + xfer += oprot->writeFieldBegin("min_open_txn", ::apache::thrift::protocol::T_I64, 3); + xfer += oprot->writeI64(this->min_open_txn); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -12009,15 +12027,21 @@ void swap(GetOpenTxnsResponse &a, GetOpenTxnsResponse &b) { using ::std::swap; swap(a.txn_high_water_mark, b.txn_high_water_mark); swap(a.open_txns, b.open_txns); + swap(a.min_open_txn, b.min_open_txn); + swap(a.__isset, b.__isset); } GetOpenTxnsResponse::GetOpenTxnsResponse(const GetOpenTxnsResponse& other524) { txn_high_water_mark = other524.txn_high_water_mark; open_txns = other524.open_txns; + min_open_txn = other524.min_open_txn; + __isset = other524.__isset; } GetOpenTxnsResponse& GetOpenTxnsResponse::operator=(const GetOpenTxnsResponse& other525) { txn_high_water_mark = other525.txn_high_water_mark; open_txns = other525.open_txns; + min_open_txn = other525.min_open_txn; + __isset = other525.__isset; return *this; } void GetOpenTxnsResponse::printTo(std::ostream& out) const { @@ -12025,6 +12049,7 @@ void GetOpenTxnsResponse::printTo(std::ostream& out) const { out << "GetOpenTxnsResponse("; out << "txn_high_water_mark=" << to_string(txn_high_water_mark); out << ", " << "open_txns=" << to_string(open_txns); + out << ", " << "min_open_txn="; (__isset.min_open_txn ? (out << to_string(min_open_txn)) : (out << "")); out << ")"; } diff --git metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h index ec81798..bfec694 100644 --- metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -4881,29 +4881,42 @@ inline std::ostream& operator<<(std::ostream& out, const GetOpenTxnsInfoResponse return out; } +typedef struct _GetOpenTxnsResponse__isset { + _GetOpenTxnsResponse__isset() : min_open_txn(false) {} + bool min_open_txn :1; +} _GetOpenTxnsResponse__isset; class GetOpenTxnsResponse { public: GetOpenTxnsResponse(const GetOpenTxnsResponse&); GetOpenTxnsResponse& operator=(const GetOpenTxnsResponse&); - GetOpenTxnsResponse() : txn_high_water_mark(0) { + GetOpenTxnsResponse() : txn_high_water_mark(0), min_open_txn(0) { } virtual ~GetOpenTxnsResponse() throw(); int64_t txn_high_water_mark; std::set open_txns; + int64_t min_open_txn; + + _GetOpenTxnsResponse__isset __isset; void __set_txn_high_water_mark(const int64_t val); void __set_open_txns(const std::set & val); + void __set_min_open_txn(const int64_t val); + bool operator == (const GetOpenTxnsResponse & rhs) const { if (!(txn_high_water_mark == rhs.txn_high_water_mark)) return false; if (!(open_txns == rhs.open_txns)) return false; + if (__isset.min_open_txn != rhs.__isset.min_open_txn) + return false; + else if (__isset.min_open_txn && !(min_open_txn == rhs.min_open_txn)) + return false; return true; } bool operator != (const GetOpenTxnsResponse &rhs) const { diff --git metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java index 6986fc2..8230d38 100644 --- metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java +++ metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java @@ -40,6 +40,7 @@ private static final org.apache.thrift.protocol.TField TXN_HIGH_WATER_MARK_FIELD_DESC = new org.apache.thrift.protocol.TField("txn_high_water_mark", org.apache.thrift.protocol.TType.I64, (short)1); private static final org.apache.thrift.protocol.TField OPEN_TXNS_FIELD_DESC = new org.apache.thrift.protocol.TField("open_txns", org.apache.thrift.protocol.TType.SET, (short)2); + private static final org.apache.thrift.protocol.TField MIN_OPEN_TXN_FIELD_DESC = new org.apache.thrift.protocol.TField("min_open_txn", org.apache.thrift.protocol.TType.I64, (short)3); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -49,11 +50,13 @@ private long txn_high_water_mark; // required private Set open_txns; // required + private long min_open_txn; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { TXN_HIGH_WATER_MARK((short)1, "txn_high_water_mark"), - OPEN_TXNS((short)2, "open_txns"); + OPEN_TXNS((short)2, "open_txns"), + MIN_OPEN_TXN((short)3, "min_open_txn"); private static final Map byName = new HashMap(); @@ -72,6 +75,8 @@ public static _Fields findByThriftId(int fieldId) { return TXN_HIGH_WATER_MARK; case 2: // OPEN_TXNS return OPEN_TXNS; + case 3: // MIN_OPEN_TXN + return MIN_OPEN_TXN; default: return null; } @@ -113,7 +118,9 @@ public String getFieldName() { // isset id assignments private static final int __TXN_HIGH_WATER_MARK_ISSET_ID = 0; + private static final int __MIN_OPEN_TXN_ISSET_ID = 1; private byte __isset_bitfield = 0; + private static final _Fields optionals[] = {_Fields.MIN_OPEN_TXN}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -122,6 +129,8 @@ public String getFieldName() { tmpMap.put(_Fields.OPEN_TXNS, new org.apache.thrift.meta_data.FieldMetaData("open_txns", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.SetMetaData(org.apache.thrift.protocol.TType.SET, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)))); + tmpMap.put(_Fields.MIN_OPEN_TXN, new org.apache.thrift.meta_data.FieldMetaData("min_open_txn", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(GetOpenTxnsResponse.class, metaDataMap); } @@ -149,6 +158,7 @@ public GetOpenTxnsResponse(GetOpenTxnsResponse other) { Set __this__open_txns = new HashSet(other.open_txns); this.open_txns = __this__open_txns; } + this.min_open_txn = other.min_open_txn; } public GetOpenTxnsResponse deepCopy() { @@ -160,6 +170,8 @@ public void clear() { setTxn_high_water_markIsSet(false); this.txn_high_water_mark = 0; this.open_txns = null; + setMin_open_txnIsSet(false); + this.min_open_txn = 0; } public long getTxn_high_water_mark() { @@ -222,6 +234,28 @@ public void setOpen_txnsIsSet(boolean value) { } } + public long getMin_open_txn() { + return this.min_open_txn; + } + + public void setMin_open_txn(long min_open_txn) { + this.min_open_txn = min_open_txn; + setMin_open_txnIsSet(true); + } + + public void unsetMin_open_txn() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __MIN_OPEN_TXN_ISSET_ID); + } + + /** Returns true if field min_open_txn is set (has been assigned a value) and false otherwise */ + public boolean isSetMin_open_txn() { + return EncodingUtils.testBit(__isset_bitfield, __MIN_OPEN_TXN_ISSET_ID); + } + + public void setMin_open_txnIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __MIN_OPEN_TXN_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case TXN_HIGH_WATER_MARK: @@ -240,6 +274,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case MIN_OPEN_TXN: + if (value == null) { + unsetMin_open_txn(); + } else { + setMin_open_txn((Long)value); + } + break; + } } @@ -251,6 +293,9 @@ public Object getFieldValue(_Fields field) { case OPEN_TXNS: return getOpen_txns(); + case MIN_OPEN_TXN: + return getMin_open_txn(); + } throw new IllegalStateException(); } @@ -266,6 +311,8 @@ public boolean isSet(_Fields field) { return isSetTxn_high_water_mark(); case OPEN_TXNS: return isSetOpen_txns(); + case MIN_OPEN_TXN: + return isSetMin_open_txn(); } throw new IllegalStateException(); } @@ -301,6 +348,15 @@ public boolean equals(GetOpenTxnsResponse that) { return false; } + boolean this_present_min_open_txn = true && this.isSetMin_open_txn(); + boolean that_present_min_open_txn = true && that.isSetMin_open_txn(); + if (this_present_min_open_txn || that_present_min_open_txn) { + if (!(this_present_min_open_txn && that_present_min_open_txn)) + return false; + if (this.min_open_txn != that.min_open_txn) + return false; + } + return true; } @@ -318,6 +374,11 @@ public int hashCode() { if (present_open_txns) list.add(open_txns); + boolean present_min_open_txn = true && (isSetMin_open_txn()); + list.add(present_min_open_txn); + if (present_min_open_txn) + list.add(min_open_txn); + return list.hashCode(); } @@ -349,6 +410,16 @@ public int compareTo(GetOpenTxnsResponse other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetMin_open_txn()).compareTo(other.isSetMin_open_txn()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetMin_open_txn()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.min_open_txn, other.min_open_txn); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -380,6 +451,12 @@ public String toString() { sb.append(this.open_txns); } first = false; + if (isSetMin_open_txn()) { + if (!first) sb.append(", "); + sb.append("min_open_txn:"); + sb.append(this.min_open_txn); + first = false; + } sb.append(")"); return sb.toString(); } @@ -459,6 +536,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, GetOpenTxnsResponse org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 3: // MIN_OPEN_TXN + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.min_open_txn = iprot.readI64(); + struct.setMin_open_txnIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -487,6 +572,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, GetOpenTxnsRespons } oprot.writeFieldEnd(); } + if (struct.isSetMin_open_txn()) { + oprot.writeFieldBegin(MIN_OPEN_TXN_FIELD_DESC); + oprot.writeI64(struct.min_open_txn); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -512,6 +602,14 @@ public void write(org.apache.thrift.protocol.TProtocol prot, GetOpenTxnsResponse oprot.writeI64(_iter472); } } + BitSet optionals = new BitSet(); + if (struct.isSetMin_open_txn()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetMin_open_txn()) { + oprot.writeI64(struct.min_open_txn); + } } @Override @@ -530,6 +628,11 @@ public void read(org.apache.thrift.protocol.TProtocol prot, GetOpenTxnsResponse } } struct.setOpen_txnsIsSet(true); + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.min_open_txn = iprot.readI64(); + struct.setMin_open_txnIsSet(true); + } } } diff --git metastore/src/gen/thrift/gen-php/metastore/Types.php metastore/src/gen/thrift/gen-php/metastore/Types.php index f505208..d6f7f49 100644 --- metastore/src/gen/thrift/gen-php/metastore/Types.php +++ metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -12002,6 +12002,10 @@ class GetOpenTxnsResponse { * @var int[] */ public $open_txns = null; + /** + * @var int + */ + public $min_open_txn = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -12018,6 +12022,10 @@ class GetOpenTxnsResponse { 'type' => TType::I64, ), ), + 3 => array( + 'var' => 'min_open_txn', + 'type' => TType::I64, + ), ); } if (is_array($vals)) { @@ -12027,6 +12035,9 @@ class GetOpenTxnsResponse { if (isset($vals['open_txns'])) { $this->open_txns = $vals['open_txns']; } + if (isset($vals['min_open_txn'])) { + $this->min_open_txn = $vals['min_open_txn']; + } } } @@ -12077,6 +12088,13 @@ class GetOpenTxnsResponse { $xfer += $input->skip($ftype); } break; + case 3: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->min_open_txn); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -12116,6 +12134,11 @@ class GetOpenTxnsResponse { } $xfer += $output->writeFieldEnd(); } + if ($this->min_open_txn !== null) { + $xfer += $output->writeFieldBegin('min_open_txn', TType::I64, 3); + $xfer += $output->writeI64($this->min_open_txn); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 8d88cd7..2d308c9 100644 --- metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -8264,17 +8264,20 @@ class GetOpenTxnsResponse: Attributes: - txn_high_water_mark - open_txns + - min_open_txn """ thrift_spec = ( None, # 0 (1, TType.I64, 'txn_high_water_mark', None, None, ), # 1 (2, TType.SET, 'open_txns', (TType.I64,None), None, ), # 2 + (3, TType.I64, 'min_open_txn', None, None, ), # 3 ) - def __init__(self, txn_high_water_mark=None, open_txns=None,): + def __init__(self, txn_high_water_mark=None, open_txns=None, min_open_txn=None,): self.txn_high_water_mark = txn_high_water_mark self.open_txns = open_txns + self.min_open_txn = min_open_txn def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -8300,6 +8303,11 @@ def read(self, iprot): iprot.readSetEnd() else: iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.min_open_txn = iprot.readI64() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -8321,6 +8329,10 @@ def write(self, oprot): oprot.writeI64(iter419) oprot.writeSetEnd() oprot.writeFieldEnd() + if self.min_open_txn is not None: + oprot.writeFieldBegin('min_open_txn', TType.I64, 3) + oprot.writeI64(self.min_open_txn) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -8336,6 +8348,7 @@ def __hash__(self): value = 17 value = (value * 31) ^ hash(self.txn_high_water_mark) value = (value * 31) ^ hash(self.open_txns) + value = (value * 31) ^ hash(self.min_open_txn) return value def __repr__(self): diff --git metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index 0964cd8..bd94e98 100644 --- metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -1843,10 +1843,12 @@ class GetOpenTxnsResponse include ::Thrift::Struct, ::Thrift::Struct_Union TXN_HIGH_WATER_MARK = 1 OPEN_TXNS = 2 + MIN_OPEN_TXN = 3 FIELDS = { TXN_HIGH_WATER_MARK => {:type => ::Thrift::Types::I64, :name => 'txn_high_water_mark'}, - OPEN_TXNS => {:type => ::Thrift::Types::SET, :name => 'open_txns', :element => {:type => ::Thrift::Types::I64}} + OPEN_TXNS => {:type => ::Thrift::Types::SET, :name => 'open_txns', :element => {:type => ::Thrift::Types::I64}}, + MIN_OPEN_TXN => {:type => ::Thrift::Types::I64, :name => 'min_open_txn', :optional => true} } def struct_fields; FIELDS; end diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index b121644..e8c5fac 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -361,15 +361,25 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { close(rs); Set openList = new HashSet(); //need the WHERE clause below to ensure consistent results with READ_COMMITTED - s = "select txn_id from TXNS where txn_id <= " + hwm; + s = "select txn_id, txn_state from TXNS where txn_id <= " + hwm; LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); + long minOpenTxn = Long.MAX_VALUE; while (rs.next()) { - openList.add(rs.getLong(1)); + long txnId = rs.getLong(1); + openList.add(txnId); + char c = rs.getString(2).charAt(0); + if(c == TXN_OPEN) { + minOpenTxn = Math.min(minOpenTxn, txnId); + } } LOG.debug("Going to rollback"); dbConn.rollback(); - return new GetOpenTxnsResponse(hwm, openList); + GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList); + if(minOpenTxn < Long.MAX_VALUE) { + otr.setMin_open_txn(minOpenTxn); + } + return otr; } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 39b18ac..e2c4b91 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -57,14 +57,15 @@ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long if (currentTxn > 0 && currentTxn == txn) continue; exceptions[i++] = txn; } - return new ValidReadTxnList(exceptions, highWater); + return new ValidReadTxnList(exceptions, highWater, txns.isSetMin_open_txn() ? + txns.getMin_open_txn() : Long.MAX_VALUE); } /** * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to * compact the files, and thus treats only open transactions as invalid. Additionally any - * txnId > highestOpenTxnId is also invalid. This is avoid creating something like + * txnId > highestOpenTxnId is also invalid. This is to avoid creating something like * delta_17_120 where txnId 80, for example, is still open. * @param txns txn list from the metastore * @return a valid txn list. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index c150ec5..449d889 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -634,7 +634,7 @@ private static boolean isValidBase(long baseTxnId, ValidTxnList txnList) { //By definition there are no open txns with id < 1. return true; } - return ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeValid(1, baseTxnId); + return txnList.isValidBase(baseTxnId); } private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, ValidTxnList txnList, List working, List originalDirectories, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 63d02fb..8c01e9b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -606,7 +606,7 @@ public boolean validateInput(FileSystem fs, HiveConf conf, } } String value = conf.get(ValidTxnList.VALID_TXNS_KEY, - Long.MAX_VALUE + ":"); + Long.MAX_VALUE + ":" + Long.MAX_VALUE); transactionList = new ValidReadTxnList(value); } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 23b1b7f..0e87d9e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -257,7 +257,7 @@ private void clean(CompactionInfo ci) throws MetaException { * unless ValidTxnList is "capped" at highestTxnId. */ final ValidTxnList txnList = ci.highestTxnId > 0 ? - new ValidReadTxnList(new long[0], ci.highestTxnId) : new ValidReadTxnList(); + new ValidReadTxnList(new long[0], ci.highestTxnId, Long.MAX_VALUE) : new ValidReadTxnList(); if (runJobAsSelf(ci.runAs)) { removeFiles(location, txnList); diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index d48e441..80b083e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -1177,6 +1177,19 @@ public void testCompactWithDelete() throws Exception { Assert.assertEquals("Unexpected 1 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(1).getState()); } + @Test + public void testNoHistory() throws Exception { + int[][] tableData = {{1,2},{3,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + runCleaner(hiveConf); + runStatementOnDriver("select count(*) from " + Table.ACIDTBL); + } /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order