Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -734,6 +734,10 @@ HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS("hive.server2.thrift.min.worker.threads", 5), HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS("hive.server2.thrift.max.worker.threads", 100), + // Configuration for async thread pool in SessionManagers + HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 10), + HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10000), + HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000), HIVE_SERVER2_THRIFT_BIND_HOST("hive.server2.thrift.bind.host", ""), HIVE_SERVER2_THRIFT_SASL_QOP("hive.server2.thrift.sasl.qop", "auth"), Index: service/if/TCLIService.thrift =================================================================== --- service/if/TCLIService.thrift +++ service/if/TCLIService.thrift @@ -582,7 +582,7 @@ // status of the statement, and to fetch results once the // statement has finished executing. struct TExecuteStatementReq { - // The session to exexcute the statement against + // The session to execute the statement against 1: required TSessionHandle sessionHandle // The statement to be executed (DML, DDL, SET, etc) @@ -593,14 +593,16 @@ // is executed. These properties apply to this statement // only and will not affect the subsequent state of the Session. 3: optional map confOverlay + + // Execute asynchronously when runAsync is true + 4: optional bool runAsync = false } struct TExecuteStatementResp { 1: required TStatus status 2: optional TOperationHandle operationHandle } - // GetTypeInfo() // // Get information about types supported by the HiveServer instance. Index: service/src/gen/thrift/gen-cpp/TCLIService_types.cpp =================================================================== --- service/src/gen/thrift/gen-cpp/TCLIService_types.cpp +++ service/src/gen/thrift/gen-cpp/TCLIService_types.cpp @@ -3259,8 +3259,8 @@ swap(a.infoValue, b.infoValue); } -const char* TExecuteStatementReq::ascii_fingerprint = "4CDA19909D21B7D9907F85E3387EAB27"; -const uint8_t TExecuteStatementReq::binary_fingerprint[16] = {0x4C,0xDA,0x19,0x90,0x9D,0x21,0xB7,0xD9,0x90,0x7F,0x85,0xE3,0x38,0x7E,0xAB,0x27}; +const char* TExecuteStatementReq::ascii_fingerprint = "FED75DB77E66D76EC1939A51FB0D96FA"; +const uint8_t TExecuteStatementReq::binary_fingerprint[16] = {0xFE,0xD7,0x5D,0xB7,0x7E,0x66,0xD7,0x6E,0xC1,0x93,0x9A,0x51,0xFB,0x0D,0x96,0xFA}; uint32_t TExecuteStatementReq::read(::apache::thrift::protocol::TProtocol* iprot) { @@ -3323,6 +3323,14 @@ xfer += iprot->skip(ftype); } break; + case 4: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->runAsync); + this->__isset.runAsync = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -3365,6 +3373,11 @@ } xfer += oprot->writeFieldEnd(); } + if (this->__isset.runAsync) { + xfer += oprot->writeFieldBegin("runAsync", ::apache::thrift::protocol::T_BOOL, 4); + xfer += oprot->writeBool(this->runAsync); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -3375,6 +3388,7 @@ swap(a.sessionHandle, b.sessionHandle); swap(a.statement, b.statement); swap(a.confOverlay, b.confOverlay); + swap(a.runAsync, b.runAsync); swap(a.__isset, b.__isset); } Index: service/src/gen/thrift/gen-cpp/TCLIService_types.h =================================================================== --- service/src/gen/thrift/gen-cpp/TCLIService_types.h +++ service/src/gen/thrift/gen-cpp/TCLIService_types.h @@ -1850,24 +1850,26 @@ void swap(TGetInfoResp &a, TGetInfoResp &b); typedef struct _TExecuteStatementReq__isset { - _TExecuteStatementReq__isset() : confOverlay(false) {} + _TExecuteStatementReq__isset() : confOverlay(false), runAsync(true) {} bool confOverlay; + bool runAsync; } _TExecuteStatementReq__isset; class TExecuteStatementReq { public: - static const char* ascii_fingerprint; // = "4CDA19909D21B7D9907F85E3387EAB27"; - static const uint8_t binary_fingerprint[16]; // = {0x4C,0xDA,0x19,0x90,0x9D,0x21,0xB7,0xD9,0x90,0x7F,0x85,0xE3,0x38,0x7E,0xAB,0x27}; + static const char* ascii_fingerprint; // = "FED75DB77E66D76EC1939A51FB0D96FA"; + static const uint8_t binary_fingerprint[16]; // = {0xFE,0xD7,0x5D,0xB7,0x7E,0x66,0xD7,0x6E,0xC1,0x93,0x9A,0x51,0xFB,0x0D,0x96,0xFA}; - TExecuteStatementReq() : statement() { + TExecuteStatementReq() : statement(), runAsync(false) { } virtual ~TExecuteStatementReq() throw() {} TSessionHandle sessionHandle; std::string statement; std::map confOverlay; + bool runAsync; _TExecuteStatementReq__isset __isset; @@ -1884,6 +1886,11 @@ __isset.confOverlay = true; } + void __set_runAsync(const bool val) { + runAsync = val; + __isset.runAsync = true; + } + bool operator == (const TExecuteStatementReq & rhs) const { if (!(sessionHandle == rhs.sessionHandle)) @@ -1894,6 +1901,10 @@ return false; else if (__isset.confOverlay && !(confOverlay == rhs.confOverlay)) return false; + if (__isset.runAsync != rhs.__isset.runAsync) + return false; + else if (__isset.runAsync && !(runAsync == rhs.runAsync)) + return false; return true; } bool operator != (const TExecuteStatementReq &rhs) const { Index: service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TExecuteStatementReq.java =================================================================== --- service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TExecuteStatementReq.java +++ service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TExecuteStatementReq.java @@ -37,6 +37,7 @@ private static final org.apache.thrift.protocol.TField SESSION_HANDLE_FIELD_DESC = new org.apache.thrift.protocol.TField("sessionHandle", org.apache.thrift.protocol.TType.STRUCT, (short)1); private static final org.apache.thrift.protocol.TField STATEMENT_FIELD_DESC = new org.apache.thrift.protocol.TField("statement", org.apache.thrift.protocol.TType.STRING, (short)2); private static final org.apache.thrift.protocol.TField CONF_OVERLAY_FIELD_DESC = new org.apache.thrift.protocol.TField("confOverlay", org.apache.thrift.protocol.TType.MAP, (short)3); + private static final org.apache.thrift.protocol.TField RUN_ASYNC_FIELD_DESC = new org.apache.thrift.protocol.TField("runAsync", org.apache.thrift.protocol.TType.BOOL, (short)4); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -47,12 +48,14 @@ private TSessionHandle sessionHandle; // required private String statement; // required private Map confOverlay; // optional + private boolean runAsync; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { SESSION_HANDLE((short)1, "sessionHandle"), STATEMENT((short)2, "statement"), - CONF_OVERLAY((short)3, "confOverlay"); + CONF_OVERLAY((short)3, "confOverlay"), + RUN_ASYNC((short)4, "runAsync"); private static final Map byName = new HashMap(); @@ -73,6 +76,8 @@ return STATEMENT; case 3: // CONF_OVERLAY return CONF_OVERLAY; + case 4: // RUN_ASYNC + return RUN_ASYNC; default: return null; } @@ -113,7 +118,9 @@ } // isset id assignments - private _Fields optionals[] = {_Fields.CONF_OVERLAY}; + private static final int __RUNASYNC_ISSET_ID = 0; + private byte __isset_bitfield = 0; + private _Fields optionals[] = {_Fields.CONF_OVERLAY,_Fields.RUN_ASYNC}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -125,11 +132,15 @@ new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))); + tmpMap.put(_Fields.RUN_ASYNC, new org.apache.thrift.meta_data.FieldMetaData("runAsync", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TExecuteStatementReq.class, metaDataMap); } public TExecuteStatementReq() { + this.runAsync = false; + } public TExecuteStatementReq( @@ -145,6 +156,7 @@ * Performs a deep copy on other. */ public TExecuteStatementReq(TExecuteStatementReq other) { + __isset_bitfield = other.__isset_bitfield; if (other.isSetSessionHandle()) { this.sessionHandle = new TSessionHandle(other.sessionHandle); } @@ -166,6 +178,7 @@ } this.confOverlay = __this__confOverlay; } + this.runAsync = other.runAsync; } public TExecuteStatementReq deepCopy() { @@ -177,6 +190,8 @@ this.sessionHandle = null; this.statement = null; this.confOverlay = null; + this.runAsync = false; + } public TSessionHandle getSessionHandle() { @@ -259,6 +274,28 @@ } } + public boolean isRunAsync() { + return this.runAsync; + } + + public void setRunAsync(boolean runAsync) { + this.runAsync = runAsync; + setRunAsyncIsSet(true); + } + + public void unsetRunAsync() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __RUNASYNC_ISSET_ID); + } + + /** Returns true if field runAsync is set (has been assigned a value) and false otherwise */ + public boolean isSetRunAsync() { + return EncodingUtils.testBit(__isset_bitfield, __RUNASYNC_ISSET_ID); + } + + public void setRunAsyncIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __RUNASYNC_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case SESSION_HANDLE: @@ -285,6 +322,14 @@ } break; + case RUN_ASYNC: + if (value == null) { + unsetRunAsync(); + } else { + setRunAsync((Boolean)value); + } + break; + } } @@ -299,6 +344,9 @@ case CONF_OVERLAY: return getConfOverlay(); + case RUN_ASYNC: + return Boolean.valueOf(isRunAsync()); + } throw new IllegalStateException(); } @@ -316,6 +364,8 @@ return isSetStatement(); case CONF_OVERLAY: return isSetConfOverlay(); + case RUN_ASYNC: + return isSetRunAsync(); } throw new IllegalStateException(); } @@ -360,6 +410,15 @@ return false; } + boolean this_present_runAsync = true && this.isSetRunAsync(); + boolean that_present_runAsync = true && that.isSetRunAsync(); + if (this_present_runAsync || that_present_runAsync) { + if (!(this_present_runAsync && that_present_runAsync)) + return false; + if (this.runAsync != that.runAsync) + return false; + } + return true; } @@ -382,6 +441,11 @@ if (present_confOverlay) builder.append(confOverlay); + boolean present_runAsync = true && (isSetRunAsync()); + builder.append(present_runAsync); + if (present_runAsync) + builder.append(runAsync); + return builder.toHashCode(); } @@ -423,6 +487,16 @@ return lastComparison; } } + lastComparison = Boolean.valueOf(isSetRunAsync()).compareTo(typedOther.isSetRunAsync()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetRunAsync()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.runAsync, typedOther.runAsync); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -468,6 +542,12 @@ } first = false; } + if (isSetRunAsync()) { + if (!first) sb.append(", "); + sb.append("runAsync:"); + sb.append(this.runAsync); + first = false; + } sb.append(")"); return sb.toString(); } @@ -498,6 +578,8 @@ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bitfield = 0; read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); } catch (org.apache.thrift.TException te) { throw new java.io.IOException(te); @@ -559,6 +641,14 @@ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 4: // RUN_ASYNC + if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { + struct.runAsync = iprot.readBool(); + struct.setRunAsyncIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -597,6 +687,11 @@ oprot.writeFieldEnd(); } } + if (struct.isSetRunAsync()) { + oprot.writeFieldBegin(RUN_ASYNC_FIELD_DESC); + oprot.writeBool(struct.runAsync); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -620,7 +715,10 @@ if (struct.isSetConfOverlay()) { optionals.set(0); } - oprot.writeBitSet(optionals, 1); + if (struct.isSetRunAsync()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); if (struct.isSetConfOverlay()) { { oprot.writeI32(struct.confOverlay.size()); @@ -631,6 +729,9 @@ } } } + if (struct.isSetRunAsync()) { + oprot.writeBool(struct.runAsync); + } } @Override @@ -641,7 +742,7 @@ struct.setSessionHandleIsSet(true); struct.statement = iprot.readString(); struct.setStatementIsSet(true); - BitSet incoming = iprot.readBitSet(1); + BitSet incoming = iprot.readBitSet(2); if (incoming.get(0)) { { org.apache.thrift.protocol.TMap _map150 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32()); @@ -657,6 +758,10 @@ } struct.setConfOverlayIsSet(true); } + if (incoming.get(1)) { + struct.runAsync = iprot.readBool(); + struct.setRunAsyncIsSet(true); + } } } Index: service/src/gen/thrift/gen-py/TCLIService/ttypes.py =================================================================== --- service/src/gen/thrift/gen-py/TCLIService/ttypes.py +++ service/src/gen/thrift/gen-py/TCLIService/ttypes.py @@ -3047,19 +3047,22 @@ - sessionHandle - statement - confOverlay + - runAsync """ thrift_spec = ( None, # 0 (1, TType.STRUCT, 'sessionHandle', (TSessionHandle, TSessionHandle.thrift_spec), None, ), # 1 (2, TType.STRING, 'statement', None, None, ), # 2 (3, TType.MAP, 'confOverlay', (TType.STRING,None,TType.STRING,None), None, ), # 3 + (4, TType.BOOL, 'runAsync', None, False, ), # 4 ) - def __init__(self, sessionHandle=None, statement=None, confOverlay=None,): + def __init__(self, sessionHandle=None, statement=None, confOverlay=None, runAsync=thrift_spec[4][4],): self.sessionHandle = sessionHandle self.statement = statement self.confOverlay = confOverlay + self.runAsync = runAsync def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -3092,6 +3095,11 @@ iprot.readMapEnd() else: iprot.skip(ftype) + elif fid == 4: + if ftype == TType.BOOL: + self.runAsync = iprot.readBool(); + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -3118,6 +3126,10 @@ oprot.writeString(viter135) oprot.writeMapEnd() oprot.writeFieldEnd() + if self.runAsync is not None: + oprot.writeFieldBegin('runAsync', TType.BOOL, 4) + oprot.writeBool(self.runAsync) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() Index: service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb =================================================================== --- service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb +++ service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb @@ -904,11 +904,13 @@ SESSIONHANDLE = 1 STATEMENT = 2 CONFOVERLAY = 3 + RUNASYNC = 4 FIELDS = { SESSIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'sessionHandle', :class => ::TSessionHandle}, STATEMENT => {:type => ::Thrift::Types::STRING, :name => 'statement'}, - CONFOVERLAY => {:type => ::Thrift::Types::MAP, :name => 'confOverlay', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRING}, :optional => true} + CONFOVERLAY => {:type => ::Thrift::Types::MAP, :name => 'confOverlay', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRING}, :optional => true}, + RUNASYNC => {:type => ::Thrift::Types::BOOL, :name => 'runAsync', :default => false, :optional => true} } def struct_fields; FIELDS; end Index: service/src/java/org/apache/hive/service/cli/CLIService.java =================================================================== --- service/src/java/org/apache/hive/service/cli/CLIService.java +++ service/src/java/org/apache/hive/service/cli/CLIService.java @@ -140,18 +140,33 @@ } /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map) + * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle, + * java.lang.String, java.util.Map) */ @Override public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map confOverlay) throws HiveSQLException { OperationHandle opHandle = sessionManager.getSession(sessionHandle) - .executeStatement(statement, confOverlay); + .executeStatement(statement, confOverlay, false); LOG.info(sessionHandle + ": executeStatement()"); return opHandle; } /* (non-Javadoc) + * @see org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, + * java.lang.String, java.util.Map) + */ + @Override + public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, + Map confOverlay) throws HiveSQLException { + OperationHandle opHandle = sessionManager.getSession(sessionHandle) + .executeStatement(statement, confOverlay, true); + LOG.info(sessionHandle + ": executeStatementAsync()"); + return opHandle; + } + + + /* (non-Javadoc) * @see org.apache.hive.service.cli.ICLIService#getTypeInfo(org.apache.hive.service.cli.SessionHandle) */ @Override Index: service/src/java/org/apache/hive/service/cli/CLIServiceClient.java =================================================================== --- service/src/java/org/apache/hive/service/cli/CLIServiceClient.java +++ service/src/java/org/apache/hive/service/cli/CLIServiceClient.java @@ -56,13 +56,22 @@ throws HiveSQLException; /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map) + * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle, + * java.lang.String, java.util.Map) */ @Override public abstract OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map confOverlay) throws HiveSQLException; /* (non-Javadoc) + * @see org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, + * java.lang.String, java.util.Map) + */ + @Override + public abstract OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, + Map confOverlay) throws HiveSQLException; + + /* (non-Javadoc) * @see org.apache.hive.service.cli.ICLIService#getTypeInfo(org.apache.hive.service.cli.SessionHandle) */ @Override Index: service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java =================================================================== --- service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java +++ service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java @@ -66,15 +66,27 @@ } /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#executeStatement(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map) + * @see org.apache.hive.service.cli.CLIServiceClient#executeStatement(org.apache.hive.service.cli.SessionHandle, + * java.lang.String, java.util.Map) */ @Override public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map confOverlay) throws HiveSQLException { return cliService.executeStatement(sessionHandle, statement, confOverlay); } /* (non-Javadoc) + * @see org.apache.hive.service.cli.CLIServiceClient#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, + * java.lang.String, java.util.Map) + */ + @Override + public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, + Map confOverlay) throws HiveSQLException { + return cliService.executeStatementAsync(sessionHandle, statement, confOverlay); + } + + + /* (non-Javadoc) * @see org.apache.hive.service.cli.CLIServiceClient#getTypeInfo(org.apache.hive.service.cli.SessionHandle) */ @Override Index: service/src/java/org/apache/hive/service/cli/ICLIService.java =================================================================== --- service/src/java/org/apache/hive/service/cli/ICLIService.java +++ service/src/java/org/apache/hive/service/cli/ICLIService.java @@ -43,6 +43,10 @@ Map confOverlay) throws HiveSQLException; + public abstract OperationHandle executeStatementAsync(SessionHandle sessionHandle, + String statement, Map confOverlay) + throws HiveSQLException; + public abstract OperationHandle getTypeInfo(SessionHandle sessionHandle) throws HiveSQLException; Index: service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java =================================================================== --- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -61,7 +63,8 @@ private TableSchema resultSchema = null; private Schema mResultSchema = null; private SerDe serde = null; - + private boolean runAsync; + private Future backgroundHandle; public SQLOperation(HiveSession parentSession, String statement, Map confOverlay) { // TODO: call setRemoteUser in ExecuteStatementOperation or higher. @@ -72,9 +75,7 @@ public void prepare() throws HiveSQLException { } - @Override - public void run() throws HiveSQLException { - setState(OperationState.RUNNING); + public void runInternal() throws HiveSQLException { String statement_trimmed = statement.trim(); String[] tokens = statement_trimmed.split("\\s"); String cmd_1 = statement_trimmed.substring(tokens[0].length()).trim(); @@ -116,31 +117,59 @@ } @Override - public void cancel() throws HiveSQLException { - setState(OperationState.CANCELED); + public void run() throws HiveSQLException { + setState(OperationState.RUNNING); + if (!shouldRunAsync()) { + runInternal(); + } else { + Runnable backgroundTask = new Runnable() { + SessionState ss = SessionState.get(); + @Override + public void run() { + SessionState.start(ss); + try { + runInternal(); + } catch (HiveSQLException e) { + e.printStackTrace(); + } + } + }; + try { + this.backgroundHandle = + getParentSession().getSessionManager().submitBackgroundOperation(backgroundTask); + } catch (RejectedExecutionException rejected) { + setState(OperationState.ERROR); + throw new HiveSQLException(rejected); + } + } + } + + private void cleanup(OperationState state) throws HiveSQLException { + setState(state); + if (shouldRunAsync()) { + if (backgroundHandle != null) { + backgroundHandle.cancel(true); + } + } if (driver != null) { driver.close(); driver.destroy(); } - SessionState session = SessionState.get(); - if (session.getTmpOutputFile() != null) { - session.getTmpOutputFile().delete(); + SessionState ss = SessionState.get(); + if (ss.getTmpOutputFile() != null) { + ss.getTmpOutputFile().delete(); } } @Override - public void close() throws HiveSQLException { - setState(OperationState.CLOSED); - if (driver != null) { - driver.close(); - driver.destroy(); - } + public void cancel() throws HiveSQLException { + cleanup(OperationState.CANCELED); + } - SessionState session = SessionState.get(); - if (session.getTmpOutputFile() != null) { - session.getTmpOutputFile().delete(); - } + @Override + public void close() throws HiveSQLException { + cleanup(OperationState.CLOSED); } @Override @@ -259,4 +288,12 @@ return serde; } + public void setRunAsync(boolean runInBackground) { + this.runAsync = runInBackground; + } + + private boolean shouldRunAsync() { + return runAsync; + } + } Index: service/src/java/org/apache/hive/service/cli/session/HiveSession.java =================================================================== --- service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -42,6 +42,11 @@ public void setSessionManager(SessionManager sessionManager); /** + * Get the session manager for the session + */ + public SessionManager getSessionManager(); + + /** * Set operation manager for the session * @param operationManager */ @@ -69,11 +74,12 @@ * execute operation handler * @param statement * @param confOverlay + * @param runAsync * @return * @throws HiveSQLException */ public OperationHandle executeStatement(String statement, - Map confOverlay) throws HiveSQLException; + Map confOverlay, boolean runAsync) throws HiveSQLException; /** * getTypeInfo operation handler Index: service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java =================================================================== --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -18,15 +18,13 @@ package org.apache.hive.service.cli.session; -import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -54,6 +52,7 @@ import org.apache.hive.service.cli.operation.GetTypeInfoOperation; import org.apache.hive.service.cli.operation.MetadataOperation; import org.apache.hive.service.cli.operation.OperationManager; +import org.apache.hive.service.cli.operation.SQLOperation; /** * HiveSession @@ -93,7 +92,7 @@ sessionState = new SessionState(hiveConf); } - private SessionManager getSessionManager() { + public SessionManager getSessionManager() { return sessionManager; } @@ -172,12 +171,16 @@ } } - public OperationHandle executeStatement(String statement, Map confOverlay) + public OperationHandle executeStatement(String statement, Map confOverlay, + boolean runAsync) throws HiveSQLException { acquire(); try { ExecuteStatementOperation operation = getOperationManager() .newExecuteStatementOperation(getSession(), statement, confOverlay); + if (operation instanceof SQLOperation) { + ((SQLOperation) operation).setRunAsync(runAsync); + } operation.run(); OperationHandle opHandle = operation.getHandle(); opHandleSet.add(opHandle); Index: service/src/java/org/apache/hive/service/cli/session/SessionManager.java =================================================================== --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -20,8 +20,15 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hive.service.CompositeService; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.SessionHandle; @@ -32,23 +39,24 @@ * */ public class SessionManager extends CompositeService { - + private static final Log LOG = LogFactory.getLog(CompositeService.class); private HiveConf hiveConf; private final Map handleToSession = new HashMap(); private OperationManager operationManager = new OperationManager(); private static final Object sessionMapLock = new Object(); + private ExecutorService backgroundOperationPool; public SessionManager() { super("SessionManager"); } @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; - operationManager = new OperationManager(); + int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS); + backgroundOperationPool = Executors.newFixedThreadPool(backgroundPoolSize); addService(operationManager); - super.init(hiveConf); } @@ -62,6 +70,15 @@ public synchronized void stop() { // TODO super.stop(); + if (backgroundOperationPool != null) { + backgroundOperationPool.shutdown(); + try { + long timeout = hiveConf.getLongVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT); + backgroundOperationPool.awaitTermination(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException exc) { + LOG.warn("background tasks still pending.", exc); + } + } } @@ -150,4 +167,8 @@ threadLocalUserName.remove(); } + public Future submitBackgroundOperation(Runnable r) { + return backgroundOperationPool.submit(r); + } + } Index: service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java =================================================================== --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -200,8 +200,10 @@ SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle()); String statement = req.getStatement(); Map confOverlay = req.getConfOverlay(); - OperationHandle operationHandle = - cliService.executeStatement(sessionHandle, statement, confOverlay); + Boolean runAsync = req.isRunAsync(); + OperationHandle operationHandle = runAsync ? + cliService.executeStatementAsync(sessionHandle, statement, confOverlay) + : cliService.executeStatement(sessionHandle, statement, confOverlay); resp.setOperationHandle(operationHandle.toTOperationHandle()); resp.setStatus(OK_STATUS); } catch (Exception e) { Index: service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java =================================================================== --- service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java +++ service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java @@ -122,9 +122,27 @@ public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map confOverlay) throws HiveSQLException { + return executeStatementInternal(sessionHandle, statement, confOverlay, false); + } + + /* (non-Javadoc) + * @see org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map) + */ + @Override + public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, + Map confOverlay) + throws HiveSQLException { + return executeStatementInternal(sessionHandle, statement, confOverlay, true); + } + + private OperationHandle executeStatementInternal(SessionHandle sessionHandle, String statement, + Map confOverlay, boolean isAsync) + throws HiveSQLException { try { - TExecuteStatementReq req = new TExecuteStatementReq(sessionHandle.toTSessionHandle(), statement); + TExecuteStatementReq req = + new TExecuteStatementReq(sessionHandle.toTSessionHandle(), statement); req.setConfOverlay(confOverlay); + req.setRunAsync(isAsync); TExecuteStatementResp resp = cliService.ExecuteStatement(req); checkStatus(resp.getStatus()); return new OperationHandle(resp.getOperationHandle()); Index: service/src/test/org/apache/hive/service/cli/CLIServiceTest.java =================================================================== --- service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -113,4 +113,64 @@ client.closeSession(sessionHandle); } + + @Test + public void testExecuteStatement() throws Exception { + HashMap confOverlay = new HashMap(); + SessionHandle sessionHandle = client.openSession("tom", "password", + new HashMap()); + // Timeout for the poll in case of asynchronous execute + long pollTimeout = System.currentTimeMillis() + 100000; + assertNotNull(sessionHandle); + + // Change lock manager, otherwise unit-test doesn't go through + String setLockMgr = "SET hive.lock.manager=" + + "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; + client.executeStatement(sessionHandle, setLockMgr, confOverlay); + + String createTable = "CREATE TABLE TEST_EXEC_ASYNC(ID STRING)"; + client.executeStatement(sessionHandle, createTable, confOverlay); + + // nonblocking execute + String select = "SELECT ID FROM TEST_EXEC_ASYNC"; + OperationHandle ophandle = + client.executeStatementAsync(sessionHandle, select, confOverlay); + + OperationState state = null; + int count = 0; + while (true) { + // Break if polling times out + if (System.currentTimeMillis() > pollTimeout) { + System.out.println("Polling timed out"); + break; + } + state = client.getOperationStatus(ophandle); + System.out.println("Polling: " + ophandle + " count=" + (++count) + + " state=" + state); + + if (OperationState.CANCELED == state || state == OperationState.CLOSED + || state == OperationState.FINISHED + || state == OperationState.ERROR) { + break; + } + Thread.sleep(1000); + } + + assertEquals("Query should be finished", + OperationState.FINISHED, client.getOperationStatus(ophandle)); + + // blocking execute + ophandle = client.executeStatement(sessionHandle, select, confOverlay); + // expect query to be completed now + assertEquals("Query should be finished", + OperationState.FINISHED, client.getOperationStatus(ophandle)); + + // cancellation test + ophandle = client.executeStatementAsync(sessionHandle, select, confOverlay); + System.out.println("cancelling " + ophandle); + client.cancelOperation(ophandle); + state = client.getOperationStatus(ophandle); + System.out.println(ophandle + " after cancelling, state= " + state); + assertEquals("Query should be cancelled", OperationState.CANCELED, state); + } }