diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 29e6315..88c9de4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1140,7 +1140,7 @@ public CommandProcessorResponse compileAndRespond(String command) { private int compileInternal(String command) { int ret; final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled, - command); + command); if (compileLock == null) { return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode(); } @@ -1176,8 +1176,8 @@ private ReentrantLock tryAcquireCompileLock(boolean isParallelEnabled, final ReentrantLock compileLock = isParallelEnabled ? SessionState.get().getCompileLock() : globalCompileLock; long maxCompileLockWaitTime = HiveConf.getTimeVar( - this.conf, ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, - TimeUnit.SECONDS); + this.conf, ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, + TimeUnit.SECONDS); if (maxCompileLockWaitTime > 0) { try { if (LOG.isDebugEnabled()) { @@ -1932,4 +1932,36 @@ public void setOperationId(String opId) { this.operationId = opId; } + public List getTaskStatuses() { + if (plan == null) { + return null; + } + List> tasks = new ArrayList>(); + tasks.addAll(plan.getRootTasks()); + for (int i = 0; i < tasks.size(); i++) { + Task tsk = tasks.get(i); + if (tsk.getDependentTasks() != null) { + tasks.addAll(tsk.getDependentTasks()); + } + } + // add backup tasks if any + if (driverCxt != null) { + try { + Task runnable; + while ((runnable = driverCxt.getRunnable(maxthreads)) != null) { + if (!tasks.contains(runnable)) { + tasks.add(runnable); + } + } + } catch (HiveException e) { + console.printError("FAILED: Hive Internal Error: " + Utilities.getNameMessage(e) + "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + } + List statuses = new ArrayList(tasks.size()); + for (Task tsk : tasks) { + statuses.add(new TaskStatus(tsk.getId(), tsk.getExternalHandle(), tsk.getTaskState())); + } + return statuses; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/TaskStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/TaskStatus.java new file mode 100644 index 0000000..873317e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/TaskStatus.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.Task.TaskState; + +public class TaskStatus { + private String taskId; + private String externalHandle; + private Task.TaskState taskState; + + public TaskStatus(String taskId, String externalHandle, Task.TaskState state) { + this.taskId = taskId; + this.externalHandle = externalHandle; + this.taskState = state; + } + + public TaskStatus() { + + } + + public TaskStatus(String taskId, Task.TaskState state) { + this(taskId, null, state); + } + + public String getTaskId() { + return taskId; + } + + public String getExternalHandle() { + return externalHandle; + } + + public String getTaskState() { + return taskState.toString(); + } + + @Override + public String toString() { + return taskId + "/" + externalHandle + "/" + taskState; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public void setExternalHandle(String externalHandle) { + this.externalHandle = externalHandle; + } + + public void setTaskState(String taskState) { + if (taskState == null) { + this.taskState = TaskState.UNKNOWN_STATE; + } else { + this.taskState = TaskState.valueOf(taskState.toUpperCase()); + } + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 0eab63e..6558992 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -94,6 +94,19 @@ DYNAMIC_PARTITIONS, // list of dynamic partitions } + public static enum TaskState { + // Task data structures have been initialized + INITIALIZED_STATE, + // Task has been queued for execution by the driver + QUEUED_STATE, + // Task is currently running + RUNNING_STATE, + // Task has completed + FINISHED_STATE, + // Task state is unkown + UNKNOWN_STATE + } + // Bean methods protected boolean rootTask; @@ -375,6 +388,30 @@ public String getId() { return id; } + public String getExternalHandle() { + return getId(); + } + + public TaskState getTaskState() { + if (done()) { + return TaskState.FINISHED_STATE; + } + + if (started()) { + return TaskState.RUNNING_STATE; + } + + if (getInitialized()) { + return TaskState.INITIALIZED_STATE; + } + + if (getQueued()) { + return TaskState.QUEUED_STATE; + } + + return TaskState.UNKNOWN_STATE; + } + public boolean isMapRedTask() { return false; } @@ -556,4 +593,6 @@ public int hashCode() { public boolean equals(Object obj) { return toString().equals(String.valueOf(obj)); } + + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index ab7fd93..0d6ed35 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -428,6 +428,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); + this.jobID = rj.getJobID(); returnVal = jobExecHelper.progress(rj, jc, ctx.getHiveTxnManager()); success = (returnVal == 0); @@ -847,5 +848,10 @@ public void shutdown() { rj = null; } } + + @Override + public String getExternalHandle() { + return this.jobID; + } } diff --git a/service/if/TCLIService.thrift b/service/if/TCLIService.thrift index baf583f..fc5abdb 100644 --- a/service/if/TCLIService.thrift +++ b/service/if/TCLIService.thrift @@ -977,6 +977,15 @@ struct TGetOperationStatusResp { // Error message 5: optional string errorMessage + + // List of statuses of sub tasks + 6: optional string taskStatus + + // When was the operation started + 7: optional i64 operationStarted + // When was the operation completed + 8: optional i64 operationCompleted + } diff --git a/service/src/gen/thrift/gen-cpp/TCLIService_types.cpp b/service/src/gen/thrift/gen-cpp/TCLIService_types.cpp index b852379..9047bed 100644 --- a/service/src/gen/thrift/gen-cpp/TCLIService_types.cpp +++ b/service/src/gen/thrift/gen-cpp/TCLIService_types.cpp @@ -7612,6 +7612,21 @@ void TGetOperationStatusResp::__set_errorMessage(const std::string& val) { __isset.errorMessage = true; } +void TGetOperationStatusResp::__set_taskStatus(const std::string& val) { + this->taskStatus = val; +__isset.taskStatus = true; +} + +void TGetOperationStatusResp::__set_operationStarted(const int64_t val) { + this->operationStarted = val; +__isset.operationStarted = true; +} + +void TGetOperationStatusResp::__set_operationCompleted(const int64_t val) { + this->operationCompleted = val; +__isset.operationCompleted = true; +} + uint32_t TGetOperationStatusResp::read(::apache::thrift::protocol::TProtocol* iprot) { apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); @@ -7676,6 +7691,30 @@ uint32_t TGetOperationStatusResp::read(::apache::thrift::protocol::TProtocol* ip xfer += iprot->skip(ftype); } break; + case 6: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->taskStatus); + this->__isset.taskStatus = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 7: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->operationStarted); + this->__isset.operationStarted = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 8: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->operationCompleted); + this->__isset.operationCompleted = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -7719,6 +7758,21 @@ uint32_t TGetOperationStatusResp::write(::apache::thrift::protocol::TProtocol* o xfer += oprot->writeString(this->errorMessage); xfer += oprot->writeFieldEnd(); } + if (this->__isset.taskStatus) { + xfer += oprot->writeFieldBegin("taskStatus", ::apache::thrift::protocol::T_STRING, 6); + xfer += oprot->writeString(this->taskStatus); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.operationStarted) { + xfer += oprot->writeFieldBegin("operationStarted", ::apache::thrift::protocol::T_I64, 7); + xfer += oprot->writeI64(this->operationStarted); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.operationCompleted) { + xfer += oprot->writeFieldBegin("operationCompleted", ::apache::thrift::protocol::T_I64, 8); + xfer += oprot->writeI64(this->operationCompleted); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -7731,6 +7785,9 @@ void swap(TGetOperationStatusResp &a, TGetOperationStatusResp &b) { swap(a.sqlState, b.sqlState); swap(a.errorCode, b.errorCode); swap(a.errorMessage, b.errorMessage); + swap(a.taskStatus, b.taskStatus); + swap(a.operationStarted, b.operationStarted); + swap(a.operationCompleted, b.operationCompleted); swap(a.__isset, b.__isset); } @@ -7740,6 +7797,9 @@ TGetOperationStatusResp::TGetOperationStatusResp(const TGetOperationStatusResp& sqlState = other263.sqlState; errorCode = other263.errorCode; errorMessage = other263.errorMessage; + taskStatus = other263.taskStatus; + operationStarted = other263.operationStarted; + operationCompleted = other263.operationCompleted; __isset = other263.__isset; } TGetOperationStatusResp& TGetOperationStatusResp::operator=(const TGetOperationStatusResp& other264) { @@ -7748,6 +7808,9 @@ TGetOperationStatusResp& TGetOperationStatusResp::operator=(const TGetOperationS sqlState = other264.sqlState; errorCode = other264.errorCode; errorMessage = other264.errorMessage; + taskStatus = other264.taskStatus; + operationStarted = other264.operationStarted; + operationCompleted = other264.operationCompleted; __isset = other264.__isset; return *this; } @@ -7759,6 +7822,9 @@ void TGetOperationStatusResp::printTo(std::ostream& out) const { out << ", " << "sqlState="; (__isset.sqlState ? (out << to_string(sqlState)) : (out << "")); out << ", " << "errorCode="; (__isset.errorCode ? (out << to_string(errorCode)) : (out << "")); out << ", " << "errorMessage="; (__isset.errorMessage ? (out << to_string(errorMessage)) : (out << "")); + out << ", " << "taskStatus="; (__isset.taskStatus ? (out << to_string(taskStatus)) : (out << "")); + out << ", " << "operationStarted="; (__isset.operationStarted ? (out << to_string(operationStarted)) : (out << "")); + out << ", " << "operationCompleted="; (__isset.operationCompleted ? (out << to_string(operationCompleted)) : (out << "")); out << ")"; } diff --git a/service/src/gen/thrift/gen-cpp/TCLIService_types.h b/service/src/gen/thrift/gen-cpp/TCLIService_types.h index b078c99..016552f 100644 --- a/service/src/gen/thrift/gen-cpp/TCLIService_types.h +++ b/service/src/gen/thrift/gen-cpp/TCLIService_types.h @@ -3408,11 +3408,14 @@ inline std::ostream& operator<<(std::ostream& out, const TGetOperationStatusReq& } typedef struct _TGetOperationStatusResp__isset { - _TGetOperationStatusResp__isset() : operationState(false), sqlState(false), errorCode(false), errorMessage(false) {} + _TGetOperationStatusResp__isset() : operationState(false), sqlState(false), errorCode(false), errorMessage(false), taskStatus(false), operationStarted(false), operationCompleted(false) {} bool operationState :1; bool sqlState :1; bool errorCode :1; bool errorMessage :1; + bool taskStatus :1; + bool operationStarted :1; + bool operationCompleted :1; } _TGetOperationStatusResp__isset; class TGetOperationStatusResp { @@ -3420,7 +3423,7 @@ class TGetOperationStatusResp { TGetOperationStatusResp(const TGetOperationStatusResp&); TGetOperationStatusResp& operator=(const TGetOperationStatusResp&); - TGetOperationStatusResp() : operationState((TOperationState::type)0), sqlState(), errorCode(0), errorMessage() { + TGetOperationStatusResp() : operationState((TOperationState::type)0), sqlState(), errorCode(0), errorMessage(), taskStatus(), operationStarted(0), operationCompleted(0) { } virtual ~TGetOperationStatusResp() throw(); @@ -3429,6 +3432,9 @@ class TGetOperationStatusResp { std::string sqlState; int32_t errorCode; std::string errorMessage; + std::string taskStatus; + int64_t operationStarted; + int64_t operationCompleted; _TGetOperationStatusResp__isset __isset; @@ -3442,6 +3448,12 @@ class TGetOperationStatusResp { void __set_errorMessage(const std::string& val); + void __set_taskStatus(const std::string& val); + + void __set_operationStarted(const int64_t val); + + void __set_operationCompleted(const int64_t val); + bool operator == (const TGetOperationStatusResp & rhs) const { if (!(status == rhs.status)) @@ -3462,6 +3474,18 @@ class TGetOperationStatusResp { return false; else if (__isset.errorMessage && !(errorMessage == rhs.errorMessage)) return false; + if (__isset.taskStatus != rhs.__isset.taskStatus) + return false; + else if (__isset.taskStatus && !(taskStatus == rhs.taskStatus)) + return false; + if (__isset.operationStarted != rhs.__isset.operationStarted) + return false; + else if (__isset.operationStarted && !(operationStarted == rhs.operationStarted)) + return false; + if (__isset.operationCompleted != rhs.__isset.operationCompleted) + return false; + else if (__isset.operationCompleted && !(operationCompleted == rhs.operationCompleted)) + return false; return true; } bool operator != (const TGetOperationStatusResp &rhs) const { diff --git a/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TGetOperationStatusResp.java b/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TGetOperationStatusResp.java index 99c2409..5cbc006 100644 --- a/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TGetOperationStatusResp.java +++ b/service/src/gen/thrift/gen-javabean/org/apache/hive/service/cli/thrift/TGetOperationStatusResp.java @@ -43,6 +43,9 @@ private static final org.apache.thrift.protocol.TField SQL_STATE_FIELD_DESC = new org.apache.thrift.protocol.TField("sqlState", org.apache.thrift.protocol.TType.STRING, (short)3); private static final org.apache.thrift.protocol.TField ERROR_CODE_FIELD_DESC = new org.apache.thrift.protocol.TField("errorCode", org.apache.thrift.protocol.TType.I32, (short)4); private static final org.apache.thrift.protocol.TField ERROR_MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("errorMessage", org.apache.thrift.protocol.TType.STRING, (short)5); + private static final org.apache.thrift.protocol.TField TASK_STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("taskStatus", org.apache.thrift.protocol.TType.STRING, (short)6); + private static final org.apache.thrift.protocol.TField OPERATION_STARTED_FIELD_DESC = new org.apache.thrift.protocol.TField("operationStarted", org.apache.thrift.protocol.TType.I64, (short)7); + private static final org.apache.thrift.protocol.TField OPERATION_COMPLETED_FIELD_DESC = new org.apache.thrift.protocol.TField("operationCompleted", org.apache.thrift.protocol.TType.I64, (short)8); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -55,6 +58,9 @@ private String sqlState; // optional private int errorCode; // optional private String errorMessage; // optional + private String taskStatus; // optional + private long operationStarted; // optional + private long operationCompleted; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -66,7 +72,10 @@ OPERATION_STATE((short)2, "operationState"), SQL_STATE((short)3, "sqlState"), ERROR_CODE((short)4, "errorCode"), - ERROR_MESSAGE((short)5, "errorMessage"); + ERROR_MESSAGE((short)5, "errorMessage"), + TASK_STATUS((short)6, "taskStatus"), + OPERATION_STARTED((short)7, "operationStarted"), + OPERATION_COMPLETED((short)8, "operationCompleted"); private static final Map byName = new HashMap(); @@ -91,6 +100,12 @@ public static _Fields findByThriftId(int fieldId) { return ERROR_CODE; case 5: // ERROR_MESSAGE return ERROR_MESSAGE; + case 6: // TASK_STATUS + return TASK_STATUS; + case 7: // OPERATION_STARTED + return OPERATION_STARTED; + case 8: // OPERATION_COMPLETED + return OPERATION_COMPLETED; default: return null; } @@ -132,8 +147,10 @@ public String getFieldName() { // isset id assignments private static final int __ERRORCODE_ISSET_ID = 0; + private static final int __OPERATIONSTARTED_ISSET_ID = 1; + private static final int __OPERATIONCOMPLETED_ISSET_ID = 2; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.OPERATION_STATE,_Fields.SQL_STATE,_Fields.ERROR_CODE,_Fields.ERROR_MESSAGE}; + private static final _Fields optionals[] = {_Fields.OPERATION_STATE,_Fields.SQL_STATE,_Fields.ERROR_CODE,_Fields.ERROR_MESSAGE,_Fields.TASK_STATUS,_Fields.OPERATION_STARTED,_Fields.OPERATION_COMPLETED}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -147,6 +164,12 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.ERROR_MESSAGE, new org.apache.thrift.meta_data.FieldMetaData("errorMessage", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.TASK_STATUS, new org.apache.thrift.meta_data.FieldMetaData("taskStatus", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.OPERATION_STARTED, new org.apache.thrift.meta_data.FieldMetaData("operationStarted", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.OPERATION_COMPLETED, new org.apache.thrift.meta_data.FieldMetaData("operationCompleted", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TGetOperationStatusResp.class, metaDataMap); } @@ -179,6 +202,11 @@ public TGetOperationStatusResp(TGetOperationStatusResp other) { if (other.isSetErrorMessage()) { this.errorMessage = other.errorMessage; } + if (other.isSetTaskStatus()) { + this.taskStatus = other.taskStatus; + } + this.operationStarted = other.operationStarted; + this.operationCompleted = other.operationCompleted; } public TGetOperationStatusResp deepCopy() { @@ -193,6 +221,11 @@ public void clear() { setErrorCodeIsSet(false); this.errorCode = 0; this.errorMessage = null; + this.taskStatus = null; + setOperationStartedIsSet(false); + this.operationStarted = 0; + setOperationCompletedIsSet(false); + this.operationCompleted = 0; } public TStatus getStatus() { @@ -317,6 +350,73 @@ public void setErrorMessageIsSet(boolean value) { } } + public String getTaskStatus() { + return this.taskStatus; + } + + public void setTaskStatus(String taskStatus) { + this.taskStatus = taskStatus; + } + + public void unsetTaskStatus() { + this.taskStatus = null; + } + + /** Returns true if field taskStatus is set (has been assigned a value) and false otherwise */ + public boolean isSetTaskStatus() { + return this.taskStatus != null; + } + + public void setTaskStatusIsSet(boolean value) { + if (!value) { + this.taskStatus = null; + } + } + + public long getOperationStarted() { + return this.operationStarted; + } + + public void setOperationStarted(long operationStarted) { + this.operationStarted = operationStarted; + setOperationStartedIsSet(true); + } + + public void unsetOperationStarted() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __OPERATIONSTARTED_ISSET_ID); + } + + /** Returns true if field operationStarted is set (has been assigned a value) and false otherwise */ + public boolean isSetOperationStarted() { + return EncodingUtils.testBit(__isset_bitfield, __OPERATIONSTARTED_ISSET_ID); + } + + public void setOperationStartedIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __OPERATIONSTARTED_ISSET_ID, value); + } + + public long getOperationCompleted() { + return this.operationCompleted; + } + + public void setOperationCompleted(long operationCompleted) { + this.operationCompleted = operationCompleted; + setOperationCompletedIsSet(true); + } + + public void unsetOperationCompleted() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __OPERATIONCOMPLETED_ISSET_ID); + } + + /** Returns true if field operationCompleted is set (has been assigned a value) and false otherwise */ + public boolean isSetOperationCompleted() { + return EncodingUtils.testBit(__isset_bitfield, __OPERATIONCOMPLETED_ISSET_ID); + } + + public void setOperationCompletedIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __OPERATIONCOMPLETED_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case STATUS: @@ -359,6 +459,30 @@ public void setFieldValue(_Fields field, Object value) { } break; + case TASK_STATUS: + if (value == null) { + unsetTaskStatus(); + } else { + setTaskStatus((String)value); + } + break; + + case OPERATION_STARTED: + if (value == null) { + unsetOperationStarted(); + } else { + setOperationStarted((Long)value); + } + break; + + case OPERATION_COMPLETED: + if (value == null) { + unsetOperationCompleted(); + } else { + setOperationCompleted((Long)value); + } + break; + } } @@ -379,6 +503,15 @@ public Object getFieldValue(_Fields field) { case ERROR_MESSAGE: return getErrorMessage(); + case TASK_STATUS: + return getTaskStatus(); + + case OPERATION_STARTED: + return getOperationStarted(); + + case OPERATION_COMPLETED: + return getOperationCompleted(); + } throw new IllegalStateException(); } @@ -400,6 +533,12 @@ public boolean isSet(_Fields field) { return isSetErrorCode(); case ERROR_MESSAGE: return isSetErrorMessage(); + case TASK_STATUS: + return isSetTaskStatus(); + case OPERATION_STARTED: + return isSetOperationStarted(); + case OPERATION_COMPLETED: + return isSetOperationCompleted(); } throw new IllegalStateException(); } @@ -462,6 +601,33 @@ public boolean equals(TGetOperationStatusResp that) { return false; } + boolean this_present_taskStatus = true && this.isSetTaskStatus(); + boolean that_present_taskStatus = true && that.isSetTaskStatus(); + if (this_present_taskStatus || that_present_taskStatus) { + if (!(this_present_taskStatus && that_present_taskStatus)) + return false; + if (!this.taskStatus.equals(that.taskStatus)) + return false; + } + + boolean this_present_operationStarted = true && this.isSetOperationStarted(); + boolean that_present_operationStarted = true && that.isSetOperationStarted(); + if (this_present_operationStarted || that_present_operationStarted) { + if (!(this_present_operationStarted && that_present_operationStarted)) + return false; + if (this.operationStarted != that.operationStarted) + return false; + } + + boolean this_present_operationCompleted = true && this.isSetOperationCompleted(); + boolean that_present_operationCompleted = true && that.isSetOperationCompleted(); + if (this_present_operationCompleted || that_present_operationCompleted) { + if (!(this_present_operationCompleted && that_present_operationCompleted)) + return false; + if (this.operationCompleted != that.operationCompleted) + return false; + } + return true; } @@ -494,6 +660,21 @@ public int hashCode() { if (present_errorMessage) list.add(errorMessage); + boolean present_taskStatus = true && (isSetTaskStatus()); + list.add(present_taskStatus); + if (present_taskStatus) + list.add(taskStatus); + + boolean present_operationStarted = true && (isSetOperationStarted()); + list.add(present_operationStarted); + if (present_operationStarted) + list.add(operationStarted); + + boolean present_operationCompleted = true && (isSetOperationCompleted()); + list.add(present_operationCompleted); + if (present_operationCompleted) + list.add(operationCompleted); + return list.hashCode(); } @@ -555,6 +736,36 @@ public int compareTo(TGetOperationStatusResp other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetTaskStatus()).compareTo(other.isSetTaskStatus()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTaskStatus()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.taskStatus, other.taskStatus); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetOperationStarted()).compareTo(other.isSetOperationStarted()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetOperationStarted()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.operationStarted, other.operationStarted); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetOperationCompleted()).compareTo(other.isSetOperationCompleted()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetOperationCompleted()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.operationCompleted, other.operationCompleted); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -618,6 +829,28 @@ public String toString() { } first = false; } + if (isSetTaskStatus()) { + if (!first) sb.append(", "); + sb.append("taskStatus:"); + if (this.taskStatus == null) { + sb.append("null"); + } else { + sb.append(this.taskStatus); + } + first = false; + } + if (isSetOperationStarted()) { + if (!first) sb.append(", "); + sb.append("operationStarted:"); + sb.append(this.operationStarted); + first = false; + } + if (isSetOperationCompleted()) { + if (!first) sb.append(", "); + sb.append("operationCompleted:"); + sb.append(this.operationCompleted); + first = false; + } sb.append(")"); return sb.toString(); } @@ -711,6 +944,30 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, TGetOperationStatus org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 6: // TASK_STATUS + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.taskStatus = iprot.readString(); + struct.setTaskStatusIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 7: // OPERATION_STARTED + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.operationStarted = iprot.readI64(); + struct.setOperationStartedIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 8: // OPERATION_COMPLETED + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.operationCompleted = iprot.readI64(); + struct.setOperationCompletedIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -755,6 +1012,23 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TGetOperationStatu oprot.writeFieldEnd(); } } + if (struct.taskStatus != null) { + if (struct.isSetTaskStatus()) { + oprot.writeFieldBegin(TASK_STATUS_FIELD_DESC); + oprot.writeString(struct.taskStatus); + oprot.writeFieldEnd(); + } + } + if (struct.isSetOperationStarted()) { + oprot.writeFieldBegin(OPERATION_STARTED_FIELD_DESC); + oprot.writeI64(struct.operationStarted); + oprot.writeFieldEnd(); + } + if (struct.isSetOperationCompleted()) { + oprot.writeFieldBegin(OPERATION_COMPLETED_FIELD_DESC); + oprot.writeI64(struct.operationCompleted); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -786,7 +1060,16 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatus if (struct.isSetErrorMessage()) { optionals.set(3); } - oprot.writeBitSet(optionals, 4); + if (struct.isSetTaskStatus()) { + optionals.set(4); + } + if (struct.isSetOperationStarted()) { + optionals.set(5); + } + if (struct.isSetOperationCompleted()) { + optionals.set(6); + } + oprot.writeBitSet(optionals, 7); if (struct.isSetOperationState()) { oprot.writeI32(struct.operationState.getValue()); } @@ -799,6 +1082,15 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatus if (struct.isSetErrorMessage()) { oprot.writeString(struct.errorMessage); } + if (struct.isSetTaskStatus()) { + oprot.writeString(struct.taskStatus); + } + if (struct.isSetOperationStarted()) { + oprot.writeI64(struct.operationStarted); + } + if (struct.isSetOperationCompleted()) { + oprot.writeI64(struct.operationCompleted); + } } @Override @@ -807,7 +1099,7 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatusR struct.status = new TStatus(); struct.status.read(iprot); struct.setStatusIsSet(true); - BitSet incoming = iprot.readBitSet(4); + BitSet incoming = iprot.readBitSet(7); if (incoming.get(0)) { struct.operationState = org.apache.hive.service.cli.thrift.TOperationState.findByValue(iprot.readI32()); struct.setOperationStateIsSet(true); @@ -824,6 +1116,18 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatusR struct.errorMessage = iprot.readString(); struct.setErrorMessageIsSet(true); } + if (incoming.get(4)) { + struct.taskStatus = iprot.readString(); + struct.setTaskStatusIsSet(true); + } + if (incoming.get(5)) { + struct.operationStarted = iprot.readI64(); + struct.setOperationStartedIsSet(true); + } + if (incoming.get(6)) { + struct.operationCompleted = iprot.readI64(); + struct.setOperationCompletedIsSet(true); + } } } diff --git a/service/src/gen/thrift/gen-py/TCLIService/ttypes.py b/service/src/gen/thrift/gen-py/TCLIService/ttypes.py index ef5f5f5..c691781 100644 --- a/service/src/gen/thrift/gen-py/TCLIService/ttypes.py +++ b/service/src/gen/thrift/gen-py/TCLIService/ttypes.py @@ -5635,6 +5635,9 @@ class TGetOperationStatusResp: - sqlState - errorCode - errorMessage + - taskStatus + - operationStarted + - operationCompleted """ thrift_spec = ( @@ -5644,14 +5647,20 @@ class TGetOperationStatusResp: (3, TType.STRING, 'sqlState', None, None, ), # 3 (4, TType.I32, 'errorCode', None, None, ), # 4 (5, TType.STRING, 'errorMessage', None, None, ), # 5 + (6, TType.STRING, 'taskStatus', None, None, ), # 6 + (7, TType.I64, 'operationStarted', None, None, ), # 7 + (8, TType.I64, 'operationCompleted', None, None, ), # 8 ) - def __init__(self, status=None, operationState=None, sqlState=None, errorCode=None, errorMessage=None,): + def __init__(self, status=None, operationState=None, sqlState=None, errorCode=None, errorMessage=None, taskStatus=None, operationStarted=None, operationCompleted=None,): self.status = status self.operationState = operationState self.sqlState = sqlState self.errorCode = errorCode self.errorMessage = errorMessage + self.taskStatus = taskStatus + self.operationStarted = operationStarted + self.operationCompleted = operationCompleted def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -5688,6 +5697,21 @@ def read(self, iprot): self.errorMessage = iprot.readString() else: iprot.skip(ftype) + elif fid == 6: + if ftype == TType.STRING: + self.taskStatus = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 7: + if ftype == TType.I64: + self.operationStarted = iprot.readI64() + else: + iprot.skip(ftype) + elif fid == 8: + if ftype == TType.I64: + self.operationCompleted = iprot.readI64() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -5718,6 +5742,18 @@ def write(self, oprot): oprot.writeFieldBegin('errorMessage', TType.STRING, 5) oprot.writeString(self.errorMessage) oprot.writeFieldEnd() + if self.taskStatus is not None: + oprot.writeFieldBegin('taskStatus', TType.STRING, 6) + oprot.writeString(self.taskStatus) + oprot.writeFieldEnd() + if self.operationStarted is not None: + oprot.writeFieldBegin('operationStarted', TType.I64, 7) + oprot.writeI64(self.operationStarted) + oprot.writeFieldEnd() + if self.operationCompleted is not None: + oprot.writeFieldBegin('operationCompleted', TType.I64, 8) + oprot.writeI64(self.operationCompleted) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -5734,6 +5770,9 @@ def __hash__(self): value = (value * 31) ^ hash(self.sqlState) value = (value * 31) ^ hash(self.errorCode) value = (value * 31) ^ hash(self.errorMessage) + value = (value * 31) ^ hash(self.taskStatus) + value = (value * 31) ^ hash(self.operationStarted) + value = (value * 31) ^ hash(self.operationCompleted) return value def __repr__(self): diff --git a/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb b/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb index f004ec4..07ed97c 100644 --- a/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb +++ b/service/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb @@ -1471,13 +1471,19 @@ class TGetOperationStatusResp SQLSTATE = 3 ERRORCODE = 4 ERRORMESSAGE = 5 + TASKSTATUS = 6 + OPERATIONSTARTED = 7 + OPERATIONCOMPLETED = 8 FIELDS = { STATUS => {:type => ::Thrift::Types::STRUCT, :name => 'status', :class => ::TStatus}, OPERATIONSTATE => {:type => ::Thrift::Types::I32, :name => 'operationState', :optional => true, :enum_class => ::TOperationState}, SQLSTATE => {:type => ::Thrift::Types::STRING, :name => 'sqlState', :optional => true}, ERRORCODE => {:type => ::Thrift::Types::I32, :name => 'errorCode', :optional => true}, - ERRORMESSAGE => {:type => ::Thrift::Types::STRING, :name => 'errorMessage', :optional => true} + ERRORMESSAGE => {:type => ::Thrift::Types::STRING, :name => 'errorMessage', :optional => true}, + TASKSTATUS => {:type => ::Thrift::Types::STRING, :name => 'taskStatus', :optional => true}, + OPERATIONSTARTED => {:type => ::Thrift::Types::I64, :name => 'operationStarted', :optional => true}, + OPERATIONCOMPLETED => {:type => ::Thrift::Types::I64, :name => 'operationCompleted', :optional => true} } def struct_fields; FIELDS; end diff --git a/service/src/java/org/apache/hive/service/cli/OperationStatus.java b/service/src/java/org/apache/hive/service/cli/OperationStatus.java index e45b828..5e24d38 100644 --- a/service/src/java/org/apache/hive/service/cli/OperationStatus.java +++ b/service/src/java/org/apache/hive/service/cli/OperationStatus.java @@ -25,10 +25,16 @@ public class OperationStatus { private final OperationState state; + private final String taskStatus; + private final long operationStarted; + private final long operationCompleted; private final HiveSQLException operationException; - public OperationStatus(OperationState state, HiveSQLException operationException) { + public OperationStatus(OperationState state, String taskStatus, long operationStarted, long operationCompleted, HiveSQLException operationException) { this.state = state; + this.taskStatus = taskStatus; + this.operationStarted = operationStarted; + this.operationCompleted = operationCompleted; this.operationException = operationException; } @@ -36,6 +42,18 @@ public OperationState getState() { return state; } + public String getTaskStatus() { + return taskStatus; + } + + public long getOperationStarted() { + return operationStarted; + } + + public long getOperationCompleted() { + return operationCompleted; + } + public HiveSQLException getOperationException() { return operationException; } diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java index 8868ec1..9c4d645 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java @@ -46,6 +46,7 @@ protected GetCatalogsOperation(HiveSession parentSession) { @Override public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); + markOperationStartTime(); try { if (isAuthV2Enabled()) { authorizeMetaGets(HiveOperationType.GET_CATALOGS, null); @@ -54,6 +55,8 @@ public void runInternal() throws HiveSQLException { } catch (HiveSQLException e) { setState(OperationState.ERROR); throw e; + } finally { + markOperationCompletedTime(); } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java index 8ecdc2e..53711c2 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java @@ -128,6 +128,7 @@ protected GetColumnsOperation(HiveSession parentSession, String catalogName, Str @Override public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); + markOperationStartTime(); try { IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient(); String schemaPattern = convertSchemaPattern(schemaName); @@ -201,6 +202,8 @@ public void runInternal() throws HiveSQLException { } catch (Exception e) { setState(OperationState.ERROR); throw new HiveSQLException(e); + } finally { + markOperationCompletedTime(); } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java index 6df1e8a..cdc3272 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java @@ -78,6 +78,7 @@ public GetFunctionsOperation(HiveSession parentSession, @Override public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); + markOperationStartTime(); if (isAuthV2Enabled()) { // get databases for schema pattern IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient(); @@ -87,6 +88,7 @@ public void runInternal() throws HiveSQLException { matchingDbs = metastoreClient.getDatabases(schemaPattern); } catch (TException e) { setState(OperationState.ERROR); + markOperationCompletedTime(); throw new HiveSQLException(e); } // authorize this call on the schema objects @@ -120,6 +122,8 @@ public void runInternal() throws HiveSQLException { } catch (Exception e) { setState(OperationState.ERROR); throw new HiveSQLException(e); + } finally { + markOperationCompletedTime(); } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java index e56686a..86f0b2b 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java @@ -62,6 +62,7 @@ protected GetSchemasOperation(HiveSession parentSession, @Override public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); + markOperationStartTime(); if (isAuthV2Enabled()) { String cmdStr = "catalog : " + catalogName + ", schemaPattern : " + schemaName; authorizeMetaGets(HiveOperationType.GET_SCHEMAS, null, cmdStr); @@ -76,6 +77,8 @@ public void runInternal() throws HiveSQLException { } catch (Exception e) { setState(OperationState.ERROR); throw new HiveSQLException(e); + } finally { + markOperationCompletedTime(); } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java index a09b39a..dc3a84f 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java @@ -54,6 +54,7 @@ protected GetTableTypesOperation(HiveSession parentSession) { @Override public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); + markOperationStartTime(); if (isAuthV2Enabled()) { authorizeMetaGets(HiveOperationType.GET_TABLETYPES, null); } @@ -65,6 +66,8 @@ public void runInternal() throws HiveSQLException { } catch (Exception e) { setState(OperationState.ERROR); throw new HiveSQLException(e); + } finally { + markOperationCompletedTime(); } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java index 740b851..91f2d2d 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java @@ -90,6 +90,7 @@ protected GetTablesOperation(HiveSession parentSession, @Override public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); + markOperationStartTime(); try { IMetaStoreClient metastoreClient = getParentSession().getMetaStoreClient(); String schemaPattern = convertSchemaPattern(schemaName); @@ -117,6 +118,8 @@ public void runInternal() throws HiveSQLException { } catch (Exception e) { setState(OperationState.ERROR); throw new HiveSQLException(e); + } finally { + markOperationCompletedTime(); } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java index 2a0fec2..654e86c 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java @@ -83,6 +83,7 @@ protected GetTypeInfoOperation(HiveSession parentSession) { @Override public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); + markOperationStartTime(); if (isAuthV2Enabled()) { authorizeMetaGets(HiveOperationType.GET_TYPEINFO, null); } @@ -114,6 +115,8 @@ public void runInternal() throws HiveSQLException { } catch (Exception e) { setState(OperationState.ERROR); throw new HiveSQLException(e); + } finally { + markOperationCompletedTime(); } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java index 04dc6e3..613d9f3 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java @@ -107,6 +107,7 @@ private void tearDownSessionIO() { @Override public void runInternal() throws HiveSQLException { setState(OperationState.RUNNING); + markOperationStartTime(); try { String command = getStatement().trim(); String[] tokens = statement.split("\\s"); @@ -140,6 +141,8 @@ public void runInternal() throws HiveSQLException { } catch (Exception e) { setState(OperationState.ERROR); throw new HiveSQLException("Error running query: " + e.toString(), e); + } finally { + markOperationCompletedTime(); } setState(OperationState.FINISHED); } diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index d2b3f9c..a622a17 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -75,6 +75,9 @@ private long operationTimeout; private volatile long lastAccessTime; + protected long operationStart; + protected long operationComplete; + protected static final EnumSet DEFAULT_FETCH_ORIENTATION_SET = EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST); @@ -132,7 +135,13 @@ public OperationType getType() { } public OperationStatus getStatus() { - return new OperationStatus(state, operationException); + String taskStatus = null; + try { + taskStatus = getTaskStatus(); + } catch (HiveSQLException sqlException) { + LOG.error("Error getting task status for " + opHandle.toString(), sqlException); + } + return new OperationStatus(state, taskStatus, operationStart, operationComplete, operationException); } public boolean hasResultSet() { @@ -339,6 +348,10 @@ public RowSet getNextRowSet() throws HiveSQLException { return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS); } + public String getTaskStatus() throws HiveSQLException { + return null; + } + /** * Verify if the given fetch orientation is part of the default orientation types. * @param orientation @@ -407,4 +420,20 @@ protected void setMetrics(OperationState state) { } } } + + public long getOperationComplete() { + return operationComplete; + } + + public long getOperationStart() { + return operationStart; + } + + protected void markOperationStartTime() { + operationStart = System.currentTimeMillis(); + } + + protected void markOperationCompletedTime() { + operationComplete = System.currentTimeMillis(); + } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 2eaab4a..24158f7 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -108,6 +108,7 @@ private void setupSessionIO(SessionState sessionState) { */ public void prepare(HiveConf sqlOperationConf) throws HiveSQLException { setState(OperationState.RUNNING); + markOperationStartTime(); try { driver = new Driver(sqlOperationConf, getParentSession().getUserName()); @@ -155,9 +156,11 @@ public void prepare(HiveConf sqlOperationConf) throws HiveSQLException { } } catch (HiveSQLException e) { setState(OperationState.ERROR); + markOperationCompletedTime(); throw e; } catch (Throwable e) { setState(OperationState.ERROR); + markOperationCompletedTime(); throw new HiveSQLException("Error running query: " + e.toString(), e); } } @@ -191,6 +194,8 @@ private void runQuery(HiveConf sqlOperationConf) throws HiveSQLException { } catch (Throwable e) { setState(OperationState.ERROR); throw new HiveSQLException("Error running query: " + e.toString(), e); + } finally { + markOperationCompletedTime(); } setState(OperationState.FINISHED); } @@ -266,6 +271,8 @@ public Object run() throws HiveSQLException { setState(OperationState.ERROR); throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected); + } finally { + markOperationCompletedTime(); } } } @@ -317,6 +324,7 @@ private void cleanup(OperationState state) throws HiveSQLException { @Override public void cancel() throws HiveSQLException { + markOperationCompletedTime(); cleanup(OperationState.CANCELED); } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java index 1af4539..7c4ca05 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java @@ -307,7 +307,8 @@ public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveS if (opState == OperationState.ERROR) { opException = new HiveSQLException(resp.getErrorMessage(), resp.getSqlState(), resp.getErrorCode()); } - return new OperationStatus(opState, opException); + return new OperationStatus(opState, resp.getTaskStatus(), resp.getOperationStarted(), + resp.getOperationCompleted(), opException); } catch (HiveSQLException e) { throw e; } catch (Exception e) { diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index e78181a..c4f5b26 100644 --- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import java.io.Serializable; @@ -169,6 +170,9 @@ public void testExecuteStatement() throws Exception { // Blocking execute queryString = "SELECT ID+1 FROM TEST_EXEC"; opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + + OperationStatus opStatus = client.getOperationStatus(opHandle); + checkOperationTimes(opHandle, opStatus); // Expect query to be completed now assertEquals("Query should be finished", OperationState.FINISHED, client.getOperationStatus(opHandle).getState()); @@ -266,6 +270,10 @@ public void testExecuteStatementAsync() throws Exception { opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); System.out.println("Cancelling " + opHandle); client.cancelOperation(opHandle); + + OperationStatus operationStatus = client.getOperationStatus(opHandle); + checkOperationTimes(opHandle, operationStatus); + state = client.getOperationStatus(opHandle).getState(); System.out.println(opHandle + " after cancelling, state= " + state); assertEquals("Query should be cancelled", OperationState.CANCELED, state); @@ -489,7 +497,7 @@ private SessionHandle openSession(Map confOverlay) SessionState.get().setIsHiveServerQuery(true); // Pretend we are in HS2. String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname - + " = false"; + + " = false"; client.executeStatement(sessionHandle, queryString, confOverlay); return sessionHandle; } @@ -620,4 +628,16 @@ public void testConfOverlay() throws Exception { client.closeOperation(opHandle); client.closeSession(sessionHandle); } + + private void checkOperationTimes(OperationHandle operationHandle, OperationStatus status) { + OperationState state = status.getState(); + if (OperationState.CANCELED == state || state == OperationState.CLOSED + || state == OperationState.FINISHED || state == OperationState.ERROR) { + System.out.println("##OP " + operationHandle.getHandleIdentifier() + " STATE:" + status.getState() + +" START:" + status.getOperationStarted() + + " END:" + status.getOperationCompleted()); + assertFalse(status.getOperationCompleted() == 0); + assertTrue(status.getOperationCompleted() - status.getOperationStarted() >= 0); + } + } }