diff --git llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java index 8748151..d378955 100644 --- llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java +++ llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java @@ -5587,6 +5587,16 @@ public Builder clearAttemptNumber() { * optional int64 first_attempt_start_time = 5; */ long getFirstAttemptStartTime(); + + // optional int64 current_attempt_start_time = 6; + /** + * optional int64 current_attempt_start_time = 6; + */ + boolean hasCurrentAttemptStartTime(); + /** + * optional int64 current_attempt_start_time = 6; + */ + long getCurrentAttemptStartTime(); } /** * Protobuf type {@code FragmentRuntimeInfo} @@ -5664,6 +5674,11 @@ private FragmentRuntimeInfo( firstAttemptStartTime_ = input.readInt64(); break; } + case 48: { + bitField0_ |= 0x00000020; + currentAttemptStartTime_ = input.readInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -5784,12 +5799,29 @@ public long getFirstAttemptStartTime() { return firstAttemptStartTime_; } + // optional int64 current_attempt_start_time = 6; + public static final int CURRENT_ATTEMPT_START_TIME_FIELD_NUMBER = 6; + private long currentAttemptStartTime_; + /** + * optional int64 current_attempt_start_time = 6; + */ + public boolean hasCurrentAttemptStartTime() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional int64 current_attempt_start_time = 6; + */ + public long getCurrentAttemptStartTime() { + return currentAttemptStartTime_; + } + private void initFields() { numSelfAndUpstreamTasks_ = 0; numSelfAndUpstreamCompletedTasks_ = 0; withinDagPriority_ = 0; dagStartTime_ = 0L; firstAttemptStartTime_ = 0L; + currentAttemptStartTime_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -5818,6 +5850,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (((bitField0_ & 0x00000010) == 0x00000010)) { output.writeInt64(5, firstAttemptStartTime_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + output.writeInt64(6, currentAttemptStartTime_); + } getUnknownFields().writeTo(output); } @@ -5847,6 +5882,10 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeInt64Size(5, firstAttemptStartTime_); } + if (((bitField0_ & 0x00000020) == 0x00000020)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(6, currentAttemptStartTime_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -5895,6 +5934,11 @@ public boolean equals(final java.lang.Object obj) { result = result && (getFirstAttemptStartTime() == other.getFirstAttemptStartTime()); } + result = result && (hasCurrentAttemptStartTime() == other.hasCurrentAttemptStartTime()); + if (hasCurrentAttemptStartTime()) { + result = result && (getCurrentAttemptStartTime() + == other.getCurrentAttemptStartTime()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -5928,6 +5972,10 @@ public int hashCode() { hash = (37 * hash) + FIRST_ATTEMPT_START_TIME_FIELD_NUMBER; hash = (53 * hash) + hashLong(getFirstAttemptStartTime()); } + if (hasCurrentAttemptStartTime()) { + hash = (37 * hash) + CURRENT_ATTEMPT_START_TIME_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getCurrentAttemptStartTime()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -6047,6 +6095,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000008); firstAttemptStartTime_ = 0L; bitField0_ = (bitField0_ & ~0x00000010); + currentAttemptStartTime_ = 0L; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -6095,6 +6145,10 @@ public Builder clone() { to_bitField0_ |= 0x00000010; } result.firstAttemptStartTime_ = firstAttemptStartTime_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000020; + } + result.currentAttemptStartTime_ = currentAttemptStartTime_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -6126,6 +6180,9 @@ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtoc if (other.hasFirstAttemptStartTime()) { setFirstAttemptStartTime(other.getFirstAttemptStartTime()); } + if (other.hasCurrentAttemptStartTime()) { + setCurrentAttemptStartTime(other.getCurrentAttemptStartTime()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -6318,6 +6375,39 @@ public Builder clearFirstAttemptStartTime() { return this; } + // optional int64 current_attempt_start_time = 6; + private long currentAttemptStartTime_ ; + /** + * optional int64 current_attempt_start_time = 6; + */ + public boolean hasCurrentAttemptStartTime() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional int64 current_attempt_start_time = 6; + */ + public long getCurrentAttemptStartTime() { + return currentAttemptStartTime_; + } + /** + * optional int64 current_attempt_start_time = 6; + */ + public Builder setCurrentAttemptStartTime(long value) { + bitField0_ |= 0x00000020; + currentAttemptStartTime_ = value; + onChanged(); + return this; + } + /** + * optional int64 current_attempt_start_time = 6; + */ + public Builder clearCurrentAttemptStartTime() { + bitField0_ = (bitField0_ & ~0x00000020); + currentAttemptStartTime_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:FragmentRuntimeInfo) } @@ -12714,44 +12804,44 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { "\003(\0132\014.IOSpecProto\0221\n\023grouped_input_specs" + "\030\007 \003(\0132\024.GroupInputSpecProto\022\032\n\022vertex_p" + "arallelism\030\010 \001(\005\022\027\n\017fragment_number\030\t \001(" + - "\005\022\026\n\016attempt_number\030\n \001(\005\"\300\001\n\023FragmentRu", + "\005\022\026\n\016attempt_number\030\n \001(\005\"\344\001\n\023FragmentRu", "ntimeInfo\022#\n\033num_self_and_upstream_tasks" + "\030\001 \001(\005\022-\n%num_self_and_upstream_complete" + "d_tasks\030\002 \001(\005\022\033\n\023within_dag_priority\030\003 \001" + "(\005\022\026\n\016dag_start_time\030\004 \001(\003\022 \n\030first_atte" + - "mpt_start_time\030\005 \001(\003\"\266\002\n\026SubmitWorkReque" + - "stProto\022\033\n\023container_id_string\030\001 \001(\t\022\017\n\007" + - "am_host\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030\n\020token_" + - "identifier\030\004 \001(\t\022\032\n\022credentials_binary\030\005" + - " \001(\014\022\014\n\004user\030\006 \001(\t\022\035\n\025application_id_str" + - "ing\030\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n", - "\rfragment_spec\030\t \001(\0132\022.FragmentSpecProto" + - "\0223\n\025fragment_runtime_info\030\n \001(\0132\024.Fragme" + - "ntRuntimeInfo\"\031\n\027SubmitWorkResponseProto" + - "\"f\n\036SourceStateUpdatedRequestProto\022\020\n\010da" + - "g_name\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030" + - "\003 \001(\0162\021.SourceStateProto\"!\n\037SourceStateU" + - "pdatedResponseProto\"X\n\031QueryCompleteRequ" + - "estProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002" + - " \001(\t\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryCo" + - "mpleteResponseProto\"\245\001\n\035TerminateFragmen", - "tRequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_n" + - "ame\030\002 \001(\t\022\032\n\022dag_attempt_number\030\003 \001(\005\022\023\n" + - "\013vertex_name\030\004 \001(\t\022\027\n\017fragment_number\030\005 " + - "\001(\005\022\026\n\016attempt_number\030\006 \001(\005\" \n\036Terminate" + - "FragmentResponseProto*2\n\020SourceStateProt" + - "o\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\0022\316\002\n\022Ll" + - "apDaemonProtocol\022?\n\nsubmitWork\022\027.SubmitW" + - "orkRequestProto\032\030.SubmitWorkResponseProt" + - "o\022W\n\022sourceStateUpdated\022\037.SourceStateUpd" + - "atedRequestProto\032 .SourceStateUpdatedRes", - "ponseProto\022H\n\rqueryComplete\022\032.QueryCompl" + - "eteRequestProto\032\033.QueryCompleteResponseP" + - "roto\022T\n\021terminateFragment\022\036.TerminateFra" + - "gmentRequestProto\032\037.TerminateFragmentRes" + - "ponseProtoBH\n&org.apache.hadoop.hive.lla" + - "p.daemon.rpcB\030LlapDaemonProtocolProtos\210\001" + - "\001\240\001\001" + "mpt_start_time\030\005 \001(\003\022\"\n\032current_attempt_" + + "start_time\030\006 \001(\003\"\266\002\n\026SubmitWorkRequestPr" + + "oto\022\033\n\023container_id_string\030\001 \001(\t\022\017\n\007am_h" + + "ost\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030\n\020token_iden" + + "tifier\030\004 \001(\t\022\032\n\022credentials_binary\030\005 \001(\014" + + "\022\014\n\004user\030\006 \001(\t\022\035\n\025application_id_string\030", + "\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n\rfra" + + "gment_spec\030\t \001(\0132\022.FragmentSpecProto\0223\n\025" + + "fragment_runtime_info\030\n \001(\0132\024.FragmentRu" + + "ntimeInfo\"\031\n\027SubmitWorkResponseProto\"f\n\036" + + "SourceStateUpdatedRequestProto\022\020\n\010dag_na" + + "me\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001(" + + "\0162\021.SourceStateProto\"!\n\037SourceStateUpdat" + + "edResponseProto\"X\n\031QueryCompleteRequestP" + + "roto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t" + + "\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryComple", + "teResponseProto\"\245\001\n\035TerminateFragmentReq" + + "uestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030" + + "\002 \001(\t\022\032\n\022dag_attempt_number\030\003 \001(\005\022\023\n\013ver" + + "tex_name\030\004 \001(\t\022\027\n\017fragment_number\030\005 \001(\005\022" + + "\026\n\016attempt_number\030\006 \001(\005\" \n\036TerminateFrag" + + "mentResponseProto*2\n\020SourceStateProto\022\017\n" + + "\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\0022\316\002\n\022LlapDa" + + "emonProtocol\022?\n\nsubmitWork\022\027.SubmitWorkR" + + "equestProto\032\030.SubmitWorkResponseProto\022W\n" + + "\022sourceStateUpdated\022\037.SourceStateUpdated", + "RequestProto\032 .SourceStateUpdatedRespons" + + "eProto\022H\n\rqueryComplete\022\032.QueryCompleteR" + + "equestProto\032\033.QueryCompleteResponseProto" + + "\022T\n\021terminateFragment\022\036.TerminateFragmen" + + "tRequestProto\032\037.TerminateFragmentRespons" + + "eProtoBH\n&org.apache.hadoop.hive.llap.da" + + "emon.rpcB\030LlapDaemonProtocolProtos\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -12793,7 +12883,7 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { internal_static_FragmentRuntimeInfo_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_FragmentRuntimeInfo_descriptor, - new java.lang.String[] { "NumSelfAndUpstreamTasks", "NumSelfAndUpstreamCompletedTasks", "WithinDagPriority", "DagStartTime", "FirstAttemptStartTime", }); + new java.lang.String[] { "NumSelfAndUpstreamTasks", "NumSelfAndUpstreamCompletedTasks", "WithinDagPriority", "DagStartTime", "FirstAttemptStartTime", "CurrentAttemptStartTime", }); internal_static_SubmitWorkRequestProto_descriptor = getDescriptor().getMessageTypes().get(6); internal_static_SubmitWorkRequestProto_fieldAccessorTable = new diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index a208bdd..d594d6a 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -282,6 +282,7 @@ public static String stringifySubmitRequest(SubmitWorkRequestProto request) { sb.append(", completedTaskCount=").append(fragmentRuntimeInfo.getNumSelfAndUpstreamCompletedTasks()); sb.append(", dagStartTime=").append(fragmentRuntimeInfo.getDagStartTime()); sb.append(", firstAttemptStartTime=").append(fragmentRuntimeInfo.getFirstAttemptStartTime()); + sb.append(", currentAttemptStartTime=").append(fragmentRuntimeInfo.getCurrentAttemptStartTime()); sb.append("}"); return sb.toString(); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 166dac5..2ea39b7 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -442,4 +442,9 @@ private String getTaskAttemptId(SubmitWorkRequestProto request) { public long getFirstAttemptStartTime() { return request.getFragmentRuntimeInfo().getFirstAttemptStartTime(); } + + public long getCurrentAttemptStartTime() { + return request.getFragmentRuntimeInfo().getCurrentAttemptStartTime(); + } + } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java index d83d62b..40b317d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java @@ -160,6 +160,7 @@ public synchronized FragmentRuntimeInfo getFragmentRuntimeInfo(String dagName, S builder.setDagStartTime(taskCommunicatorContext.getDagStartTime()); builder.setWithinDagPriority(priority); builder.setFirstAttemptStartTime(taskCommunicatorContext.getFirstAttemptStartTime(vertexName, fragmentNumber)); + builder.setCurrentAttemptStartTime(System.currentTimeMillis()); return builder.build(); } diff --git llap-server/src/protobuf/LlapDaemonProtocol.proto llap-server/src/protobuf/LlapDaemonProtocol.proto index e098e87..d8fd882 100644 --- llap-server/src/protobuf/LlapDaemonProtocol.proto +++ llap-server/src/protobuf/LlapDaemonProtocol.proto @@ -66,6 +66,7 @@ message FragmentRuntimeInfo { optional int32 within_dag_priority = 3; optional int64 dag_start_time = 4; optional int64 first_attempt_start_time = 5; + optional int64 current_attempt_start_time = 6; } enum SourceStateProto {