From 36f2149a7802b64409f44cb145672ce224b4da0c Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Mon, 27 Apr 2020 09:50:55 +0200 Subject: [PATCH] HIVE-23293: Locks: Implement zero-wait readers --- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 5 +- .../hive/ql/lockmgr/TestDbTxnManager2.java | 88 ++++++++++---- .../hive/metastore/api/LockRequest.java | 109 +++++++++++++++++- .../gen/thrift/gen-php/metastore/Types.php | 23 ++++ .../thrift/gen-py/hive_metastore/ttypes.py | 15 ++- .../gen/thrift/gen-rb/hive_metastore_types.rb | 4 +- .../hive/metastore/LockRequestBuilder.java | 5 + .../src/main/thrift/hive_metastore.thrift | 1 + .../hadoop/hive/metastore/txn/TxnHandler.java | 36 ++++-- 9 files changed, 243 insertions(+), 43 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index b4dac4346e..a08af7cf02 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -412,6 +412,8 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB rqstBuilder.setTransactionId(txnId) .setUser(username); + rqstBuilder.setZeroWaitReadEnabled(!conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) || + !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK)); // Make sure we need locks. It's possible there's nothing to lock in // this operation. @@ -420,11 +422,10 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB return null; } List lockComponents = AcidUtils.makeLockComponents(plan.getOutputs(), plan.getInputs(), conf); - lockComponents.addAll(getGlobalLocks(ctx.getConf())); //It's possible there's nothing to lock even if we have w/r entities. - if(lockComponents.isEmpty()) { + if (lockComponents.isEmpty()) { LOG.debug("No locks needed for queryId=" + queryId); return null; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 497cedd61f..1373dbc696 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -2517,7 +2517,17 @@ public void testShowTablesLock() throws Exception { @Test public void testFairness() throws Exception { - dropTable(new String[] {"T6"}); + testFairness(false); + } + + @Test + public void testFairnessZeroWaitRead() throws Exception { + testFairness(true); + } + + private void testFairness(boolean zeroWaitRead) throws Exception { + dropTable(new String[]{"T6"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !zeroWaitRead); driver.run("create table if not exists T6(a int)"); driver.compileAndRespond("select a from T6", true); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); //gets S lock on T6 @@ -2536,11 +2546,21 @@ public void testFairness() throws Exception { //this should block behind the X lock on T6 //this is a contrived example, in practice this query would of course fail after drop table driver.compileAndRespond("select a from T6", true); - ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false); //gets S lock on T6 + try { + ((DbTxnManager) txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false); //gets S lock on T6 + } catch (LockException ex) { + Assert.assertTrue(zeroWaitRead); + Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, ex.getCanonicalErrorMsg()); + Assert.assertEquals("Exception msg didn't match", + ErrorMsg.TXN_ABORTED.format(JavaUtils.txnIdToString(txnMgr3.getCurrentTxnId()), + "Failed to acquire shared-read lock due to an existing exclusive lock"), ex.getMessage()); + } locks = getLocks(); - Assert.assertEquals("Unexpected lock count", 3, locks.size()); + Assert.assertEquals("Unexpected lock count", (zeroWaitRead ? 2 : 3), locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, locks); - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T6", null, locks); + if (!zeroWaitRead) { + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T6", null, locks); + } checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, locks); } @@ -2556,7 +2576,17 @@ public void testFairness() throws Exception { */ @Test public void testFairness2() throws Exception { + testFairness2(false); + } + + @Test + public void testFairness2ZeroWaitRead() throws Exception { + testFairness2(true); + } + + private void testFairness2(boolean zeroWaitRead) throws Exception { dropTable(new String[]{"T7"}); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !zeroWaitRead); driver.run("create table if not exists T7 (a int) " + "partitioned by (p int) stored as orc TBLPROPERTIES ('transactional'='true')"); driver.run("insert into T7 partition(p) values(1,1),(1,2)"); //create 2 partitions @@ -2579,35 +2609,47 @@ public void testFairness2() throws Exception { //this should block behind the X lock on T7.p=1 driver.compileAndRespond("select a from T7", true); //tries to get S lock on T7, S on T7.p=1 and S on T7.p=2 - ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false); + try { + ((DbTxnManager) txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false); + } catch (LockException ex) { + Assert.assertTrue(zeroWaitRead); + Assert.assertEquals( "Got wrong message code", ErrorMsg.TXN_ABORTED, ex.getCanonicalErrorMsg()); + Assert.assertEquals("Exception msg didn't match", + ErrorMsg.TXN_ABORTED.format(JavaUtils.txnIdToString(txnMgr3.getCurrentTxnId()), + "Failed to acquire shared-read lock due to an existing exclusive lock"), ex.getMessage()); + } locks = getLocks(); - Assert.assertEquals("Unexpected lock count", 7, locks.size()); + Assert.assertEquals("Unexpected lock count", (zeroWaitRead ? 4 : 7), locks.size()); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=1", locks); checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks); - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, locks); - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", locks); - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", locks); + if (!zeroWaitRead) { + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", locks); + } checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T7", "p=1", locks); txnMgr.commitTxn(); //release locks from "select a from T7" - to unblock hte drop partition //retest the the "drop partiton" X lock - ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(6).getLockid()); + ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(zeroWaitRead ? 3 : 6).getLockid()); locks = getLocks(); - Assert.assertEquals("Unexpected lock count", 4, locks.size()); + Assert.assertEquals("Unexpected lock count", (zeroWaitRead ? 1 : 4), locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T7", "p=1", locks); - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, locks); - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", locks); - checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", locks); - - txnMgr2.rollbackTxn(); //release the X lock on T7.p=1 - //re-test the locks - ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(1).getLockid()); //S lock on T7 - locks = getLocks(); - Assert.assertEquals("Unexpected lock count", 3, locks.size()); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=1", locks); - checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks); + if (!zeroWaitRead) { + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", locks); + checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", locks); + + txnMgr2.rollbackTxn(); //release the X lock on T7.p=1 + //re-test the locks + ((DbLockManager) txnMgr2.getLockManager()).checkLock(locks.get(1).getLockid()); //S lock on T7 + locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 3, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=1", locks); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks); + } } @Test diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockRequest.java index 7402fb30eb..e382c9d377 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockRequest.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockRequest.java @@ -43,6 +43,7 @@ private static final org.apache.thrift.protocol.TField USER_FIELD_DESC = new org.apache.thrift.protocol.TField("user", org.apache.thrift.protocol.TType.STRING, (short)3); private static final org.apache.thrift.protocol.TField HOSTNAME_FIELD_DESC = new org.apache.thrift.protocol.TField("hostname", org.apache.thrift.protocol.TType.STRING, (short)4); private static final org.apache.thrift.protocol.TField AGENT_INFO_FIELD_DESC = new org.apache.thrift.protocol.TField("agentInfo", org.apache.thrift.protocol.TType.STRING, (short)5); + private static final org.apache.thrift.protocol.TField ZERO_WAIT_READ_ENABLED_FIELD_DESC = new org.apache.thrift.protocol.TField("zeroWaitReadEnabled", org.apache.thrift.protocol.TType.BOOL, (short)6); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -55,6 +56,7 @@ private String user; // required private String hostname; // required private String agentInfo; // optional + private boolean zeroWaitReadEnabled; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -62,7 +64,8 @@ TXNID((short)2, "txnid"), USER((short)3, "user"), HOSTNAME((short)4, "hostname"), - AGENT_INFO((short)5, "agentInfo"); + AGENT_INFO((short)5, "agentInfo"), + ZERO_WAIT_READ_ENABLED((short)6, "zeroWaitReadEnabled"); private static final Map byName = new HashMap(); @@ -87,6 +90,8 @@ public static _Fields findByThriftId(int fieldId) { return HOSTNAME; case 5: // AGENT_INFO return AGENT_INFO; + case 6: // ZERO_WAIT_READ_ENABLED + return ZERO_WAIT_READ_ENABLED; default: return null; } @@ -128,8 +133,9 @@ public String getFieldName() { // isset id assignments private static final int __TXNID_ISSET_ID = 0; + private static final int __ZEROWAITREADENABLED_ISSET_ID = 1; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.TXNID,_Fields.AGENT_INFO}; + private static final _Fields optionals[] = {_Fields.TXNID,_Fields.AGENT_INFO,_Fields.ZERO_WAIT_READ_ENABLED}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -144,6 +150,8 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.AGENT_INFO, new org.apache.thrift.meta_data.FieldMetaData("agentInfo", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.ZERO_WAIT_READ_ENABLED, new org.apache.thrift.meta_data.FieldMetaData("zeroWaitReadEnabled", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LockRequest.class, metaDataMap); } @@ -151,6 +159,8 @@ public String getFieldName() { public LockRequest() { this.agentInfo = "Unknown"; + this.zeroWaitReadEnabled = false; + } public LockRequest( @@ -186,6 +196,7 @@ public LockRequest(LockRequest other) { if (other.isSetAgentInfo()) { this.agentInfo = other.agentInfo; } + this.zeroWaitReadEnabled = other.zeroWaitReadEnabled; } public LockRequest deepCopy() { @@ -201,6 +212,8 @@ public void clear() { this.hostname = null; this.agentInfo = "Unknown"; + this.zeroWaitReadEnabled = false; + } public int getComponentSize() { @@ -332,6 +345,28 @@ public void setAgentInfoIsSet(boolean value) { } } + public boolean isZeroWaitReadEnabled() { + return this.zeroWaitReadEnabled; + } + + public void setZeroWaitReadEnabled(boolean zeroWaitReadEnabled) { + this.zeroWaitReadEnabled = zeroWaitReadEnabled; + setZeroWaitReadEnabledIsSet(true); + } + + public void unsetZeroWaitReadEnabled() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __ZEROWAITREADENABLED_ISSET_ID); + } + + /** Returns true if field zeroWaitReadEnabled is set (has been assigned a value) and false otherwise */ + public boolean isSetZeroWaitReadEnabled() { + return EncodingUtils.testBit(__isset_bitfield, __ZEROWAITREADENABLED_ISSET_ID); + } + + public void setZeroWaitReadEnabledIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __ZEROWAITREADENABLED_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case COMPONENT: @@ -374,6 +409,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case ZERO_WAIT_READ_ENABLED: + if (value == null) { + unsetZeroWaitReadEnabled(); + } else { + setZeroWaitReadEnabled((Boolean)value); + } + break; + } } @@ -394,6 +437,9 @@ public Object getFieldValue(_Fields field) { case AGENT_INFO: return getAgentInfo(); + case ZERO_WAIT_READ_ENABLED: + return isZeroWaitReadEnabled(); + } throw new IllegalStateException(); } @@ -415,6 +461,8 @@ public boolean isSet(_Fields field) { return isSetHostname(); case AGENT_INFO: return isSetAgentInfo(); + case ZERO_WAIT_READ_ENABLED: + return isSetZeroWaitReadEnabled(); } throw new IllegalStateException(); } @@ -477,6 +525,15 @@ public boolean equals(LockRequest that) { return false; } + boolean this_present_zeroWaitReadEnabled = true && this.isSetZeroWaitReadEnabled(); + boolean that_present_zeroWaitReadEnabled = true && that.isSetZeroWaitReadEnabled(); + if (this_present_zeroWaitReadEnabled || that_present_zeroWaitReadEnabled) { + if (!(this_present_zeroWaitReadEnabled && that_present_zeroWaitReadEnabled)) + return false; + if (this.zeroWaitReadEnabled != that.zeroWaitReadEnabled) + return false; + } + return true; } @@ -509,6 +566,11 @@ public int hashCode() { if (present_agentInfo) list.add(agentInfo); + boolean present_zeroWaitReadEnabled = true && (isSetZeroWaitReadEnabled()); + list.add(present_zeroWaitReadEnabled); + if (present_zeroWaitReadEnabled) + list.add(zeroWaitReadEnabled); + return list.hashCode(); } @@ -570,6 +632,16 @@ public int compareTo(LockRequest other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetZeroWaitReadEnabled()).compareTo(other.isSetZeroWaitReadEnabled()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetZeroWaitReadEnabled()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.zeroWaitReadEnabled, other.zeroWaitReadEnabled); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -629,6 +701,12 @@ public String toString() { } first = false; } + if (isSetZeroWaitReadEnabled()) { + if (!first) sb.append(", "); + sb.append("zeroWaitReadEnabled:"); + sb.append(this.zeroWaitReadEnabled); + first = false; + } sb.append(")"); return sb.toString(); } @@ -737,6 +815,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, LockRequest struct) org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 6: // ZERO_WAIT_READ_ENABLED + if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { + struct.zeroWaitReadEnabled = iprot.readBool(); + struct.setZeroWaitReadEnabledIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -784,6 +870,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, LockRequest struct oprot.writeFieldEnd(); } } + if (struct.isSetZeroWaitReadEnabled()) { + oprot.writeFieldBegin(ZERO_WAIT_READ_ENABLED_FIELD_DESC); + oprot.writeBool(struct.zeroWaitReadEnabled); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -817,13 +908,19 @@ public void write(org.apache.thrift.protocol.TProtocol prot, LockRequest struct) if (struct.isSetAgentInfo()) { optionals.set(1); } - oprot.writeBitSet(optionals, 2); + if (struct.isSetZeroWaitReadEnabled()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); if (struct.isSetTxnid()) { oprot.writeI64(struct.txnid); } if (struct.isSetAgentInfo()) { oprot.writeString(struct.agentInfo); } + if (struct.isSetZeroWaitReadEnabled()) { + oprot.writeBool(struct.zeroWaitReadEnabled); + } } @Override @@ -845,7 +942,7 @@ public void read(org.apache.thrift.protocol.TProtocol prot, LockRequest struct) struct.setUserIsSet(true); struct.hostname = iprot.readString(); struct.setHostnameIsSet(true); - BitSet incoming = iprot.readBitSet(2); + BitSet incoming = iprot.readBitSet(3); if (incoming.get(0)) { struct.txnid = iprot.readI64(); struct.setTxnidIsSet(true); @@ -854,6 +951,10 @@ public void read(org.apache.thrift.protocol.TProtocol prot, LockRequest struct) struct.agentInfo = iprot.readString(); struct.setAgentInfoIsSet(true); } + if (incoming.get(2)) { + struct.zeroWaitReadEnabled = iprot.readBool(); + struct.setZeroWaitReadEnabledIsSet(true); + } } } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php index 9fb7ff011a..5bb3035961 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php @@ -20624,6 +20624,10 @@ class LockRequest { * @var string */ public $agentInfo = "Unknown"; + /** + * @var bool + */ + public $zeroWaitReadEnabled = false; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -20653,6 +20657,10 @@ class LockRequest { 'var' => 'agentInfo', 'type' => TType::STRING, ), + 6 => array( + 'var' => 'zeroWaitReadEnabled', + 'type' => TType::BOOL, + ), ); } if (is_array($vals)) { @@ -20671,6 +20679,9 @@ class LockRequest { if (isset($vals['agentInfo'])) { $this->agentInfo = $vals['agentInfo']; } + if (isset($vals['zeroWaitReadEnabled'])) { + $this->zeroWaitReadEnabled = $vals['zeroWaitReadEnabled']; + } } } @@ -20739,6 +20750,13 @@ class LockRequest { $xfer += $input->skip($ftype); } break; + case 6: + if ($ftype == TType::BOOL) { + $xfer += $input->readBool($this->zeroWaitReadEnabled); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -20789,6 +20807,11 @@ class LockRequest { $xfer += $output->writeString($this->agentInfo); $xfer += $output->writeFieldEnd(); } + if ($this->zeroWaitReadEnabled !== null) { + $xfer += $output->writeFieldBegin('zeroWaitReadEnabled', TType::BOOL, 6); + $xfer += $output->writeBool($this->zeroWaitReadEnabled); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 4f317b3453..3b22d46836 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -14330,6 +14330,7 @@ class LockRequest: - user - hostname - agentInfo + - zeroWaitReadEnabled """ thrift_spec = ( @@ -14339,14 +14340,16 @@ class LockRequest: (3, TType.STRING, 'user', None, None, ), # 3 (4, TType.STRING, 'hostname', None, None, ), # 4 (5, TType.STRING, 'agentInfo', None, "Unknown", ), # 5 + (6, TType.BOOL, 'zeroWaitReadEnabled', None, False, ), # 6 ) - def __init__(self, component=None, txnid=None, user=None, hostname=None, agentInfo=thrift_spec[5][4],): + def __init__(self, component=None, txnid=None, user=None, hostname=None, agentInfo=thrift_spec[5][4], zeroWaitReadEnabled=thrift_spec[6][4],): self.component = component self.txnid = txnid self.user = user self.hostname = hostname self.agentInfo = agentInfo + self.zeroWaitReadEnabled = zeroWaitReadEnabled def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -14388,6 +14391,11 @@ def read(self, iprot): self.agentInfo = iprot.readString() else: iprot.skip(ftype) + elif fid == 6: + if ftype == TType.BOOL: + self.zeroWaitReadEnabled = iprot.readBool() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -14421,6 +14429,10 @@ def write(self, oprot): oprot.writeFieldBegin('agentInfo', TType.STRING, 5) oprot.writeString(self.agentInfo) oprot.writeFieldEnd() + if self.zeroWaitReadEnabled is not None: + oprot.writeFieldBegin('zeroWaitReadEnabled', TType.BOOL, 6) + oprot.writeBool(self.zeroWaitReadEnabled) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -14441,6 +14453,7 @@ def __hash__(self): value = (value * 31) ^ hash(self.user) value = (value * 31) ^ hash(self.hostname) value = (value * 31) ^ hash(self.agentInfo) + value = (value * 31) ^ hash(self.zeroWaitReadEnabled) return value def __repr__(self): diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index e64ae0ead2..5e053296fb 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -3187,13 +3187,15 @@ class LockRequest USER = 3 HOSTNAME = 4 AGENTINFO = 5 + ZEROWAITREADENABLED = 6 FIELDS = { COMPONENT => {:type => ::Thrift::Types::LIST, :name => 'component', :element => {:type => ::Thrift::Types::STRUCT, :class => ::LockComponent}}, TXNID => {:type => ::Thrift::Types::I64, :name => 'txnid', :optional => true}, USER => {:type => ::Thrift::Types::STRING, :name => 'user'}, HOSTNAME => {:type => ::Thrift::Types::STRING, :name => 'hostname'}, - AGENTINFO => {:type => ::Thrift::Types::STRING, :name => 'agentInfo', :default => %q"Unknown", :optional => true} + AGENTINFO => {:type => ::Thrift::Types::STRING, :name => 'agentInfo', :default => %q"Unknown", :optional => true}, + ZEROWAITREADENABLED => {:type => ::Thrift::Types::BOOL, :name => 'zeroWaitReadEnabled', :default => false, :optional => true} } def struct_fields; FIELDS; end diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java index 93da0f60ec..b43410d44c 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java @@ -85,6 +85,11 @@ public LockRequestBuilder setUser(String user) { return this; } + public LockRequestBuilder setZeroWaitReadEnabled(boolean zeroWaitRead) { + req.setZeroWaitReadEnabled(zeroWaitRead); + return this; + } + /** * Add a lock component to the lock request * @param component to add diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index 1e3d6e9b8b..d6291cc3f5 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -1073,6 +1073,7 @@ struct LockRequest { 3: required string user, // used in 'show locks' to help admins find who has open locks 4: required string hostname, // used in 'show locks' to help admins find who has open locks 5: optional string agentInfo = "Unknown", + 6: optional bool zeroWaitReadEnabled = false, } struct LockResponse { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 580786832e..7a9bfa82cc 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -2320,7 +2320,8 @@ public long cleanupMaterializationRebuildLocks(ValidTxnList validTxnList, long t public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst); try { - return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, rqst.getTxnid()); + return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, rqst.getTxnid(), + rqst.isZeroWaitReadEnabled()); } catch(NoSuchLockException e) { // This should never happen, as we just added the lock id @@ -2624,7 +2625,7 @@ private static String normalizeCase(String s) { return s == null ? null : s.toLowerCase(); } - private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId) + private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId, boolean zeroWaitReadEnabled) throws NoSuchLockException, TxnAbortedException, MetaException { try { try { @@ -2633,7 +2634,7 @@ private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long //should only get here if retrying this op dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); } - return checkLock(dbConn, extLockId, txnId); + return checkLock(dbConn, extLockId, txnId, zeroWaitReadEnabled); } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); @@ -2646,7 +2647,7 @@ private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long } } catch(RetryException e) { - return checkLockWithRetry(dbConn, extLockId, txnId); + return checkLockWithRetry(dbConn, extLockId, txnId, zeroWaitReadEnabled); } } /** @@ -2693,7 +2694,7 @@ public LockResponse checkLock(CheckLockRequest rqst) //todo: strictly speaking there is a bug here. heartbeat*() commits but both heartbeat and //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired //extra heartbeat is logically harmless, but ... - return checkLock(dbConn, extLockId, info.txnId); + return checkLock(dbConn, extLockId, info.txnId, false); } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); @@ -4242,7 +4243,7 @@ private static boolean isValidTxn(long txnId) { * checkLock() will in the worst case keep locks in Waiting state a little longer. */ @RetrySemantics.SafeToRetry("See @SafeToRetry") - private LockResponse checkLock(Connection dbConn, long extLockId, long txnId) + private LockResponse checkLock(Connection dbConn, long extLockId, long txnId, boolean zeroWaitReadEnabled) throws NoSuchLockException, TxnAbortedException, MetaException, SQLException { Statement stmt = null; ResultSet rs = null; @@ -4324,7 +4325,7 @@ private LockResponse checkLock(Connection dbConn, long extLockId, long txnId) } String queryStr = - " \"EX\".*, \"REQ\".\"HL_LOCK_INT_ID\" AS \"REQ_LOCK_INT_ID\" FROM (" + + " \"EX\".*, \"REQ\".\"HL_LOCK_INT_ID\" \"LOCK_INT_ID\", \"REQ\".\"HL_LOCK_TYPE\" \"LOCK_TYPE\" FROM (" + " SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\"," + " \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\" FROM \"HIVE_LOCKS\"" + " WHERE \"HL_LOCK_EXT_ID\" < " + extLockId + ") \"EX\"" + @@ -4349,6 +4350,9 @@ private LockResponse checkLock(Connection dbConn, long extLockId, long txnId) is performed on that db (e.g. show tables, created table, etc). EXCLUSIVE on an object may mean it's being dropped or overwritten.*/ String[] whereStr = { + // shared-read + " \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.sharedRead() + " AND \"EX\".\"HL_LOCK_TYPE\"=" + + LockTypeUtil.exclusive() + " AND NOT (\"EX\".\"HL_TABLE\" IS NOT NULL AND \"REQ\".\"HL_TABLE\" IS NULL)", // exclusive " \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.exclusive() + " AND NOT (\"EX\".\"HL_TABLE\" IS NULL AND \"EX\".\"HL_LOCK_TYPE\"=" + @@ -4358,10 +4362,7 @@ is performed on that db (e.g. show tables, created table, etc). LockTypeUtil.exclWrite() + "," + LockTypeUtil.exclusive() + ")", // excl-write " \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.exclWrite() + " AND \"EX\".\"HL_LOCK_TYPE\"!=" + - LockTypeUtil.sharedRead(), - // shared-read - " \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.sharedRead() + " AND \"EX\".\"HL_LOCK_TYPE\"=" + - LockTypeUtil.exclusive() + " AND NOT (\"EX\".\"HL_TABLE\" IS NOT NULL AND \"REQ\".\"HL_TABLE\" IS NULL)" + LockTypeUtil.sharedRead() }; List subQuery = new ArrayList<>(); @@ -4377,8 +4378,19 @@ is performed on that db (e.g. show tables, created table, etc). if (rs.next()) { // We acquire all locks for a given query atomically; if 1 blocks, all remain in Waiting state. LockInfo blockedBy = new LockInfo(rs); - long intLockId = rs.getLong("REQ_LOCK_INT_ID"); + long intLockId = rs.getLong("LOCK_INT_ID"); + char lockChar = rs.getString("LOCK_TYPE").charAt(0); + if (zeroWaitReadEnabled && isValidTxn(txnId)) { + LockType lockType = LockTypeUtil.getLockTypeFromEncoding(lockChar) + .orElseThrow(() -> new MetaException("Unknown lock type: " + lockChar)); + + if (lockType == LockType.SHARED_READ) { + stmt.executeUpdate("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = " + extLockId); + dbConn.commit(); + throw new TxnAbortedException("Failed to acquire shared-read lock due to an existing exclusive lock"); + } + } String sqlText = "UPDATE \"HIVE_LOCKS\"" + " SET \"HL_BLOCKEDBY_EXT_ID\" = " + blockedBy.extLockId + ", \"HL_BLOCKEDBY_INT_ID\" = " + blockedBy.intLockId + -- 2.23.0