diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java index de66d9efb1cace9d32174e3020920d5e4002dc85..ac9b306a68307755a42b2e8d4ebc848bb128554f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java @@ -37,6 +37,7 @@ public static void setUpBeforeClass() throws Exception { service = new EmbeddedThriftBinaryCLIService(); HiveConf conf = new HiveConf(); + conf.setBoolean("datanucleus.schema.autoCreateTables", true); conf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); service.init(conf); client = new ThriftCLIServiceClient(service); diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java index 9765b9dbe1ae097c108437c85b11b7bcee8f49da..411e17fbe747489bdb8b2bc7c4870d94596be666 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java @@ -23,15 +23,12 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.service.cli.OperationHandle; import org.apache.hive.service.cli.SessionHandle; -import org.apache.hive.service.cli.operation.ExecuteStatementOperation; import org.apache.hive.service.cli.operation.SQLOperationDisplay; import org.apache.hive.service.rpc.thrift.TProtocolVersion; import org.apache.hive.service.server.HiveServer2; -import org.apache.hive.service.servlet.QueryProfileServlet; import org.apache.hive.tmpl.QueryProfileTmpl; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import java.io.StringWriter; @@ -155,8 +152,8 @@ private void verifyDDL(SQLOperationDisplay display, String stmt, String handle, Assert.assertTrue(qDisplay1.getPerfLogStarts(QueryDisplay.Phase.COMPILATION).size() > 0); Assert.assertTrue(qDisplay1.getPerfLogEnds(QueryDisplay.Phase.COMPILATION).size() > 0); - Assert.assertEquals(qDisplay1.getTaskInfos().size(), 1); - QueryDisplay.TaskInfo tInfo1 = qDisplay1.getTaskInfos().get(0); + Assert.assertEquals(qDisplay1.getTaskDisplays().size(), 1); + QueryDisplay.TaskDisplay tInfo1 = qDisplay1.getTaskDisplays().get(0); Assert.assertEquals(tInfo1.getTaskId(), "Stage-0"); Assert.assertEquals(tInfo1.getTaskType(), StageType.DDL); Assert.assertTrue(tInfo1.getBeginTime() > 0 && tInfo1.getBeginTime() <= System.currentTimeMillis()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index b50c5a2c4e396975f3323b16426445f869ff7b89..30ab1bbbf54bdbaec65e2a48057607f6be65a123 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -496,7 +496,7 @@ public void run() { schema = getSchema(sem, conf); plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, - SessionState.get().getHiveOperation(), schema); + SessionState.get().getHiveOperation(), schema, queryDisplay); conf.setQueryString(queryStr); @@ -1186,7 +1186,7 @@ public CommandProcessorResponse compileAndRespond(String command) { private int compileInternal(String command) { int ret; final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled, - command); + command); if (compileLock == null) { return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode(); } @@ -1229,8 +1229,8 @@ private ReentrantLock tryAcquireCompileLock(boolean isParallelEnabled, final ReentrantLock compileLock = isParallelEnabled ? SessionState.get().getCompileLock() : globalCompileLock; long maxCompileLockWaitTime = HiveConf.getTimeVar( - this.conf, ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, - TimeUnit.SECONDS); + this.conf, ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT, + TimeUnit.SECONDS); if (maxCompileLockWaitTime > 0) { try { if (LOG.isDebugEnabled()) { @@ -1573,7 +1573,6 @@ public int execute() throws CommandNeedRetryException { // Launch upto maxthreads tasks Task task; while ((task = driverCxt.getRunnable(maxthreads)) != null) { - queryDisplay.addTask(task); TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt); if (!runner.isRunning()) { break; @@ -1586,7 +1585,7 @@ public int execute() throws CommandNeedRetryException { continue; } hookContext.addCompleteTask(tskRun); - queryDisplay.setTaskCompleted(tskRun.getTask().getId(), tskRun.getTaskResult()); + queryDisplay.setTaskResult(tskRun.getTask().getId(), tskRun.getTaskResult()); Task tsk = tskRun.getTask(); TaskResult result = tskRun.getTaskResult(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java index c87c825f95d2ff49bc62f892eb1cea5abf3c65c6..467dab66e454d895742e96d4ac5db452fea00551 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java @@ -22,11 +22,12 @@ import org.apache.hadoop.hive.ql.exec.TaskResult; import org.apache.hadoop.hive.ql.plan.api.StageType; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import java.io.Serializable; +import java.util.*; + +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.annotate.JsonWriteNullProperties; +import org.codehaus.jackson.annotate.JsonIgnore; /** * Some limited query information to save for WebUI. @@ -41,39 +42,56 @@ private String errorMessage; private String queryId; - private final Map> hmsTimingMap = new HashMap(); - private final Map> perfLogStartMap = new HashMap(); - private final Map> perfLogEndMap = new HashMap(); + private final Map> hmsTimingMap = new HashMap>(); + private final Map> perfLogStartMap = new HashMap>(); + private final Map> perfLogEndMap = new HashMap>(); + + private final LinkedHashMap tasks = new LinkedHashMap(); - private final LinkedHashMap tasks = new LinkedHashMap(); + public synchronized void updateTaskStatus(Task tTask) { + if (!tasks.containsKey(tTask.getId())) { + tasks.put(tTask.getId(), new TaskDisplay(tTask)); + } + tasks.get(tTask.getId()).updateStatus(tTask); + } //Inner classes - public static enum Phase { + public enum Phase { COMPILATION, EXECUTION, } - public static class TaskInfo { + @JsonWriteNullProperties(false) + @JsonIgnoreProperties(ignoreUnknown = true) + public static class TaskDisplay { + private Integer returnVal; //if set, determines that task is complete. private String errorMsg; - private long endTime; - final long beginTime; - final String taskId; - final StageType taskType; - final String name; - final boolean requireLock; - final boolean retryIfFail; + private Long beginTime; + private Long endTime; + + private String taskId; + private String taskExternalHandle; + + public Task.TaskState taskState; + private StageType taskType; + private String name; + private boolean requireLock; + private boolean retryIfFail; + // required for jackson + public TaskDisplay() { - public TaskInfo (Task task) { - beginTime = System.currentTimeMillis(); + } + public TaskDisplay(Task task) { taskId = task.getId(); + taskExternalHandle = task.getExternalHandle(); taskType = task.getType(); name = task.getName(); requireLock = task.requireLock(); retryIfFail = task.ifRetryCmdWhenFail(); } - + @JsonIgnore public synchronized String getStatus() { if (returnVal == null) { return "Running"; @@ -84,67 +102,82 @@ public synchronized String getStatus() { } } - public synchronized long getElapsedTime() { - if (endTime == 0) { + public synchronized Long getElapsedTime() { + if (endTime == null) { + if (beginTime == null) { + return null; + } return System.currentTimeMillis() - beginTime; } else { return endTime - beginTime; } } + public synchronized Integer getReturnValue() { + return returnVal; + } + public synchronized String getErrorMsg() { return errorMsg; } - public synchronized long getEndTime() { - return endTime; + public synchronized Long getBeginTime() { + return beginTime; } - //Following methods do not need to be synchronized, because they are final fields. - public long getBeginTime() { - return beginTime; + public synchronized Long getEndTime() { + return endTime; } - public String getTaskId() { + public synchronized String getTaskId() { return taskId; } - public StageType getTaskType() { + public synchronized StageType getTaskType() { return taskType; } - public String getName() { + public synchronized String getName() { return name; } - - public boolean isRequireLock() { + @JsonIgnore + public synchronized boolean isRequireLock() { return requireLock; } - - public boolean isRetryIfFail() { + @JsonIgnore + public synchronized boolean isRetryIfFail() { return retryIfFail; } - } - public synchronized void addTask(Task task) { - tasks.put(task.getId(), new TaskInfo(task)); - } + public synchronized String getExternalHandle() { + return taskExternalHandle; + } - public synchronized void setTaskCompleted(String taskId, TaskResult result) { - TaskInfo taskInfo = tasks.get(taskId); - if (taskInfo != null) { - taskInfo.returnVal = result.getExitVal(); + public synchronized void updateStatus(Task tTask) { + this.taskState = tTask.getTaskState(); + switch(taskState) { + case RUNNING: + beginTime = System.currentTimeMillis(); + break; + case FINISHED: + endTime = System.currentTimeMillis(); + break; + } + } + } + public synchronized void setTaskResult(String taskId, TaskResult result) { + TaskDisplay taskDisplay = tasks.get(taskId); + if (taskDisplay != null) { + taskDisplay.returnVal = result.getExitVal(); if (result.getTaskError() != null) { - taskInfo.errorMsg = result.getTaskError().toString(); + taskDisplay.errorMsg = result.getTaskError().toString(); } - taskInfo.endTime = System.currentTimeMillis(); } } - - public synchronized List getTaskInfos() { - List taskInfos = new ArrayList(); - taskInfos.addAll(tasks.values()); - return taskInfos; + public synchronized List getTaskDisplays() { + List taskDisplays = new ArrayList(); + taskDisplays.addAll(tasks.values()); + return taskDisplays; } public synchronized void setQueryStr(String queryStr) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java index 4933b34710afebde923b548572ffd7f98b7eb728..ef0923d555ba662b4ed30ef45a3d72760cdfad52 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java @@ -114,11 +114,25 @@ public QueryPlan() { public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId, HiveOperation operation, Schema resultSchema) { + this(queryString, sem, startTime, queryId, operation, resultSchema, null); + } + public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId, + HiveOperation operation, Schema resultSchema, QueryDisplay queryDisplay) { this.queryString = queryString; rootTasks = new ArrayList>(sem.getAllRootTasks()); reducerTimeStatsPerJobList = new ArrayList(); fetchTask = sem.getFetchTask(); + if (queryDisplay != null) { + if (fetchTask != null) { + fetchTask.setQueryDisplay(queryDisplay); + } + if (rootTasks!= null) { + for (Task t : rootTasks) { + t.setQueryDisplay(queryDisplay); + } + } + } // Note that inputs and outputs can be changed when the query gets executed inputs = sem.getAllInputs(); outputs = sem.getAllOutputs(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index e199e5ee65404d2e9a38d20fa9d2ff72754962e9..6c677f5bbae024b503594238e59f9fbf6ba283cf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -30,9 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.*; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -52,10 +50,6 @@ private static final long serialVersionUID = 1L; public transient HashMap taskCounters; public transient TaskHandle taskHandle; - protected transient boolean started; - protected transient boolean initialized; - protected transient boolean isdone; - protected transient boolean queued; protected transient HiveConf conf; protected transient LogHelper console; protected transient QueryPlan queryPlan; @@ -81,18 +75,32 @@ // created in case the mapjoin failed. public static final int MAPJOIN_ONLY_NOBACKUP = 7; public static final int CONVERTED_SORTMERGEJOIN = 8; - + public QueryDisplay queryDisplay = null; // Descendants tasks who subscribe feeds from this task protected transient List> feedSubscribers; protected String id; protected T work; - + private TaskState taskState = TaskState.CREATED; private transient boolean fetchSource; - public static enum FeedType { + public enum FeedType { DYNAMIC_PARTITIONS, // list of dynamic partitions } + public enum TaskState { + // Task data structures have been initialized + INITIALIZED, + // Task has been queued for execution by the driver + QUEUED, + // Task is currently running + RUNNING, + // Task has completed + FINISHED, + // Task is just created + CREATED, + // Task state is unkown + UNKNOWN + } // Bean methods @@ -108,10 +116,6 @@ private Throwable exception; public Task() { - isdone = false; - started = false; - initialized = false; - queued = false; this.taskCounters = new HashMap(); taskTag = Task.NO_TAG; } @@ -123,13 +127,25 @@ public TaskHandle getTaskHandle() { public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext, CompilationOpContext opContext) { this.queryPlan = queryPlan; - isdone = false; - started = false; setInitialized(); this.conf = conf; this.driverContext = driverContext; console = new LogHelper(LOG); } + public void setQueryDisplay(QueryDisplay queryDisplay) { + this.queryDisplay = queryDisplay; + } + + private void updateStatusInQueryDisplay() { + if (queryDisplay != null) { + queryDisplay.updateTaskStatus(this); + } + } + + private void setState(TaskState state) { + this.taskState = state; + updateStatusInQueryDisplay(); + } protected Hive getHive() { try { @@ -323,37 +339,36 @@ public void removeDependentTask(Task dependent) { } } } - public void setStarted() { - this.started = true; + setState(TaskState.RUNNING); } public boolean started() { - return started; + return taskState == TaskState.RUNNING; } public boolean done() { - return isdone; + return taskState == TaskState.FINISHED; } public void setDone() { - isdone = true; + setState(TaskState.FINISHED); } public void setQueued() { - queued = true; + setState(TaskState.QUEUED); } public boolean getQueued() { - return queued; + return taskState == TaskState.QUEUED; } public void setInitialized() { - initialized = true; + setState(TaskState.INITIALIZED); } public boolean getInitialized() { - return initialized; + return taskState == TaskState.INITIALIZED; } public boolean isRunnable() { @@ -391,6 +406,14 @@ public String getId() { return id; } + public String getExternalHandle() { + return null; + } + + public TaskState getTaskState() { + return taskState; + } + public boolean isMapRedTask() { return false; } @@ -572,4 +595,6 @@ public int hashCode() { public boolean equals(Object obj) { return toString().equals(String.valueOf(obj)); } + + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index ce020a52721b439c4a9a81ee75e9829f2da00221..d164859219896d88c42a69e56f621cb08012f633 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -430,6 +430,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); + this.jobID = rj.getJobID(); returnVal = jobExecHelper.progress(rj, jc); success = (returnVal == 0); @@ -849,5 +850,10 @@ public void shutdown() { rj = null; } } + + @Override + public String getExternalHandle() { + return this.jobID; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index 5bc3d9e732330af7890d7379334ed42be116ccbe..310356c526149088b2c1fc5dc4c1da3cbf166595 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -363,25 +363,25 @@ static void configureDebugVariablesForChildJVM(Map environmentVa @Override public boolean mapStarted() { boolean b = super.mapStarted(); - return runningViaChild ? isdone : b; + return runningViaChild ? done() : b; } @Override public boolean reduceStarted() { boolean b = super.reduceStarted(); - return runningViaChild ? isdone : b; + return runningViaChild ? done() : b; } @Override public boolean mapDone() { boolean b = super.mapDone(); - return runningViaChild ? isdone : b; + return runningViaChild ? done() : b; } @Override public boolean reduceDone() { boolean b = super.reduceDone(); - return runningViaChild ? isdone : b; + return runningViaChild ? done() : b; } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java index 45cd533270a550ae07510fe3974ddceaed30af51..687f5516517994cd972e8557904f68cd8a15afd5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java @@ -109,7 +109,7 @@ }; /** - * TaskInfo. + * TaskDisplay. * */ public static class TaskInfo extends Info { diff --git a/service-rpc/if/TCLIService.thrift b/service-rpc/if/TCLIService.thrift index 0aa9d13464bf284a3048092372299efb8e1d6bcc..aa28b6e21e2b4b1dccf82a6b20d1584d5d3261ba 100644 --- a/service-rpc/if/TCLIService.thrift +++ b/service-rpc/if/TCLIService.thrift @@ -977,6 +977,15 @@ struct TGetOperationStatusResp { // Error message 5: optional string errorMessage + + // List of statuses of sub tasks + 6: optional string taskStatus + + // When was the operation started + 7: optional i64 operationStarted + // When was the operation completed + 8: optional i64 operationCompleted + } diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java index a7a8ebc5c663961e6f4ebdb9df78fa3d53e31d98..30492800ff95573cdadf939f66debee00e4ac5a8 100644 --- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java +++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TGetOperationStatusResp.java @@ -43,6 +43,9 @@ private static final org.apache.thrift.protocol.TField SQL_STATE_FIELD_DESC = new org.apache.thrift.protocol.TField("sqlState", org.apache.thrift.protocol.TType.STRING, (short)3); private static final org.apache.thrift.protocol.TField ERROR_CODE_FIELD_DESC = new org.apache.thrift.protocol.TField("errorCode", org.apache.thrift.protocol.TType.I32, (short)4); private static final org.apache.thrift.protocol.TField ERROR_MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("errorMessage", org.apache.thrift.protocol.TType.STRING, (short)5); + private static final org.apache.thrift.protocol.TField TASK_STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("taskStatus", org.apache.thrift.protocol.TType.STRING, (short)6); + private static final org.apache.thrift.protocol.TField OPERATION_STARTED_FIELD_DESC = new org.apache.thrift.protocol.TField("operationStarted", org.apache.thrift.protocol.TType.I64, (short)7); + private static final org.apache.thrift.protocol.TField OPERATION_COMPLETED_FIELD_DESC = new org.apache.thrift.protocol.TField("operationCompleted", org.apache.thrift.protocol.TType.I64, (short)8); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -55,6 +58,9 @@ private String sqlState; // optional private int errorCode; // optional private String errorMessage; // optional + private String taskStatus; // optional + private long operationStarted; // optional + private long operationCompleted; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -66,7 +72,10 @@ OPERATION_STATE((short)2, "operationState"), SQL_STATE((short)3, "sqlState"), ERROR_CODE((short)4, "errorCode"), - ERROR_MESSAGE((short)5, "errorMessage"); + ERROR_MESSAGE((short)5, "errorMessage"), + TASK_STATUS((short)6, "taskStatus"), + OPERATION_STARTED((short)7, "operationStarted"), + OPERATION_COMPLETED((short)8, "operationCompleted"); private static final Map byName = new HashMap(); @@ -91,6 +100,12 @@ public static _Fields findByThriftId(int fieldId) { return ERROR_CODE; case 5: // ERROR_MESSAGE return ERROR_MESSAGE; + case 6: // TASK_STATUS + return TASK_STATUS; + case 7: // OPERATION_STARTED + return OPERATION_STARTED; + case 8: // OPERATION_COMPLETED + return OPERATION_COMPLETED; default: return null; } @@ -132,8 +147,10 @@ public String getFieldName() { // isset id assignments private static final int __ERRORCODE_ISSET_ID = 0; + private static final int __OPERATIONSTARTED_ISSET_ID = 1; + private static final int __OPERATIONCOMPLETED_ISSET_ID = 2; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.OPERATION_STATE,_Fields.SQL_STATE,_Fields.ERROR_CODE,_Fields.ERROR_MESSAGE}; + private static final _Fields optionals[] = {_Fields.OPERATION_STATE,_Fields.SQL_STATE,_Fields.ERROR_CODE,_Fields.ERROR_MESSAGE,_Fields.TASK_STATUS,_Fields.OPERATION_STARTED,_Fields.OPERATION_COMPLETED}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -147,6 +164,12 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.ERROR_MESSAGE, new org.apache.thrift.meta_data.FieldMetaData("errorMessage", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.TASK_STATUS, new org.apache.thrift.meta_data.FieldMetaData("taskStatus", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.OPERATION_STARTED, new org.apache.thrift.meta_data.FieldMetaData("operationStarted", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.OPERATION_COMPLETED, new org.apache.thrift.meta_data.FieldMetaData("operationCompleted", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TGetOperationStatusResp.class, metaDataMap); } @@ -179,6 +202,11 @@ public TGetOperationStatusResp(TGetOperationStatusResp other) { if (other.isSetErrorMessage()) { this.errorMessage = other.errorMessage; } + if (other.isSetTaskStatus()) { + this.taskStatus = other.taskStatus; + } + this.operationStarted = other.operationStarted; + this.operationCompleted = other.operationCompleted; } public TGetOperationStatusResp deepCopy() { @@ -193,6 +221,11 @@ public void clear() { setErrorCodeIsSet(false); this.errorCode = 0; this.errorMessage = null; + this.taskStatus = null; + setOperationStartedIsSet(false); + this.operationStarted = 0; + setOperationCompletedIsSet(false); + this.operationCompleted = 0; } public TStatus getStatus() { @@ -317,6 +350,73 @@ public void setErrorMessageIsSet(boolean value) { } } + public String getTaskStatus() { + return this.taskStatus; + } + + public void setTaskStatus(String taskStatus) { + this.taskStatus = taskStatus; + } + + public void unsetTaskStatus() { + this.taskStatus = null; + } + + /** Returns true if field taskStatus is set (has been assigned a value) and false otherwise */ + public boolean isSetTaskStatus() { + return this.taskStatus != null; + } + + public void setTaskStatusIsSet(boolean value) { + if (!value) { + this.taskStatus = null; + } + } + + public long getOperationStarted() { + return this.operationStarted; + } + + public void setOperationStarted(long operationStarted) { + this.operationStarted = operationStarted; + setOperationStartedIsSet(true); + } + + public void unsetOperationStarted() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __OPERATIONSTARTED_ISSET_ID); + } + + /** Returns true if field operationStarted is set (has been assigned a value) and false otherwise */ + public boolean isSetOperationStarted() { + return EncodingUtils.testBit(__isset_bitfield, __OPERATIONSTARTED_ISSET_ID); + } + + public void setOperationStartedIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __OPERATIONSTARTED_ISSET_ID, value); + } + + public long getOperationCompleted() { + return this.operationCompleted; + } + + public void setOperationCompleted(long operationCompleted) { + this.operationCompleted = operationCompleted; + setOperationCompletedIsSet(true); + } + + public void unsetOperationCompleted() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __OPERATIONCOMPLETED_ISSET_ID); + } + + /** Returns true if field operationCompleted is set (has been assigned a value) and false otherwise */ + public boolean isSetOperationCompleted() { + return EncodingUtils.testBit(__isset_bitfield, __OPERATIONCOMPLETED_ISSET_ID); + } + + public void setOperationCompletedIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __OPERATIONCOMPLETED_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case STATUS: @@ -359,6 +459,30 @@ public void setFieldValue(_Fields field, Object value) { } break; + case TASK_STATUS: + if (value == null) { + unsetTaskStatus(); + } else { + setTaskStatus((String)value); + } + break; + + case OPERATION_STARTED: + if (value == null) { + unsetOperationStarted(); + } else { + setOperationStarted((Long)value); + } + break; + + case OPERATION_COMPLETED: + if (value == null) { + unsetOperationCompleted(); + } else { + setOperationCompleted((Long)value); + } + break; + } } @@ -379,6 +503,15 @@ public Object getFieldValue(_Fields field) { case ERROR_MESSAGE: return getErrorMessage(); + case TASK_STATUS: + return getTaskStatus(); + + case OPERATION_STARTED: + return getOperationStarted(); + + case OPERATION_COMPLETED: + return getOperationCompleted(); + } throw new IllegalStateException(); } @@ -400,6 +533,12 @@ public boolean isSet(_Fields field) { return isSetErrorCode(); case ERROR_MESSAGE: return isSetErrorMessage(); + case TASK_STATUS: + return isSetTaskStatus(); + case OPERATION_STARTED: + return isSetOperationStarted(); + case OPERATION_COMPLETED: + return isSetOperationCompleted(); } throw new IllegalStateException(); } @@ -462,6 +601,33 @@ public boolean equals(TGetOperationStatusResp that) { return false; } + boolean this_present_taskStatus = true && this.isSetTaskStatus(); + boolean that_present_taskStatus = true && that.isSetTaskStatus(); + if (this_present_taskStatus || that_present_taskStatus) { + if (!(this_present_taskStatus && that_present_taskStatus)) + return false; + if (!this.taskStatus.equals(that.taskStatus)) + return false; + } + + boolean this_present_operationStarted = true && this.isSetOperationStarted(); + boolean that_present_operationStarted = true && that.isSetOperationStarted(); + if (this_present_operationStarted || that_present_operationStarted) { + if (!(this_present_operationStarted && that_present_operationStarted)) + return false; + if (this.operationStarted != that.operationStarted) + return false; + } + + boolean this_present_operationCompleted = true && this.isSetOperationCompleted(); + boolean that_present_operationCompleted = true && that.isSetOperationCompleted(); + if (this_present_operationCompleted || that_present_operationCompleted) { + if (!(this_present_operationCompleted && that_present_operationCompleted)) + return false; + if (this.operationCompleted != that.operationCompleted) + return false; + } + return true; } @@ -494,6 +660,21 @@ public int hashCode() { if (present_errorMessage) list.add(errorMessage); + boolean present_taskStatus = true && (isSetTaskStatus()); + list.add(present_taskStatus); + if (present_taskStatus) + list.add(taskStatus); + + boolean present_operationStarted = true && (isSetOperationStarted()); + list.add(present_operationStarted); + if (present_operationStarted) + list.add(operationStarted); + + boolean present_operationCompleted = true && (isSetOperationCompleted()); + list.add(present_operationCompleted); + if (present_operationCompleted) + list.add(operationCompleted); + return list.hashCode(); } @@ -555,6 +736,36 @@ public int compareTo(TGetOperationStatusResp other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetTaskStatus()).compareTo(other.isSetTaskStatus()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTaskStatus()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.taskStatus, other.taskStatus); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetOperationStarted()).compareTo(other.isSetOperationStarted()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetOperationStarted()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.operationStarted, other.operationStarted); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetOperationCompleted()).compareTo(other.isSetOperationCompleted()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetOperationCompleted()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.operationCompleted, other.operationCompleted); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -618,6 +829,28 @@ public String toString() { } first = false; } + if (isSetTaskStatus()) { + if (!first) sb.append(", "); + sb.append("taskStatus:"); + if (this.taskStatus == null) { + sb.append("null"); + } else { + sb.append(this.taskStatus); + } + first = false; + } + if (isSetOperationStarted()) { + if (!first) sb.append(", "); + sb.append("operationStarted:"); + sb.append(this.operationStarted); + first = false; + } + if (isSetOperationCompleted()) { + if (!first) sb.append(", "); + sb.append("operationCompleted:"); + sb.append(this.operationCompleted); + first = false; + } sb.append(")"); return sb.toString(); } @@ -711,6 +944,30 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, TGetOperationStatus org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 6: // TASK_STATUS + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.taskStatus = iprot.readString(); + struct.setTaskStatusIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 7: // OPERATION_STARTED + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.operationStarted = iprot.readI64(); + struct.setOperationStartedIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 8: // OPERATION_COMPLETED + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.operationCompleted = iprot.readI64(); + struct.setOperationCompletedIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -755,6 +1012,23 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TGetOperationStatu oprot.writeFieldEnd(); } } + if (struct.taskStatus != null) { + if (struct.isSetTaskStatus()) { + oprot.writeFieldBegin(TASK_STATUS_FIELD_DESC); + oprot.writeString(struct.taskStatus); + oprot.writeFieldEnd(); + } + } + if (struct.isSetOperationStarted()) { + oprot.writeFieldBegin(OPERATION_STARTED_FIELD_DESC); + oprot.writeI64(struct.operationStarted); + oprot.writeFieldEnd(); + } + if (struct.isSetOperationCompleted()) { + oprot.writeFieldBegin(OPERATION_COMPLETED_FIELD_DESC); + oprot.writeI64(struct.operationCompleted); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -786,7 +1060,16 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatus if (struct.isSetErrorMessage()) { optionals.set(3); } - oprot.writeBitSet(optionals, 4); + if (struct.isSetTaskStatus()) { + optionals.set(4); + } + if (struct.isSetOperationStarted()) { + optionals.set(5); + } + if (struct.isSetOperationCompleted()) { + optionals.set(6); + } + oprot.writeBitSet(optionals, 7); if (struct.isSetOperationState()) { oprot.writeI32(struct.operationState.getValue()); } @@ -799,6 +1082,15 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatus if (struct.isSetErrorMessage()) { oprot.writeString(struct.errorMessage); } + if (struct.isSetTaskStatus()) { + oprot.writeString(struct.taskStatus); + } + if (struct.isSetOperationStarted()) { + oprot.writeI64(struct.operationStarted); + } + if (struct.isSetOperationCompleted()) { + oprot.writeI64(struct.operationCompleted); + } } @Override @@ -807,7 +1099,7 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatusR struct.status = new TStatus(); struct.status.read(iprot); struct.setStatusIsSet(true); - BitSet incoming = iprot.readBitSet(4); + BitSet incoming = iprot.readBitSet(7); if (incoming.get(0)) { struct.operationState = org.apache.hive.service.rpc.thrift.TOperationState.findByValue(iprot.readI32()); struct.setOperationStateIsSet(true); @@ -824,6 +1116,18 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TGetOperationStatusR struct.errorMessage = iprot.readString(); struct.setErrorMessageIsSet(true); } + if (incoming.get(4)) { + struct.taskStatus = iprot.readString(); + struct.setTaskStatusIsSet(true); + } + if (incoming.get(5)) { + struct.operationStarted = iprot.readI64(); + struct.setOperationStartedIsSet(true); + } + if (incoming.get(6)) { + struct.operationCompleted = iprot.readI64(); + struct.setOperationCompletedIsSet(true); + } } } diff --git a/service/src/jamon/org/apache/hive/tmpl/QueryProfileTmpl.jamon b/service/src/jamon/org/apache/hive/tmpl/QueryProfileTmpl.jamon index c51368947ffe5873c727fc7e74aa25e748707370..b6e57c970565d226818e5376916d0f82b2cc6f9b 100644 --- a/service/src/jamon/org/apache/hive/tmpl/QueryProfileTmpl.jamon +++ b/service/src/jamon/org/apache/hive/tmpl/QueryProfileTmpl.jamon @@ -176,16 +176,16 @@ org.apache.hive.service.cli.operation.SQLOperationDisplay; Retry If Fail - <%if sod.getQueryDisplay() != null && sod.getQueryDisplay().getTaskInfos() != null %> - <%for QueryDisplay.TaskInfo taskInfo : sod.getQueryDisplay().getTaskInfos() %> + <%if sod.getQueryDisplay() != null && sod.getQueryDisplay().getTaskDisplays() != null %> + <%for QueryDisplay.TaskDisplay taskDisplay : sod.getQueryDisplay().getTaskDisplays() %> - <% taskInfo.getTaskId() + ":" + taskInfo.getTaskType() %> - <% taskInfo.getStatus() %> - <% new Date(taskInfo.getBeginTime()) %> - <% taskInfo.getEndTime() == 0 ? "" : new Date(taskInfo.getEndTime()) %> - <% taskInfo.getElapsedTime()/1000 %> (s) - <% taskInfo.isRequireLock() %> - <% taskInfo.isRetryIfFail() %> + <% taskDisplay.getTaskId() + ":" + taskDisplay.getTaskType() %> + <% taskDisplay.getStatus() %> + <% new Date(taskDisplay.getBeginTime()) %> + <% taskDisplay.getEndTime() == 0 ? "" : new Date(taskDisplay.getEndTime()) %> + <% taskDisplay.getElapsedTime()/1000 %> (s) + <% taskDisplay.isRequireLock() %> + <% taskDisplay.isRetryIfFail() %> diff --git a/service/src/java/org/apache/hive/service/cli/OperationStatus.java b/service/src/java/org/apache/hive/service/cli/OperationStatus.java index e45b828193daf46e26c1587f7e0b9ea1a43f4930..5e24d38dfbcb3948ac184897d3fb61da24d9688a 100644 --- a/service/src/java/org/apache/hive/service/cli/OperationStatus.java +++ b/service/src/java/org/apache/hive/service/cli/OperationStatus.java @@ -25,10 +25,16 @@ public class OperationStatus { private final OperationState state; + private final String taskStatus; + private final long operationStarted; + private final long operationCompleted; private final HiveSQLException operationException; - public OperationStatus(OperationState state, HiveSQLException operationException) { + public OperationStatus(OperationState state, String taskStatus, long operationStarted, long operationCompleted, HiveSQLException operationException) { this.state = state; + this.taskStatus = taskStatus; + this.operationStarted = operationStarted; + this.operationCompleted = operationCompleted; this.operationException = operationException; } @@ -36,6 +42,18 @@ public OperationState getState() { return state; } + public String getTaskStatus() { + return taskStatus; + } + + public long getOperationStarted() { + return operationStarted; + } + + public long getOperationCompleted() { + return operationCompleted; + } + public HiveSQLException getOperationException() { return operationException; } diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 22f725c948494865e176a8d18543caf462a77c19..d9a273b1b95adf7a6a68fa013042da2408904d64 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -77,6 +77,9 @@ private volatile long lastAccessTime; private final long beginTime; + protected long operationStart; + protected long operationComplete; + protected static final EnumSet DEFAULT_FETCH_ORIENTATION_SET = EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST); @@ -137,7 +140,13 @@ public OperationType getType() { } public OperationStatus getStatus() { - return new OperationStatus(state, operationException); + String taskStatus = null; + try { + taskStatus = getTaskStatus(); + } catch (HiveSQLException sqlException) { + LOG.error("Error getting task status for " + opHandle.toString(), sqlException); + } + return new OperationStatus(state, taskStatus, operationStart, operationComplete, operationException); } public boolean hasResultSet() { @@ -346,6 +355,10 @@ public RowSet getNextRowSet() throws HiveSQLException { return getNextRowSet(FetchOrientation.FETCH_NEXT, DEFAULT_FETCH_MAX_ROWS); } + public String getTaskStatus() throws HiveSQLException { + return null; + } + /** * Verify if the given fetch orientation is part of the default orientation types. * @param orientation @@ -431,5 +444,31 @@ protected OperationState getState() { } protected void onNewState(OperationState state, OperationState prevState) { + switch(state) { + case RUNNING: + markOperationStartTime(); + break; + case ERROR: + case FINISHED: + case CANCELED: + markOperationCompletedTime(); + break; + } + } + + public long getOperationComplete() { + return operationComplete; + } + + public long getOperationStart() { + return operationStart; + } + + protected void markOperationStartTime() { + operationStart = System.currentTimeMillis(); + } + + protected void markOperationCompletedTime() { + operationComplete = System.currentTimeMillis(); } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 100dc6a5ceb10bff1d6591dd4b92062f4d787b98..04d816a72afcadbff26f5eb5d68003a7344e31c9 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -18,17 +18,10 @@ package org.apache.hive.service.cli.operation; -import java.io.IOException; -import java.io.PrintStream; -import java.io.Serializable; -import java.io.UnsupportedEncodingException; +import java.io.*; import java.security.PrivilegedExceptionAction; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import java.util.*; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; @@ -44,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryDisplay; import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -69,6 +63,9 @@ import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.server.ThreadWithGarbageCleanup; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; /** * SQLOperation. @@ -128,7 +125,6 @@ private void setupSessionIO(SessionState sessionState) { */ public void prepare(HiveConf sqlOperationConf) throws HiveSQLException { setState(OperationState.RUNNING); - try { driver = new Driver(sqlOperationConf, getParentSession().getUserName()); sqlOpDisplay.setQueryDisplay(driver.getQueryDisplay()); @@ -387,6 +383,38 @@ public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws H } } + @Override + public String getTaskStatus() throws HiveSQLException { + if (driver != null) { + List statuses = driver.getQueryDisplay().getTaskDisplays(); + if (statuses != null) { + ByteArrayOutputStream out = null; + try { + ObjectMapper mapper = new ObjectMapper(); + out = new ByteArrayOutputStream(); + mapper.writeValue(out, statuses); + return out.toString("UTF-8"); + } catch (JsonGenerationException e) { + throw new HiveSQLException(e); + } catch (JsonMappingException e) { + throw new HiveSQLException(e); + } catch (IOException e) { + throw new HiveSQLException(e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + throw new HiveSQLException(e); + } + } + } + } + } + // Driver not initialized + return null; + } + private RowSet decode(List rows, RowSet rowSet) throws Exception { if (driver.isFetchingTable()) { return prepareFromRow(rows, rowSet); @@ -508,6 +536,7 @@ public SQLOperationDisplay getSQLOperationDisplay() { @Override protected void onNewState(OperationState state, OperationState prevState) { + super.onNewState(state, prevState); currentSQLStateScope = setMetrics(currentSQLStateScope, MetricsConstant.SQL_OPERATION_PREFIX, MetricsConstant.COMPLETED_SQL_OPERATION_PREFIX, state); diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 8dff26467260c365d5869d5b3e3bae15c3acd92d..f8594a0c4057e6c89d3e4e67a27ab7da82136530 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -679,6 +679,7 @@ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) th new OperationHandle(req.getOperationHandle())); resp.setOperationState(operationStatus.getState().toTOperationState()); HiveSQLException opException = operationStatus.getOperationException(); + resp.setTaskStatus(operationStatus.getTaskStatus()); if (opException != null) { resp.setSqlState(opException.getSQLState()); resp.setErrorCode(opException.getErrorCode()); diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java index 5f01165d0dfd131a2599b90a8e1c4d4970650b7a..ccce6dc57d2d654e5822b05de3e5e5e4bbe38ddd 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java @@ -361,7 +361,8 @@ public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveS if (opState == OperationState.ERROR) { opException = new HiveSQLException(resp.getErrorMessage(), resp.getSqlState(), resp.getErrorCode()); } - return new OperationStatus(opState, opException); + return new OperationStatus(opState, resp.getTaskStatus(), resp.getOperationStarted(), + resp.getOperationCompleted(), opException); } catch (HiveSQLException e) { throw e; } catch (Exception e) { diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index e78181a15993d99f1cab5a061c08bb21823d2171..ef56b13c4c41f04a97fe1ad88499050f2a42cac0 100644 --- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -18,11 +18,9 @@ package org.apache.hive.service.cli; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; +import java.io.ByteArrayInputStream; import java.io.Serializable; import java.util.Collections; import java.util.HashMap; @@ -36,15 +34,19 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.QueryDisplay; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook; import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.session.SessionState; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -169,6 +171,9 @@ public void testExecuteStatement() throws Exception { // Blocking execute queryString = "SELECT ID+1 FROM TEST_EXEC"; opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); + + OperationStatus opStatus = client.getOperationStatus(opHandle); + checkOperationTimes(opHandle, opStatus); // Expect query to be completed now assertEquals("Query should be finished", OperationState.FINISHED, client.getOperationStatus(opHandle).getState()); @@ -266,6 +271,10 @@ public void testExecuteStatementAsync() throws Exception { opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay); System.out.println("Cancelling " + opHandle); client.cancelOperation(opHandle); + + OperationStatus operationStatus = client.getOperationStatus(opHandle); + checkOperationTimes(opHandle, operationStatus); + state = client.getOperationStatus(opHandle).getState(); System.out.println(opHandle + " after cancelling, state= " + state); assertEquals("Query should be cancelled", OperationState.CANCELED, state); @@ -489,7 +498,7 @@ private SessionHandle openSession(Map confOverlay) SessionState.get().setIsHiveServerQuery(true); // Pretend we are in HS2. String queryString = "SET " + HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname - + " = false"; + + " = false"; client.executeStatement(sessionHandle, queryString, confOverlay); return sessionHandle; } @@ -620,4 +629,88 @@ public void testConfOverlay() throws Exception { client.closeOperation(opHandle); client.closeSession(sessionHandle); } + + @Test + public void testTaskStatus() throws Exception { + HashMap confOverlay = new HashMap(); + String tableName = "TEST_EXEC_ASYNC"; + String columnDefinitions = "(ID STRING)"; + + // Open a session and set up the test data + SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay); + assertNotNull(sessionHandle); + // nonblocking execute + String select = "SELECT ID + ' ' FROM TEST_EXEC_ASYNC"; + OperationHandle ophandle = + client.executeStatementAsync(sessionHandle, select, confOverlay); + + OperationStatus status = null; + int count = 0; + while (true) { + status = client.getOperationStatus(ophandle); + checkOperationTimes(ophandle, status); + OperationState state = status.getState(); + System.out.println("Polling: " + ophandle + " count=" + (++count) + + " state=" + state); + + String jsonTaskStatus = status.getTaskStatus(); + assertNotNull(jsonTaskStatus); + ObjectMapper mapper = new ObjectMapper(); + ByteArrayInputStream in = new ByteArrayInputStream(jsonTaskStatus.getBytes("UTF-8")); + List taskStatuses = + mapper.readValue(in, new TypeReference>(){}); + checkTaskStatuses(taskStatuses); + System.out.println("task statuses: " + jsonTaskStatus); // TaskDisplay doesn't have a toString, using json + if (OperationState.CANCELED == state || state == OperationState.CLOSED + || state == OperationState.FINISHED + || state == OperationState.ERROR) { + break; + } + Thread.sleep(1000); + } + } + + private void checkTaskStatuses(List taskDisplays) { + assertNotNull(taskDisplays); + for (QueryDisplay.TaskDisplay taskDisplay: taskDisplays) { + switch (taskDisplay.taskState) { + case INITIALIZED: + case QUEUED: + assertNull(taskDisplay.getBeginTime()); + assertNull(taskDisplay.getEndTime()); + assertNull(taskDisplay.getElapsedTime()); + assertNull(taskDisplay.getErrorMsg()); + assertNull(taskDisplay.getReturnValue()); + break; + case RUNNING: + assertNotNull(taskDisplay.getBeginTime()); + assertNull(taskDisplay.getEndTime()); + assertNotNull(taskDisplay.getElapsedTime()); + assertNull(taskDisplay.getErrorMsg()); + assertNull(taskDisplay.getReturnValue()); + break; + case FINISHED: + assertNotNull(taskDisplay.getBeginTime()); + assertNotNull(taskDisplay.getEndTime()); + assertNotNull(taskDisplay.getElapsedTime()); + break; + case UNKNOWN: + default: + fail("unknown task status: " + taskDisplay); + } + } + } + + + private void checkOperationTimes(OperationHandle operationHandle, OperationStatus status) { + OperationState state = status.getState(); + if (OperationState.CANCELED == state || state == OperationState.CLOSED + || state == OperationState.FINISHED || state == OperationState.ERROR) { + System.out.println("##OP " + operationHandle.getHandleIdentifier() + " STATE:" + status.getState() + +" START:" + status.getOperationStarted() + + " END:" + status.getOperationCompleted()); + assertFalse(status.getOperationCompleted() == 0); + assertTrue(status.getOperationCompleted() - status.getOperationStarted() >= 0); + } + } }