diff --git a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java index af009b8..3bc5974 100644 --- a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java +++ b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java @@ -90,6 +90,97 @@ private SourceStateProto(int index, int value) { // @@protoc_insertion_point(enum_scope:SourceStateProto) } + /** + * Protobuf enum {@code SubmissionStateProto} + */ + public enum SubmissionStateProto + implements com.google.protobuf.ProtocolMessageEnum { + /** + * ACCEPTED = 1; + */ + ACCEPTED(0, 1), + /** + * REJECTED = 2; + */ + REJECTED(1, 2), + /** + * EVICTED_OTHER = 3; + */ + EVICTED_OTHER(2, 3), + ; + + /** + * ACCEPTED = 1; + */ + public static final int ACCEPTED_VALUE = 1; + /** + * REJECTED = 2; + */ + public static final int REJECTED_VALUE = 2; + /** + * EVICTED_OTHER = 3; + */ + public static final int EVICTED_OTHER_VALUE = 3; + + + public final int getNumber() { return value; } + + public static SubmissionStateProto valueOf(int value) { + switch (value) { + case 1: return ACCEPTED; + case 2: return REJECTED; + case 3: return EVICTED_OTHER; + default: return null; + } + } + + public static com.google.protobuf.Internal.EnumLiteMap + internalGetValueMap() { + return internalValueMap; + } + private static com.google.protobuf.Internal.EnumLiteMap + internalValueMap = + new com.google.protobuf.Internal.EnumLiteMap() { + public SubmissionStateProto findValueByNumber(int number) { + return SubmissionStateProto.valueOf(number); + } + }; + + public final com.google.protobuf.Descriptors.EnumValueDescriptor + getValueDescriptor() { + return getDescriptor().getValues().get(index); + } + public final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptorForType() { + return getDescriptor(); + } + public static final com.google.protobuf.Descriptors.EnumDescriptor + getDescriptor() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.getDescriptor().getEnumTypes().get(1); + } + + private static final SubmissionStateProto[] VALUES = values(); + + public static SubmissionStateProto valueOf( + com.google.protobuf.Descriptors.EnumValueDescriptor desc) { + if (desc.getType() != getDescriptor()) { + throw new java.lang.IllegalArgumentException( + "EnumValueDescriptor is not for this type."); + } + return VALUES[desc.getIndex()]; + } + + private final int index; + private final int value; + + private SubmissionStateProto(int index, int value) { + this.index = index; + this.value = value; + } + + // @@protoc_insertion_point(enum_scope:SubmissionStateProto) + } + public interface UserPayloadProtoOrBuilder extends com.google.protobuf.MessageOrBuilder { @@ -8265,6 +8356,31 @@ public Builder clearFragmentRuntimeInfo() { public interface SubmitWorkResponseProtoOrBuilder extends com.google.protobuf.MessageOrBuilder { + + // optional .SubmissionStateProto submission_state = 1; + /** + * optional .SubmissionStateProto submission_state = 1; + */ + boolean hasSubmissionState(); + /** + * optional .SubmissionStateProto submission_state = 1; + */ + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto getSubmissionState(); + + // optional string msg = 2; + /** + * optional string msg = 2; + */ + boolean hasMsg(); + /** + * optional string msg = 2; + */ + java.lang.String getMsg(); + /** + * optional string msg = 2; + */ + com.google.protobuf.ByteString + getMsgBytes(); } /** * Protobuf type {@code SubmitWorkResponseProto} @@ -8299,6 +8415,7 @@ private SubmitWorkResponseProto( com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { initFields(); + int mutable_bitField0_ = 0; com.google.protobuf.UnknownFieldSet.Builder unknownFields = com.google.protobuf.UnknownFieldSet.newBuilder(); try { @@ -8316,6 +8433,22 @@ private SubmitWorkResponseProto( } break; } + case 8: { + int rawValue = input.readEnum(); + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto value = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(1, rawValue); + } else { + bitField0_ |= 0x00000001; + submissionState_ = value; + } + break; + } + case 18: { + bitField0_ |= 0x00000002; + msg_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -8355,7 +8488,69 @@ public SubmitWorkResponseProto parsePartialFrom( return PARSER; } + private int bitField0_; + // optional .SubmissionStateProto submission_state = 1; + public static final int SUBMISSION_STATE_FIELD_NUMBER = 1; + private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto submissionState_; + /** + * optional .SubmissionStateProto submission_state = 1; + */ + public boolean hasSubmissionState() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional .SubmissionStateProto submission_state = 1; + */ + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto getSubmissionState() { + return submissionState_; + } + + // optional string msg = 2; + public static final int MSG_FIELD_NUMBER = 2; + private java.lang.Object msg_; + /** + * optional string msg = 2; + */ + public boolean hasMsg() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional string msg = 2; + */ + public java.lang.String getMsg() { + java.lang.Object ref = msg_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + msg_ = s; + } + return s; + } + } + /** + * optional string msg = 2; + */ + public com.google.protobuf.ByteString + getMsgBytes() { + java.lang.Object ref = msg_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + msg_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { + submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED; + msg_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -8369,6 +8564,12 @@ public final boolean isInitialized() { public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeEnum(1, submissionState_.getNumber()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, getMsgBytes()); + } getUnknownFields().writeTo(output); } @@ -8378,6 +8579,14 @@ public int getSerializedSize() { if (size != -1) return size; size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeEnumSize(1, submissionState_.getNumber()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, getMsgBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -8401,6 +8610,16 @@ public boolean equals(final java.lang.Object obj) { org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto) obj; boolean result = true; + result = result && (hasSubmissionState() == other.hasSubmissionState()); + if (hasSubmissionState()) { + result = result && + (getSubmissionState() == other.getSubmissionState()); + } + result = result && (hasMsg() == other.hasMsg()); + if (hasMsg()) { + result = result && getMsg() + .equals(other.getMsg()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -8414,6 +8633,14 @@ public int hashCode() { } int hash = 41; hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasSubmissionState()) { + hash = (37 * hash) + SUBMISSION_STATE_FIELD_NUMBER; + hash = (53 * hash) + hashEnum(getSubmissionState()); + } + if (hasMsg()) { + hash = (37 * hash) + MSG_FIELD_NUMBER; + hash = (53 * hash) + getMsg().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -8523,6 +8750,10 @@ private static Builder create() { public Builder clear() { super.clear(); + submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED; + bitField0_ = (bitField0_ & ~0x00000001); + msg_ = ""; + bitField0_ = (bitField0_ & ~0x00000002); return this; } @@ -8549,6 +8780,17 @@ public Builder clone() { public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto buildPartial() { org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.submissionState_ = submissionState_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.msg_ = msg_; + result.bitField0_ = to_bitField0_; onBuilt(); return result; } @@ -8564,6 +8806,14 @@ public Builder mergeFrom(com.google.protobuf.Message other) { public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto other) { if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto.getDefaultInstance()) return this; + if (other.hasSubmissionState()) { + setSubmissionState(other.getSubmissionState()); + } + if (other.hasMsg()) { + bitField0_ |= 0x00000002; + msg_ = other.msg_; + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -8589,6 +8839,117 @@ public Builder mergeFrom( } return this; } + private int bitField0_; + + // optional .SubmissionStateProto submission_state = 1; + private org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED; + /** + * optional .SubmissionStateProto submission_state = 1; + */ + public boolean hasSubmissionState() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional .SubmissionStateProto submission_state = 1; + */ + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto getSubmissionState() { + return submissionState_; + } + /** + * optional .SubmissionStateProto submission_state = 1; + */ + public Builder setSubmissionState(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + submissionState_ = value; + onChanged(); + return this; + } + /** + * optional .SubmissionStateProto submission_state = 1; + */ + public Builder clearSubmissionState() { + bitField0_ = (bitField0_ & ~0x00000001); + submissionState_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto.ACCEPTED; + onChanged(); + return this; + } + + // optional string msg = 2; + private java.lang.Object msg_ = ""; + /** + * optional string msg = 2; + */ + public boolean hasMsg() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional string msg = 2; + */ + public java.lang.String getMsg() { + java.lang.Object ref = msg_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + msg_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string msg = 2; + */ + public com.google.protobuf.ByteString + getMsgBytes() { + java.lang.Object ref = msg_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + msg_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string msg = 2; + */ + public Builder setMsg( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + msg_ = value; + onChanged(); + return this; + } + /** + * optional string msg = 2; + */ + public Builder clearMsg() { + bitField0_ = (bitField0_ & ~0x00000002); + msg_ = getDefaultInstance().getMsg(); + onChanged(); + return this; + } + /** + * optional string msg = 2; + */ + public Builder setMsgBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + msg_ = value; + onChanged(); + return this; + } // @@protoc_insertion_point(builder_scope:SubmitWorkResponseProto) } @@ -13565,33 +13926,36 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { "ing\030\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n" + "\rfragment_spec\030\t \001(\0132\022.FragmentSpecProto" + "\0223\n\025fragment_runtime_info\030\n \001(\0132\024.Fragme" + - "ntRuntimeInfo\"\031\n\027SubmitWorkResponseProto" + - "\"f\n\036SourceStateUpdatedRequestProto\022\020\n\010da" + - "g_name\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030" + - "\003 \001(\0162\021.SourceStateProto\"!\n\037SourceStateU" + - "pdatedResponseProto\"X\n\031QueryCompleteRequ" + - "estProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002" + - " \001(\t\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryCo", - "mpleteResponseProto\"g\n\035TerminateFragment" + - "RequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_na" + - "me\030\002 \001(\t\022\"\n\032fragment_identifier_string\030\007" + - " \001(\t\" \n\036TerminateFragmentResponseProto\"\026" + - "\n\024GetTokenRequestProto\"&\n\025GetTokenRespon" + - "seProto\022\r\n\005token\030\001 \001(\014*2\n\020SourceStatePro" + - "to\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\0022\316\002\n\022L" + - "lapDaemonProtocol\022?\n\nsubmitWork\022\027.Submit" + - "WorkRequestProto\032\030.SubmitWorkResponsePro" + - "to\022W\n\022sourceStateUpdated\022\037.SourceStateUp", - "datedRequestProto\032 .SourceStateUpdatedRe" + - "sponseProto\022H\n\rqueryComplete\022\032.QueryComp" + - "leteRequestProto\032\033.QueryCompleteResponse" + - "Proto\022T\n\021terminateFragment\022\036.TerminateFr" + - "agmentRequestProto\032\037.TerminateFragmentRe" + - "sponseProto2]\n\026LlapManagementProtocol\022C\n" + - "\022getDelegationToken\022\025.GetTokenRequestPro" + - "to\032\026.GetTokenResponseProtoBH\n&org.apache" + - ".hadoop.hive.llap.daemon.rpcB\030LlapDaemon" + - "ProtocolProtos\210\001\001\240\001\001" + "ntRuntimeInfo\"W\n\027SubmitWorkResponseProto" + + "\022/\n\020submission_state\030\001 \001(\0162\025.SubmissionS" + + "tateProto\022\013\n\003msg\030\002 \001(\t\"f\n\036SourceStateUpd" + + "atedRequestProto\022\020\n\010dag_name\030\001 \001(\t\022\020\n\010sr" + + "c_name\030\002 \001(\t\022 \n\005state\030\003 \001(\0162\021.SourceStat" + + "eProto\"!\n\037SourceStateUpdatedResponseProt" + + "o\"X\n\031QueryCompleteRequestProto\022\020\n\010query_", + "id\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t\022\027\n\014delete_del" + + "ay\030\003 \001(\003:\0010\"\034\n\032QueryCompleteResponseProt" + + "o\"g\n\035TerminateFragmentRequestProto\022\020\n\010qu" + + "ery_id\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t\022\"\n\032fragme" + + "nt_identifier_string\030\007 \001(\t\" \n\036TerminateF" + + "ragmentResponseProto\"\026\n\024GetTokenRequestP" + + "roto\"&\n\025GetTokenResponseProto\022\r\n\005token\030\001" + + " \001(\014*2\n\020SourceStateProto\022\017\n\013S_SUCCEEDED\020" + + "\001\022\r\n\tS_RUNNING\020\002*E\n\024SubmissionStateProto" + + "\022\014\n\010ACCEPTED\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_", + "OTHER\020\0032\316\002\n\022LlapDaemonProtocol\022?\n\nsubmit" + + "Work\022\027.SubmitWorkRequestProto\032\030.SubmitWo" + + "rkResponseProto\022W\n\022sourceStateUpdated\022\037." + + "SourceStateUpdatedRequestProto\032 .SourceS" + + "tateUpdatedResponseProto\022H\n\rqueryComplet" + + "e\022\032.QueryCompleteRequestProto\032\033.QueryCom" + + "pleteResponseProto\022T\n\021terminateFragment\022" + + "\036.TerminateFragmentRequestProto\032\037.Termin" + + "ateFragmentResponseProto2]\n\026LlapManageme" + + "ntProtocol\022C\n\022getDelegationToken\022\025.GetTo", + "kenRequestProto\032\026.GetTokenResponseProtoB" + + "H\n&org.apache.hadoop.hive.llap.daemon.rp" + + "cB\030LlapDaemonProtocolProtos\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -13645,7 +14009,7 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { internal_static_SubmitWorkResponseProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SubmitWorkResponseProto_descriptor, - new java.lang.String[] { }); + new java.lang.String[] { "SubmissionState", "Msg", }); internal_static_SourceStateUpdatedRequestProto_descriptor = getDescriptor().getMessageTypes().get(8); internal_static_SourceStateUpdatedRequestProto_fieldAccessorTable = new diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java index f3ce33b..fc29371 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java @@ -16,19 +16,22 @@ import java.io.IOException; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; public interface ContainerRunner { - void submitWork(SubmitWorkRequestProto request) throws IOException; + SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException; - void sourceStateUpdated(SourceStateUpdatedRequestProto request); + SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request); - void queryComplete(QueryCompleteRequestProto request); + QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request); - void terminateFragment(TerminateFragmentRequestProto request); + TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 2139bb0..5b8ba97 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -22,13 +22,11 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; @@ -39,9 +37,14 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor; @@ -145,7 +148,7 @@ protected void serviceStop() throws Exception { } @Override - public void submitWork(SubmitWorkRequestProto request) throws IOException { + public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException { HistoryLogger.logFragmentStart(request.getApplicationIdString(), request.getContainerIdString(), localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(), @@ -157,6 +160,8 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { // TODO Reduce the length of this string. Way too verbose at the moment. String ndcContextString = request.getFragmentSpec().getFragmentIdentifierString(); NDC.push(ndcContextString); + Scheduler.SubmissionState submissionState; + SubmitWorkResponseProto.Builder responseBuilder = SubmitWorkResponseProto.newBuilder(); try { Map env = new HashMap<>(); // TODO What else is required in this environment map. @@ -191,7 +196,9 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { Token jobToken = TokenCache.getSessionToken(credentials); - LOG.debug("Registering request with the ShuffleHandler"); + if (LOG.isDebugEnabled()) { + LOG.debug("Registering request with the ShuffleHandler"); + } ShuffleHandler.get() .registerDag(request.getApplicationIdString(), dagIdentifier, jobToken, request.getUser(), localDirs); @@ -200,18 +207,29 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, this); - try { - executorService.schedule(callable); - } catch (RejectedExecutionException e) { + submissionState = executorService.schedule(callable); + + if (LOG.isInfoEnabled()) { + LOG.info("Work SubmissionState: " + submissionState.toString()); + } + + if (submissionState.equals(Scheduler.SubmissionState.REJECTED)) { // Stop tracking the fragment and re-throw the error. fragmentComplete(fragmentInfo); - throw e; + return responseBuilder + .setSubmissionState(SubmissionStateProto.valueOf(submissionState.name())) + .setMsg(submissionState.getMessage()) + .build(); } metrics.incrExecutorTotalRequestsHandled(); metrics.incrExecutorNumQueuedRequests(); } finally { NDC.pop(); } + + responseBuilder.setSubmissionState(SubmissionStateProto.valueOf(submissionState.name())) + .setMsg(submissionState.getMessage()); + return responseBuilder.build(); } private static class LlapExecutionContext extends ExecutionContextImpl @@ -230,14 +248,15 @@ public void initializeHook(TezProcessor source) { } @Override - public void sourceStateUpdated(SourceStateUpdatedRequestProto request) { + public SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request) { LOG.info("Processing state update: " + stringifySourceStateUpdateRequest(request)); queryTracker.registerSourceStateChange(request.getDagName(), request.getSrcName(), request.getState()); + return SourceStateUpdatedResponseProto.getDefaultInstance(); } @Override - public void queryComplete(QueryCompleteRequestProto request) { + public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) { LOG.info("Processing queryComplete notification for {}", request.getDagName()); List knownFragments = queryTracker.queryComplete(null, request.getDagName(), request.getDeleteDelay()); @@ -248,12 +267,14 @@ public void queryComplete(QueryCompleteRequestProto request) { fragmentInfo.getFragmentIdentifierString()); executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); } + return QueryCompleteResponseProto.getDefaultInstance(); } @Override - public void terminateFragment(TerminateFragmentRequestProto request) { + public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) { LOG.info("DBG: Received terminateFragment request for {}", request.getFragmentIdentifierString()); executorService.killFragment(request.getFragmentIdentifierString()); + return TerminateFragmentResponseProto.getDefaultInstance(); } private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) { diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 7ce8ba0..fe37319 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -32,16 +32,20 @@ import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; -import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.hive.llap.daemon.services.impl.LlapWebServices; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; import org.apache.hadoop.hive.llap.metrics.MetricsUtils; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.service.CompositeService; @@ -311,25 +315,25 @@ public static void main(String[] args) throws Exception { } @Override - public void submitWork(SubmitWorkRequestProto request) throws + public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException { numSubmissions.incrementAndGet(); - containerRunner.submitWork(request); + return containerRunner.submitWork(request); } @Override - public void sourceStateUpdated(SourceStateUpdatedRequestProto request) { - containerRunner.sourceStateUpdated(request); + public SourceStateUpdatedResponseProto sourceStateUpdated(SourceStateUpdatedRequestProto request) { + return containerRunner.sourceStateUpdated(request); } @Override - public void queryComplete(QueryCompleteRequestProto request) { - containerRunner.queryComplete(request); + public QueryCompleteResponseProto queryComplete(QueryCompleteRequestProto request) { + return containerRunner.queryComplete(request); } @Override - public void terminateFragment(TerminateFragmentRequestProto request) { - containerRunner.terminateFragment(request); + public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) { + return containerRunner.terminateFragment(request); } @VisibleForTesting diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java index db0b752..3c5984f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java @@ -19,20 +19,14 @@ import java.security.PrivilegedAction; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; -import com.google.protobuf.BlockingService; -import com.google.protobuf.ByteString; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.daemon.ContainerRunner; +import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; +import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenResponseProto; @@ -44,20 +38,27 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; +import org.apache.hadoop.hive.llap.security.LlapDaemonPolicyProvider; +import org.apache.hadoop.hive.llap.security.LlapSecurityHelper; +import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; +import org.apache.hadoop.hive.llap.security.SecretManager; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.hive.llap.security.LlapSecurityHelper; -import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; -import org.apache.hadoop.hive.llap.security.SecretManager; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.hive.llap.daemon.ContainerRunner; -import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; -import org.apache.hadoop.hive.llap.daemon.LlapManagementProtocolBlockingPB; -import org.apache.hadoop.hive.llap.security.LlapDaemonPolicyProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import com.google.protobuf.BlockingService; +import com.google.protobuf.ByteString; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; public class LlapDaemonProtocolServerImpl extends AbstractService implements LlapDaemonProtocolBlockingPB, LlapManagementProtocolBlockingPB { @@ -93,33 +94,29 @@ public SubmitWorkResponseProto submitWork(RpcController controller, SubmitWorkRequestProto request) throws ServiceException { try { - containerRunner.submitWork(request); + return containerRunner.submitWork(request); } catch (IOException e) { throw new ServiceException(e); } - return SubmitWorkResponseProto.getDefaultInstance(); } @Override public SourceStateUpdatedResponseProto sourceStateUpdated(RpcController controller, SourceStateUpdatedRequestProto request) throws ServiceException { - containerRunner.sourceStateUpdated(request); - return SourceStateUpdatedResponseProto.getDefaultInstance(); + return containerRunner.sourceStateUpdated(request); } @Override public QueryCompleteResponseProto queryComplete(RpcController controller, QueryCompleteRequestProto request) throws ServiceException { - containerRunner.queryComplete(request); - return QueryCompleteResponseProto.getDefaultInstance(); + return containerRunner.queryComplete(request); } @Override public TerminateFragmentResponseProto terminateFragment( RpcController controller, TerminateFragmentRequestProto request) throws ServiceException { - containerRunner.terminateFragment(request); - return TerminateFragmentResponseProto.getDefaultInstance(); + return containerRunner.terminateFragment(request); } @Override diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java index 1d35b10..e7d4ff1 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java @@ -18,19 +18,39 @@ package org.apache.hadoop.hive.llap.daemon.impl; import java.util.Set; -import java.util.concurrent.RejectedExecutionException; /** * Task scheduler interface */ public interface Scheduler { + enum SubmissionState { + ACCEPTED, // request accepted + REJECTED, // request rejected as wait queue is full + EVICTED_OTHER; // request accepted but evicted other low priority task + + String message; + + public String getMessage() { + return message; + } + + public void setMessage(final String message) { + this.message = message; + } + + @Override + public String toString() { + return message == null ? super.toString() : super.toString() + " Message: " + message; + } + } + /** * Schedule the task or throw RejectedExecutionException if queues are full * @param t - task to schedule - * @throws RejectedExecutionException + * @return SubmissionState */ - void schedule(T t) throws RejectedExecutionException; + SubmissionState schedule(T t); /** * Attempt to kill the fragment with the specified fragmentId diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 5e2c6dd..712e11c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -70,7 +70,7 @@ * run to completion immediately (canFinish = false) are added to pre-emption queue. *

* When all the executor threads are occupied and wait queue is full, the task scheduler will - * throw RejectedExecutionException. + * return SubmissionState.REJECTED response *

* Task executor service can be shut down which will terminated all running tasks and reject all * new tasks. Shutting down of the task executor service can be done gracefully or immediately. @@ -316,9 +316,9 @@ public void onFailure(Throwable t) { } @Override - public void schedule(TaskRunnerCallable task) throws RejectedExecutionException { + public SubmissionState schedule(TaskRunnerCallable task) { TaskWrapper taskWrapper = new TaskWrapper(task, this); - + SubmissionState result; TaskWrapper evictedTask; synchronized (lock) { // If the queue does not have capacity, it does not throw a Rejection. Instead it will @@ -328,19 +328,34 @@ public void schedule(TaskRunnerCallable task) throws RejectedExecutionException // actual executor threads picking up any work. This will lead to unnecessary rejection of tasks. // The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots) evictedTask = waitQueue.offer(taskWrapper); - if (evictedTask != taskWrapper) { + + // null evicted task means offer accepted + // evictedTask is not equal taskWrapper means current task is accepted and it evicted + // some other task + if (evictedTask == null || evictedTask != taskWrapper) { knownTasks.put(taskWrapper.getRequestId(), taskWrapper); taskWrapper.setIsInWaitQueue(true); if (isDebugEnabled) { LOG.debug("{} added to wait queue. Current wait queue size={}", task.getRequestId(), waitQueue.size()); } + + result = evictedTask == null ? SubmissionState.ACCEPTED : SubmissionState.EVICTED_OTHER; + + if (result.equals(SubmissionState.EVICTED_OTHER)) { + result.setMessage(evictedTask.getRequestId()); + } else { + result.setMessage(task.getRequestId()); + } } else { if (isInfoEnabled) { LOG.info("wait queue full, size={}. {} not added", waitQueue.size(), task.getRequestId()); } evictedTask.getTaskRunnerCallable().killTask(); - throw new RejectedExecutionException("Wait queue full"); + + result = SubmissionState.REJECTED; + result.setMessage("Wait queue full"); + return result; } } @@ -371,6 +386,8 @@ public void schedule(TaskRunnerCallable task) throws RejectedExecutionException synchronized (lock) { lock.notify(); } + + return result; } @Override diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto b/llap-server/src/protobuf/LlapDaemonProtocol.proto index 07721df..cf8d22d 100644 --- a/llap-server/src/protobuf/LlapDaemonProtocol.proto +++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto @@ -87,7 +87,15 @@ message SubmitWorkRequestProto { optional FragmentRuntimeInfo fragment_runtime_info = 10; } +enum SubmissionStateProto { + ACCEPTED = 1; + REJECTED = 2; + EVICTED_OTHER = 3; +} + message SubmitWorkResponseProto { + optional SubmissionStateProto submission_state = 1; + optional string msg = 2; } message SourceStateUpdatedRequestProto { diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index cb2d0e9..5491064 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -22,7 +22,6 @@ import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.util.HashMap; import java.util.Map; @@ -30,7 +29,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -124,28 +122,17 @@ public void testWaitQueuePreemption() throws InterruptedException { // TODO HIVE-11687. Remove the awaitStart once offer can handle (waitQueueSize + numFreeExecutionSlots) // This currently serves to allow the task to be removed from the waitQueue. r1.awaitStart(); - try { - taskExecutorService.schedule(r2); - } catch (RejectedExecutionException e) { - fail("Unexpected rejection with space available in queue"); - } - try { - taskExecutorService.schedule(r3); - } catch (RejectedExecutionException e) { - fail("Unexpected rejection with space available in queue"); - } + Scheduler.SubmissionState submissionState = taskExecutorService.schedule(r2); + assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState); - try { - taskExecutorService.schedule(r4); - fail("Expecting a Rejection for non finishable task with a full queue"); - } catch (RejectedExecutionException e) { - } + submissionState = taskExecutorService.schedule(r3); + assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState); - try { - taskExecutorService.schedule(r5); - } catch (RejectedExecutionException e) { - fail("Unexpected rejection for a finishable task"); - } + submissionState = taskExecutorService.schedule(r4); + assertEquals(Scheduler.SubmissionState.REJECTED, submissionState); + + submissionState = taskExecutorService.schedule(r5); + assertEquals(Scheduler.SubmissionState.EVICTED_OTHER, submissionState); // Ensure the correct task was preempted. assertEquals(true, r3.wasPreempted());