diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 73bc620..7243648 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -55,6 +55,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.SQLTimeoutException; import java.sql.SQLWarning; import java.sql.Statement; import java.sql.Timestamp; @@ -2384,7 +2385,7 @@ public void run() { try { System.out.println("Executing query: "); stmt.executeQuery("select sleepUDF(t1.under_col) as u0, t1.under_col as u1, " + - "t2.under_col as u2 from " + tableName + "t1 join " + tableName + + "t2.under_col as u2 from " + tableName + " t1 join " + tableName + " t2 on t1.under_col = t2.under_col"); fail("Expecting SQLException"); } catch (SQLException e) { @@ -2399,7 +2400,7 @@ public void run() { @Override public void run() { try { - Thread.sleep(1000); + Thread.sleep(10000); System.out.println("Cancelling query: "); stmt.cancel(); } catch (Exception e) { @@ -2414,6 +2415,44 @@ public void run() { stmt.close(); } + @Test + public void testQueryTimeout() throws Exception { + String udfName = SleepUDF.class.getName(); + Statement stmt1 = con.createStatement(); + stmt1.execute("create temporary function sleepUDF as '" + udfName + "'"); + stmt1.close(); + Statement stmt = con.createStatement(); + // Test a query where timeout kicks in + // Set query timeout to 15 seconds + stmt.setQueryTimeout(15); + System.err.println("Executing query: "); + try { + // Sleep UDF sleeps for 100ms for each select call + // The test table has 500 rows, so that should be sufficient time + stmt.executeQuery("select sleepUDF(t1.under_col) as u0, t1.under_col as u1, " + + "t2.under_col as u2 from " + tableName + " t1 join " + tableName + + " t2 on t1.under_col = t2.under_col"); + fail("Expecting SQLTimeoutException"); + } catch (SQLTimeoutException e) { + assertNotNull(e); + System.err.println(e.toString()); + } catch (SQLException e) { + fail("Expecting SQLTimeoutException, but got SQLException: " + e); + e.printStackTrace(); + } + + // Test a query where timeout does not kick in. Set it to 25s + stmt.setQueryTimeout(25); + try { + stmt.executeQuery("show tables"); + } catch (SQLException e) { + fail("Unexpected SQLException: " + e); + e.printStackTrace(); + } + + stmt.close(); + } + /** * Test the non-null value of the Yarn ATS GUID. * We spawn 2 threads - one running the query and diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java index 4d763d2..c9e6a13 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java @@ -70,7 +70,7 @@ protected synchronized void release(boolean userAccess) { Map confOverlay = new HashMap(); String hql = "drop table if exists table_not_exists"; Mockito.when(operationManager.newExecuteStatementOperation(same(session), eq(hql), - (Map)Mockito.any(), eq(true))).thenReturn(operation); + (Map)Mockito.any(), eq(true), eq(0))).thenReturn(operation); try { diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index 3cc6b74..38ccc78 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -43,6 +43,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLTimeoutException; import java.sql.SQLWarning; import java.util.ArrayList; import java.util.HashMap; @@ -111,6 +112,8 @@ */ private boolean isExecuteStatementFailed = false; + private int queryTimeout = 0; + public HiveStatement(HiveConnection connection, TCLIService.Iface client, TSessionHandle sessHandle) { this(connection, client, sessHandle, false, DEFAULT_FETCH_SIZE); @@ -300,7 +303,7 @@ private void runAsyncOnServer(String sql) throws SQLException { */ execReq.setRunAsync(true); execReq.setConfOverlay(sessConf); - + execReq.setQueryTimeout(queryTimeout); try { TExecuteStatementResp execResp = client.ExecuteStatement(execReq); Utils.verifySuccessWithInfo(execResp.getStatus()); @@ -323,8 +326,8 @@ void waitForOperationToComplete() throws SQLException { while (!isOperationComplete) { try { /** - * For an async SQLOperation, GetOperationStatus will use the long polling approach - * It will essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires + * For an async SQLOperation, GetOperationStatus will use the long polling approach It will + * essentially return after the HIVE_SERVER2_LONG_POLLING_TIMEOUT (a server config) expires */ statusResp = client.GetOperationStatus(statusReq); Utils.verifySuccessWithInfo(statusResp.getStatus()); @@ -338,10 +341,12 @@ void waitForOperationToComplete() throws SQLException { case CANCELED_STATE: // 01000 -> warning throw new SQLException("Query was cancelled", "01000"); + case TIMEDOUT_STATE: + throw new SQLTimeoutException("Query timed out after " + queryTimeout + " seconds"); case ERROR_STATE: // Get the error details from the underlying exception - throw new SQLException(statusResp.getErrorMessage(), - statusResp.getSqlState(), statusResp.getErrorCode()); + throw new SQLException(statusResp.getErrorMessage(), statusResp.getSqlState(), + statusResp.getErrorCode()); case UKNOWN_STATE: throw new SQLException("Unknown query", "HY000"); case INITIALIZED_STATE: @@ -787,10 +792,7 @@ public void setPoolable(boolean poolable) throws SQLException { @Override public void setQueryTimeout(int seconds) throws SQLException { - // 0 is supported which means "no limit" - if (seconds != 0) { - throw new SQLException("Query timeout seconds must be 0"); - } + this.queryTimeout = seconds; } /* diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 32d2cb2..6a610cb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1725,20 +1725,31 @@ public int execute() throws CommandNeedRetryException { } LOG.info("Completed executing command(queryId=" + queryId + "); Time taken: " + duration + " seconds"); } - plan.setDone(); - if (SessionState.get() != null) { - try { - SessionState.get().getHiveHistory().logPlanProgress(plan); - } catch (Exception e) { - // ignore - } + releasePlan(plan); + + if (console != null) { + console.printInfo("OK"); } - console.printInfo("OK"); return (0); } + private synchronized void releasePlan(QueryPlan plan) { + // Plan maybe null if Driver.close is called in another thread for the same Driver object + if (plan != null) { + plan.setDone(); + if (SessionState.get() != null) { + try { + SessionState.get().getHiveHistory().logPlanProgress(plan); + } catch (Exception e) { + // Log and ignore + LOG.warn("Could not log query plan progress", e); + } + } + } + } + private void setQueryDisplays(List> tasks) { if (tasks != null) { for (Task task : tasks) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java index 0234fd9..6582cdd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryImpl.java @@ -315,9 +315,11 @@ public void progressTask(String queryId, Task task) { @Override public void logPlanProgress(QueryPlan plan) throws IOException { - Map ctrmap = ctrMapFactory.get(); - ctrmap.put("plan", plan.toString()); - log(RecordTypes.Counters, ctrmap); + if (plan != null) { + Map ctrmap = ctrMapFactory.get(); + ctrmap.put("plan", plan.toString()); + log(RecordTypes.Counters, ctrmap); + } } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java b/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java index 6d0f14a..18216f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java @@ -166,7 +166,7 @@ synchronized void write(String msg) { return readResults(maxRows); } - void remove() { + synchronized void remove() { try { if (in != null) { in.close(); @@ -174,8 +174,10 @@ void remove() { if (out != null) { out.close(); } - FileUtils.forceDelete(file); - isRemoved = true; + if (!isRemoved) { + FileUtils.forceDelete(file); + isRemoved = true; + } } catch (Exception e) { LOG.error("Failed to remove corresponding log file of operation: " + operationName, e); } diff --git a/service-rpc/if/TCLIService.thrift b/service-rpc/if/TCLIService.thrift index 92bcf77..9879b1b 100644 --- a/service-rpc/if/TCLIService.thrift +++ b/service-rpc/if/TCLIService.thrift @@ -458,6 +458,9 @@ enum TOperationState { // The operation is in an pending state PENDING_STATE, + + // The operation is in an timedout state + TIMEDOUT_STATE, } // A string identifier. This is interpreted literally. @@ -697,6 +700,9 @@ struct TExecuteStatementReq { // Execute asynchronously when runAsync is true 4: optional bool runAsync = false + + // The number of seconds after which the query will timeout on the server + 5: optional i64 queryTimeout = 0 } struct TExecuteStatementResp { diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp index 66f5e8c..5229230 100644 --- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp +++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp @@ -109,7 +109,8 @@ int _kTOperationStateValues[] = { TOperationState::CLOSED_STATE, TOperationState::ERROR_STATE, TOperationState::UKNOWN_STATE, - TOperationState::PENDING_STATE + TOperationState::PENDING_STATE, + TOperationState::TIMEDOUT_STATE }; const char* _kTOperationStateNames[] = { "INITIALIZED_STATE", @@ -119,9 +120,10 @@ const char* _kTOperationStateNames[] = { "CLOSED_STATE", "ERROR_STATE", "UKNOWN_STATE", - "PENDING_STATE" + "PENDING_STATE", + "TIMEDOUT_STATE" }; -const std::map _TOperationState_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(8, _kTOperationStateValues, _kTOperationStateNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map _TOperationState_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(9, _kTOperationStateValues, _kTOperationStateNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); int _kTOperationTypeValues[] = { TOperationType::EXECUTE_STATEMENT, @@ -5575,6 +5577,11 @@ void TExecuteStatementReq::__set_runAsync(const bool val) { __isset.runAsync = true; } +void TExecuteStatementReq::__set_queryTimeout(const int64_t val) { + this->queryTimeout = val; +__isset.queryTimeout = true; +} + uint32_t TExecuteStatementReq::read(::apache::thrift::protocol::TProtocol* iprot) { apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); @@ -5645,6 +5652,14 @@ uint32_t TExecuteStatementReq::read(::apache::thrift::protocol::TProtocol* iprot xfer += iprot->skip(ftype); } break; + case 5: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->queryTimeout); + this->__isset.queryTimeout = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -5693,6 +5708,11 @@ uint32_t TExecuteStatementReq::write(::apache::thrift::protocol::TProtocol* opro xfer += oprot->writeBool(this->runAsync); xfer += oprot->writeFieldEnd(); } + if (this->__isset.queryTimeout) { + xfer += oprot->writeFieldBegin("queryTimeout", ::apache::thrift::protocol::T_I64, 5); + xfer += oprot->writeI64(this->queryTimeout); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -5704,6 +5724,7 @@ void swap(TExecuteStatementReq &a, TExecuteStatementReq &b) { swap(a.statement, b.statement); swap(a.confOverlay, b.confOverlay); swap(a.runAsync, b.runAsync); + swap(a.queryTimeout, b.queryTimeout); swap(a.__isset, b.__isset); } @@ -5712,6 +5733,7 @@ TExecuteStatementReq::TExecuteStatementReq(const TExecuteStatementReq& other222) statement = other222.statement; confOverlay = other222.confOverlay; runAsync = other222.runAsync; + queryTimeout = other222.queryTimeout; __isset = other222.__isset; } TExecuteStatementReq& TExecuteStatementReq::operator=(const TExecuteStatementReq& other223) { @@ -5719,6 +5741,7 @@ TExecuteStatementReq& TExecuteStatementReq::operator=(const TExecuteStatementReq statement = other223.statement; confOverlay = other223.confOverlay; runAsync = other223.runAsync; + queryTimeout = other223.queryTimeout; __isset = other223.__isset; return *this; } @@ -5729,6 +5752,7 @@ void TExecuteStatementReq::printTo(std::ostream& out) const { out << ", " << "statement=" << to_string(statement); out << ", " << "confOverlay="; (__isset.confOverlay ? (out << to_string(confOverlay)) : (out << "")); out << ", " << "runAsync="; (__isset.runAsync ? (out << to_string(runAsync)) : (out << "")); + out << ", " << "queryTimeout="; (__isset.queryTimeout ? (out << to_string(queryTimeout)) : (out << "")); out << ")"; } diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h index 9f937ca..838bf17 100644 --- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h +++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h @@ -84,7 +84,8 @@ struct TOperationState { CLOSED_STATE = 4, ERROR_STATE = 5, UKNOWN_STATE = 6, - PENDING_STATE = 7 + PENDING_STATE = 7, + TIMEDOUT_STATE = 8 }; }; @@ -2501,9 +2502,10 @@ inline std::ostream& operator<<(std::ostream& out, const TGetInfoResp& obj) } typedef struct _TExecuteStatementReq__isset { - _TExecuteStatementReq__isset() : confOverlay(false), runAsync(true) {} + _TExecuteStatementReq__isset() : confOverlay(false), runAsync(true), queryTimeout(true) {} bool confOverlay :1; bool runAsync :1; + bool queryTimeout :1; } _TExecuteStatementReq__isset; class TExecuteStatementReq { @@ -2511,7 +2513,7 @@ class TExecuteStatementReq { TExecuteStatementReq(const TExecuteStatementReq&); TExecuteStatementReq& operator=(const TExecuteStatementReq&); - TExecuteStatementReq() : statement(), runAsync(false) { + TExecuteStatementReq() : statement(), runAsync(false), queryTimeout(0LL) { } virtual ~TExecuteStatementReq() throw(); @@ -2519,6 +2521,7 @@ class TExecuteStatementReq { std::string statement; std::map confOverlay; bool runAsync; + int64_t queryTimeout; _TExecuteStatementReq__isset __isset; @@ -2530,6 +2533,8 @@ class TExecuteStatementReq { void __set_runAsync(const bool val); + void __set_queryTimeout(const int64_t val); + bool operator == (const TExecuteStatementReq & rhs) const { if (!(sessionHandle == rhs.sessionHandle)) @@ -2544,6 +2549,10 @@ class TExecuteStatementReq { return false; else if (__isset.runAsync && !(runAsync == rhs.runAsync)) return false; + if (__isset.queryTimeout != rhs.__isset.queryTimeout) + return false; + else if (__isset.queryTimeout && !(queryTimeout == rhs.queryTimeout)) + return false; return true; } bool operator != (const TExecuteStatementReq &rhs) const { diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TExecuteStatementReq.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TExecuteStatementReq.java index 2eb4d09..1f73cec 100644 --- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TExecuteStatementReq.java +++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TExecuteStatementReq.java @@ -42,6 +42,7 @@ private static final org.apache.thrift.protocol.TField STATEMENT_FIELD_DESC = new org.apache.thrift.protocol.TField("statement", org.apache.thrift.protocol.TType.STRING, (short)2); private static final org.apache.thrift.protocol.TField CONF_OVERLAY_FIELD_DESC = new org.apache.thrift.protocol.TField("confOverlay", org.apache.thrift.protocol.TType.MAP, (short)3); private static final org.apache.thrift.protocol.TField RUN_ASYNC_FIELD_DESC = new org.apache.thrift.protocol.TField("runAsync", org.apache.thrift.protocol.TType.BOOL, (short)4); + private static final org.apache.thrift.protocol.TField QUERY_TIMEOUT_FIELD_DESC = new org.apache.thrift.protocol.TField("queryTimeout", org.apache.thrift.protocol.TType.I64, (short)5); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -53,13 +54,15 @@ private String statement; // required private Map confOverlay; // optional private boolean runAsync; // optional + private long queryTimeout; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { SESSION_HANDLE((short)1, "sessionHandle"), STATEMENT((short)2, "statement"), CONF_OVERLAY((short)3, "confOverlay"), - RUN_ASYNC((short)4, "runAsync"); + RUN_ASYNC((short)4, "runAsync"), + QUERY_TIMEOUT((short)5, "queryTimeout"); private static final Map byName = new HashMap(); @@ -82,6 +85,8 @@ public static _Fields findByThriftId(int fieldId) { return CONF_OVERLAY; case 4: // RUN_ASYNC return RUN_ASYNC; + case 5: // QUERY_TIMEOUT + return QUERY_TIMEOUT; default: return null; } @@ -123,8 +128,9 @@ public String getFieldName() { // isset id assignments private static final int __RUNASYNC_ISSET_ID = 0; + private static final int __QUERYTIMEOUT_ISSET_ID = 1; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.CONF_OVERLAY,_Fields.RUN_ASYNC}; + private static final _Fields optionals[] = {_Fields.CONF_OVERLAY,_Fields.RUN_ASYNC,_Fields.QUERY_TIMEOUT}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -138,6 +144,8 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))); tmpMap.put(_Fields.RUN_ASYNC, new org.apache.thrift.meta_data.FieldMetaData("runAsync", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); + tmpMap.put(_Fields.QUERY_TIMEOUT, new org.apache.thrift.meta_data.FieldMetaData("queryTimeout", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TExecuteStatementReq.class, metaDataMap); } @@ -145,6 +153,8 @@ public String getFieldName() { public TExecuteStatementReq() { this.runAsync = false; + this.queryTimeout = 0L; + } public TExecuteStatementReq( @@ -172,6 +182,7 @@ public TExecuteStatementReq(TExecuteStatementReq other) { this.confOverlay = __this__confOverlay; } this.runAsync = other.runAsync; + this.queryTimeout = other.queryTimeout; } public TExecuteStatementReq deepCopy() { @@ -185,6 +196,8 @@ public void clear() { this.confOverlay = null; this.runAsync = false; + this.queryTimeout = 0L; + } public TSessionHandle getSessionHandle() { @@ -289,6 +302,28 @@ public void setRunAsyncIsSet(boolean value) { __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __RUNASYNC_ISSET_ID, value); } + public long getQueryTimeout() { + return this.queryTimeout; + } + + public void setQueryTimeout(long queryTimeout) { + this.queryTimeout = queryTimeout; + setQueryTimeoutIsSet(true); + } + + public void unsetQueryTimeout() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __QUERYTIMEOUT_ISSET_ID); + } + + /** Returns true if field queryTimeout is set (has been assigned a value) and false otherwise */ + public boolean isSetQueryTimeout() { + return EncodingUtils.testBit(__isset_bitfield, __QUERYTIMEOUT_ISSET_ID); + } + + public void setQueryTimeoutIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __QUERYTIMEOUT_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case SESSION_HANDLE: @@ -323,6 +358,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case QUERY_TIMEOUT: + if (value == null) { + unsetQueryTimeout(); + } else { + setQueryTimeout((Long)value); + } + break; + } } @@ -340,6 +383,9 @@ public Object getFieldValue(_Fields field) { case RUN_ASYNC: return isRunAsync(); + case QUERY_TIMEOUT: + return getQueryTimeout(); + } throw new IllegalStateException(); } @@ -359,6 +405,8 @@ public boolean isSet(_Fields field) { return isSetConfOverlay(); case RUN_ASYNC: return isSetRunAsync(); + case QUERY_TIMEOUT: + return isSetQueryTimeout(); } throw new IllegalStateException(); } @@ -412,6 +460,15 @@ public boolean equals(TExecuteStatementReq that) { return false; } + boolean this_present_queryTimeout = true && this.isSetQueryTimeout(); + boolean that_present_queryTimeout = true && that.isSetQueryTimeout(); + if (this_present_queryTimeout || that_present_queryTimeout) { + if (!(this_present_queryTimeout && that_present_queryTimeout)) + return false; + if (this.queryTimeout != that.queryTimeout) + return false; + } + return true; } @@ -439,6 +496,11 @@ public int hashCode() { if (present_runAsync) list.add(runAsync); + boolean present_queryTimeout = true && (isSetQueryTimeout()); + list.add(present_queryTimeout); + if (present_queryTimeout) + list.add(queryTimeout); + return list.hashCode(); } @@ -490,6 +552,16 @@ public int compareTo(TExecuteStatementReq other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetQueryTimeout()).compareTo(other.isSetQueryTimeout()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetQueryTimeout()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.queryTimeout, other.queryTimeout); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -541,6 +613,12 @@ public String toString() { sb.append(this.runAsync); first = false; } + if (isSetQueryTimeout()) { + if (!first) sb.append(", "); + sb.append("queryTimeout:"); + sb.append(this.queryTimeout); + first = false; + } sb.append(")"); return sb.toString(); } @@ -642,6 +720,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, TExecuteStatementRe org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 5: // QUERY_TIMEOUT + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.queryTimeout = iprot.readI64(); + struct.setQueryTimeoutIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -685,6 +771,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TExecuteStatementR oprot.writeBool(struct.runAsync); oprot.writeFieldEnd(); } + if (struct.isSetQueryTimeout()) { + oprot.writeFieldBegin(QUERY_TIMEOUT_FIELD_DESC); + oprot.writeI64(struct.queryTimeout); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -711,7 +802,10 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TExecuteStatementRe if (struct.isSetRunAsync()) { optionals.set(1); } - oprot.writeBitSet(optionals, 2); + if (struct.isSetQueryTimeout()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); if (struct.isSetConfOverlay()) { { oprot.writeI32(struct.confOverlay.size()); @@ -725,6 +819,9 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TExecuteStatementRe if (struct.isSetRunAsync()) { oprot.writeBool(struct.runAsync); } + if (struct.isSetQueryTimeout()) { + oprot.writeI64(struct.queryTimeout); + } } @Override @@ -735,7 +832,7 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TExecuteStatementReq struct.setSessionHandleIsSet(true); struct.statement = iprot.readString(); struct.setStatementIsSet(true); - BitSet incoming = iprot.readBitSet(2); + BitSet incoming = iprot.readBitSet(3); if (incoming.get(0)) { { org.apache.thrift.protocol.TMap _map168 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32()); @@ -755,6 +852,10 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TExecuteStatementReq struct.runAsync = iprot.readBool(); struct.setRunAsyncIsSet(true); } + if (incoming.get(2)) { + struct.queryTimeout = iprot.readI64(); + struct.setQueryTimeoutIsSet(true); + } } } diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOperationState.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOperationState.java index 3fa49b0..4390b4b 100644 --- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOperationState.java +++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOperationState.java @@ -19,7 +19,8 @@ CLOSED_STATE(4), ERROR_STATE(5), UKNOWN_STATE(6), - PENDING_STATE(7); + PENDING_STATE(7), + TIMEDOUT_STATE(8); private final int value; @@ -56,6 +57,8 @@ public static TOperationState findByValue(int value) { return UKNOWN_STATE; case 7: return PENDING_STATE; + case 8: + return TIMEDOUT_STATE; default: return null; } diff --git a/service-rpc/src/gen/thrift/gen-php/Types.php b/service-rpc/src/gen/thrift/gen-php/Types.php index 7f1f99f..9ed7403 100644 --- a/service-rpc/src/gen/thrift/gen-php/Types.php +++ b/service-rpc/src/gen/thrift/gen-php/Types.php @@ -109,6 +109,7 @@ final class TOperationState { const ERROR_STATE = 5; const UKNOWN_STATE = 6; const PENDING_STATE = 7; + const TIMEDOUT_STATE = 8; static public $__names = array( 0 => 'INITIALIZED_STATE', 1 => 'RUNNING_STATE', @@ -118,6 +119,7 @@ final class TOperationState { 5 => 'ERROR_STATE', 6 => 'UKNOWN_STATE', 7 => 'PENDING_STATE', + 8 => 'TIMEDOUT_STATE', ); } @@ -5446,6 +5448,10 @@ class TExecuteStatementReq { * @var bool */ public $runAsync = false; + /** + * @var int + */ + public $queryTimeout = 0; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -5475,6 +5481,10 @@ class TExecuteStatementReq { 'var' => 'runAsync', 'type' => TType::BOOL, ), + 5 => array( + 'var' => 'queryTimeout', + 'type' => TType::I64, + ), ); } if (is_array($vals)) { @@ -5490,6 +5500,9 @@ class TExecuteStatementReq { if (isset($vals['runAsync'])) { $this->runAsync = $vals['runAsync']; } + if (isset($vals['queryTimeout'])) { + $this->queryTimeout = $vals['queryTimeout']; + } } } @@ -5554,6 +5567,13 @@ class TExecuteStatementReq { $xfer += $input->skip($ftype); } break; + case 5: + if ($ftype == TType::I64) { + $xfer += $input->readI64($this->queryTimeout); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -5603,6 +5623,11 @@ class TExecuteStatementReq { $xfer += $output->writeBool($this->runAsync); $xfer += $output->writeFieldEnd(); } + if ($this->queryTimeout !== null) { + $xfer += $output->writeFieldBegin('queryTimeout', TType::I64, 5); + $xfer += $output->writeI64($this->queryTimeout); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py index 3bb20b8..44e5462 100644 --- a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py +++ b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py @@ -154,6 +154,7 @@ class TOperationState: ERROR_STATE = 5 UKNOWN_STATE = 6 PENDING_STATE = 7 + TIMEDOUT_STATE = 8 _VALUES_TO_NAMES = { 0: "INITIALIZED_STATE", @@ -164,6 +165,7 @@ class TOperationState: 5: "ERROR_STATE", 6: "UKNOWN_STATE", 7: "PENDING_STATE", + 8: "TIMEDOUT_STATE", } _NAMES_TO_VALUES = { @@ -175,6 +177,7 @@ class TOperationState: "ERROR_STATE": 5, "UKNOWN_STATE": 6, "PENDING_STATE": 7, + "TIMEDOUT_STATE": 8, } class TOperationType: @@ -4162,6 +4165,7 @@ class TExecuteStatementReq: - statement - confOverlay - runAsync + - queryTimeout """ thrift_spec = ( @@ -4170,13 +4174,15 @@ class TExecuteStatementReq: (2, TType.STRING, 'statement', None, None, ), # 2 (3, TType.MAP, 'confOverlay', (TType.STRING,None,TType.STRING,None), None, ), # 3 (4, TType.BOOL, 'runAsync', None, False, ), # 4 + (5, TType.I64, 'queryTimeout', None, 0, ), # 5 ) - def __init__(self, sessionHandle=None, statement=None, confOverlay=None, runAsync=thrift_spec[4][4],): + def __init__(self, sessionHandle=None, statement=None, confOverlay=None, runAsync=thrift_spec[4][4], queryTimeout=thrift_spec[5][4],): self.sessionHandle = sessionHandle self.statement = statement self.confOverlay = confOverlay self.runAsync = runAsync + self.queryTimeout = queryTimeout def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -4214,6 +4220,11 @@ def read(self, iprot): self.runAsync = iprot.readBool() else: iprot.skip(ftype) + elif fid == 5: + if ftype == TType.I64: + self.queryTimeout = iprot.readI64() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -4244,6 +4255,10 @@ def write(self, oprot): oprot.writeFieldBegin('runAsync', TType.BOOL, 4) oprot.writeBool(self.runAsync) oprot.writeFieldEnd() + if self.queryTimeout is not None: + oprot.writeFieldBegin('queryTimeout', TType.I64, 5) + oprot.writeI64(self.queryTimeout) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -4261,6 +4276,7 @@ def __hash__(self): value = (value * 31) ^ hash(self.statement) value = (value * 31) ^ hash(self.confOverlay) value = (value * 31) ^ hash(self.runAsync) + value = (value * 31) ^ hash(self.queryTimeout) return value def __repr__(self): diff --git a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb index 7208bae..b39ec1e 100644 --- a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb +++ b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb @@ -65,8 +65,9 @@ module TOperationState ERROR_STATE = 5 UKNOWN_STATE = 6 PENDING_STATE = 7 - VALUE_MAP = {0 => "INITIALIZED_STATE", 1 => "RUNNING_STATE", 2 => "FINISHED_STATE", 3 => "CANCELED_STATE", 4 => "CLOSED_STATE", 5 => "ERROR_STATE", 6 => "UKNOWN_STATE", 7 => "PENDING_STATE"} - VALID_VALUES = Set.new([INITIALIZED_STATE, RUNNING_STATE, FINISHED_STATE, CANCELED_STATE, CLOSED_STATE, ERROR_STATE, UKNOWN_STATE, PENDING_STATE]).freeze + TIMEDOUT_STATE = 8 + VALUE_MAP = {0 => "INITIALIZED_STATE", 1 => "RUNNING_STATE", 2 => "FINISHED_STATE", 3 => "CANCELED_STATE", 4 => "CLOSED_STATE", 5 => "ERROR_STATE", 6 => "UKNOWN_STATE", 7 => "PENDING_STATE", 8 => "TIMEDOUT_STATE"} + VALID_VALUES = Set.new([INITIALIZED_STATE, RUNNING_STATE, FINISHED_STATE, CANCELED_STATE, CLOSED_STATE, ERROR_STATE, UKNOWN_STATE, PENDING_STATE, TIMEDOUT_STATE]).freeze end module TOperationType @@ -1135,12 +1136,14 @@ class TExecuteStatementReq STATEMENT = 2 CONFOVERLAY = 3 RUNASYNC = 4 + QUERYTIMEOUT = 5 FIELDS = { SESSIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'sessionHandle', :class => ::TSessionHandle}, STATEMENT => {:type => ::Thrift::Types::STRING, :name => 'statement'}, CONFOVERLAY => {:type => ::Thrift::Types::MAP, :name => 'confOverlay', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRING}, :optional => true}, - RUNASYNC => {:type => ::Thrift::Types::BOOL, :name => 'runAsync', :default => false, :optional => true} + RUNASYNC => {:type => ::Thrift::Types::BOOL, :name => 'runAsync', :default => false, :optional => true}, + QUERYTIMEOUT => {:type => ::Thrift::Types::I64, :name => 'queryTimeout', :default => 0, :optional => true} } def struct_fields; FIELDS; end diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java index 4a83e38..ed52b4a 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIService.java +++ b/service/src/java/org/apache/hive/service/cli/CLIService.java @@ -248,33 +248,55 @@ public GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType getInfoType return infoValue; } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle, - * java.lang.String, java.util.Map) + /** + * Execute statement on the server. This is a blocking call. */ @Override public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, - Map confOverlay) - throws HiveSQLException { - OperationHandle opHandle = sessionManager.getSession(sessionHandle) - .executeStatement(statement, confOverlay); + Map confOverlay) throws HiveSQLException { + OperationHandle opHandle = + sessionManager.getSession(sessionHandle).executeStatement(statement, confOverlay); LOG.debug(sessionHandle + ": executeStatement()"); return opHandle; } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, - * java.lang.String, java.util.Map) + /** + * Execute statement on the server with a timeout. This is a blocking call. + */ + @Override + public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, + Map confOverlay, long queryTimeout) throws HiveSQLException { + OperationHandle opHandle = + sessionManager.getSession(sessionHandle).executeStatement(statement, confOverlay, + queryTimeout); + LOG.debug(sessionHandle + ": executeStatement()"); + return opHandle; + } + + /** + * Execute statement asynchronously on the server. This is a non-blocking call */ @Override public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, Map confOverlay) throws HiveSQLException { - OperationHandle opHandle = sessionManager.getSession(sessionHandle) - .executeStatementAsync(statement, confOverlay); + OperationHandle opHandle = + sessionManager.getSession(sessionHandle).executeStatementAsync(statement, confOverlay); LOG.debug(sessionHandle + ": executeStatementAsync()"); return opHandle; } + /** + * Execute statement asynchronously on the server with a timeout. This is a non-blocking call + */ + @Override + public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, + Map confOverlay, long queryTimeout) throws HiveSQLException { + OperationHandle opHandle = + sessionManager.getSession(sessionHandle).executeStatementAsync(statement, confOverlay, + queryTimeout); + LOG.debug(sessionHandle + ": executeStatementAsync()"); + return opHandle; + } /* (non-Javadoc) * @see org.apache.hive.service.cli.ICLIService#getTypeInfo(org.apache.hive.service.cli.SessionHandle) diff --git a/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java index 79e0024..86e9bb1 100644 --- a/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java +++ b/service/src/java/org/apache/hive/service/cli/EmbeddedCLIServiceClient.java @@ -67,26 +67,29 @@ public GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType getInfoType return cliService.getInfo(sessionHandle, getInfoType); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#executeStatement(org.apache.hive.service.cli.SessionHandle, - * java.lang.String, java.util.Map) - */ @Override public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, Map confOverlay) throws HiveSQLException { return cliService.executeStatement(sessionHandle, statement, confOverlay); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.CLIServiceClient#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, - * java.lang.String, java.util.Map) - */ + @Override + public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, + Map confOverlay, long queryTimeout) throws HiveSQLException { + return cliService.executeStatement(sessionHandle, statement, confOverlay, queryTimeout); + } + @Override public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, Map confOverlay) throws HiveSQLException { return cliService.executeStatementAsync(sessionHandle, statement, confOverlay); } + @Override + public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, + Map confOverlay, long queryTimeout) throws HiveSQLException { + return cliService.executeStatementAsync(sessionHandle, statement, confOverlay, queryTimeout); + } /* (non-Javadoc) * @see org.apache.hive.service.cli.CLIServiceClient#getTypeInfo(org.apache.hive.service.cli.SessionHandle) diff --git a/service/src/java/org/apache/hive/service/cli/ICLIService.java b/service/src/java/org/apache/hive/service/cli/ICLIService.java index e4aef96..fef772d 100644 --- a/service/src/java/org/apache/hive/service/cli/ICLIService.java +++ b/service/src/java/org/apache/hive/service/cli/ICLIService.java @@ -39,12 +39,16 @@ GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType infoType) throws HiveSQLException; OperationHandle executeStatement(SessionHandle sessionHandle, String statement, - Map confOverlay) - throws HiveSQLException; + Map confOverlay) throws HiveSQLException; - OperationHandle executeStatementAsync(SessionHandle sessionHandle, - String statement, Map confOverlay) - throws HiveSQLException; + OperationHandle executeStatement(SessionHandle sessionHandle, String statement, + Map confOverlay, long queryTimeout) throws HiveSQLException; + + OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, + Map confOverlay) throws HiveSQLException; + + OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, + Map confOverlay, long queryTimeout) throws HiveSQLException; OperationHandle getTypeInfo(SessionHandle sessionHandle) throws HiveSQLException; @@ -105,6 +109,4 @@ OperationHandle getCrossReference(SessionHandle sessionHandle, String primaryCatalog, String primarySchema, String primaryTable, String foreignCatalog, String foreignSchema, String foreignTable) throws HiveSQLException; - - } diff --git a/service/src/java/org/apache/hive/service/cli/OperationState.java b/service/src/java/org/apache/hive/service/cli/OperationState.java index 6a67a1d..ae1ff5e 100644 --- a/service/src/java/org/apache/hive/service/cli/OperationState.java +++ b/service/src/java/org/apache/hive/service/cli/OperationState.java @@ -32,7 +32,8 @@ CLOSED(TOperationState.CLOSED_STATE, true), ERROR(TOperationState.ERROR_STATE, true), UNKNOWN(TOperationState.UKNOWN_STATE, false), - PENDING(TOperationState.PENDING_STATE, false); + PENDING(TOperationState.PENDING_STATE, false), + TIMEDOUT(TOperationState.TIMEDOUT_STATE, true); private final TOperationState tOperationState; private final boolean terminal; @@ -57,6 +58,7 @@ public static void validateTransition(OperationState oldState, case RUNNING: case CANCELED: case CLOSED: + case TIMEDOUT: return; } break; @@ -67,6 +69,7 @@ public static void validateTransition(OperationState oldState, case CANCELED: case ERROR: case CLOSED: + case TIMEDOUT: return; } break; @@ -76,11 +79,13 @@ public static void validateTransition(OperationState oldState, case CANCELED: case ERROR: case CLOSED: + case TIMEDOUT: return; } break; case FINISHED: case CANCELED: + case TIMEDOUT: case ERROR: if (OperationState.CLOSED.equals(newState)) { return; diff --git a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java index b3d9b52..ff46ed8 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java @@ -39,9 +39,9 @@ public String getStatement() { return statement; } - public static ExecuteStatementOperation newExecuteStatementOperation( - HiveSession parentSession, String statement, Map confOverlay, boolean runAsync) - throws HiveSQLException { + public static ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, + String statement, Map confOverlay, boolean runAsync, long queryTimeout) + throws HiveSQLException { String[] tokens = statement.trim().split("\\s+"); CommandProcessor processor = null; try { @@ -50,7 +50,8 @@ public static ExecuteStatementOperation newExecuteStatementOperation( throw new HiveSQLException(e.getMessage(), e.getSQLState(), e); } if (processor == null) { - return new SQLOperation(parentSession, statement, confOverlay, runAsync); + // runAsync, queryTimeout makes sense only for a SQLOperation + return new SQLOperation(parentSession, statement, confOverlay, runAsync, queryTimeout); } return new HiveCommandOperation(parentSession, statement, processor, confOverlay); } diff --git a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java index f18dc67..8f08c2e 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java @@ -229,4 +229,9 @@ private void resetResultReader() { resultReader = null; } } + + @Override + public void cancel(OperationState stateAfterCancel) throws HiveSQLException { + throw new UnsupportedOperationException("HiveCommandOperation.cancel()"); + } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java b/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java index 77228fa..fd6e428 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java @@ -109,7 +109,7 @@ private String convertPattern(String pattern, boolean datanucleusFormat) { pattern = replaceAll(pattern, "^_", "."); return pattern; } - + private String replaceAll(String input, final String pattern, final String replace) { while (true) { String replaced = input.replaceAll(pattern, replace); @@ -145,4 +145,9 @@ protected void authorizeMetaGets(HiveOperationType opType, List confOverlay, boolean runAsync) - throws HiveSQLException { - ExecuteStatementOperation executeStatementOperation = ExecuteStatementOperation - .newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync); + String statement, Map confOverlay, boolean runAsync, long queryTimeout) + throws HiveSQLException { + ExecuteStatementOperation executeStatementOperation = + ExecuteStatementOperation.newExecuteStatementOperation(parentSession, statement, + confOverlay, runAsync, queryTimeout); addOperation(executeStatementOperation); return executeStatementOperation; } @@ -250,20 +249,20 @@ public OperationStatus getOperationStatus(OperationHandle opHandle) return getOperation(opHandle).getStatus(); } + /** + * Cancel the running operation unless it is already in a terminal state + * @param opHandle + * @throws HiveSQLException + */ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { Operation operation = getOperation(opHandle); OperationState opState = operation.getStatus().getState(); - if (opState == OperationState.CANCELED || - opState == OperationState.CLOSED || - opState == OperationState.FINISHED || - opState == OperationState.ERROR || - opState == OperationState.UNKNOWN) { + if (opState.isTerminal()) { // Cancel should be a no-op in either cases LOG.debug(opHandle + ": Operation is already aborted in state - " + opState); - } - else { + } else { LOG.debug(opHandle + ": Attempting to cancel from state - " + opState); - operation.cancel(); + operation.cancel(OperationState.CANCELED); } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 01dd48c..67e0e52 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -18,12 +18,24 @@ package org.apache.hive.service.cli.operation; -import java.io.*; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; import java.security.PrivilegedExceptionAction; import java.sql.SQLException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.codec.binary.Base64; @@ -84,9 +96,10 @@ private SerDe serde = null; private boolean fetchStarted = false; private volatile MetricsScope currentSQLStateScope; - - //Display for WebUI. + // Display for WebUI. private SQLOperationDisplay sqlOpDisplay; + private long queryTimeout; + private ScheduledExecutorService timeoutExecutor; /** * A map to track query count running by each user @@ -94,10 +107,11 @@ private static Map userQueries = new HashMap(); private static final String ACTIVE_SQL_USER = MetricsConstant.SQL_OPERATION_PREFIX + "active_user"; - public SQLOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runInBackground) { + public SQLOperation(HiveSession parentSession, String statement, Map confOverlay, + boolean runInBackground, long queryTimeout) { // TODO: call setRemoteUser in ExecuteStatementOperation or higher. super(parentSession, statement, confOverlay, runInBackground); + this.queryTimeout = queryTimeout; setupSessionIO(parentSession.getSessionState()); try { sqlOpDisplay = new SQLOperationDisplay(this); @@ -121,7 +135,7 @@ private void setupSessionIO(SessionState sessionState) { } } - /*** + /** * Compile the query and extract metadata * @param sqlOperationConf * @throws HiveSQLException @@ -130,6 +144,29 @@ public void prepare(QueryState queryState) throws HiveSQLException { setState(OperationState.RUNNING); try { driver = new Driver(queryState, getParentSession().getUserName()); + + // Start the timer thread for cancelling the query when query timeout is reached + // queryTimeout == 0 means no timeout + if (queryTimeout > 0) { + timeoutExecutor = new ScheduledThreadPoolExecutor(1); + Runnable timeoutTask = new Runnable() { + @Override + public void run() { + try { + LOG.info("Query timed out after: " + queryTimeout + + " seconds. Cancelling the execution now."); + SQLOperation.this.cancel(OperationState.TIMEDOUT); + } catch (HiveSQLException e) { + LOG.error("Error cancelling the query after timeout: " + queryTimeout + " seconds", e); + } finally { + // Stop + timeoutExecutor.shutdown(); + } + } + }; + timeoutExecutor.schedule(timeoutTask, queryTimeout, TimeUnit.SECONDS); + } + sqlOpDisplay.setQueryDisplay(driver.getQueryDisplay()); // set the operation handle information in Driver, so that thrift API users @@ -184,6 +221,13 @@ public void prepare(QueryState queryState) throws HiveSQLException { private void runQuery() throws HiveSQLException { try { + OperationState opState = getStatus().getState(); + // Operation may have been cancelled by another thread + if (opState.isTerminal()) { + LOG.info("Not running the query. Operation is already in terminal state: " + opState + + ", perhaps cancelled due to query timeout or by another thread."); + return; + } // In Hive server mode, we are not able to retry in the FetchTask // case, when calling fetch queries since execute() has returned. // For now, we disable the test attempts. @@ -193,14 +237,16 @@ private void runQuery() throws HiveSQLException { throw toSQLException("Error while processing statement", response); } } catch (HiveSQLException e) { - // If the operation was cancelled by another thread, - // Driver#run will return a non-zero response code. - // We will simply return if the operation state is CANCELED, - // otherwise throw an exception - if (getStatus().getState() == OperationState.CANCELED) { + /** + * If the operation was cancelled by another thread, or the execution timed out, Driver#run + * may return a non-zero response code. We will simply return if the operation state is + * CANCELED, TIMEDOUT or CLOSED, otherwise throw an exception + */ + if ((getStatus().getState() == OperationState.CANCELED) + || (getStatus().getState() == OperationState.TIMEDOUT) + || (getStatus().getState() == OperationState.CLOSED)) { return; - } - else { + } else { setState(OperationState.ERROR); throw e; } @@ -312,8 +358,22 @@ private void registerCurrentOperationLog() { } } - private void cleanup(OperationState state) throws HiveSQLException { + private synchronized void cleanup(OperationState state) throws HiveSQLException { setState(state); + if (driver != null) { + driver.close(); + driver.destroy(); + } + driver = null; + + SessionState ss = SessionState.get(); + if (ss == null) { + LOG.warn("Operation seems to be in invalid state, SessionState is null"); + } else { + ss.deleteTmpOutputFile(); + ss.deleteTmpErrOutputFile(); + } + if (shouldRunAsync()) { Future backgroundHandle = getBackgroundHandle(); if (backgroundHandle != null) { @@ -321,20 +381,16 @@ private void cleanup(OperationState state) throws HiveSQLException { } } - if (driver != null) { - driver.close(); - driver.destroy(); + // Shutdown the timeout thread if any, while closing this operation + if ((timeoutExecutor != null) && (state != OperationState.TIMEDOUT) && (state.isTerminal())) { + timeoutExecutor.shutdownNow(); } - driver = null; - - SessionState ss = SessionState.get(); - ss.deleteTmpOutputFile(); - ss.deleteTmpErrOutputFile(); } @Override - public void cancel() throws HiveSQLException { - cleanup(OperationState.CANCELED); + public void cancel(OperationState stateAfterCancel) throws HiveSQLException { + cleanup(stateAfterCancel); + cleanupOperationLog(); } @Override diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java index 9ea643b..78ff388 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java @@ -56,18 +56,38 @@ * @return * @throws HiveSQLException */ - OperationHandle executeStatement(String statement, - Map confOverlay) throws HiveSQLException; + OperationHandle executeStatement(String statement, Map confOverlay) throws HiveSQLException; /** * execute operation handler * @param statement * @param confOverlay + * @param queryTimeout * @return * @throws HiveSQLException */ - OperationHandle executeStatementAsync(String statement, - Map confOverlay) throws HiveSQLException; + OperationHandle executeStatement(String statement, Map confOverlay, + long queryTimeout) throws HiveSQLException; + + /** + * execute operation handler + * @param statement + * @param confOverlay + * @return + * @throws HiveSQLException + */ + OperationHandle executeStatementAsync(String statement, Map confOverlay) throws HiveSQLException; + + /** + * execute operation handler + * @param statement + * @param confOverlay + * @param queryTimeout + * @return + * @throws HiveSQLException + */ + OperationHandle executeStatementAsync(String statement, Map confOverlay, + long queryTimeout) throws HiveSQLException; /** * getTypeInfo operation handler diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 0cfec7a..a0015eb 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -204,7 +204,7 @@ protected int processCmd(String cmd) { OperationHandle opHandle = null; try { //execute in sync mode - opHandle = executeStatementInternal(cmd_trimed, null, false); + opHandle = executeStatementInternal(cmd_trimed, null, false, 0); } catch (HiveSQLException e) { LOG.warn("Failed to execute command in global .hiverc file.", e); return -1; @@ -426,33 +426,43 @@ public GetInfoValue getInfo(GetInfoType getInfoType) } @Override - public OperationHandle executeStatement(String statement, Map confOverlay) - throws HiveSQLException { - return executeStatementInternal(statement, confOverlay, false); + public OperationHandle executeStatement(String statement, Map confOverlay) throws HiveSQLException { + return executeStatementInternal(statement, confOverlay, false, 0); } @Override - public OperationHandle executeStatementAsync(String statement, Map confOverlay) - throws HiveSQLException { - return executeStatementInternal(statement, confOverlay, true); + public OperationHandle executeStatement(String statement, Map confOverlay, + long queryTimeout) throws HiveSQLException { + return executeStatementInternal(statement, confOverlay, false, queryTimeout); } - private OperationHandle executeStatementInternal(String statement, Map confOverlay, - boolean runAsync) - throws HiveSQLException { + @Override + public OperationHandle executeStatementAsync(String statement, Map confOverlay) throws HiveSQLException { + return executeStatementInternal(statement, confOverlay, true, 0); + } + + @Override + public OperationHandle executeStatementAsync(String statement, Map confOverlay, + long queryTimeout) throws HiveSQLException { + return executeStatementInternal(statement, confOverlay, true, queryTimeout); + } + + private OperationHandle executeStatementInternal(String statement, + Map confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException { acquire(true); OperationManager operationManager = getOperationManager(); - ExecuteStatementOperation operation = operationManager - .newExecuteStatementOperation(getSession(), statement, confOverlay, runAsync); + ExecuteStatementOperation operation = + operationManager.newExecuteStatementOperation(getSession(), statement, confOverlay, + runAsync, queryTimeout); OperationHandle opHandle = operation.getHandle(); try { operation.run(); addOpHandle(opHandle); return opHandle; } catch (HiveSQLException e) { - // Refering to SQLOperation.java,there is no chance that a HiveSQLException throws and the asyn - // background operation submits to thread pool successfully at the same time. So, Cleanup + // Refering to SQLOperation.java, there is no chance that a HiveSQLException throws and the + // async background operation submits to thread pool successfully at the same time. So, Cleanup // opHandle directly when got HiveSQLException operationManager.closeOperation(opHandle); throw e; diff --git a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java index b2e0e9e..933750b 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java @@ -126,20 +126,30 @@ public GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType getInfoType } @Override - public OperationHandle executeStatement(SessionHandle sessionHandle, - String statement, - Map confOverlay) throws HiveSQLException { + public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, + Map confOverlay) throws HiveSQLException { return cliService.executeStatement(sessionHandle, statement, confOverlay); } @Override - public OperationHandle executeStatementAsync(SessionHandle sessionHandle, - String statement, - Map confOverlay) throws HiveSQLException { + public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, + Map confOverlay, long queryTimeout) throws HiveSQLException { + return cliService.executeStatement(sessionHandle, statement, confOverlay, queryTimeout); + } + + @Override + public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, + Map confOverlay) throws HiveSQLException { return cliService.executeStatementAsync(sessionHandle, statement, confOverlay); } @Override + public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, + Map confOverlay, long queryTimeout) throws HiveSQLException { + return cliService.executeStatementAsync(sessionHandle, statement, confOverlay, queryTimeout); + } + + @Override public OperationHandle getTypeInfo(SessionHandle sessionHandle) throws HiveSQLException { return cliService.getTypeInfo(sessionHandle); } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 6ede1d7..5464e58 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -493,15 +493,17 @@ public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws T String statement = req.getStatement(); Map confOverlay = req.getConfOverlay(); Boolean runAsync = req.isRunAsync(); - OperationHandle operationHandle = runAsync ? - cliService.executeStatementAsync(sessionHandle, statement, confOverlay) - : cliService.executeStatement(sessionHandle, statement, confOverlay); - resp.setOperationHandle(operationHandle.toTOperationHandle()); - resp.setStatus(OK_STATUS); + long queryTimeout = req.getQueryTimeout(); + OperationHandle operationHandle = + runAsync ? cliService.executeStatementAsync(sessionHandle, statement, confOverlay, + queryTimeout) : cliService.executeStatement(sessionHandle, statement, confOverlay, + queryTimeout); + resp.setOperationHandle(operationHandle.toTOperationHandle()); + resp.setStatus(OK_STATUS); } catch (Exception e) { // Note: it's rather important that this (and other methods) catch Exception, not Throwable; - // in combination with HiveSessionProxy.invoke code, perhaps unintentionally, it used - // to also catch all errors; and now it allows OOMs only to propagate. + // in combination with HiveSessionProxy.invoke code, perhaps unintentionally, it used + // to also catch all errors; and now it allows OOMs only to propagate. LOG.warn("Error executing statement: ", e); resp.setStatus(HiveSQLException.toTStatus(e)); } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java index 098aea6..82ac42d 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java @@ -166,34 +166,38 @@ public GetInfoValue getInfo(SessionHandle sessionHandle, GetInfoType infoType) } } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#executeStatement(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map) - */ @Override public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, - Map confOverlay) - throws HiveSQLException { - return executeStatementInternal(sessionHandle, statement, confOverlay, false); + Map confOverlay) throws HiveSQLException { + return executeStatementInternal(sessionHandle, statement, confOverlay, false, 0); + } + + @Override + public OperationHandle executeStatement(SessionHandle sessionHandle, String statement, + Map confOverlay, long queryTimeout) throws HiveSQLException { + return executeStatementInternal(sessionHandle, statement, confOverlay, false, queryTimeout); } - /* (non-Javadoc) - * @see org.apache.hive.service.cli.ICLIService#executeStatementAsync(org.apache.hive.service.cli.SessionHandle, java.lang.String, java.util.Map) - */ @Override public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, - Map confOverlay) - throws HiveSQLException { - return executeStatementInternal(sessionHandle, statement, confOverlay, true); + Map confOverlay) throws HiveSQLException { + return executeStatementInternal(sessionHandle, statement, confOverlay, true, 0); + } + + @Override + public OperationHandle executeStatementAsync(SessionHandle sessionHandle, String statement, + Map confOverlay, long queryTimeout) throws HiveSQLException { + return executeStatementInternal(sessionHandle, statement, confOverlay, true, queryTimeout); } private OperationHandle executeStatementInternal(SessionHandle sessionHandle, String statement, - Map confOverlay, boolean isAsync) - throws HiveSQLException { + Map confOverlay, boolean isAsync, long queryTimeout) throws HiveSQLException { try { TExecuteStatementReq req = new TExecuteStatementReq(sessionHandle.toTSessionHandle(), statement); req.setConfOverlay(confOverlay); req.setRunAsync(isAsync); + req.setQueryTimeout(queryTimeout); TExecuteStatementResp resp = cliService.ExecuteStatement(req); checkStatus(resp.getStatus()); TProtocolVersion protocol = sessionHandle.getProtocolVersion(); diff --git a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java index 1740079..abb1ecf 100644 --- a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java @@ -178,8 +178,7 @@ public void testExecuteStatement() throws Exception { // Execute another query queryString = "SELECT ID+1 FROM TEST_EXEC_THRIFT"; - OperationHandle opHandle = client.executeStatement(sessHandle, - queryString, opConf); + OperationHandle opHandle = client.executeStatement(sessHandle, queryString, opConf); assertNotNull(opHandle); OperationStatus opStatus = client.getOperationStatus(opHandle); @@ -229,8 +228,7 @@ public void testExecuteStatementAsync() throws Exception { // Execute another query queryString = "SELECT ID+1 FROM TEST_EXEC_ASYNC_THRIFT"; System.out.println("Will attempt to execute: " + queryString); - opHandle = client.executeStatementAsync(sessHandle, - queryString, opConf); + opHandle = client.executeStatementAsync(sessHandle, queryString, opConf); assertNotNull(opHandle); // Poll on the operation status till the query is completed diff --git a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java index a1ef1fc..ab20c4c 100644 --- a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java +++ b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java @@ -200,8 +200,7 @@ public void testExecuteStatement() throws Exception { // Execute another query queryString = "SELECT ID+1 FROM TEST_EXEC_THRIFT"; - OperationHandle opHandle = client.executeStatement(sessHandle, - queryString, opConf); + OperationHandle opHandle = client.executeStatement(sessHandle, queryString, opConf); assertNotNull(opHandle); OperationStatus opStatus = client.getOperationStatus(opHandle);