diff --git a/storage-api/src/java/org/apache/hive/common/util/TxnIdUtils.java b/storage-api/src/java/org/apache/hive/common/util/TxnIdUtils.java new file mode 100644 index 0000000000..3bb7afbbfa --- /dev/null +++ b/storage-api/src/java/org/apache/hive/common/util/TxnIdUtils.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.common.util; + +import org.apache.hadoop.hive.common.ValidWriteIdList; + +public class TxnIdUtils { + + /** + * Check if 2 ValidWriteIdLists are at an equivalent commit point. + */ + public static boolean checkEquivalentWriteIds(ValidWriteIdList a, ValidWriteIdList b) { + if (!a.getTableName().equalsIgnoreCase(b.getTableName())) { + return false; + } + ValidWriteIdList newer = a; + ValidWriteIdList older = b; + if (a.getHighWatermark() < b.getHighWatermark()) { + newer = b; + older = a; + } + + Long newMinOpenId = newer.getMinOpenWriteId(); + Long oldMinOpenId = older.getMinOpenWriteId(); + + if (newMinOpenId == null || oldMinOpenId == null) { + return false; + } + + return checkEquivalentCommittedIds( + oldMinOpenId.longValue(), older.getHighWatermark(), older.getInvalidWriteIds(), + newMinOpenId.longValue(), newer.getHighWatermark(), newer.getInvalidWriteIds()); + } + + /** + * Check the min open ID/highwater mark/exceptions list to see if 2 ID lists are at the same commit point. + * This can also be used for ValidTxnList as well as ValidWriteIdList. + */ + private static boolean checkEquivalentCommittedIds( + long oldMinOpenId, long oldHWM, long[] oldInvalidIds, + long newMinOpenId, long newHWM, long[] newInvalidIds) { + + // There should be no valid txns in newer list that are not also in older. + // Per ekoifman if the min open IDs are not the same this implies a commit has occurred. + // Based on the requirement that newMinOpenId == oldMinOpenId, the following checks needed: + // - All values in oldInvalidIds should also be in newInvalidIds. + // - if oldHWM < newHWM, then all IDs between oldHWM .. newHWM should exist in newInvalidTxns. + // A Gap in the sequence means a committed txn in newer list (lists are not equivalent) + if (newMinOpenId != oldMinOpenId) { + return false; + } + + if (newInvalidIds.length < oldInvalidIds.length) { + return false; + } + + for (int idx = 0; idx < oldInvalidIds.length; ++idx) { + if (oldInvalidIds[idx] != newInvalidIds[idx]) { + return false; + } + } + + // If older committed state is equivalent to newer state, then there should be no committed IDs + // between oldHWM and newHWM, and newInvalidIds should have exactly (newHWM - oldHWM) + // more entries than oldInvalidIds. + long oldNewListSizeDifference = newInvalidIds.length - oldInvalidIds.length; + long oldNewHWMDifference = newHWM - oldHWM; + if (oldNewHWMDifference != oldNewListSizeDifference) { + return false; + } + + return true; + } +} diff --git a/storage-api/src/test/org/apache/hive/common/util/TestTxnIdUtils.java b/storage-api/src/test/org/apache/hive/common/util/TestTxnIdUtils.java new file mode 100644 index 0000000000..1274d4a8d9 --- /dev/null +++ b/storage-api/src/test/org/apache/hive/common/util/TestTxnIdUtils.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.common.util; + +import java.util.BitSet; + +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.junit.Test; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestTxnIdUtils { + + @Test + public void testCheckEquivalentWriteIds() throws Exception { + ValidReaderWriteIdList id1 = new ValidReaderWriteIdList("default.table1", + new long[] {1,2,3,4,5}, new BitSet(), 5, 1); + assertTrue(TxnIdUtils.checkEquivalentWriteIds(id1, id1)); + + // write ID with additional uncommitted IDs. Should match. + ValidReaderWriteIdList id2 = new ValidReaderWriteIdList("default.table1", + new long[] {1,2,3,4,5,6,7,8}, new BitSet(), 8, 1); + assertTrue(TxnIdUtils.checkEquivalentWriteIds(id1, id2)); + assertTrue(TxnIdUtils.checkEquivalentWriteIds(id2, id1)); + + // ID 1 has been committed, all others open + ValidReaderWriteIdList id3 = new ValidReaderWriteIdList("default.table1", + new long[] {2,3,4,5,6,7,8}, new BitSet(), 8, 2); + assertFalse(TxnIdUtils.checkEquivalentWriteIds(id1, id3)); + assertFalse(TxnIdUtils.checkEquivalentWriteIds(id3, id2)); + + // ID 5 has been committed, all others open + ValidReaderWriteIdList id4 = new ValidReaderWriteIdList("default.table1", + new long[] {1,2,3,4,6,7,8}, new BitSet(), 8, 1); + assertFalse(TxnIdUtils.checkEquivalentWriteIds(id1, id4)); + assertFalse(TxnIdUtils.checkEquivalentWriteIds(id4, id2)); + + // ID 8 was committed, all others open + ValidReaderWriteIdList id5 = new ValidReaderWriteIdList("default.table1", + new long[] {1,2,3,4,6,7}, new BitSet(), 8, 1); + assertFalse(TxnIdUtils.checkEquivalentWriteIds(id1, id5)); + assertFalse(TxnIdUtils.checkEquivalentWriteIds(id5, id2)); + + // Different table name + ValidReaderWriteIdList id6 = new ValidReaderWriteIdList("default.tab2", + new long[] {1,2,3,4,5}, new BitSet(), 5, 1); + assertFalse(TxnIdUtils.checkEquivalentWriteIds(id1, id6)); + + // WriteID for table1, way in the future + ValidReaderWriteIdList id7 = new ValidReaderWriteIdList("default.table1", + new long[] {100, 101, 105}, new BitSet(), 105, 100); + assertFalse(TxnIdUtils.checkEquivalentWriteIds(id1, id7)); + } +}