diff --git common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java index ad79e2c..334b93e 100644 --- common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java +++ common/src/java/org/apache/hadoop/hive/common/ValidCompactorTxnList.java @@ -18,94 +18,61 @@ package org.apache.hadoop.hive.common; -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hive.common.ValidReadTxnList; - import java.util.Arrays; /** - * And implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor. - * For the purposes of {@link #isTxnRangeValid} this class will view a transaction as valid if it - * is committed or aborted. Additionally it will return none if there are any open transactions - * below the max transaction given, since we don't want to compact above open transactions. For - * {@link #isTxnValid} it will still view a transaction as valid only if it is committed. These - * produce the logic we need to assure that the compactor only sees records less than the lowest + * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by the compactor. + * + * Compaction should only include txns up to smallest open txn (exclussive). + * There may be aborted txns in the snapshot represented by this ValidCompactorTxnList. + * Thus {@link #isTxnRangeValid(long, long)} returns NONE for any range that inluces any unresolved + * transactions. Any txn above {@code highWatermark} is unresolved. + * These produce the logic we need to assure that the compactor only sees records less than the lowest * open transaction when choosing which files to compact, but that it still ignores aborted * records when compacting. + * + * See {@link org.apache.hadoop.hive.metastore.txn.TxnUtils#createValidCompactTxnList()} for proper + * way to construct this. */ public class ValidCompactorTxnList extends ValidReadTxnList { - //TODO: refactor this - minOpenTxn is not needed if we set - // highWatermark = Math.min(highWaterMark, minOpenTxn) (assuming there are open txns) - - // The minimum open transaction id - private long minOpenTxn; - public ValidCompactorTxnList() { super(); - minOpenTxn = -1; } - /** - * - * @param exceptions list of all open and aborted transactions - * @param minOpen lowest open transaction - * @param highWatermark highest committed transaction + * @param abortedTxnList list of all aborted transactions + * @param highWatermark highest committed transaction to be considered for compaction, + * equivalently (lowest_open_txn - 1). */ - public ValidCompactorTxnList(long[] exceptions, long minOpen, long highWatermark) { - super(exceptions, highWatermark); - minOpenTxn = minOpen; + public ValidCompactorTxnList(long[] abortedTxnList, long highWatermark) { + super(abortedTxnList, highWatermark); + if(this.exceptions.length <= 0) { + return; + } + //now that exceptions (aka abortedTxnList) is sorted + int idx = Arrays.binarySearch(this.exceptions, highWatermark); + int lastElementPos; + if(idx < 0) { + int insertionPoint = -idx - 1 ;//see Arrays.binarySearch() JavaDoc + lastElementPos = insertionPoint - 1; + } + else { + lastElementPos = idx; + } + /** + * ensure that we throw out any exceptions above highWatermark to make + * {@link #isTxnValid(long)} faster + */ + this.exceptions = Arrays.copyOf(this.exceptions, lastElementPos + 1); } - public ValidCompactorTxnList(String value) { super(value); } - + /** + * Returns {@link org.apache.hadoop.hive.common.ValidTxnList.RangeResponse.ALL} if all txns in + * the range are resolved and RangeResponse.NONE otherwise + */ @Override public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { - if (highWatermark < minTxnId) { - return RangeResponse.NONE; - } else if (minOpenTxn < 0) { - return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE; - } else { - return minOpenTxn > maxTxnId ? RangeResponse.ALL : RangeResponse.NONE; - } - } - - @Override - public String writeToString() { - StringBuilder buf = new StringBuilder(); - buf.append(highWatermark); - buf.append(':'); - buf.append(minOpenTxn); - if (exceptions.length == 0) { - buf.append(':'); - } else { - for(long except: exceptions) { - buf.append(':'); - buf.append(except); - } - } - return buf.toString(); - } - - @Override - public void readFromString(String src) { - if (src == null || src.length() == 0) { - highWatermark = Long.MAX_VALUE; - exceptions = new long[0]; - } else { - String[] values = src.split(":"); - highWatermark = Long.parseLong(values[0]); - minOpenTxn = Long.parseLong(values[1]); - exceptions = new long[values.length - 2]; - for(int i = 2; i < values.length; ++i) { - exceptions[i-2] = Long.parseLong(values[i]); - } - } - } - - @VisibleForTesting - public long getMinOpenTxn() { - return minOpenTxn; + return highWatermark >= maxTxnId ? RangeResponse.ALL : RangeResponse.NONE; } } diff --git common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java index fda242d..2f35917 100644 --- common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java +++ common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java @@ -18,28 +18,43 @@ package org.apache.hadoop.hive.common; +import com.google.common.annotations.VisibleForTesting; + import java.util.Arrays; /** - * An implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by readers. + * An implementation of {@link org.apache.hadoop.hive.common.ValidTxnList} for use by readers. * This class will view a transaction as valid only if it is committed. Both open and aborted * transactions will be seen as invalid. */ public class ValidReadTxnList implements ValidTxnList { protected long[] exceptions; + //default value means there are no open txn in the snapshot + private long minOpenTxn = Long.MAX_VALUE; protected long highWatermark; public ValidReadTxnList() { - this(new long[0], Long.MAX_VALUE); + this(new long[0], Long.MAX_VALUE, Long.MAX_VALUE); } + /** + * Used if there are no open transactions in the snapshot + */ public ValidReadTxnList(long[] exceptions, long highWatermark) { + this(exceptions, highWatermark, Long.MAX_VALUE); + } + public ValidReadTxnList(long[] exceptions, long highWatermark, long minOpenTxn) { if (exceptions.length == 0) { this.exceptions = exceptions; } else { this.exceptions = exceptions.clone(); Arrays.sort(this.exceptions); + this.minOpenTxn = minOpenTxn; + if(this.exceptions[0] <= 0) { + //should never happen of course + throw new IllegalArgumentException("Invalid txnid: " + this.exceptions[0] + " found"); + } } this.highWatermark = highWatermark; } @@ -56,6 +71,14 @@ public boolean isTxnValid(long txnid) { return Arrays.binarySearch(exceptions, txnid) < 0; } + /** + * We cannot use a base file if its range contains an open txn. + * @param txnid from base_xxxx + */ + @Override + public boolean isValidBase(long txnid) { + return minOpenTxn > txnid && txnid <= highWatermark; + } @Override public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { // check the easy cases first @@ -92,6 +115,8 @@ public String toString() { public String writeToString() { StringBuilder buf = new StringBuilder(); buf.append(highWatermark); + buf.append(':'); + buf.append(minOpenTxn); if (exceptions.length == 0) { buf.append(':'); } else { @@ -111,9 +136,10 @@ public void readFromString(String src) { } else { String[] values = src.split(":"); highWatermark = Long.parseLong(values[0]); - exceptions = new long[values.length - 1]; - for(int i = 1; i < values.length; ++i) { - exceptions[i-1] = Long.parseLong(values[i]); + minOpenTxn = Long.parseLong(values[1]); + exceptions = new long[values.length - 2]; + for(int i = 2; i < values.length; ++i) { + exceptions[i-2] = Long.parseLong(values[i]); } } } @@ -127,5 +153,9 @@ public long getHighWatermark() { public long[] getInvalidTransactions() { return exceptions; } + @VisibleForTesting + public long getMinOpenTxn() { + return minOpenTxn; + } } diff --git common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java index 87e7e30..5e1e4ee 100644 --- common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java +++ common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java @@ -47,6 +47,13 @@ public boolean isTxnValid(long txnid); /** + * Returns {@code true} if such base file can be used to materialize the snapshot represented by + * this {@code ValidTxnList}. + * @param txnid highest txn in a given base_xxxx file + */ + public boolean isValidBase(long txnid); + + /** * Find out if a range of transaction ids are valid. Note that valid may have different meanings * for different implementations, as some will only want to see committed transactions and some * both committed and aborted. diff --git common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java index d3c6803..0aab90e 100644 --- common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java +++ common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java @@ -34,7 +34,7 @@ @Test public void noExceptions() throws Exception { - ValidTxnList txnList = new ValidReadTxnList(new long[0], 1); + ValidTxnList txnList = new ValidReadTxnList(new long[0], 1, Long.MAX_VALUE); String str = txnList.writeToString(); Assert.assertEquals("1:", str); ValidTxnList newList = new ValidReadTxnList(); @@ -45,7 +45,7 @@ public void noExceptions() throws Exception { @Test public void exceptions() throws Exception { - ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, 5); + ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, 5, 4L); String str = txnList.writeToString(); Assert.assertEquals("5:2:4", str); ValidTxnList newList = new ValidReadTxnList(); @@ -62,7 +62,7 @@ public void exceptions() throws Exception { public void longEnoughToCompress() throws Exception { long[] exceptions = new long[1000]; for (int i = 0; i < 1000; i++) exceptions[i] = i + 100; - ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000); + ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000, 900); String str = txnList.writeToString(); ValidTxnList newList = new ValidReadTxnList(); newList.readFromString(str); @@ -76,7 +76,7 @@ public void longEnoughToCompress() throws Exception { public void readWriteConfig() throws Exception { long[] exceptions = new long[1000]; for (int i = 0; i < 1000; i++) exceptions[i] = i + 100; - ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000); + ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000, 900); String str = txnList.writeToString(); Configuration conf = new Configuration(); conf.set(ValidTxnList.VALID_TXNS_KEY, str); diff --git itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index ca2a912..731caa8 100644 --- itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -1038,6 +1038,10 @@ public long getHighWatermark() { public long[] getInvalidTransactions() { return new long[0]; } + @Override + public boolean isValidBase(long txnid) { + return true; + } }; OrcInputFormat aif = new OrcInputFormat(); diff --git metastore/if/hive_metastore.thrift metastore/if/hive_metastore.thrift index 4d92b73..a2e35b8 100755 --- metastore/if/hive_metastore.thrift +++ metastore/if/hive_metastore.thrift @@ -635,6 +635,7 @@ struct GetOpenTxnsInfoResponse { struct GetOpenTxnsResponse { 1: required i64 txn_high_water_mark, 2: required set open_txns, + 3: optional i64 min_open_txn, //since 1.3,2.2 } struct OpenTxnRequest { diff --git metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index 79460a8..174b539 100644 --- metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -11911,6 +11911,11 @@ void GetOpenTxnsResponse::__set_open_txns(const std::set & val) { this->open_txns = val; } +void GetOpenTxnsResponse::__set_min_open_txn(const int64_t val) { + this->min_open_txn = val; +__isset.min_open_txn = true; +} + uint32_t GetOpenTxnsResponse::read(::apache::thrift::protocol::TProtocol* iprot) { apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); @@ -11963,6 +11968,14 @@ uint32_t GetOpenTxnsResponse::read(::apache::thrift::protocol::TProtocol* iprot) xfer += iprot->skip(ftype); } break; + case 3: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->min_open_txn); + this->__isset.min_open_txn = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -12000,6 +12013,11 @@ uint32_t GetOpenTxnsResponse::write(::apache::thrift::protocol::TProtocol* oprot } xfer += oprot->writeFieldEnd(); + if (this->__isset.min_open_txn) { + xfer += oprot->writeFieldBegin("min_open_txn", ::apache::thrift::protocol::T_I64, 3); + xfer += oprot->writeI64(this->min_open_txn); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -12009,15 +12027,21 @@ void swap(GetOpenTxnsResponse &a, GetOpenTxnsResponse &b) { using ::std::swap; swap(a.txn_high_water_mark, b.txn_high_water_mark); swap(a.open_txns, b.open_txns); + swap(a.min_open_txn, b.min_open_txn); + swap(a.__isset, b.__isset); } GetOpenTxnsResponse::GetOpenTxnsResponse(const GetOpenTxnsResponse& other524) { txn_high_water_mark = other524.txn_high_water_mark; open_txns = other524.open_txns; + min_open_txn = other524.min_open_txn; + __isset = other524.__isset; } GetOpenTxnsResponse& GetOpenTxnsResponse::operator=(const GetOpenTxnsResponse& other525) { txn_high_water_mark = other525.txn_high_water_mark; open_txns = other525.open_txns; + min_open_txn = other525.min_open_txn; + __isset = other525.__isset; return *this; } void GetOpenTxnsResponse::printTo(std::ostream& out) const { @@ -12025,6 +12049,7 @@ void GetOpenTxnsResponse::printTo(std::ostream& out) const { out << "GetOpenTxnsResponse("; out << "txn_high_water_mark=" << to_string(txn_high_water_mark); out << ", " << "open_txns=" << to_string(open_txns); + out << ", " << "min_open_txn="; (__isset.min_open_txn ? (out << to_string(min_open_txn)) : (out << "")); out << ")"; } diff --git metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h index ec81798..bfec694 100644 --- metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ metastore/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -4881,29 +4881,42 @@ inline std::ostream& operator<<(std::ostream& out, const GetOpenTxnsInfoResponse return out; } +typedef struct _GetOpenTxnsResponse__isset { + _GetOpenTxnsResponse__isset() : min_open_txn(false) {} + bool min_open_txn :1; +} _GetOpenTxnsResponse__isset; class GetOpenTxnsResponse { public: GetOpenTxnsResponse(const GetOpenTxnsResponse&); GetOpenTxnsResponse& operator=(const GetOpenTxnsResponse&); - GetOpenTxnsResponse() : txn_high_water_mark(0) { + GetOpenTxnsResponse() : txn_high_water_mark(0), min_open_txn(0) { } virtual ~GetOpenTxnsResponse() throw(); int64_t txn_high_water_mark; std::set open_txns; + int64_t min_open_txn; + + _GetOpenTxnsResponse__isset __isset; void __set_txn_high_water_mark(const int64_t val); void __set_open_txns(const std::set & val); + void __set_min_open_txn(const int64_t val); + bool operator == (const GetOpenTxnsResponse & rhs) const { if (!(txn_high_water_mark == rhs.txn_high_water_mark)) return false; if (!(open_txns == rhs.open_txns)) return false; + if (__isset.min_open_txn != rhs.__isset.min_open_txn) + return false; + else if (__isset.min_open_txn && !(min_open_txn == rhs.min_open_txn)) + return false; return true; } bool operator != (const GetOpenTxnsResponse &rhs) const { diff --git metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java index 6986fc2..8230d38 100644 --- metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java +++ metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/GetOpenTxnsResponse.java @@ -40,6 +40,7 @@ private static final org.apache.thrift.protocol.TField TXN_HIGH_WATER_MARK_FIELD_DESC = new org.apache.thrift.protocol.TField("txn_high_water_mark", org.apache.thrift.protocol.TType.I64, (short)1); private static final org.apache.thrift.protocol.TField OPEN_TXNS_FIELD_DESC = new org.apache.thrift.protocol.TField("open_txns", org.apache.thrift.protocol.TType.SET, (short)2); + private static final org.apache.thrift.protocol.TField MIN_OPEN_TXN_FIELD_DESC = new org.apache.thrift.protocol.TField("min_open_txn", org.apache.thrift.protocol.TType.I64, (short)3); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -49,11 +50,13 @@ private long txn_high_water_mark; // required private Set open_txns; // required + private long min_open_txn; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { TXN_HIGH_WATER_MARK((short)1, "txn_high_water_mark"), - OPEN_TXNS((short)2, "open_txns"); + OPEN_TXNS((short)2, "open_txns"), + MIN_OPEN_TXN((short)3, "min_open_txn"); private static final Map byName = new HashMap(); @@ -72,6 +75,8 @@ public static _Fields findByThriftId(int fieldId) { return TXN_HIGH_WATER_MARK; case 2: // OPEN_TXNS return OPEN_TXNS; + case 3: // MIN_OPEN_TXN + return MIN_OPEN_TXN; default: return null; } @@ -113,7 +118,9 @@ public String getFieldName() { // isset id assignments private static final int __TXN_HIGH_WATER_MARK_ISSET_ID = 0; + private static final int __MIN_OPEN_TXN_ISSET_ID = 1; private byte __isset_bitfield = 0; + private static final _Fields optionals[] = {_Fields.MIN_OPEN_TXN}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -122,6 +129,8 @@ public String getFieldName() { tmpMap.put(_Fields.OPEN_TXNS, new org.apache.thrift.meta_data.FieldMetaData("open_txns", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.SetMetaData(org.apache.thrift.protocol.TType.SET, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)))); + tmpMap.put(_Fields.MIN_OPEN_TXN, new org.apache.thrift.meta_data.FieldMetaData("min_open_txn", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(GetOpenTxnsResponse.class, metaDataMap); } @@ -149,6 +158,7 @@ public GetOpenTxnsResponse(GetOpenTxnsResponse other) { Set __this__open_txns = new HashSet(other.open_txns); this.open_txns = __this__open_txns; } + this.min_open_txn = other.min_open_txn; } public GetOpenTxnsResponse deepCopy() { @@ -160,6 +170,8 @@ public void clear() { setTxn_high_water_markIsSet(false); this.txn_high_water_mark = 0; this.open_txns = null; + setMin_open_txnIsSet(false); + this.min_open_txn = 0; } public long getTxn_high_water_mark() { @@ -222,6 +234,28 @@ public void setOpen_txnsIsSet(boolean value) { } } + public long getMin_open_txn() { + return this.min_open_txn; + } + + public void setMin_open_txn(long min_open_txn) { + this.min_open_txn = min_open_txn; + setMin_open_txnIsSet(true); + } + + public void unsetMin_open_txn() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __MIN_OPEN_TXN_ISSET_ID); + } + + /** Returns true if field min_open_txn is set (has been assigned a value) and false otherwise */ + public boolean isSetMin_open_txn() { + return EncodingUtils.testBit(__isset_bitfield, __MIN_OPEN_TXN_ISSET_ID); + } + + public void setMin_open_txnIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __MIN_OPEN_TXN_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case TXN_HIGH_WATER_MARK: @@ -240,6 +274,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case MIN_OPEN_TXN: + if (value == null) { + unsetMin_open_txn(); + } else { + setMin_open_txn((Long)value); + } + break; + } } @@ -251,6 +293,9 @@ public Object getFieldValue(_Fields field) { case OPEN_TXNS: return getOpen_txns(); + case MIN_OPEN_TXN: + return getMin_open_txn(); + } throw new IllegalStateException(); } @@ -266,6 +311,8 @@ public boolean isSet(_Fields field) { return isSetTxn_high_water_mark(); case OPEN_TXNS: return isSetOpen_txns(); + case MIN_OPEN_TXN: + return isSetMin_open_txn(); } throw new IllegalStateException(); } @@ -301,6 +348,15 @@ public boolean equals(GetOpenTxnsResponse that) { return false; } + boolean this_present_min_open_txn = true && this.isSetMin_open_txn(); + boolean that_present_min_open_txn = true && that.isSetMin_open_txn(); + if (this_present_min_open_txn || that_present_min_open_txn) { + if (!(this_present_min_open_txn && that_present_min_open_txn)) + return false; + if (this.min_open_txn != that.min_open_txn) + return false; + } + return true; } @@ -318,6 +374,11 @@ public int hashCode() { if (present_open_txns) list.add(open_txns); + boolean present_min_open_txn = true && (isSetMin_open_txn()); + list.add(present_min_open_txn); + if (present_min_open_txn) + list.add(min_open_txn); + return list.hashCode(); } @@ -349,6 +410,16 @@ public int compareTo(GetOpenTxnsResponse other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetMin_open_txn()).compareTo(other.isSetMin_open_txn()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetMin_open_txn()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.min_open_txn, other.min_open_txn); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -380,6 +451,12 @@ public String toString() { sb.append(this.open_txns); } first = false; + if (isSetMin_open_txn()) { + if (!first) sb.append(", "); + sb.append("min_open_txn:"); + sb.append(this.min_open_txn); + first = false; + } sb.append(")"); return sb.toString(); } @@ -459,6 +536,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, GetOpenTxnsResponse org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 3: // MIN_OPEN_TXN + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.min_open_txn = iprot.readI64(); + struct.setMin_open_txnIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -487,6 +572,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, GetOpenTxnsRespons } oprot.writeFieldEnd(); } + if (struct.isSetMin_open_txn()) { + oprot.writeFieldBegin(MIN_OPEN_TXN_FIELD_DESC); + oprot.writeI64(struct.min_open_txn); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -512,6 +602,14 @@ public void write(org.apache.thrift.protocol.TProtocol prot, GetOpenTxnsResponse oprot.writeI64(_iter472); } } + BitSet optionals = new BitSet(); + if (struct.isSetMin_open_txn()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetMin_open_txn()) { + oprot.writeI64(struct.min_open_txn); + } } @Override @@ -530,6 +628,11 @@ public void read(org.apache.thrift.protocol.TProtocol prot, GetOpenTxnsResponse } } struct.setOpen_txnsIsSet(true); + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.min_open_txn = iprot.readI64(); + struct.setMin_open_txnIsSet(true); + } } } diff --git metastore/src/gen/thrift/gen-php/metastore/Types.php metastore/src/gen/thrift/gen-php/metastore/Types.php index f505208..d6f7f49 100644 --- metastore/src/gen/thrift/gen-php/metastore/Types.php +++ metastore/src/gen/thrift/gen-php/metastore/Types.php @@ -12002,6 +12002,10 @@ class GetOpenTxnsResponse { * @var int[] */ public $open_txns = null; + /** + * @var int + */ + public $min_open_txn = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -12018,6 +12022,10 @@ class GetOpenTxnsResponse { 'type' => TType::I64, ), ), + 3 => array( + 'var' => 'min_open_txn', + 'type' => TType::I64, + ), ); } if (is_array($vals)) { @@ -12027,6 +12035,9 @@ class GetOpenTxnsResponse { if (isset($vals['open_txns'])) { $this->open_txns = $vals['open_txns']; } + if (isset($vals['min_open_txn'])) { + $this->min_open_txn = $vals['min_open_txn']; + } } } @@ -12077,6 +12088,13 @@ class GetOpenTxnsResponse { $xfer += $input->skip($ftype); } break; + case 3: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->min_open_txn); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -12116,6 +12134,11 @@ class GetOpenTxnsResponse { } $xfer += $output->writeFieldEnd(); } + if ($this->min_open_txn !== null) { + $xfer += $output->writeFieldBegin('min_open_txn', TType::I64, 3); + $xfer += $output->writeI64($this->min_open_txn); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 8d88cd7..2d308c9 100644 --- metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -8264,17 +8264,20 @@ class GetOpenTxnsResponse: Attributes: - txn_high_water_mark - open_txns + - min_open_txn """ thrift_spec = ( None, # 0 (1, TType.I64, 'txn_high_water_mark', None, None, ), # 1 (2, TType.SET, 'open_txns', (TType.I64,None), None, ), # 2 + (3, TType.I64, 'min_open_txn', None, None, ), # 3 ) - def __init__(self, txn_high_water_mark=None, open_txns=None,): + def __init__(self, txn_high_water_mark=None, open_txns=None, min_open_txn=None,): self.txn_high_water_mark = txn_high_water_mark self.open_txns = open_txns + self.min_open_txn = min_open_txn def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -8300,6 +8303,11 @@ def read(self, iprot): iprot.readSetEnd() else: iprot.skip(ftype) + elif fid == 3: + if ftype == TType.I64: + self.min_open_txn = iprot.readI64() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -8321,6 +8329,10 @@ def write(self, oprot): oprot.writeI64(iter419) oprot.writeSetEnd() oprot.writeFieldEnd() + if self.min_open_txn is not None: + oprot.writeFieldBegin('min_open_txn', TType.I64, 3) + oprot.writeI64(self.min_open_txn) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -8336,6 +8348,7 @@ def __hash__(self): value = 17 value = (value * 31) ^ hash(self.txn_high_water_mark) value = (value * 31) ^ hash(self.open_txns) + value = (value * 31) ^ hash(self.min_open_txn) return value def __repr__(self): diff --git metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index 0964cd8..bd94e98 100644 --- metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -1843,10 +1843,12 @@ class GetOpenTxnsResponse include ::Thrift::Struct, ::Thrift::Struct_Union TXN_HIGH_WATER_MARK = 1 OPEN_TXNS = 2 + MIN_OPEN_TXN = 3 FIELDS = { TXN_HIGH_WATER_MARK => {:type => ::Thrift::Types::I64, :name => 'txn_high_water_mark'}, - OPEN_TXNS => {:type => ::Thrift::Types::SET, :name => 'open_txns', :element => {:type => ::Thrift::Types::I64}} + OPEN_TXNS => {:type => ::Thrift::Types::SET, :name => 'open_txns', :element => {:type => ::Thrift::Types::I64}}, + MIN_OPEN_TXN => {:type => ::Thrift::Types::I64, :name => 'min_open_txn', :optional => true} } def struct_fields; FIELDS; end diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index b121644..e8c5fac 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -361,15 +361,25 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { close(rs); Set openList = new HashSet(); //need the WHERE clause below to ensure consistent results with READ_COMMITTED - s = "select txn_id from TXNS where txn_id <= " + hwm; + s = "select txn_id, txn_state from TXNS where txn_id <= " + hwm; LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); + long minOpenTxn = Long.MAX_VALUE; while (rs.next()) { - openList.add(rs.getLong(1)); + long txnId = rs.getLong(1); + openList.add(txnId); + char c = rs.getString(2).charAt(0); + if(c == TXN_OPEN) { + minOpenTxn = Math.min(minOpenTxn, txnId); + } } LOG.debug("Going to rollback"); dbConn.rollback(); - return new GetOpenTxnsResponse(hwm, openList); + GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList); + if(minOpenTxn < Long.MAX_VALUE) { + otr.setMin_open_txn(minOpenTxn); + } + return otr; } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 39b18ac..2ffa1da 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; @@ -57,31 +58,42 @@ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long if (currentTxn > 0 && currentTxn == txn) continue; exceptions[i++] = txn; } - return new ValidReadTxnList(exceptions, highWater); + if(txns.isSetMin_open_txn()) { + return new ValidReadTxnList(exceptions, highWater, txns.getMin_open_txn()); + } + else { + return new ValidReadTxnList(exceptions, highWater); + } } /** * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to * compact the files, and thus treats only open transactions as invalid. Additionally any - * txnId > highestOpenTxnId is also invalid. This is avoid creating something like + * txnId > highestOpenTxnId is also invalid. This is to avoid creating something like * delta_17_120 where txnId 80, for example, is still open. * @param txns txn list from the metastore * @return a valid txn list. */ public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) { - //todo: this could be more efficient: using select min(txn_id) from TXNS where txn_state=" + - // quoteChar(TXN_OPEN) to compute compute HWM... long highWater = txns.getTxn_high_water_mark(); long minOpenTxn = Long.MAX_VALUE; long[] exceptions = new long[txns.getOpen_txnsSize()]; int i = 0; for (TxnInfo txn : txns.getOpen_txns()) { - if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, txn.getId()); - exceptions[i++] = txn.getId();//todo: only add Aborted - }//remove all exceptions < minOpenTxn + if (txn.getState() == TxnState.OPEN) { + minOpenTxn = Math.min(minOpenTxn, txn.getId()); + } + else { + //only need aborted since we don't consider anything above minOpenTxn + exceptions[i++] = txn.getId(); + } + } + if(i < exceptions.length) { + exceptions = Arrays.copyOf(exceptions, i); + } highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1; - return new ValidCompactorTxnList(exceptions, -1, highWater); + return new ValidCompactorTxnList(exceptions, highWater); } /** diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java index c249854..79ccc6b 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java @@ -26,66 +26,74 @@ @Test public void minTxnHigh() { - ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 4}, 3, 5); + ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 4}, 2); ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9); Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp); } @Test public void maxTxnLow() { - ValidTxnList txns = new ValidCompactorTxnList(new long[]{13, 14}, 13, 15); + ValidTxnList txns = new ValidCompactorTxnList(new long[]{13, 14}, 12); ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9); Assert.assertEquals(ValidTxnList.RangeResponse.ALL, rsp); } @Test public void minTxnHighNoExceptions() { - ValidTxnList txns = new ValidCompactorTxnList(new long[0], -1, 5); + ValidTxnList txns = new ValidCompactorTxnList(new long[0], 5); ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9); Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp); } @Test public void maxTxnLowNoExceptions() { - ValidTxnList txns = new ValidCompactorTxnList(new long[0], -1, 15); + ValidTxnList txns = new ValidCompactorTxnList(new long[0], 15); ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9); Assert.assertEquals(ValidTxnList.RangeResponse.ALL, rsp); } @Test public void exceptionsAllBelow() { - ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 6}, 3, 15); + ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 6}, 3); ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9); Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp); } @Test public void exceptionsInMidst() { - ValidTxnList txns = new ValidCompactorTxnList(new long[]{8}, 8, 15); + ValidTxnList txns = new ValidCompactorTxnList(new long[]{8}, 7); ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9); Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp); } + @Test + public void exceptionsAbveHighWaterMark() { + ValidTxnList txns = new ValidCompactorTxnList(new long[]{8, 11, 17, 29}, 15); + Assert.assertArrayEquals("", new long[]{8, 11}, txns.getInvalidTransactions()); + ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9); + Assert.assertEquals(ValidTxnList.RangeResponse.ALL, rsp); + rsp = txns.isTxnRangeValid(12, 16); + Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp); + } @Test public void writeToString() { - ValidTxnList txns = new ValidCompactorTxnList(new long[]{9, 7, 10}, 9, 37); - Assert.assertEquals("37:9:7:9:10", txns.writeToString()); + ValidTxnList txns = new ValidCompactorTxnList(new long[]{9, 7, 10, Long.MAX_VALUE}, 8); + Assert.assertEquals("8:" + Long.MAX_VALUE + ":7", txns.writeToString()); txns = new ValidCompactorTxnList(); - Assert.assertEquals(Long.toString(Long.MAX_VALUE) + ":-1:", txns.writeToString()); - txns = new ValidCompactorTxnList(new long[0], -1, 23); - Assert.assertEquals("23:-1:", txns.writeToString()); + Assert.assertEquals(Long.toString(Long.MAX_VALUE) + ":" + Long.MAX_VALUE + ":", txns.writeToString()); + txns = new ValidCompactorTxnList(new long[0], 23); + Assert.assertEquals("23:" + Long.MAX_VALUE + ":", txns.writeToString()); } @Test public void readFromString() { - ValidCompactorTxnList txns = new ValidCompactorTxnList("37:9:7:9:10"); + ValidCompactorTxnList txns = new ValidCompactorTxnList("37:" + Long.MAX_VALUE + ":7:9:10"); Assert.assertEquals(37L, txns.getHighWatermark()); - Assert.assertEquals(9L, txns.getMinOpenTxn()); + Assert.assertEquals(Long.MAX_VALUE, txns.getMinOpenTxn()); Assert.assertArrayEquals(new long[]{7L, 9L, 10L}, txns.getInvalidTransactions()); - txns = new ValidCompactorTxnList("21:-1:"); + txns = new ValidCompactorTxnList("21:" + Long.MAX_VALUE + ":"); Assert.assertEquals(21L, txns.getHighWatermark()); - Assert.assertEquals(-1L, txns.getMinOpenTxn()); + Assert.assertEquals(Long.MAX_VALUE, txns.getMinOpenTxn()); Assert.assertEquals(0, txns.getInvalidTransactions().length); - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index c150ec5..449d889 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -634,7 +634,7 @@ private static boolean isValidBase(long baseTxnId, ValidTxnList txnList) { //By definition there are no open txns with id < 1. return true; } - return ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeValid(1, baseTxnId); + return txnList.isValidBase(baseTxnId); } private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, ValidTxnList txnList, List working, List originalDirectories, diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 63d02fb..0d6f177 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -605,9 +605,8 @@ public boolean validateInput(FileSystem fs, HiveConf conf, footerCache = useExternalCache ? metaCache : localCache; } } - String value = conf.get(ValidTxnList.VALID_TXNS_KEY, - Long.MAX_VALUE + ":"); - transactionList = new ValidReadTxnList(value); + String value = conf.get(ValidTxnList.VALID_TXNS_KEY); + transactionList = value == null ? new ValidReadTxnList() : new ValidReadTxnList(value); } @VisibleForTesting diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index d48e441..e377f52 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -1178,6 +1178,22 @@ public void testCompactWithDelete() throws Exception { } /** + * make sure Aborted txns don't red-flag a base_xxxx (HIVE-14350) + */ + @Test + public void testNoHistory() throws Exception { + int[][] tableData = {{1,2},{3,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + runCleaner(hiveConf); + runStatementOnDriver("select count(*) from " + Table.ACIDTBL); + } + /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order */ diff --git ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java index b83cea4..556df18 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java @@ -121,7 +121,7 @@ public void testOriginal() throws Exception { new MockFile("mock:/tbl/part1/subdir/000000_0", 0, new byte[0])); AcidUtils.Directory dir = AcidUtils.getAcidState(new MockPath(fs, "/tbl/part1"), conf, - new ValidReadTxnList("100:")); + new ValidReadTxnList("100:" + Long.MAX_VALUE + ":")); assertEquals(null, dir.getBaseDirectory()); assertEquals(0, dir.getCurrentDirectories().size()); assertEquals(0, dir.getObsolete().size()); @@ -152,7 +152,7 @@ public void testOriginalDeltas() throws Exception { new MockFile("mock:/tbl/part1/delta_101_101/bucket_0", 0, new byte[0])); AcidUtils.Directory dir = AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs, - "mock:/tbl/part1"), conf, new ValidReadTxnList("100:")); + "mock:/tbl/part1"), conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":")); assertEquals(null, dir.getBaseDirectory()); List obsolete = dir.getObsolete(); assertEquals(2, obsolete.size()); @@ -194,7 +194,7 @@ public void testBaseDeltas() throws Exception { new MockFile("mock:/tbl/part1/delta_90_120/bucket_0", 0, new byte[0])); AcidUtils.Directory dir = AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs, - "mock:/tbl/part1"), conf, new ValidReadTxnList("100:")); + "mock:/tbl/part1"), conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":")); assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString()); List obsolete = dir.getObsolete(); assertEquals(5, obsolete.size()); @@ -225,7 +225,7 @@ public void testBestBase() throws Exception { new MockFile("mock:/tbl/part1/base_200/bucket_0", 500, new byte[0])); Path part = new MockPath(fs, "/tbl/part1"); AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:")); + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:" + Long.MAX_VALUE + ":")); assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString()); assertEquals(1, dir.getCurrentDirectories().size()); assertEquals("mock:/tbl/part1/delta_120_130", @@ -238,7 +238,7 @@ public void testBestBase() throws Exception { assertEquals("mock:/tbl/part1/delta_98_100", obsoletes.get(3).getPath().toString()); assertEquals(0, dir.getOriginalFiles().size()); - dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:")); + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:" + Long.MAX_VALUE + ":")); assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString()); assertEquals(0, dir.getCurrentDirectories().size()); obsoletes = dir.getObsolete(); @@ -250,8 +250,9 @@ public void testBestBase() throws Exception { the existence of delta_120_130 implies that 121 in the exception list is aborted unless delta_120_130 is from streaming ingest in which case 121 can be open (and thus 122-130 are open too) + 99 here would be Aborted since 121 is minOpenTxn, base_100 is still good For multi-statment txns, see HIVE-13369*/ - dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:121")); + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:121:99:121")); assertEquals("mock:/tbl/part1/base_100", dir.getBaseDirectory().toString()); assertEquals(1, dir.getCurrentDirectories().size()); assertEquals("mock:/tbl/part1/delta_120_130", @@ -265,7 +266,7 @@ public void testBestBase() throws Exception { boolean gotException = false; try { - dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("125:5")); + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("125:5:5")); } catch(IOException e) { gotException = true; @@ -282,7 +283,7 @@ public void testBestBase() throws Exception { part = new MockPath(fs, "/tbl/part1"); try { gotException = false; - dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7")); + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7:7")); } catch(IOException e) { gotException = true; @@ -298,7 +299,7 @@ public void testBestBase() throws Exception { part = new MockPath(fs, "/tbl/part1"); try { gotException = false; - dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7")); + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7:7")); } catch(IOException e) { gotException = true; @@ -314,7 +315,7 @@ public void testBestBase() throws Exception { new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0])); part = new MockPath(fs, "/tbl/part1"); //note that we don't include current txn of the client in exception list to read-you-writes - dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("1:")); + dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("1:" + Long.MAX_VALUE + ":")); assertEquals("mock:/tbl/part1/base_" + Long.MIN_VALUE, dir.getBaseDirectory().toString()); assertEquals(1, dir.getCurrentDirectories().size()); assertEquals("mock:/tbl/part1/delta_1_1", dir.getCurrentDirectories().get(0).getPath().toString()); @@ -330,7 +331,7 @@ public void testObsoleteOriginals() throws Exception { new MockFile("mock:/tbl/part1/000001_1", 500, new byte[0])); Path part = new MockPath(fs, "/tbl/part1"); AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:")); + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:" + Long.MAX_VALUE + ":")); // Obsolete list should include the two original bucket files, and the old base dir List obsolete = dir.getObsolete(); assertEquals(3, obsolete.size()); @@ -351,7 +352,7 @@ public void testOverlapingDelta() throws Exception { new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0])); Path part = new MockPath(fs, "mock:/tbl/part1"); AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:")); + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":")); assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); List obsolete = dir.getObsolete(); assertEquals(2, obsolete.size()); @@ -386,7 +387,7 @@ public void testOverlapingDelta2() throws Exception { new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0])); Path part = new MockPath(fs, "mock:/tbl/part1"); AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:")); + AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:" + Long.MAX_VALUE + ":")); assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString()); List obsolete = dir.getObsolete(); assertEquals(5, obsolete.size()); @@ -411,7 +412,7 @@ public void deltasWithOpenTxnInRead() throws Exception { new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0])); Path part = new MockPath(fs, "mock:/tbl/part1"); - AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4")); + AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4:4")); List delts = dir.getCurrentDirectories(); assertEquals(2, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); @@ -432,7 +433,7 @@ public void deltasWithOpenTxnInRead2() throws Exception { new MockFile("mock:/tbl/part1/delta_4_4_3/bucket_0", 500, new byte[0]), new MockFile("mock:/tbl/part1/delta_101_101_1/bucket_0", 500, new byte[0])); Path part = new MockPath(fs, "mock:/tbl/part1"); - AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4")); + AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:4:4")); List delts = dir.getCurrentDirectories(); assertEquals(2, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); @@ -447,7 +448,7 @@ public void deltasWithOpenTxnsNotInCompact() throws Exception { new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0])); Path part = new MockPath(fs, "mock:/tbl/part1"); AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("100:4")); + AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("4:" + Long.MAX_VALUE)); List delts = dir.getCurrentDirectories(); assertEquals(1, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString()); @@ -464,7 +465,7 @@ public void deltasWithOpenTxnsNotInCompact2() throws Exception { new MockFile("mock:/tbl/part1/delta_6_10/bucket_0", 500, new byte[0])); Path part = new MockPath(fs, "mock:/tbl/part1"); AcidUtils.Directory dir = - AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("100:4")); + AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("3:" + Long.MAX_VALUE)); List delts = dir.getCurrentDirectories(); assertEquals(1, delts.size()); assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());