diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java index c7fa5da..b5f6dfc 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java @@ -18,6 +18,7 @@ package org.apache.hive.service.cli.session; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; @@ -27,6 +28,7 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; + import static org.mockito.Matchers.*; import java.util.HashMap; @@ -39,9 +41,10 @@ * Verifying OperationManager.closeOperation(opHandle) is invoked when * get HiveSQLException during sync query * @throws HiveSQLException + * @throws IllegalAccessException */ @Test - public void testLeakOperationHandle() throws HiveSQLException { + public void testLeakOperationHandle() throws HiveSQLException, IllegalAccessException { //create HiveSessionImpl object TProtocolVersion protocol = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2; String username = ""; @@ -61,7 +64,7 @@ protected synchronized void release(boolean userAccess, boolean isOperation) { //mock operationManager for session OperationManager operationManager = Mockito.mock(OperationManager.class); - session.setOperationManager(operationManager); + FieldUtils.writeField(session, "operationManager", operationManager, true); //mock operation and opHandle for operationManager ExecuteStatementOperation operation = Mockito.mock(ExecuteStatementOperation.class); diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java index cc18ce7..d2b6520 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.service.cli.OperationHandle; import org.apache.hive.service.cli.SessionHandle; +import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.cli.operation.SQLOperationDisplay; import org.apache.hive.service.rpc.thrift.TProtocolVersion; import org.apache.hive.service.server.HiveServer2; @@ -70,24 +71,24 @@ public void testQueryDisplay() throws Exception { List liveSqlOperations, historicSqlOperations; - liveSqlOperations = sessionManager.getOperationManager().getLiveSqlOperations(); - historicSqlOperations = sessionManager.getOperationManager().getHistoricalSQLOperations(); + liveSqlOperations = OperationManager.getLiveSqlOperations(); + historicSqlOperations = OperationManager.getHistoricalSQLOperations(); Assert.assertEquals(liveSqlOperations.size(), 2); Assert.assertEquals(historicSqlOperations.size(), 0); verifyDDL(liveSqlOperations.get(0), "show databases", opHandle1.getHandleIdentifier().toString(), false); verifyDDL(liveSqlOperations.get(1),"show tables", opHandle2.getHandleIdentifier().toString(), false); session.closeOperation(opHandle1); - liveSqlOperations = sessionManager.getOperationManager().getLiveSqlOperations(); - historicSqlOperations = sessionManager.getOperationManager().getHistoricalSQLOperations(); + liveSqlOperations = OperationManager.getLiveSqlOperations(); + historicSqlOperations = OperationManager.getHistoricalSQLOperations(); Assert.assertEquals(liveSqlOperations.size(), 1); Assert.assertEquals(historicSqlOperations.size(), 1); verifyDDL(historicSqlOperations.get(0),"show databases", opHandle1.getHandleIdentifier().toString(), true); verifyDDL(liveSqlOperations.get(0),"show tables", opHandle2.getHandleIdentifier().toString(), false); session.closeOperation(opHandle2); - liveSqlOperations = sessionManager.getOperationManager().getLiveSqlOperations(); - historicSqlOperations = sessionManager.getOperationManager().getHistoricalSQLOperations(); + liveSqlOperations = OperationManager.getLiveSqlOperations(); + historicSqlOperations = OperationManager.getHistoricalSQLOperations(); Assert.assertEquals(liveSqlOperations.size(), 0); Assert.assertEquals(historicSqlOperations.size(), 2); verifyDDL(historicSqlOperations.get(0),"show databases", opHandle1.getHandleIdentifier().toString(), true); @@ -170,7 +171,7 @@ private void verifyDDL(SQLOperationDisplay display, String stmt, String handle, */ private void verifyDDLHtml(String stmt, String opHandle) throws Exception { StringWriter sw = new StringWriter(); - SQLOperationDisplay sod = sessionManager.getOperationManager().getSQLOperationDisplay( + SQLOperationDisplay sod = OperationManager.getSQLOperationDisplay( opHandle); new QueryProfileTmpl().render(sw, sod); String html = sw.toString(); diff --git a/service-rpc/if/TCLIService.thrift b/service-rpc/if/TCLIService.thrift index d17930a..be8f273 100644 --- a/service-rpc/if/TCLIService.thrift +++ b/service-rpc/if/TCLIService.thrift @@ -550,6 +550,8 @@ struct TOperationHandle { // modifiedRowCount is unset if the operation generates // a result set. 4: optional double modifiedRowCount + + 5: required TSessionHandle sessionHandle } diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp index 2f460e8..dafc063 100644 --- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp +++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp @@ -4478,6 +4478,10 @@ void TOperationHandle::__set_modifiedRowCount(const double val) { __isset.modifiedRowCount = true; } +void TOperationHandle::__set_sessionHandle(const TSessionHandle& val) { + this->sessionHandle = val; +} + uint32_t TOperationHandle::read(::apache::thrift::protocol::TProtocol* iprot) { apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); @@ -4493,6 +4497,7 @@ uint32_t TOperationHandle::read(::apache::thrift::protocol::TProtocol* iprot) { bool isset_operationId = false; bool isset_operationType = false; bool isset_hasResultSet = false; + bool isset_sessionHandle = false; while (true) { @@ -4536,6 +4541,14 @@ uint32_t TOperationHandle::read(::apache::thrift::protocol::TProtocol* iprot) { xfer += iprot->skip(ftype); } break; + case 5: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->sessionHandle.read(iprot); + isset_sessionHandle = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -4551,6 +4564,8 @@ uint32_t TOperationHandle::read(::apache::thrift::protocol::TProtocol* iprot) { throw TProtocolException(TProtocolException::INVALID_DATA); if (!isset_hasResultSet) throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_sessionHandle) + throw TProtocolException(TProtocolException::INVALID_DATA); return xfer; } @@ -4576,6 +4591,10 @@ uint32_t TOperationHandle::write(::apache::thrift::protocol::TProtocol* oprot) c xfer += oprot->writeDouble(this->modifiedRowCount); xfer += oprot->writeFieldEnd(); } + xfer += oprot->writeFieldBegin("sessionHandle", ::apache::thrift::protocol::T_STRUCT, 5); + xfer += this->sessionHandle.write(oprot); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -4587,6 +4606,7 @@ void swap(TOperationHandle &a, TOperationHandle &b) { swap(a.operationType, b.operationType); swap(a.hasResultSet, b.hasResultSet); swap(a.modifiedRowCount, b.modifiedRowCount); + swap(a.sessionHandle, b.sessionHandle); swap(a.__isset, b.__isset); } @@ -4595,6 +4615,7 @@ TOperationHandle::TOperationHandle(const TOperationHandle& other179) { operationType = other179.operationType; hasResultSet = other179.hasResultSet; modifiedRowCount = other179.modifiedRowCount; + sessionHandle = other179.sessionHandle; __isset = other179.__isset; } TOperationHandle& TOperationHandle::operator=(const TOperationHandle& other180) { @@ -4602,6 +4623,7 @@ TOperationHandle& TOperationHandle::operator=(const TOperationHandle& other180) operationType = other180.operationType; hasResultSet = other180.hasResultSet; modifiedRowCount = other180.modifiedRowCount; + sessionHandle = other180.sessionHandle; __isset = other180.__isset; return *this; } @@ -4612,6 +4634,7 @@ void TOperationHandle::printTo(std::ostream& out) const { out << ", " << "operationType=" << to_string(operationType); out << ", " << "hasResultSet=" << to_string(hasResultSet); out << ", " << "modifiedRowCount="; (__isset.modifiedRowCount ? (out << to_string(modifiedRowCount)) : (out << "")); + out << ", " << "sessionHandle=" << to_string(sessionHandle); out << ")"; } diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h index fa02090..ae19b85 100644 --- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h +++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h @@ -2072,6 +2072,7 @@ class TOperationHandle { TOperationType::type operationType; bool hasResultSet; double modifiedRowCount; + TSessionHandle sessionHandle; _TOperationHandle__isset __isset; @@ -2083,6 +2084,8 @@ class TOperationHandle { void __set_modifiedRowCount(const double val); + void __set_sessionHandle(const TSessionHandle& val); + bool operator == (const TOperationHandle & rhs) const { if (!(operationId == rhs.operationId)) @@ -2095,6 +2098,8 @@ class TOperationHandle { return false; else if (__isset.modifiedRowCount && !(modifiedRowCount == rhs.modifiedRowCount)) return false; + if (!(sessionHandle == rhs.sessionHandle)) + return false; return true; } bool operator != (const TOperationHandle &rhs) const { diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOperationHandle.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOperationHandle.java index 9eaf2be..d3e4577 100644 --- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOperationHandle.java +++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOperationHandle.java @@ -42,6 +42,7 @@ private static final org.apache.thrift.protocol.TField OPERATION_TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("operationType", org.apache.thrift.protocol.TType.I32, (short)2); private static final org.apache.thrift.protocol.TField HAS_RESULT_SET_FIELD_DESC = new org.apache.thrift.protocol.TField("hasResultSet", org.apache.thrift.protocol.TType.BOOL, (short)3); private static final org.apache.thrift.protocol.TField MODIFIED_ROW_COUNT_FIELD_DESC = new org.apache.thrift.protocol.TField("modifiedRowCount", org.apache.thrift.protocol.TType.DOUBLE, (short)4); + private static final org.apache.thrift.protocol.TField SESSION_HANDLE_FIELD_DESC = new org.apache.thrift.protocol.TField("sessionHandle", org.apache.thrift.protocol.TType.STRUCT, (short)5); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -53,6 +54,7 @@ private TOperationType operationType; // required private boolean hasResultSet; // required private double modifiedRowCount; // optional + private TSessionHandle sessionHandle; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -63,7 +65,8 @@ */ OPERATION_TYPE((short)2, "operationType"), HAS_RESULT_SET((short)3, "hasResultSet"), - MODIFIED_ROW_COUNT((short)4, "modifiedRowCount"); + MODIFIED_ROW_COUNT((short)4, "modifiedRowCount"), + SESSION_HANDLE((short)5, "sessionHandle"); private static final Map byName = new HashMap(); @@ -86,6 +89,8 @@ public static _Fields findByThriftId(int fieldId) { return HAS_RESULT_SET; case 4: // MODIFIED_ROW_COUNT return MODIFIED_ROW_COUNT; + case 5: // SESSION_HANDLE + return SESSION_HANDLE; default: return null; } @@ -141,6 +146,8 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); tmpMap.put(_Fields.MODIFIED_ROW_COUNT, new org.apache.thrift.meta_data.FieldMetaData("modifiedRowCount", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE))); + tmpMap.put(_Fields.SESSION_HANDLE, new org.apache.thrift.meta_data.FieldMetaData("sessionHandle", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TSessionHandle.class))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TOperationHandle.class, metaDataMap); } @@ -151,13 +158,15 @@ public TOperationHandle() { public TOperationHandle( THandleIdentifier operationId, TOperationType operationType, - boolean hasResultSet) + boolean hasResultSet, + TSessionHandle sessionHandle) { this(); this.operationId = operationId; this.operationType = operationType; this.hasResultSet = hasResultSet; setHasResultSetIsSet(true); + this.sessionHandle = sessionHandle; } /** @@ -173,6 +182,9 @@ public TOperationHandle(TOperationHandle other) { } this.hasResultSet = other.hasResultSet; this.modifiedRowCount = other.modifiedRowCount; + if (other.isSetSessionHandle()) { + this.sessionHandle = new TSessionHandle(other.sessionHandle); + } } public TOperationHandle deepCopy() { @@ -187,6 +199,7 @@ public void clear() { this.hasResultSet = false; setModifiedRowCountIsSet(false); this.modifiedRowCount = 0.0; + this.sessionHandle = null; } public THandleIdentifier getOperationId() { @@ -287,6 +300,29 @@ public void setModifiedRowCountIsSet(boolean value) { __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __MODIFIEDROWCOUNT_ISSET_ID, value); } + public TSessionHandle getSessionHandle() { + return this.sessionHandle; + } + + public void setSessionHandle(TSessionHandle sessionHandle) { + this.sessionHandle = sessionHandle; + } + + public void unsetSessionHandle() { + this.sessionHandle = null; + } + + /** Returns true if field sessionHandle is set (has been assigned a value) and false otherwise */ + public boolean isSetSessionHandle() { + return this.sessionHandle != null; + } + + public void setSessionHandleIsSet(boolean value) { + if (!value) { + this.sessionHandle = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case OPERATION_ID: @@ -321,6 +357,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case SESSION_HANDLE: + if (value == null) { + unsetSessionHandle(); + } else { + setSessionHandle((TSessionHandle)value); + } + break; + } } @@ -338,6 +382,9 @@ public Object getFieldValue(_Fields field) { case MODIFIED_ROW_COUNT: return getModifiedRowCount(); + case SESSION_HANDLE: + return getSessionHandle(); + } throw new IllegalStateException(); } @@ -357,6 +404,8 @@ public boolean isSet(_Fields field) { return isSetHasResultSet(); case MODIFIED_ROW_COUNT: return isSetModifiedRowCount(); + case SESSION_HANDLE: + return isSetSessionHandle(); } throw new IllegalStateException(); } @@ -410,6 +459,15 @@ public boolean equals(TOperationHandle that) { return false; } + boolean this_present_sessionHandle = true && this.isSetSessionHandle(); + boolean that_present_sessionHandle = true && that.isSetSessionHandle(); + if (this_present_sessionHandle || that_present_sessionHandle) { + if (!(this_present_sessionHandle && that_present_sessionHandle)) + return false; + if (!this.sessionHandle.equals(that.sessionHandle)) + return false; + } + return true; } @@ -437,6 +495,11 @@ public int hashCode() { if (present_modifiedRowCount) list.add(modifiedRowCount); + boolean present_sessionHandle = true && (isSetSessionHandle()); + list.add(present_sessionHandle); + if (present_sessionHandle) + list.add(sessionHandle); + return list.hashCode(); } @@ -488,6 +551,16 @@ public int compareTo(TOperationHandle other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetSessionHandle()).compareTo(other.isSetSessionHandle()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetSessionHandle()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.sessionHandle, other.sessionHandle); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -533,6 +606,14 @@ public String toString() { sb.append(this.modifiedRowCount); first = false; } + if (!first) sb.append(", "); + sb.append("sessionHandle:"); + if (this.sessionHandle == null) { + sb.append("null"); + } else { + sb.append(this.sessionHandle); + } + first = false; sb.append(")"); return sb.toString(); } @@ -551,10 +632,17 @@ public void validate() throws org.apache.thrift.TException { throw new org.apache.thrift.protocol.TProtocolException("Required field 'hasResultSet' is unset! Struct:" + toString()); } + if (!isSetSessionHandle()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'sessionHandle' is unset! Struct:" + toString()); + } + // check for sub-struct validity if (operationId != null) { operationId.validate(); } + if (sessionHandle != null) { + sessionHandle.validate(); + } } private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { @@ -626,6 +714,15 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, TOperationHandle st org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 5: // SESSION_HANDLE + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.sessionHandle = new TSessionHandle(); + struct.sessionHandle.read(iprot); + struct.setSessionHandleIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -657,6 +754,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TOperationHandle s oprot.writeDouble(struct.modifiedRowCount); oprot.writeFieldEnd(); } + if (struct.sessionHandle != null) { + oprot.writeFieldBegin(SESSION_HANDLE_FIELD_DESC); + struct.sessionHandle.write(oprot); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -677,6 +779,7 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TOperationHandle st struct.operationId.write(oprot); oprot.writeI32(struct.operationType.getValue()); oprot.writeBool(struct.hasResultSet); + struct.sessionHandle.write(oprot); BitSet optionals = new BitSet(); if (struct.isSetModifiedRowCount()) { optionals.set(0); @@ -697,6 +800,9 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TOperationHandle str struct.setOperationTypeIsSet(true); struct.hasResultSet = iprot.readBool(); struct.setHasResultSetIsSet(true); + struct.sessionHandle = new TSessionHandle(); + struct.sessionHandle.read(iprot); + struct.setSessionHandleIsSet(true); BitSet incoming = iprot.readBitSet(1); if (incoming.get(0)) { struct.modifiedRowCount = iprot.readDouble(); diff --git a/service-rpc/src/gen/thrift/gen-php/Types.php b/service-rpc/src/gen/thrift/gen-php/Types.php index 9dcfd67..ad93644 100644 --- a/service-rpc/src/gen/thrift/gen-php/Types.php +++ b/service-rpc/src/gen/thrift/gen-php/Types.php @@ -4374,6 +4374,10 @@ class TOperationHandle { * @var double */ public $modifiedRowCount = null; + /** + * @var \TSessionHandle + */ + public $sessionHandle = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -4395,6 +4399,11 @@ class TOperationHandle { 'var' => 'modifiedRowCount', 'type' => TType::DOUBLE, ), + 5 => array( + 'var' => 'sessionHandle', + 'type' => TType::STRUCT, + 'class' => '\TSessionHandle', + ), ); } if (is_array($vals)) { @@ -4410,6 +4419,9 @@ class TOperationHandle { if (isset($vals['modifiedRowCount'])) { $this->modifiedRowCount = $vals['modifiedRowCount']; } + if (isset($vals['sessionHandle'])) { + $this->sessionHandle = $vals['sessionHandle']; + } } } @@ -4461,6 +4473,14 @@ class TOperationHandle { $xfer += $input->skip($ftype); } break; + case 5: + if ($ftype == TType::STRUCT) { + $this->sessionHandle = new \TSessionHandle(); + $xfer += $this->sessionHandle->read($input); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -4497,6 +4517,14 @@ class TOperationHandle { $xfer += $output->writeDouble($this->modifiedRowCount); $xfer += $output->writeFieldEnd(); } + if ($this->sessionHandle !== null) { + if (!is_object($this->sessionHandle)) { + throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA); + } + $xfer += $output->writeFieldBegin('sessionHandle', TType::STRUCT, 5); + $xfer += $this->sessionHandle->write($output); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py index ebb7e5f..7b31975 100644 --- a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py +++ b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py @@ -3388,6 +3388,7 @@ class TOperationHandle: - operationType - hasResultSet - modifiedRowCount + - sessionHandle """ thrift_spec = ( @@ -3396,13 +3397,15 @@ class TOperationHandle: (2, TType.I32, 'operationType', None, None, ), # 2 (3, TType.BOOL, 'hasResultSet', None, None, ), # 3 (4, TType.DOUBLE, 'modifiedRowCount', None, None, ), # 4 + (5, TType.STRUCT, 'sessionHandle', (TSessionHandle, TSessionHandle.thrift_spec), None, ), # 5 ) - def __init__(self, operationId=None, operationType=None, hasResultSet=None, modifiedRowCount=None,): + def __init__(self, operationId=None, operationType=None, hasResultSet=None, modifiedRowCount=None, sessionHandle=None,): self.operationId = operationId self.operationType = operationType self.hasResultSet = hasResultSet self.modifiedRowCount = modifiedRowCount + self.sessionHandle = sessionHandle def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -3434,6 +3437,12 @@ def read(self, iprot): self.modifiedRowCount = iprot.readDouble() else: iprot.skip(ftype) + elif fid == 5: + if ftype == TType.STRUCT: + self.sessionHandle = TSessionHandle() + self.sessionHandle.read(iprot) + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -3460,6 +3469,10 @@ def write(self, oprot): oprot.writeFieldBegin('modifiedRowCount', TType.DOUBLE, 4) oprot.writeDouble(self.modifiedRowCount) oprot.writeFieldEnd() + if self.sessionHandle is not None: + oprot.writeFieldBegin('sessionHandle', TType.STRUCT, 5) + self.sessionHandle.write(oprot) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -3470,6 +3483,8 @@ def validate(self): raise TProtocol.TProtocolException(message='Required field operationType is unset!') if self.hasResultSet is None: raise TProtocol.TProtocolException(message='Required field hasResultSet is unset!') + if self.sessionHandle is None: + raise TProtocol.TProtocolException(message='Required field sessionHandle is unset!') return @@ -3479,6 +3494,7 @@ def __hash__(self): value = (value * 31) ^ hash(self.operationType) value = (value * 31) ^ hash(self.hasResultSet) value = (value * 31) ^ hash(self.modifiedRowCount) + value = (value * 31) ^ hash(self.sessionHandle) return value def __repr__(self): diff --git a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb index 6269a2d..ac54eb5 100644 --- a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb +++ b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb @@ -926,12 +926,14 @@ class TOperationHandle OPERATIONTYPE = 2 HASRESULTSET = 3 MODIFIEDROWCOUNT = 4 + SESSIONHANDLE = 5 FIELDS = { OPERATIONID => {:type => ::Thrift::Types::STRUCT, :name => 'operationId', :class => ::THandleIdentifier}, OPERATIONTYPE => {:type => ::Thrift::Types::I32, :name => 'operationType', :enum_class => ::TOperationType}, HASRESULTSET => {:type => ::Thrift::Types::BOOL, :name => 'hasResultSet'}, - MODIFIEDROWCOUNT => {:type => ::Thrift::Types::DOUBLE, :name => 'modifiedRowCount', :optional => true} + MODIFIEDROWCOUNT => {:type => ::Thrift::Types::DOUBLE, :name => 'modifiedRowCount', :optional => true}, + SESSIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'sessionHandle', :class => ::TSessionHandle} } def struct_fields; FIELDS; end @@ -940,6 +942,7 @@ class TOperationHandle raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field operationId is unset!') unless @operationId raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field operationType is unset!') unless @operationType raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field hasResultSet is unset!') if @hasResultSet.nil? + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field sessionHandle is unset!') unless @sessionHandle unless @operationType.nil? || ::TOperationType::VALID_VALUES.include?(@operationType) raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Invalid value of field operationType!') end diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java index ed52b4a..5608235 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIService.java +++ b/service/src/java/org/apache/hive/service/cli/CLIService.java @@ -41,6 +41,7 @@ import org.apache.hive.service.ServiceException; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.cli.operation.Operation; +import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.cli.session.SessionManager; import org.apache.hive.service.rpc.thrift.TProtocolVersion; import org.apache.hive.service.server.HiveServer2; @@ -415,14 +416,14 @@ public OperationHandle getCrossReference(SessionHandle sessionHandle, LOG.debug(sessionHandle + ": getCrossReference()"); return opHandle; } - + /* (non-Javadoc) * @see org.apache.hive.service.cli.ICLIService#getOperationStatus(org.apache.hive.service.cli.OperationHandle) */ @Override public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException { - Operation operation = sessionManager.getOperationManager().getOperation(opHandle); + Operation operation = sessionManager.getSession(opHandle.getSessionHandle()).getOperationManager().getOperation(opHandle); /** * If this is a background operation run asynchronously, * we block for a configured duration, before we return @@ -460,8 +461,7 @@ public OperationStatus getOperationStatus(OperationHandle opHandle) @Override public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { - sessionManager.getOperationManager().getOperation(opHandle) - .getParentSession().cancelOperation(opHandle); + sessionManager.getSession(opHandle.getSessionHandle()).cancelOperation(opHandle); LOG.debug(opHandle + ": cancelOperation()"); } @@ -471,8 +471,7 @@ public void cancelOperation(OperationHandle opHandle) @Override public void closeOperation(OperationHandle opHandle) throws HiveSQLException { - sessionManager.getOperationManager().getOperation(opHandle) - .getParentSession().closeOperation(opHandle); + sessionManager.getSession(opHandle.getSessionHandle()).closeOperation(opHandle); LOG.debug(opHandle + ": closeOperation"); } @@ -482,8 +481,7 @@ public void closeOperation(OperationHandle opHandle) @Override public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException { - TableSchema tableSchema = sessionManager.getOperationManager() - .getOperation(opHandle).getParentSession().getResultSetMetadata(opHandle); + TableSchema tableSchema = sessionManager.getSession(opHandle.getSessionHandle()).getResultSetMetadata(opHandle); LOG.debug(opHandle + ": getResultSetMetadata()"); return tableSchema; } @@ -501,8 +499,7 @@ public RowSet fetchResults(OperationHandle opHandle) @Override public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows, FetchType fetchType) throws HiveSQLException { - RowSet rowSet = sessionManager.getOperationManager().getOperation(opHandle) - .getParentSession().fetchResults(opHandle, orientation, maxRows, fetchType); + RowSet rowSet = sessionManager.getSession(opHandle.getSessionHandle()).fetchResults(opHandle, orientation, maxRows, fetchType); LOG.debug(opHandle + ": fetchResults()"); return rowSet; } diff --git a/service/src/java/org/apache/hive/service/cli/OperationHandle.java b/service/src/java/org/apache/hive/service/cli/OperationHandle.java index 267a6f8..ceabea3 100644 --- a/service/src/java/org/apache/hive/service/cli/OperationHandle.java +++ b/service/src/java/org/apache/hive/service/cli/OperationHandle.java @@ -22,13 +22,15 @@ public class OperationHandle extends Handle { + private SessionHandle sessionHandle; private final OperationType opType; private final TProtocolVersion protocol; private boolean hasResultSet = false; - public OperationHandle(OperationType opType, TProtocolVersion protocol) { + public OperationHandle(OperationType opType, SessionHandle sessionHandle, TProtocolVersion protocol) { super(); this.opType = opType; + this.sessionHandle = sessionHandle; this.protocol = protocol; } @@ -39,11 +41,18 @@ public OperationHandle(TOperationHandle tOperationHandle) { public OperationHandle(TOperationHandle tOperationHandle, TProtocolVersion protocol) { super(tOperationHandle.getOperationId()); + if (tOperationHandle.getSessionHandle() != null) { + this.sessionHandle = new SessionHandle(tOperationHandle.getSessionHandle(), protocol); + } this.opType = OperationType.getOperationType(tOperationHandle.getOperationType()); this.hasResultSet = tOperationHandle.isHasResultSet(); this.protocol = protocol; } + public SessionHandle getSessionHandle() { + return sessionHandle; + } + public OperationType getOperationType() { return opType; } @@ -59,6 +68,7 @@ public boolean hasResultSet() { public TOperationHandle toTOperationHandle() { TOperationHandle tOperationHandle = new TOperationHandle(); tOperationHandle.setOperationId(getHandleIdentifier().toTHandleIdentifier()); + tOperationHandle.setSessionHandle(sessionHandle.toTSessionHandle()); tOperationHandle.setOperationType(opType.toTOperationType()); tOperationHandle.setHasResultSet(hasResultSet); return tOperationHandle; diff --git a/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java b/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java index eaf1acb..3a1ef72 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java +++ b/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java @@ -57,7 +57,6 @@ public static final Layout nonVerboseLayout = PatternLayout.createLayout( "%-5p : %m%n", null, configuration, null, null, true, false, null, null); - private final OperationManager operationManager; private final StringOutputStreamManager manager; private boolean isVerbose; private final Layout layout; @@ -73,10 +72,9 @@ * @param operationManager Operation manager */ protected LogDivertAppender(String name, Filter filter, - StringOutputStreamManager manager, OperationManager operationManager, + StringOutputStreamManager manager, OperationLog.LoggingLevel loggingMode) { super(name, null, filter, false, true, manager); - this.operationManager = operationManager; this.manager = manager; this.isVerbose = (loggingMode == OperationLog.LoggingLevel.VERBOSE); this.layout = getDefaultLayout(); @@ -107,7 +105,6 @@ protected LogDivertAppender(String name, Filter filter, private static class NameFilter extends AbstractFilter { private Pattern namePattern; private OperationLog.LoggingLevel loggingMode; - private final OperationManager operationManager; /* Patterns that are excluded in verbose logging level. * Filter out messages coming from log processing classes, or we'll run an infinite loop. @@ -140,15 +137,14 @@ private void setCurrentNamePattern(OperationLog.LoggingLevel mode) { } } - public NameFilter(OperationLog.LoggingLevel loggingMode, OperationManager op) { - this.operationManager = op; + public NameFilter(OperationLog.LoggingLevel loggingMode) { this.loggingMode = loggingMode; setCurrentNamePattern(loggingMode); } @Override public Result filter(LogEvent event) { - OperationLog log = operationManager.getOperationLogByThread(); + OperationLog log = OperationManager.getOperationLogByThread(); boolean excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE); if (log == null) { @@ -179,11 +175,10 @@ public Result filter(LogEvent event) { } } - public static LogDivertAppender createInstance(OperationManager operationManager, - OperationLog.LoggingLevel loggingMode) { - return new LogDivertAppender("LogDivertAppender", new NameFilter(loggingMode, operationManager), + public static LogDivertAppender createInstance(OperationLog.LoggingLevel loggingMode) { + return new LogDivertAppender("LogDivertAppender", new NameFilter(loggingMode), new StringOutputStreamManager(new ByteArrayOutputStream(), "StringStream", null), - operationManager, loggingMode); + loggingMode); } public String getOutput() { @@ -200,7 +195,7 @@ public void start() { // If there is a logging level change from verbose->non-verbose or vice-versa since // the last subAppend call, change the layout to preserve consistency. - OperationLog log = operationManager.getOperationLogByThread(); + OperationLog log = OperationManager.getOperationLogByThread(); if (log != null) { isVerbose = (log.getOpLoggingLevel() == OperationLog.LoggingLevel.VERBOSE); } @@ -221,7 +216,7 @@ public void append(LogEvent event) { String logOutput = getOutput(); manager.reset(); - OperationLog log = operationManager.getOperationLogByThread(); + OperationLog log = OperationManager.getOperationLogByThread(); if (log == null) { LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName()); return; diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 90fe76d..a6afa6f 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -100,7 +100,7 @@ protected Operation(HiveSession parentSession, if (confOverlay != null) { this.confOverlay = confOverlay; } - this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion()); + this.opHandle = new OperationHandle(opType, parentSession.getSessionHandle(), parentSession.getProtocolVersion()); beginTime = System.currentTimeMillis(); lastAccessTime = beginTime; operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(), diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index c26a611..72e3476 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.session.OperationLog; -import org.apache.hive.service.AbstractService; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; @@ -55,25 +54,24 @@ import org.slf4j.LoggerFactory; /** + * OperationManager is a class to manage the operations. Each session has its own * OperationManager. * */ -public class OperationManager extends AbstractService { - private final Logger LOG = LoggerFactory.getLogger(OperationManager.class.getName()); +public class OperationManager { + private static final Logger LOG = LoggerFactory.getLogger(OperationManager.class.getName()); private final ConcurrentHashMap handleToOperation = new ConcurrentHashMap(); //Following fields for displaying queries on WebUI - private Object webuiLock = new Object(); - private SQLOperationDisplayCache historicSqlOperations; - private Map liveSqlOperations = new LinkedHashMap(); + private static Object webuiLock = new Object(); + private static SQLOperationDisplayCache historicSqlOperations; + private static Map liveSqlOperations = new LinkedHashMap(); - public OperationManager() { - super(OperationManager.class.getSimpleName()); - } - - @Override - public synchronized void init(HiveConf hiveConf) { + /** + * Initialization once for logging and global WebUI operations + */ + public static void initializeOnce(HiveConf hiveConf) { if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { initOperationLogCapture(hiveConf.getVar( HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL)); @@ -84,25 +82,14 @@ public synchronized void init(HiveConf hiveConf) { historicSqlOperations = new SQLOperationDisplayCache( hiveConf.getIntVar(ConfVars.HIVE_SERVER2_WEBUI_MAX_HISTORIC_QUERIES)); } - super.init(hiveConf); - } - - @Override - public synchronized void start() { - super.start(); } - @Override - public synchronized void stop() { - super.stop(); - } - - private void initOperationLogCapture(String loggingMode) { + private static void initOperationLogCapture(String loggingMode) { // Register another Appender (with the same layout) that talks to us. - Appender ap = LogDivertAppender.createInstance(this, OperationLog.getLoggingLevel(loggingMode)); + Appender ap = LogDivertAppender.createInstance(OperationLog.getLoggingLevel(loggingMode)); LoggerContext context = (LoggerContext) LogManager.getContext(false); Configuration configuration = context.getConfiguration(); - LoggerConfig loggerConfig = configuration.getLoggerConfig(LoggerFactory.getLogger(getClass()).getName()); + LoggerConfig loggerConfig = configuration.getLoggerConfig(LoggerFactory.getLogger(OperationManager.class).getName()); loggerConfig.addAppender(ap, null, null); context.updateLoggers(); ap.start(); @@ -355,13 +342,13 @@ private Schema getLogSchema() { } - public OperationLog getOperationLogByThread() { + public static OperationLog getOperationLogByThread() { return OperationLog.getCurrentOperationLog(); } - public List removeExpiredOperations(OperationHandle[] handles) { + private List removeExpiredOperations() { List removed = new ArrayList(); - for (OperationHandle handle : handles) { + for (OperationHandle handle : handleToOperation.keySet()) { Operation operation = removeTimedOutOperation(handle); if (operation != null) { LOG.warn("Operation " + handle + " is timed-out and will be closed"); @@ -371,11 +358,46 @@ public OperationLog getOperationLogByThread() { return removed; } + public void closeExpiredOperations() { + synchronized (handleToOperation) { + List operations = removeExpiredOperations(); + if (!operations.isEmpty()) { + for (Operation operation : operations) { + handleToOperation.remove(operation.getHandle()); + try { + operation.close(); + } catch (Exception e) { + LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e); + } + } + } + } + } + + public boolean isEmpty() { + return handleToOperation.isEmpty(); + } + + public int getOpenOperationCount() { + return handleToOperation.size(); + } + /** + * Close all the operations for the current session + * @throws HiveSQLException + */ + public void close() throws HiveSQLException { + synchronized (handleToOperation) { + for(OperationHandle handle : handleToOperation.keySet()) { + closeOperation(handle); + } + } + } + /** * @return displays representing a number of historical SQLOperations, at max number of * hive.server2.webui.max.historic.queries */ - public List getHistoricalSQLOperations() { + public static List getHistoricalSQLOperations() { List result = new LinkedList<>(); synchronized (webuiLock) { if (historicSqlOperations != null) { @@ -388,7 +410,7 @@ public OperationLog getOperationLogByThread() { /** * @return displays representing live SQLOperations */ - public List getLiveSqlOperations() { + public static List getLiveSqlOperations() { List result = new LinkedList<>(); synchronized (webuiLock) { result.addAll(liveSqlOperations.values()); @@ -400,7 +422,7 @@ public OperationLog getOperationLogByThread() { * @param handle handle of SQLOperation. * @return display representing a particular SQLOperation. */ - public SQLOperationDisplay getSQLOperationDisplay(String handle) { + public static SQLOperationDisplay getSQLOperationDisplay(String handle) { synchronized (webuiLock) { if (historicSqlOperations == null) { return null; diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java index 9436a25..1a1cd33 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java @@ -46,10 +46,9 @@ SessionManager getSessionManager(); /** - * Set operation manager for the session - * @param operationManager + * Get operation manager for the session */ - void setOperationManager(OperationManager operationManager); + OperationManager getOperationManager(); /** * Check whether operation logging is enabled and session dir is created successfully diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index eca02d9..c7eff6f 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -23,11 +23,8 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.Semaphore; @@ -104,8 +101,6 @@ private SessionManager sessionManager; private OperationManager operationManager; - // Synchronized by locking on itself. - private final Set opHandleSet = new HashSet(); private boolean isOperationLogEnabled; private File sessionLogDir; // TODO: the control flow for this needs to be defined. Hive is supposed to be thread-local. @@ -113,7 +108,6 @@ private volatile long lastAccessTime; private volatile long lastIdleTime; - private volatile int pendingCount = 0; private final Semaphore operationLock; @@ -143,6 +137,7 @@ public HiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol, // Use thrift transportable formatter sessionConf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, ThriftFormatter.class.getName()); sessionConf.setInt(SerDeUtils.LIST_SINK_OUTPUT_PROTOCOL, protocol.getValue()); + operationManager = new OperationManager(); } public HiveSessionImpl(TProtocolVersion protocol, String username, String password, @@ -320,13 +315,9 @@ public void setSessionManager(SessionManager sessionManager) { this.sessionManager = sessionManager; } - private OperationManager getOperationManager() { - return operationManager; - } - @Override - public void setOperationManager(OperationManager operationManager) { - this.operationManager = operationManager; + public OperationManager getOperationManager() { + return operationManager; } protected void acquire(boolean userAccess, boolean isOperation) { @@ -360,7 +351,6 @@ private synchronized void acquireAfterOpLock(boolean userAccess) { // set the thread name with the logging prefix. sessionState.updateThreadName(); Hive.set(sessionHive); - pendingCount++; lastIdleTime = 0; } @@ -397,10 +387,9 @@ private synchronized void releaseBeforeOpLock(boolean userAccess) { if (userAccess) { lastAccessTime = System.currentTimeMillis(); } - pendingCount--; // lastIdleTime is only set by the last one // who calls release with empty opHandleSet. - if (pendingCount == 0 && opHandleSet.isEmpty()) { + if (operationManager.isEmpty()) { lastIdleTime = System.currentTimeMillis(); } } @@ -484,17 +473,12 @@ public OperationHandle executeStatementAsync(String statement, Map confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException { + private OperationHandle runOperation(Operation operation) throws HiveSQLException { acquire(true, true); - OperationManager operationManager = getOperationManager(); - ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation( - getSession(), statement, confOverlay, runAsync, queryTimeout); OperationHandle opHandle = operation.getHandle(); try { operation.run(); - addOpHandle(opHandle); return opHandle; } catch (HiveSQLException e) { // Refering to SQLOperation.java, there is no chance that a HiveSQLException throws and the @@ -511,6 +495,14 @@ private OperationHandle executeStatementInternal(String statement, } } + private OperationHandle executeStatementInternal(String statement, + Map confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException { + ExecuteStatementOperation operation = + operationManager.newExecuteStatementOperation(getSession(), statement, confOverlay, + runAsync, queryTimeout); + return runOperation(operation); + } + @Override public Future submitBackgroundOperation(Runnable work) { return getSessionManager().submitBackgroundOperation( @@ -525,171 +517,69 @@ protected void done() { @Override public OperationHandle getTypeInfo() throws HiveSQLException { - acquire(true, true); - - OperationManager operationManager = getOperationManager(); GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession()); - OperationHandle opHandle = operation.getHandle(); - try { - operation.run(); - addOpHandle(opHandle); - return opHandle; - } catch (HiveSQLException e) { - operationManager.closeOperation(opHandle); - throw e; - } finally { - release(true, true); - } + return runOperation(operation); } @Override public OperationHandle getCatalogs() throws HiveSQLException { - acquire(true, true); - - OperationManager operationManager = getOperationManager(); GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession()); - OperationHandle opHandle = operation.getHandle(); - try { - operation.run(); - addOpHandle(opHandle); - return opHandle; - } catch (HiveSQLException e) { - operationManager.closeOperation(opHandle); - throw e; - } finally { - release(true, true); - } + return runOperation(operation); } @Override public OperationHandle getSchemas(String catalogName, String schemaName) throws HiveSQLException { - acquire(true, true); - - OperationManager operationManager = getOperationManager(); GetSchemasOperation operation = operationManager.newGetSchemasOperation(getSession(), catalogName, schemaName); - OperationHandle opHandle = operation.getHandle(); - try { - operation.run(); - addOpHandle(opHandle); - return opHandle; - } catch (HiveSQLException e) { - operationManager.closeOperation(opHandle); - throw e; - } finally { - release(true, true); - } + return runOperation(operation); } @Override public OperationHandle getTables(String catalogName, String schemaName, String tableName, List tableTypes) throws HiveSQLException { - acquire(true, true); - - OperationManager operationManager = getOperationManager(); MetadataOperation operation = operationManager.newGetTablesOperation(getSession(), catalogName, schemaName, tableName, tableTypes); - OperationHandle opHandle = operation.getHandle(); - try { - operation.run(); - addOpHandle(opHandle); - return opHandle; - } catch (HiveSQLException e) { - operationManager.closeOperation(opHandle); - throw e; - } finally { - release(true, true); - } + return runOperation(operation); } @Override public OperationHandle getTableTypes() throws HiveSQLException { - acquire(true, true); - - OperationManager operationManager = getOperationManager(); GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession()); - OperationHandle opHandle = operation.getHandle(); - try { - operation.run(); - addOpHandle(opHandle); - return opHandle; - } catch (HiveSQLException e) { - operationManager.closeOperation(opHandle); - throw e; - } finally { - release(true, true); - } + return runOperation(operation); } @Override public OperationHandle getColumns(String catalogName, String schemaName, String tableName, String columnName) throws HiveSQLException { - acquire(true, true); String addedJars = Utilities.getResourceFiles(sessionConf, SessionState.ResourceType.JAR); if (StringUtils.isNotBlank(addedJars)) { IMetaStoreClient metastoreClient = getSession().getMetaStoreClient(); metastoreClient.setHiveAddedJars(addedJars); } - OperationManager operationManager = getOperationManager(); + GetColumnsOperation operation = operationManager.newGetColumnsOperation(getSession(), catalogName, schemaName, tableName, columnName); - OperationHandle opHandle = operation.getHandle(); - try { - operation.run(); - addOpHandle(opHandle); - return opHandle; - } catch (HiveSQLException e) { - operationManager.closeOperation(opHandle); - throw e; - } finally { - release(true, true); - } - } - - private void addOpHandle(OperationHandle opHandle) { - synchronized (opHandleSet) { - opHandleSet.add(opHandle); - } + return runOperation(operation); } @Override public OperationHandle getFunctions(String catalogName, String schemaName, String functionName) throws HiveSQLException { - acquire(true, true); - - OperationManager operationManager = getOperationManager(); GetFunctionsOperation operation = operationManager .newGetFunctionsOperation(getSession(), catalogName, schemaName, functionName); - OperationHandle opHandle = operation.getHandle(); - try { - operation.run(); - addOpHandle(opHandle); - return opHandle; - } catch (HiveSQLException e) { - operationManager.closeOperation(opHandle); - throw e; - } finally { - release(true, true); - } + return runOperation(operation); } @Override public void close() throws HiveSQLException { try { acquire(true, false); - // Iterate through the opHandles and close their operations - List ops = null; - synchronized (opHandleSet) { - ops = new ArrayList<>(opHandleSet); - opHandleSet.clear(); - } - for (OperationHandle opHandle : ops) { - operationManager.closeOperation(opHandle); - } + operationManager.close(); + // Cleanup session log directory. cleanupSessionLogDir(); HiveHistory hiveHist = sessionState.getHiveHistory(); @@ -765,15 +655,11 @@ public long getCreationTime() { @Override public void closeExpiredOperations() { - OperationHandle[] handles; - synchronized (opHandleSet) { - handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]); - } - if (handles.length > 0) { - List operations = operationManager.removeExpiredOperations(handles); - if (!operations.isEmpty()) { - closeTimedOutOperations(operations); - } + acquire(false, false); + try { + operationManager.closeExpiredOperations(); + } finally { + release(false, false); } } @@ -782,29 +668,11 @@ public long getNoOperationTime() { return lastIdleTime > 0 ? System.currentTimeMillis() - lastIdleTime : 0; } - private void closeTimedOutOperations(List operations) { - acquire(false, false); - try { - for (Operation operation : operations) { - synchronized (opHandleSet) { - opHandleSet.remove(operation.getHandle()); - } - try { - operation.close(); - } catch (Exception e) { - LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e); - } - } - } finally { - release(false, false); - } - } - @Override public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { acquire(true, false); try { - sessionManager.getOperationManager().cancelOperation(opHandle); + operationManager.cancelOperation(opHandle); } finally { release(true, false); } @@ -815,9 +683,6 @@ public void closeOperation(OperationHandle opHandle) throws HiveSQLException { acquire(true, false); try { operationManager.closeOperation(opHandle); - synchronized (opHandleSet) { - opHandleSet.remove(opHandle); - } } finally { release(true, false); } @@ -827,7 +692,7 @@ public void closeOperation(OperationHandle opHandle) throws HiveSQLException { public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException { acquire(true, true); try { - return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle); + return operationManager.getOperationResultSetSchema(opHandle); } finally { release(true, true); } @@ -853,7 +718,7 @@ protected HiveSession getSession() { @Override public int getOpenOperationCount() { - return opHandleSet.size(); + return operationManager.getOpenOperationCount(); } @Override @@ -897,45 +762,19 @@ private String getUserFromToken(HiveAuthFactory authFactory, String tokenStr) th @Override public OperationHandle getPrimaryKeys(String catalog, String schema, String table) throws HiveSQLException { - acquire(true, true); - - OperationManager operationManager = getOperationManager(); GetPrimaryKeysOperation operation = operationManager .newGetPrimaryKeysOperation(getSession(), catalog, schema, table); - OperationHandle opHandle = operation.getHandle(); - try { - operation.run(); - addOpHandle(opHandle); - return opHandle; - } catch (HiveSQLException e) { - operationManager.closeOperation(opHandle); - throw e; - } finally { - release(true, true); - } + return runOperation(operation); } @Override public OperationHandle getCrossReference(String primaryCatalog, String primarySchema, String primaryTable, String foreignCatalog, String foreignSchema, String foreignTable) throws HiveSQLException { - acquire(true, true); - - OperationManager operationManager = getOperationManager(); GetCrossReferenceOperation operation = operationManager .newGetCrossReferenceOperation(getSession(), primaryCatalog, primarySchema, primaryTable, foreignCatalog, foreignSchema, foreignTable); - OperationHandle opHandle = operation.getHandle(); - try { - operation.run(); - addOpHandle(opHandle); - return opHandle; - } catch (HiveSQLException e) { - operationManager.closeOperation(opHandle); - throw e; - } finally { - release(true, true); - } + return runOperation(operation); } } diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java index 4f5a852..9c023a8 100644 --- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -45,7 +45,6 @@ import org.apache.hive.service.CompositeService; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.SessionHandle; -import org.apache.hive.service.cli.operation.Operation; import org.apache.hive.service.cli.operation.OperationManager; import org.apache.hive.service.rpc.thrift.TOpenSessionReq; import org.apache.hive.service.rpc.thrift.TProtocolVersion; @@ -65,7 +64,6 @@ private HiveConf hiveConf; private final Map handleToSession = new ConcurrentHashMap(); - private final OperationManager operationManager = new OperationManager(); private ThreadPoolExecutor backgroundOperationPool; private boolean isOperationLogEnabled; private File operationLogRootDir; @@ -92,8 +90,9 @@ public synchronized void init(HiveConf hiveConf) { if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { initOperationLogRootDir(); } + OperationManager.initializeOnce(hiveConf); + createBackgroundOperationPool(); - addService(operationManager); initSessionImplClassName(); super.init(hiveConf); } @@ -322,7 +321,6 @@ public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion p } } session.setSessionManager(this); - session.setOperationManager(operationManager); try { session.open(sessionConf); } catch (Exception e) { @@ -391,10 +389,6 @@ public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLExcepti return session; } - public OperationManager getOperationManager() { - return operationManager; - } - private static ThreadLocal threadLocalIpAddress = new ThreadLocal(); public static void setIpAddress(String ipAddress) { @@ -475,10 +469,6 @@ private void executeSessionHooks(HiveSession session) throws Exception { return backgroundOperationPool.submit(r); } - public Collection getOperations() { - return operationManager.getOperations(); - } - public Collection getSessions() { return Collections.unmodifiableCollection(handleToSession.values()); } diff --git a/service/src/java/org/apache/hive/service/servlet/QueryProfileServlet.java b/service/src/java/org/apache/hive/service/servlet/QueryProfileServlet.java index 8fa447a..245d3aa 100644 --- a/service/src/java/org/apache/hive/service/servlet/QueryProfileServlet.java +++ b/service/src/java/org/apache/hive/service/servlet/QueryProfileServlet.java @@ -45,8 +45,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response) ServletContext ctx = getServletContext(); SessionManager sessionManager = (SessionManager)ctx.getAttribute("hive.sm"); - OperationManager opManager = sessionManager.getOperationManager(); - SQLOperationDisplay sod = opManager.getSQLOperationDisplay(opId); + SQLOperationDisplay sod = OperationManager.getSQLOperationDisplay(opId); if (sod == null) { LOG.debug("No display object found for operation {} ", opId); return; diff --git a/service/src/resources/hive-webapps/hiveserver2/hiveserver2.jsp b/service/src/resources/hive-webapps/hiveserver2/hiveserver2.jsp index 3c187b6..bc7ae35 100644 --- a/service/src/resources/hive-webapps/hiveserver2/hiveserver2.jsp +++ b/service/src/resources/hive-webapps/hiveserver2/hiveserver2.jsp @@ -23,6 +23,7 @@ import="org.apache.hadoop.hive.conf.HiveConf.ConfVars" import="org.apache.hive.common.util.HiveVersionInfo" import="org.apache.hive.service.cli.operation.Operation" + import="org.apache.hive.service.cli.operation.OperationManager" import="org.apache.hive.service.cli.operation.SQLOperation" import="org.apache.hive.service.cli.operation.SQLOperationDisplay" import="org.apache.hive.service.cli.session.SessionManager" @@ -140,7 +141,7 @@ for (HiveSession hiveSession: hiveSessions) { <% int queries = 0; - Collection operations = sessionManager.getOperationManager().getLiveSqlOperations(); + Collection operations = OperationManager.getLiveSqlOperations(); for (SQLOperationDisplay operation : operations) { queries++; %> @@ -181,7 +182,7 @@ for (HiveSession hiveSession: hiveSessions) { <% queries = 0; - operations = sessionManager.getOperationManager().getHistoricalSQLOperations(); + operations = OperationManager.getHistoricalSQLOperations(); for (SQLOperationDisplay operation : operations) { queries++; %> diff --git a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java index 55a325d..0f4cd55 100644 --- a/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java +++ b/service/src/test/org/apache/hive/service/cli/session/TestSessionGlobalInitFile.java @@ -124,9 +124,9 @@ public void testSessionGlobalInitDir() throws Exception { */ private void doTestSessionGlobalInitFile() throws Exception { - OperationManager operationManager = service.getService().getSessionManager() - .getOperationManager(); SessionHandle sessionHandle = client.openSession(null, null, null); + OperationManager operationManager = service.getService().getSessionManager().getSession(sessionHandle) + .getOperationManager(); // ensure there is no operation related object leak Assert.assertEquals("Verifying all operations used for init file are closed",