From 4c4688e5e48253ab5be78ab0937136d407250904 Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Tue, 28 Apr 2020 18:12:31 +0200 Subject: [PATCH] HIVE-23293: Locks: Implement zero-wait readers --- .../org/apache/hadoop/hive/ql/ErrorMsg.java | 4 +- .../hadoop/hive/ql/lockmgr/DbLockManager.java | 6 +- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 5 +- .../hive/ql/lockmgr/TestDbTxnManager2.java | 96 +++++++++++---- .../hive/metastore/api/LockRequest.java | 109 ++++++++++++++++- .../hive/metastore/api/LockResponse.java | 112 +++++++++++++++++- .../gen/thrift/gen-php/metastore/Types.php | 46 +++++++ .../thrift/gen-py/hive_metastore/ttypes.py | 30 ++++- .../gen/thrift/gen-rb/hive_metastore_types.rb | 8 +- .../hive/metastore/LockRequestBuilder.java | 5 + .../src/main/thrift/hive_metastore.thrift | 2 + .../hadoop/hive/metastore/txn/TxnHandler.java | 58 ++++++--- 12 files changed, 421 insertions(+), 60 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 8e643fe844..d732004e51 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -176,8 +176,8 @@ LOAD_INTO_NON_NATIVE(10101, "A non-native table cannot be used as target for LOAD"), LOCKMGR_NOT_SPECIFIED(10102, "Lock manager not specified correctly, set hive.lock.manager"), LOCKMGR_NOT_INITIALIZED(10103, "Lock manager could not be initialized, check hive.lock.manager "), - LOCK_CANNOT_BE_ACQUIRED(10104, "Locks on the underlying objects cannot be acquired. " - + "retry after some time"), + LOCK_CANNOT_BE_ACQUIRED(10104, "Locks on the underlying objects cannot be acquired, " + + "retry after some time."), ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED(10105, "Check hive.zookeeper.quorum " + "and hive.zookeeper.client.port"), OVERWRITE_ARCHIVED_PART(10106, "Cannot overwrite an archived partition. " + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index fb5a306ac4..cdccbdffa2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -140,9 +140,9 @@ else if(l.txnId == 0) { } locks.add(hl); if (res.getState() != LockState.ACQUIRED) { - if(res.getState() == LockState.WAITING) { - LOG.error("Unable to acquire locks for lockId={} after {} retries (retries took {} ms). QueryId={}", - res.getLockid(), numRetries, retryDuration, queryId); + LOG.error("Unable to acquire locks for lockId={} after {} retries (retries took {} ms). QueryId={}", + res.getLockid(), numRetries, retryDuration, queryId); + if (res.getState() == LockState.WAITING) { /** * the {@link #unlock(HiveLock)} here is more about future proofing when support for * multi-statement txns is added. In that case it's reasonable for the client diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index b4dac4346e..a08af7cf02 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -412,6 +412,8 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB rqstBuilder.setTransactionId(txnId) .setUser(username); + rqstBuilder.setZeroWaitReadEnabled(!conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) || + !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK)); // Make sure we need locks. It's possible there's nothing to lock in // this operation. @@ -420,11 +422,10 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB return null; } List lockComponents = AcidUtils.makeLockComponents(plan.getOutputs(), plan.getInputs(), conf); - lockComponents.addAll(getGlobalLocks(ctx.getConf())); //It's possible there's nothing to lock even if we have w/r entities. - if(lockComponents.isEmpty()) { + if (lockComponents.isEmpty()) { LOG.debug("No locks needed for queryId=" + queryId); return null; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index f90396b2a3..f0ab8afb59 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -2529,7 +2529,17 @@ public void testShowTablesLock() throws Exception { @Test public void testFairness() throws Exception { - dropTable(new String[] {"T6"}); + testFairness(false); + } + + @Test + public void testFairnessZeroWaitRead() throws Exception { + testFairness(true); + } + + private void testFairness(boolean zeroWaitRead) throws Exception { + dropTable(new String[]{"T6"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !zeroWaitRead); driver.run("create table if not exists T6(a int)"); driver.compileAndRespond("select a from T6", true); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); //gets S lock on T6 @@ -2541,18 +2551,30 @@ public void testFairness() throws Exception { List locks = getLocks(); Assert.assertEquals("Unexpected lock count", 2, locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, locks); - checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, locks); + long extLockId = checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, locks).getLockid(); HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr3); //this should block behind the X lock on T6 //this is a contrived example, in practice this query would of course fail after drop table driver.compileAndRespond("select a from T6", true); - ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false); //gets S lock on T6 + try { + ((DbTxnManager) txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false); //gets S lock on T6 + } catch (LockException ex) { + Assert.assertTrue(zeroWaitRead); + Assert.assertEquals("Exception msg didn't match", + ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg() + " LockResponse(lockid:" + (extLockId + 1) + + ", state:NOT_ACQUIRED, errorMessage:Unable to acquire read lock due to an exclusive lock" + + " {lockid:" + extLockId + " intLockId:1 txnid:" + txnMgr2.getCurrentTxnId() + + " db:default table:t6 partition:null state:WAITING type:EXCLUSIVE})", + ex.getMessage()); + } locks = getLocks(); - Assert.assertEquals("Unexpected lock count", 3, locks.size()); + Assert.assertEquals("Unexpected lock count", (zeroWaitRead ? 2 : 3), locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, locks); - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T6", null, locks); + if (!zeroWaitRead) { + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T6", null, locks); + } checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, locks); } @@ -2568,7 +2590,17 @@ public void testFairness() throws Exception { */ @Test public void testFairness2() throws Exception { + testFairness2(false); + } + + @Test + public void testFairness2ZeroWaitRead() throws Exception { + testFairness2(true); + } + + private void testFairness2(boolean zeroWaitRead) throws Exception { dropTable(new String[]{"T7"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !zeroWaitRead); driver.run("create table if not exists T7 (a int) " + "partitioned by (p int) stored as orc TBLPROPERTIES ('transactional'='true')"); driver.run("insert into T7 partition(p) values(1,1),(1,2)"); //create 2 partitions @@ -2584,42 +2616,56 @@ public void testFairness2() throws Exception { checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=1", locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks); - checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T7", "p=1", locks); + long extLockId = checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T7", "p=1", locks).getLockid(); HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); swapTxnManager(txnMgr3); //this should block behind the X lock on T7.p=1 driver.compileAndRespond("select a from T7", true); //tries to get S lock on T7, S on T7.p=1 and S on T7.p=2 - ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false); + try { + ((DbTxnManager) txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false); + } catch (LockException ex) { + Assert.assertTrue(zeroWaitRead); + Assert.assertEquals("Exception msg didn't match", + ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg() + " LockResponse(lockid:" + (extLockId + 1) + + ", state:NOT_ACQUIRED, errorMessage:Unable to acquire read lock due to an exclusive lock" + + " {lockid:" + extLockId + " intLockId:1 txnid:" + txnMgr2.getCurrentTxnId() + + " db:default table:t7 partition:p=1 state:WAITING type:EXCLUSIVE})", + ex.getMessage()); + } locks = getLocks(); - Assert.assertEquals("Unexpected lock count", 7, locks.size()); + Assert.assertEquals("Unexpected lock count", (zeroWaitRead ? 4 : 7), locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=1", locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks); - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, locks); - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", locks); - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", locks); + if (!zeroWaitRead) { + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", locks); + } checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T7", "p=1", locks); txnMgr.commitTxn(); //release locks from "select a from T7" - to unblock hte drop partition //retest the the "drop partiton" X lock - ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(6).getLockid()); + ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(zeroWaitRead ? 3 : 6).getLockid()); locks = getLocks(); - Assert.assertEquals("Unexpected lock count", 4, locks.size()); + Assert.assertEquals("Unexpected lock count", (zeroWaitRead ? 1 : 4), locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T7", "p=1", locks); - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, locks); - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", locks); - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", locks); - - txnMgr2.rollbackTxn(); //release the X lock on T7.p=1 - //re-test the locks - ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(1).getLockid()); //S lock on T7 - locks = getLocks(); - Assert.assertEquals("Unexpected lock count", 3, locks.size()); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=1", locks); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks); + if (!zeroWaitRead) { + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", locks); + + txnMgr2.rollbackTxn(); //release the X lock on T7.p=1 + //re-test the locks + ((DbLockManager) txnMgr2.getLockManager()).checkLock(locks.get(1).getLockid()); //S lock on T7 + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 3, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=1", locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks); + } } @Test diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockRequest.java index 7402fb30eb..e382c9d377 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockRequest.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockRequest.java @@ -43,6 +43,7 @@ private static final org.apache.thrift.protocol.TField USER_FIELD_DESC = new org.apache.thrift.protocol.TField("user", org.apache.thrift.protocol.TType.STRING, (short)3); private static final org.apache.thrift.protocol.TField HOSTNAME_FIELD_DESC = new org.apache.thrift.protocol.TField("hostname", org.apache.thrift.protocol.TType.STRING, (short)4); private static final org.apache.thrift.protocol.TField AGENT_INFO_FIELD_DESC = new org.apache.thrift.protocol.TField("agentInfo", org.apache.thrift.protocol.TType.STRING, (short)5); + private static final org.apache.thrift.protocol.TField ZERO_WAIT_READ_ENABLED_FIELD_DESC = new org.apache.thrift.protocol.TField("zeroWaitReadEnabled", org.apache.thrift.protocol.TType.BOOL, (short)6); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -55,6 +56,7 @@ private String user; // required private String hostname; // required private String agentInfo; // optional + private boolean zeroWaitReadEnabled; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -62,7 +64,8 @@ TXNID((short)2, "txnid"), USER((short)3, "user"), HOSTNAME((short)4, "hostname"), - AGENT_INFO((short)5, "agentInfo"); + AGENT_INFO((short)5, "agentInfo"), + ZERO_WAIT_READ_ENABLED((short)6, "zeroWaitReadEnabled"); private static final Map byName = new HashMap(); @@ -87,6 +90,8 @@ public static _Fields findByThriftId(int fieldId) { return HOSTNAME; case 5: // AGENT_INFO return AGENT_INFO; + case 6: // ZERO_WAIT_READ_ENABLED + return ZERO_WAIT_READ_ENABLED; default: return null; } @@ -128,8 +133,9 @@ public String getFieldName() { // isset id assignments private static final int __TXNID_ISSET_ID = 0; + private static final int __ZEROWAITREADENABLED_ISSET_ID = 1; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.TXNID,_Fields.AGENT_INFO}; + private static final _Fields optionals[] = {_Fields.TXNID,_Fields.AGENT_INFO,_Fields.ZERO_WAIT_READ_ENABLED}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -144,6 +150,8 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.AGENT_INFO, new org.apache.thrift.meta_data.FieldMetaData("agentInfo", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.ZERO_WAIT_READ_ENABLED, new org.apache.thrift.meta_data.FieldMetaData("zeroWaitReadEnabled", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LockRequest.class, metaDataMap); } @@ -151,6 +159,8 @@ public String getFieldName() { public LockRequest() { this.agentInfo = "Unknown"; + this.zeroWaitReadEnabled = false; + } public LockRequest( @@ -186,6 +196,7 @@ public LockRequest(LockRequest other) { if (other.isSetAgentInfo()) { this.agentInfo = other.agentInfo; } + this.zeroWaitReadEnabled = other.zeroWaitReadEnabled; } public LockRequest deepCopy() { @@ -201,6 +212,8 @@ public void clear() { this.hostname = null; this.agentInfo = "Unknown"; + this.zeroWaitReadEnabled = false; + } public int getComponentSize() { @@ -332,6 +345,28 @@ public void setAgentInfoIsSet(boolean value) { } } + public boolean isZeroWaitReadEnabled() { + return this.zeroWaitReadEnabled; + } + + public void setZeroWaitReadEnabled(boolean zeroWaitReadEnabled) { + this.zeroWaitReadEnabled = zeroWaitReadEnabled; + setZeroWaitReadEnabledIsSet(true); + } + + public void unsetZeroWaitReadEnabled() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __ZEROWAITREADENABLED_ISSET_ID); + } + + /** Returns true if field zeroWaitReadEnabled is set (has been assigned a value) and false otherwise */ + public boolean isSetZeroWaitReadEnabled() { + return EncodingUtils.testBit(__isset_bitfield, __ZEROWAITREADENABLED_ISSET_ID); + } + + public void setZeroWaitReadEnabledIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __ZEROWAITREADENABLED_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case COMPONENT: @@ -374,6 +409,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case ZERO_WAIT_READ_ENABLED: + if (value == null) { + unsetZeroWaitReadEnabled(); + } else { + setZeroWaitReadEnabled((Boolean)value); + } + break; + } } @@ -394,6 +437,9 @@ public Object getFieldValue(_Fields field) { case AGENT_INFO: return getAgentInfo(); + case ZERO_WAIT_READ_ENABLED: + return isZeroWaitReadEnabled(); + } throw new IllegalStateException(); } @@ -415,6 +461,8 @@ public boolean isSet(_Fields field) { return isSetHostname(); case AGENT_INFO: return isSetAgentInfo(); + case ZERO_WAIT_READ_ENABLED: + return isSetZeroWaitReadEnabled(); } throw new IllegalStateException(); } @@ -477,6 +525,15 @@ public boolean equals(LockRequest that) { return false; } + boolean this_present_zeroWaitReadEnabled = true && this.isSetZeroWaitReadEnabled(); + boolean that_present_zeroWaitReadEnabled = true && that.isSetZeroWaitReadEnabled(); + if (this_present_zeroWaitReadEnabled || that_present_zeroWaitReadEnabled) { + if (!(this_present_zeroWaitReadEnabled && that_present_zeroWaitReadEnabled)) + return false; + if (this.zeroWaitReadEnabled != that.zeroWaitReadEnabled) + return false; + } + return true; } @@ -509,6 +566,11 @@ public int hashCode() { if (present_agentInfo) list.add(agentInfo); + boolean present_zeroWaitReadEnabled = true && (isSetZeroWaitReadEnabled()); + list.add(present_zeroWaitReadEnabled); + if (present_zeroWaitReadEnabled) + list.add(zeroWaitReadEnabled); + return list.hashCode(); } @@ -570,6 +632,16 @@ public int compareTo(LockRequest other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetZeroWaitReadEnabled()).compareTo(other.isSetZeroWaitReadEnabled()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetZeroWaitReadEnabled()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.zeroWaitReadEnabled, other.zeroWaitReadEnabled); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -629,6 +701,12 @@ public String toString() { } first = false; } + if (isSetZeroWaitReadEnabled()) { + if (!first) sb.append(", "); + sb.append("zeroWaitReadEnabled:"); + sb.append(this.zeroWaitReadEnabled); + first = false; + } sb.append(")"); return sb.toString(); } @@ -737,6 +815,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, LockRequest struct) org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 6: // ZERO_WAIT_READ_ENABLED + if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { + struct.zeroWaitReadEnabled = iprot.readBool(); + struct.setZeroWaitReadEnabledIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -784,6 +870,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, LockRequest struct oprot.writeFieldEnd(); } } + if (struct.isSetZeroWaitReadEnabled()) { + oprot.writeFieldBegin(ZERO_WAIT_READ_ENABLED_FIELD_DESC); + oprot.writeBool(struct.zeroWaitReadEnabled); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -817,13 +908,19 @@ public void write(org.apache.thrift.protocol.TProtocol prot, LockRequest struct) if (struct.isSetAgentInfo()) { optionals.set(1); } - oprot.writeBitSet(optionals, 2); + if (struct.isSetZeroWaitReadEnabled()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); if (struct.isSetTxnid()) { oprot.writeI64(struct.txnid); } if (struct.isSetAgentInfo()) { oprot.writeString(struct.agentInfo); } + if (struct.isSetZeroWaitReadEnabled()) { + oprot.writeBool(struct.zeroWaitReadEnabled); + } } @Override @@ -845,7 +942,7 @@ public void read(org.apache.thrift.protocol.TProtocol prot, LockRequest struct) struct.setUserIsSet(true); struct.hostname = iprot.readString(); struct.setHostnameIsSet(true); - BitSet incoming = iprot.readBitSet(2); + BitSet incoming = iprot.readBitSet(3); if (incoming.get(0)) { struct.txnid = iprot.readI64(); struct.setTxnidIsSet(true); @@ -854,6 +951,10 @@ public void read(org.apache.thrift.protocol.TProtocol prot, LockRequest struct) struct.agentInfo = iprot.readString(); struct.setAgentInfoIsSet(true); } + if (incoming.get(2)) { + struct.zeroWaitReadEnabled = iprot.readBool(); + struct.setZeroWaitReadEnabledIsSet(true); + } } } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockResponse.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockResponse.java index fdaab4b394..e0f88f897c 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockResponse.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockResponse.java @@ -40,6 +40,7 @@ private static final org.apache.thrift.protocol.TField LOCKID_FIELD_DESC = new org.apache.thrift.protocol.TField("lockid", org.apache.thrift.protocol.TType.I64, (short)1); private static final org.apache.thrift.protocol.TField STATE_FIELD_DESC = new org.apache.thrift.protocol.TField("state", org.apache.thrift.protocol.TType.I32, (short)2); + private static final org.apache.thrift.protocol.TField ERROR_MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("errorMessage", org.apache.thrift.protocol.TType.STRING, (short)3); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -49,6 +50,7 @@ private long lockid; // required private LockState state; // required + private String errorMessage; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -57,7 +59,8 @@ * * @see LockState */ - STATE((short)2, "state"); + STATE((short)2, "state"), + ERROR_MESSAGE((short)3, "errorMessage"); private static final Map byName = new HashMap(); @@ -76,6 +79,8 @@ public static _Fields findByThriftId(int fieldId) { return LOCKID; case 2: // STATE return STATE; + case 3: // ERROR_MESSAGE + return ERROR_MESSAGE; default: return null; } @@ -118,6 +123,7 @@ public String getFieldName() { // isset id assignments private static final int __LOCKID_ISSET_ID = 0; private byte __isset_bitfield = 0; + private static final _Fields optionals[] = {_Fields.ERROR_MESSAGE}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -125,6 +131,8 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); tmpMap.put(_Fields.STATE, new org.apache.thrift.meta_data.FieldMetaData("state", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, LockState.class))); + tmpMap.put(_Fields.ERROR_MESSAGE, new org.apache.thrift.meta_data.FieldMetaData("errorMessage", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LockResponse.class, metaDataMap); } @@ -151,6 +159,9 @@ public LockResponse(LockResponse other) { if (other.isSetState()) { this.state = other.state; } + if (other.isSetErrorMessage()) { + this.errorMessage = other.errorMessage; + } } public LockResponse deepCopy() { @@ -162,6 +173,7 @@ public void clear() { setLockidIsSet(false); this.lockid = 0; this.state = null; + this.errorMessage = null; } public long getLockid() { @@ -217,6 +229,29 @@ public void setStateIsSet(boolean value) { } } + public String getErrorMessage() { + return this.errorMessage; + } + + public void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + + public void unsetErrorMessage() { + this.errorMessage = null; + } + + /** Returns true if field errorMessage is set (has been assigned a value) and false otherwise */ + public boolean isSetErrorMessage() { + return this.errorMessage != null; + } + + public void setErrorMessageIsSet(boolean value) { + if (!value) { + this.errorMessage = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case LOCKID: @@ -235,6 +270,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case ERROR_MESSAGE: + if (value == null) { + unsetErrorMessage(); + } else { + setErrorMessage((String)value); + } + break; + } } @@ -246,6 +289,9 @@ public Object getFieldValue(_Fields field) { case STATE: return getState(); + case ERROR_MESSAGE: + return getErrorMessage(); + } throw new IllegalStateException(); } @@ -261,6 +307,8 @@ public boolean isSet(_Fields field) { return isSetLockid(); case STATE: return isSetState(); + case ERROR_MESSAGE: + return isSetErrorMessage(); } throw new IllegalStateException(); } @@ -296,6 +344,15 @@ public boolean equals(LockResponse that) { return false; } + boolean this_present_errorMessage = true && this.isSetErrorMessage(); + boolean that_present_errorMessage = true && that.isSetErrorMessage(); + if (this_present_errorMessage || that_present_errorMessage) { + if (!(this_present_errorMessage && that_present_errorMessage)) + return false; + if (!this.errorMessage.equals(that.errorMessage)) + return false; + } + return true; } @@ -313,6 +370,11 @@ public int hashCode() { if (present_state) list.add(state.getValue()); + boolean present_errorMessage = true && (isSetErrorMessage()); + list.add(present_errorMessage); + if (present_errorMessage) + list.add(errorMessage); + return list.hashCode(); } @@ -344,6 +406,16 @@ public int compareTo(LockResponse other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetErrorMessage()).compareTo(other.isSetErrorMessage()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetErrorMessage()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.errorMessage, other.errorMessage); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -375,6 +447,16 @@ public String toString() { sb.append(this.state); } first = false; + if (isSetErrorMessage()) { + if (!first) sb.append(", "); + sb.append("errorMessage:"); + if (this.errorMessage == null) { + sb.append("null"); + } else { + sb.append(this.errorMessage); + } + first = false; + } sb.append(")"); return sb.toString(); } @@ -444,6 +526,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, LockResponse struct org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 3: // ERROR_MESSAGE + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.errorMessage = iprot.readString(); + struct.setErrorMessageIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -465,6 +555,13 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, LockResponse struc oprot.writeI32(struct.state.getValue()); oprot.writeFieldEnd(); } + if (struct.errorMessage != null) { + if (struct.isSetErrorMessage()) { + oprot.writeFieldBegin(ERROR_MESSAGE_FIELD_DESC); + oprot.writeString(struct.errorMessage); + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -484,6 +581,14 @@ public void write(org.apache.thrift.protocol.TProtocol prot, LockResponse struct TTupleProtocol oprot = (TTupleProtocol) prot; oprot.writeI64(struct.lockid); oprot.writeI32(struct.state.getValue()); + BitSet optionals = new BitSet(); + if (struct.isSetErrorMessage()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.isSetErrorMessage()) { + oprot.writeString(struct.errorMessage); + } } @Override @@ -493,6 +598,11 @@ public void read(org.apache.thrift.protocol.TProtocol prot, LockResponse struct) struct.setLockidIsSet(true); struct.state = org.apache.hadoop.hive.metastore.api.LockState.findByValue(iprot.readI32()); struct.setStateIsSet(true); + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + struct.errorMessage = iprot.readString(); + struct.setErrorMessageIsSet(true); + } } } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php index 9fb7ff011a..cf29f22d2e 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php @@ -20624,6 +20624,10 @@ class LockRequest { * @var string */ public $agentInfo = "Unknown"; + /** + * @var bool + */ + public $zeroWaitReadEnabled = false; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -20653,6 +20657,10 @@ class LockRequest { 'var' => 'agentInfo', 'type' => TType::STRING, ), + 6 => array( + 'var' => 'zeroWaitReadEnabled', + 'type' => TType::BOOL, + ), ); } if (is_array($vals)) { @@ -20671,6 +20679,9 @@ class LockRequest { if (isset($vals['agentInfo'])) { $this->agentInfo = $vals['agentInfo']; } + if (isset($vals['zeroWaitReadEnabled'])) { + $this->zeroWaitReadEnabled = $vals['zeroWaitReadEnabled']; + } } } @@ -20739,6 +20750,13 @@ class LockRequest { $xfer += $input->skip($ftype); } break; + case 6: + if ($ftype == TType::BOOL) { + $xfer += $input->readBool($this->zeroWaitReadEnabled); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -20789,6 +20807,11 @@ class LockRequest { $xfer += $output->writeString($this->agentInfo); $xfer += $output->writeFieldEnd(); } + if ($this->zeroWaitReadEnabled !== null) { + $xfer += $output->writeFieldBegin('zeroWaitReadEnabled', TType::BOOL, 6); + $xfer += $output->writeBool($this->zeroWaitReadEnabled); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; @@ -20807,6 +20830,10 @@ class LockResponse { * @var int */ public $state = null; + /** + * @var string + */ + public $errorMessage = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -20819,6 +20846,10 @@ class LockResponse { 'var' => 'state', 'type' => TType::I32, ), + 3 => array( + 'var' => 'errorMessage', + 'type' => TType::STRING, + ), ); } if (is_array($vals)) { @@ -20828,6 +20859,9 @@ class LockResponse { if (isset($vals['state'])) { $this->state = $vals['state']; } + if (isset($vals['errorMessage'])) { + $this->errorMessage = $vals['errorMessage']; + } } } @@ -20864,6 +20898,13 @@ class LockResponse { $xfer += $input->skip($ftype); } break; + case 3: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->errorMessage); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -20887,6 +20928,11 @@ class LockResponse { $xfer += $output->writeI32($this->state); $xfer += $output->writeFieldEnd(); } + if ($this->errorMessage !== null) { + $xfer += $output->writeFieldBegin('errorMessage', TType::STRING, 3); + $xfer += $output->writeString($this->errorMessage); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 4f317b3453..4b7d1d0b5f 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -14330,6 +14330,7 @@ class LockRequest: - user - hostname - agentInfo + - zeroWaitReadEnabled """ thrift_spec = ( @@ -14339,14 +14340,16 @@ class LockRequest: (3, TType.STRING, 'user', None, None, ), # 3 (4, TType.STRING, 'hostname', None, None, ), # 4 (5, TType.STRING, 'agentInfo', None, "Unknown", ), # 5 + (6, TType.BOOL, 'zeroWaitReadEnabled', None, False, ), # 6 ) - def __init__(self, component=None, txnid=None, user=None, hostname=None, agentInfo=thrift_spec[5][4],): + def __init__(self, component=None, txnid=None, user=None, hostname=None, agentInfo=thrift_spec[5][4], zeroWaitReadEnabled=thrift_spec[6][4],): self.component = component self.txnid = txnid self.user = user self.hostname = hostname self.agentInfo = agentInfo + self.zeroWaitReadEnabled = zeroWaitReadEnabled def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -14388,6 +14391,11 @@ def read(self, iprot): self.agentInfo = iprot.readString() else: iprot.skip(ftype) + elif fid == 6: + if ftype == TType.BOOL: + self.zeroWaitReadEnabled = iprot.readBool() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -14421,6 +14429,10 @@ def write(self, oprot): oprot.writeFieldBegin('agentInfo', TType.STRING, 5) oprot.writeString(self.agentInfo) oprot.writeFieldEnd() + if self.zeroWaitReadEnabled is not None: + oprot.writeFieldBegin('zeroWaitReadEnabled', TType.BOOL, 6) + oprot.writeBool(self.zeroWaitReadEnabled) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -14441,6 +14453,7 @@ def __hash__(self): value = (value * 31) ^ hash(self.user) value = (value * 31) ^ hash(self.hostname) value = (value * 31) ^ hash(self.agentInfo) + value = (value * 31) ^ hash(self.zeroWaitReadEnabled) return value def __repr__(self): @@ -14459,17 +14472,20 @@ class LockResponse: Attributes: - lockid - state + - errorMessage """ thrift_spec = ( None, # 0 (1, TType.I64, 'lockid', None, None, ), # 1 (2, TType.I32, 'state', None, None, ), # 2 + (3, TType.STRING, 'errorMessage', None, None, ), # 3 ) - def __init__(self, lockid=None, state=None,): + def __init__(self, lockid=None, state=None, errorMessage=None,): self.lockid = lockid self.state = state + self.errorMessage = errorMessage def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -14490,6 +14506,11 @@ def read(self, iprot): self.state = iprot.readI32() else: iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.errorMessage = iprot.readString() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -14508,6 +14529,10 @@ def write(self, oprot): oprot.writeFieldBegin('state', TType.I32, 2) oprot.writeI32(self.state) oprot.writeFieldEnd() + if self.errorMessage is not None: + oprot.writeFieldBegin('errorMessage', TType.STRING, 3) + oprot.writeString(self.errorMessage) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -14523,6 +14548,7 @@ def __hash__(self): value = 17 value = (value * 31) ^ hash(self.lockid) value = (value * 31) ^ hash(self.state) + value = (value * 31) ^ hash(self.errorMessage) return value def __repr__(self): diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index e64ae0ead2..9af30b53a3 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -3187,13 +3187,15 @@ class LockRequest USER = 3 HOSTNAME = 4 AGENTINFO = 5 + ZEROWAITREADENABLED = 6 FIELDS = { COMPONENT => {:type => ::Thrift::Types::LIST, :name => 'component', :element => {:type => ::Thrift::Types::STRUCT, :class => ::LockComponent}}, TXNID => {:type => ::Thrift::Types::I64, :name => 'txnid', :optional => true}, USER => {:type => ::Thrift::Types::STRING, :name => 'user'}, HOSTNAME => {:type => ::Thrift::Types::STRING, :name => 'hostname'}, - AGENTINFO => {:type => ::Thrift::Types::STRING, :name => 'agentInfo', :default => %q"Unknown", :optional => true} + AGENTINFO => {:type => ::Thrift::Types::STRING, :name => 'agentInfo', :default => %q"Unknown", :optional => true}, + ZEROWAITREADENABLED => {:type => ::Thrift::Types::BOOL, :name => 'zeroWaitReadEnabled', :default => false, :optional => true} } def struct_fields; FIELDS; end @@ -3211,10 +3213,12 @@ class LockResponse include ::Thrift::Struct, ::Thrift::Struct_Union LOCKID = 1 STATE = 2 + ERRORMESSAGE = 3 FIELDS = { LOCKID => {:type => ::Thrift::Types::I64, :name => 'lockid'}, - STATE => {:type => ::Thrift::Types::I32, :name => 'state', :enum_class => ::LockState} + STATE => {:type => ::Thrift::Types::I32, :name => 'state', :enum_class => ::LockState}, + ERRORMESSAGE => {:type => ::Thrift::Types::STRING, :name => 'errorMessage', :optional => true} } def struct_fields; FIELDS; end diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java index 93da0f60ec..b43410d44c 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java @@ -85,6 +85,11 @@ public LockRequestBuilder setUser(String user) { return this; } + public LockRequestBuilder setZeroWaitReadEnabled(boolean zeroWaitRead) { + req.setZeroWaitReadEnabled(zeroWaitRead); + return this; + } + /** * Add a lock component to the lock request * @param component to add diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index 1e3d6e9b8b..5f8ffe435e 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -1073,11 +1073,13 @@ struct LockRequest { 3: required string user, // used in 'show locks' to help admins find who has open locks 4: required string hostname, // used in 'show locks' to help admins find who has open locks 5: optional string agentInfo = "Unknown", + 6: optional bool zeroWaitReadEnabled = false } struct LockResponse { 1: required i64 lockid, 2: required LockState state, + 3: optional string errorMessage } struct CheckLockRequest { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index fe39b0b36e..b1de87f934 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -2328,7 +2328,8 @@ public long cleanupMaterializationRebuildLocks(ValidTxnList validTxnList, long t public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst); try { - return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, rqst.getTxnid()); + return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, rqst.getTxnid(), + rqst.isZeroWaitReadEnabled()); } catch(NoSuchLockException e) { // This should never happen, as we just added the lock id @@ -2634,7 +2635,7 @@ private static String normalizeCase(String s) { return s == null ? null : s.toLowerCase(); } - private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId) + private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId, boolean zeroWaitReadEnabled) throws NoSuchLockException, TxnAbortedException, MetaException { try { try { @@ -2643,7 +2644,7 @@ private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long //should only get here if retrying this op dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); } - return checkLock(dbConn, extLockId, txnId); + return checkLock(dbConn, extLockId, txnId, zeroWaitReadEnabled); } catch (SQLException e) { LOG.error("checkLock failed for extLockId={}/txnId={}. Exception msg: {}", extLockId, txnId, getMessage(e)); rollbackDBConn(dbConn); @@ -2658,7 +2659,7 @@ private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long catch(RetryException e) { LOG.debug("Going to retry checkLock for extLockId={}/txnId={} after catching RetryException with message: {}", extLockId, txnId, e.getMessage()); - return checkLockWithRetry(dbConn, extLockId, txnId); + return checkLockWithRetry(dbConn, extLockId, txnId, zeroWaitReadEnabled); } } /** @@ -2703,7 +2704,7 @@ public LockResponse checkLock(CheckLockRequest rqst) //todo: strictly speaking there is a bug here. heartbeat*() commits but both heartbeat and //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired //extra heartbeat is logically harmless, but ... - return checkLock(dbConn, extLockId, lockInfo.txnId); + return checkLock(dbConn, extLockId, lockInfo.txnId, false); } catch (SQLException e) { LOG.error("checkLock failed for request={}. Exception msg: {}", rqst, getMessage(e)); rollbackDBConn(dbConn); @@ -4250,7 +4251,7 @@ private static boolean isValidTxn(long txnId) { * checkLock() will in the worst case keep locks in Waiting state a little longer. */ @RetrySemantics.SafeToRetry("See @SafeToRetry") - private LockResponse checkLock(Connection dbConn, long extLockId, long txnId) + private LockResponse checkLock(Connection dbConn, long extLockId, long txnId, boolean zeroWaitReadEnabled) throws NoSuchLockException, TxnAbortedException, MetaException, SQLException { Statement stmt = null; ResultSet rs = null; @@ -4332,7 +4333,7 @@ private LockResponse checkLock(Connection dbConn, long extLockId, long txnId) } String queryStr = - " \"EX\".*, \"REQ\".\"HL_LOCK_INT_ID\" AS \"REQ_LOCK_INT_ID\" FROM (" + + " \"EX\".*, \"REQ\".\"HL_LOCK_INT_ID\" \"LOCK_INT_ID\", \"REQ\".\"HL_LOCK_TYPE\" \"LOCK_TYPE\" FROM (" + " SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\"," + " \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\" FROM \"HIVE_LOCKS\"" + " WHERE \"HL_LOCK_EXT_ID\" < " + extLockId + ") \"EX\"" + @@ -4357,6 +4358,9 @@ private LockResponse checkLock(Connection dbConn, long extLockId, long txnId) is performed on that db (e.g. show tables, created table, etc). EXCLUSIVE on an object may mean it's being dropped or overwritten.*/ String[] whereStr = { + // shared-read + " \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.sharedRead() + " AND \"EX\".\"HL_LOCK_TYPE\"=" + + LockTypeUtil.exclusive() + " AND NOT (\"EX\".\"HL_TABLE\" IS NOT NULL AND \"REQ\".\"HL_TABLE\" IS NULL)", // exclusive " \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.exclusive() + " AND NOT (\"EX\".\"HL_TABLE\" IS NULL AND \"EX\".\"HL_LOCK_TYPE\"=" + @@ -4366,10 +4370,7 @@ is performed on that db (e.g. show tables, created table, etc). LockTypeUtil.exclWrite() + "," + LockTypeUtil.exclusive() + ")", // excl-write " \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.exclWrite() + " AND \"EX\".\"HL_LOCK_TYPE\"!=" + - LockTypeUtil.sharedRead(), - // shared-read - " \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.sharedRead() + " AND \"EX\".\"HL_LOCK_TYPE\"=" + - LockTypeUtil.exclusive() + " AND NOT (\"EX\".\"HL_TABLE\" IS NOT NULL AND \"REQ\".\"HL_TABLE\" IS NULL)" + LockTypeUtil.sharedRead() }; List subQuery = new ArrayList<>(); @@ -4385,21 +4386,40 @@ is performed on that db (e.g. show tables, created table, etc). if (rs.next()) { // We acquire all locks for a given query atomically; if 1 blocks, all remain in Waiting state. LockInfo blockedBy = new LockInfo(rs); - long intLockId = rs.getLong("REQ_LOCK_INT_ID"); - LOG.debug("Failure to acquire lock({} intLockId:{} {}), blocked by ({}})", JavaUtils.lockIdToString(extLockId), - intLockId, JavaUtils.txnIdToString(txnId), blockedBy); + long intLockId = rs.getLong("LOCK_INT_ID"); + char lockChar = rs.getString("LOCK_TYPE").charAt(0); + LOG.debug("Failure to acquire lock({} intLockId:{} {}), blocked by ({})", JavaUtils.lockIdToString(extLockId), + intLockId, JavaUtils.txnIdToString(txnId), blockedBy); + + if (zeroWaitReadEnabled && isValidTxn(txnId)) { + LockType lockType = LockTypeUtil.getLockTypeFromEncoding(lockChar) + .orElseThrow(() -> new MetaException("Unknown lock type: " + lockChar)); + + if (lockType == LockType.SHARED_READ) { + String cleanupQuery = "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; + + LOG.debug("Going to execute query: <" + cleanupQuery + ">"); + stmt.executeUpdate(cleanupQuery); + dbConn.commit(); + + response.setErrorMessage(String.format( + "Unable to acquire read lock due to an exclusive lock {%s}", blockedBy)); + response.setState(LockState.NOT_ACQUIRED); + return response; + } + } String updateBlockedByQuery = "UPDATE \"HIVE_LOCKS\"" + - " SET \"HL_BLOCKEDBY_EXT_ID\" = " + blockedBy.extLockId + - ", \"HL_BLOCKEDBY_INT_ID\" = " + blockedBy.intLockId + - " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId + " AND \"HL_LOCK_INT_ID\" = " + intLockId; + " SET \"HL_BLOCKEDBY_EXT_ID\" = " + blockedBy.extLockId + + ", \"HL_BLOCKEDBY_INT_ID\" = " + blockedBy.intLockId + + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId + " AND \"HL_LOCK_INT_ID\" = " + intLockId; LOG.debug("Going to execute query: <" + updateBlockedByQuery + ">"); int updCnt = stmt.executeUpdate(updateBlockedByQuery); if (updCnt != 1) { - LOG.error("Failure to update blocked lock (extLockId={}, intLockId={}) with the blocking lock's IDs (extLockId={}, intLockId={})", - extLockId, intLockId, blockedBy.extLockId, blockedBy.intLockId); + LOG.error("Failure to update requested lock (extLockId={}, intLockId={}) with the blocking lock's IDs (extLockId={}, intLockId={})", + extLockId, intLockId, blockedBy.extLockId, blockedBy.intLockId); shouldNeverHappen(txnId, extLockId, intLockId); } dbConn.commit(); -- 2.23.0