diff --git llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java index ece31ed..c19cf63 100644 --- llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java +++ llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java @@ -1,5 +1,5 @@ // Generated by the protocol buffer compiler. DO NOT EDIT! -// source: LlapDaemonProtocol.proto +// source: llap-common/src/protobuf/LlapDaemonProtocol.proto package org.apache.hadoop.hive.llap.daemon.rpc; @@ -3454,6 +3454,16 @@ public Builder clearMergedInputDescriptor() { * */ int getVertexParallelism(); + + // optional bool is_external_submission = 14 [default = false]; + /** + * optional bool is_external_submission = 14 [default = false]; + */ + boolean hasIsExternalSubmission(); + /** + * optional bool is_external_submission = 14 [default = false]; + */ + boolean getIsExternalSubmission(); } /** * Protobuf type {@code SignableVertexSpec} @@ -3600,6 +3610,11 @@ private SignableVertexSpec( vertexParallelism_ = input.readInt32(); break; } + case 112: { + bitField0_ |= 0x00000400; + isExternalSubmission_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -4096,6 +4111,22 @@ public int getVertexParallelism() { return vertexParallelism_; } + // optional bool is_external_submission = 14 [default = false]; + public static final int IS_EXTERNAL_SUBMISSION_FIELD_NUMBER = 14; + private boolean isExternalSubmission_; + /** + * optional bool is_external_submission = 14 [default = false]; + */ + public boolean hasIsExternalSubmission() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * optional bool is_external_submission = 14 [default = false]; + */ + public boolean getIsExternalSubmission() { + return isExternalSubmission_; + } + private void initFields() { user_ = ""; signatureKeyId_ = 0L; @@ -4110,6 +4141,7 @@ private void initFields() { outputSpecs_ = java.util.Collections.emptyList(); groupedInputSpecs_ = java.util.Collections.emptyList(); vertexParallelism_ = 0; + isExternalSubmission_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4162,6 +4194,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (((bitField0_ & 0x00000200) == 0x00000200)) { output.writeInt32(13, vertexParallelism_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + output.writeBool(14, isExternalSubmission_); + } getUnknownFields().writeTo(output); } @@ -4223,6 +4258,10 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeInt32Size(13, vertexParallelism_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(14, isExternalSubmission_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4302,6 +4341,11 @@ public boolean equals(final java.lang.Object obj) { result = result && (getVertexParallelism() == other.getVertexParallelism()); } + result = result && (hasIsExternalSubmission() == other.hasIsExternalSubmission()); + if (hasIsExternalSubmission()) { + result = result && (getIsExternalSubmission() + == other.getIsExternalSubmission()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -4367,6 +4411,10 @@ public int hashCode() { hash = (37 * hash) + VERTEX_PARALLELISM_FIELD_NUMBER; hash = (53 * hash) + getVertexParallelism(); } + if (hasIsExternalSubmission()) { + hash = (37 * hash) + IS_EXTERNAL_SUBMISSION_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getIsExternalSubmission()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -4531,6 +4579,8 @@ public Builder clear() { } vertexParallelism_ = 0; bitField0_ = (bitField0_ & ~0x00001000); + isExternalSubmission_ = false; + bitField0_ = (bitField0_ & ~0x00002000); return this; } @@ -4634,6 +4684,10 @@ public Builder clone() { to_bitField0_ |= 0x00000200; } result.vertexParallelism_ = vertexParallelism_; + if (((from_bitField0_ & 0x00002000) == 0x00002000)) { + to_bitField0_ |= 0x00000400; + } + result.isExternalSubmission_ = isExternalSubmission_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -4768,6 +4822,9 @@ public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtoc if (other.hasVertexParallelism()) { setVertexParallelism(other.getVertexParallelism()); } + if (other.hasIsExternalSubmission()) { + setIsExternalSubmission(other.getIsExternalSubmission()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -6282,6 +6339,39 @@ public Builder clearVertexParallelism() { return this; } + // optional bool is_external_submission = 14 [default = false]; + private boolean isExternalSubmission_ ; + /** + * optional bool is_external_submission = 14 [default = false]; + */ + public boolean hasIsExternalSubmission() { + return ((bitField0_ & 0x00002000) == 0x00002000); + } + /** + * optional bool is_external_submission = 14 [default = false]; + */ + public boolean getIsExternalSubmission() { + return isExternalSubmission_; + } + /** + * optional bool is_external_submission = 14 [default = false]; + */ + public Builder setIsExternalSubmission(boolean value) { + bitField0_ |= 0x00002000; + isExternalSubmission_ = value; + onChanged(); + return this; + } + /** + * optional bool is_external_submission = 14 [default = false]; + */ + public Builder clearIsExternalSubmission() { + bitField0_ = (bitField0_ & ~0x00002000); + isExternalSubmission_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:SignableVertexSpec) } @@ -17411,83 +17501,85 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { descriptor; static { java.lang.String[] descriptorData = { - "\n\030LlapDaemonProtocol.proto\"9\n\020UserPayloa" + - "dProto\022\024\n\014user_payload\030\001 \001(\014\022\017\n\007version\030" + - "\002 \001(\005\"j\n\025EntityDescriptorProto\022\022\n\nclass_" + - "name\030\001 \001(\t\022\'\n\014user_payload\030\002 \001(\0132\021.UserP" + - "ayloadProto\022\024\n\014history_text\030\003 \001(\014\"x\n\013IOS" + - "pecProto\022\035\n\025connected_vertex_name\030\001 \001(\t\022" + - "-\n\rio_descriptor\030\002 \001(\0132\026.EntityDescripto" + - "rProto\022\033\n\023physical_edge_count\030\003 \001(\005\"z\n\023G" + - "roupInputSpecProto\022\022\n\ngroup_name\030\001 \001(\t\022\026" + - "\n\016group_vertices\030\002 \003(\t\0227\n\027merged_input_d", - "escriptor\030\003 \001(\0132\026.EntityDescriptorProto\"" + - "\245\003\n\022SignableVertexSpec\022\014\n\004user\030\001 \001(\t\022\026\n\016" + - "signatureKeyId\030\002 \001(\003\022/\n\020query_identifier" + - "\030\003 \001(\0132\025.QueryIdentifierProto\022\025\n\rhive_qu" + - "ery_id\030\004 \001(\t\022\020\n\010dag_name\030\005 \001(\t\022\023\n\013vertex" + - "_name\030\006 \001(\t\022\024\n\014vertex_index\030\007 \001(\005\022\030\n\020tok" + - "en_identifier\030\010 \001(\t\0224\n\024processor_descrip" + - "tor\030\t \001(\0132\026.EntityDescriptorProto\022!\n\013inp" + - "ut_specs\030\n \003(\0132\014.IOSpecProto\022\"\n\014output_s" + - "pecs\030\013 \003(\0132\014.IOSpecProto\0221\n\023grouped_inpu", - "t_specs\030\014 \003(\0132\024.GroupInputSpecProto\022\032\n\022v" + - "ertex_parallelism\030\r \001(\005\"K\n\016VertexOrBinar" + - "y\022#\n\006vertex\030\001 \001(\0132\023.SignableVertexSpec\022\024" + - "\n\014vertexBinary\030\002 \001(\014\"\344\001\n\023FragmentRuntime" + - "Info\022#\n\033num_self_and_upstream_tasks\030\001 \001(" + - "\005\022-\n%num_self_and_upstream_completed_tas" + - "ks\030\002 \001(\005\022\033\n\023within_dag_priority\030\003 \001(\005\022\026\n" + - "\016dag_start_time\030\004 \001(\003\022 \n\030first_attempt_s" + - "tart_time\030\005 \001(\003\022\"\n\032current_attempt_start" + - "_time\030\006 \001(\003\"d\n\024QueryIdentifierProto\022\035\n\025a", - "pplication_id_string\030\001 \001(\t\022\021\n\tdag_index\030" + - "\002 \001(\005\022\032\n\022app_attempt_number\030\003 \001(\005\"l\n\013Not" + - "TezEvent\022\037\n\027input_event_proto_bytes\030\001 \002(" + - "\014\022\023\n\013vertex_name\030\002 \002(\t\022\027\n\017dest_input_nam" + - "e\030\003 \002(\t\022\016\n\006key_id\030\004 \001(\005\"\330\002\n\026SubmitWorkRe" + - "questProto\022\"\n\twork_spec\030\001 \001(\0132\017.VertexOr" + - "Binary\022\033\n\023work_spec_signature\030\002 \001(\014\022\027\n\017f" + - "ragment_number\030\003 \001(\005\022\026\n\016attempt_number\030\004" + - " \001(\005\022\033\n\023container_id_string\030\005 \001(\t\022\017\n\007am_" + - "host\030\006 \001(\t\022\017\n\007am_port\030\007 \001(\005\022\032\n\022credentia", - "ls_binary\030\010 \001(\014\0223\n\025fragment_runtime_info" + - "\030\t \001(\0132\024.FragmentRuntimeInfo\022\033\n\023initial_" + - "event_bytes\030\n \001(\014\022\037\n\027initial_event_signa" + - "ture\030\013 \001(\014\"b\n\027SubmitWorkResponseProto\022/\n" + - "\020submission_state\030\001 \001(\0162\025.SubmissionStat" + - "eProto\022\026\n\016unique_node_id\030\002 \001(\t\"\205\001\n\036Sourc" + - "eStateUpdatedRequestProto\022/\n\020query_ident" + - "ifier\030\001 \001(\0132\025.QueryIdentifierProto\022\020\n\010sr" + - "c_name\030\002 \001(\t\022 \n\005state\030\003 \001(\0162\021.SourceStat" + - "eProto\"!\n\037SourceStateUpdatedResponseProt", - "o\"e\n\031QueryCompleteRequestProto\022/\n\020query_" + - "identifier\030\001 \001(\0132\025.QueryIdentifierProto\022" + - "\027\n\014delete_delay\030\002 \001(\003:\0010\"\034\n\032QueryComplet" + - "eResponseProto\"t\n\035TerminateFragmentReque" + - "stProto\022/\n\020query_identifier\030\001 \001(\0132\025.Quer" + - "yIdentifierProto\022\"\n\032fragment_identifier_" + - "string\030\002 \001(\t\" \n\036TerminateFragmentRespons" + - "eProto\"&\n\024GetTokenRequestProto\022\016\n\006app_id" + - "\030\001 \001(\t\"&\n\025GetTokenResponseProto\022\r\n\005token" + - "\030\001 \001(\014\"A\n\033LlapOutputSocketInitMessage\022\023\n", - "\013fragment_id\030\001 \002(\t\022\r\n\005token\030\002 \001(\014*2\n\020Sou" + - "rceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNN" + - "ING\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEPTE" + - "D\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n" + - "\022LlapDaemonProtocol\022?\n\nsubmitWork\022\027.Subm" + - "itWorkRequestProto\032\030.SubmitWorkResponseP" + - "roto\022W\n\022sourceStateUpdated\022\037.SourceState" + - "UpdatedRequestProto\032 .SourceStateUpdated" + - "ResponseProto\022H\n\rqueryComplete\022\032.QueryCo" + - "mpleteRequestProto\032\033.QueryCompleteRespon", - "seProto\022T\n\021terminateFragment\022\036.Terminate" + - "FragmentRequestProto\032\037.TerminateFragment" + - "ResponseProto2]\n\026LlapManagementProtocol\022" + - "C\n\022getDelegationToken\022\025.GetTokenRequestP" + - "roto\032\026.GetTokenResponseProtoBH\n&org.apac" + - "he.hadoop.hive.llap.daemon.rpcB\030LlapDaem" + - "onProtocolProtos\210\001\001\240\001\001" + "\n1llap-common/src/protobuf/LlapDaemonPro" + + "tocol.proto\"9\n\020UserPayloadProto\022\024\n\014user_" + + "payload\030\001 \001(\014\022\017\n\007version\030\002 \001(\005\"j\n\025Entity" + + "DescriptorProto\022\022\n\nclass_name\030\001 \001(\t\022\'\n\014u" + + "ser_payload\030\002 \001(\0132\021.UserPayloadProto\022\024\n\014" + + "history_text\030\003 \001(\014\"x\n\013IOSpecProto\022\035\n\025con" + + "nected_vertex_name\030\001 \001(\t\022-\n\rio_descripto" + + "r\030\002 \001(\0132\026.EntityDescriptorProto\022\033\n\023physi" + + "cal_edge_count\030\003 \001(\005\"z\n\023GroupInputSpecPr" + + "oto\022\022\n\ngroup_name\030\001 \001(\t\022\026\n\016group_vertice", + "s\030\002 \003(\t\0227\n\027merged_input_descriptor\030\003 \001(\013" + + "2\026.EntityDescriptorProto\"\314\003\n\022SignableVer" + + "texSpec\022\014\n\004user\030\001 \001(\t\022\026\n\016signatureKeyId\030" + + "\002 \001(\003\022/\n\020query_identifier\030\003 \001(\0132\025.QueryI" + + "dentifierProto\022\025\n\rhive_query_id\030\004 \001(\t\022\020\n" + + "\010dag_name\030\005 \001(\t\022\023\n\013vertex_name\030\006 \001(\t\022\024\n\014" + + "vertex_index\030\007 \001(\005\022\030\n\020token_identifier\030\010" + + " \001(\t\0224\n\024processor_descriptor\030\t \001(\0132\026.Ent" + + "ityDescriptorProto\022!\n\013input_specs\030\n \003(\0132" + + "\014.IOSpecProto\022\"\n\014output_specs\030\013 \003(\0132\014.IO", + "SpecProto\0221\n\023grouped_input_specs\030\014 \003(\0132\024" + + ".GroupInputSpecProto\022\032\n\022vertex_paralleli" + + "sm\030\r \001(\005\022%\n\026is_external_submission\030\016 \001(\010" + + ":\005false\"K\n\016VertexOrBinary\022#\n\006vertex\030\001 \001(" + + "\0132\023.SignableVertexSpec\022\024\n\014vertexBinary\030\002" + + " \001(\014\"\344\001\n\023FragmentRuntimeInfo\022#\n\033num_self" + + "_and_upstream_tasks\030\001 \001(\005\022-\n%num_self_an" + + "d_upstream_completed_tasks\030\002 \001(\005\022\033\n\023with" + + "in_dag_priority\030\003 \001(\005\022\026\n\016dag_start_time\030" + + "\004 \001(\003\022 \n\030first_attempt_start_time\030\005 \001(\003\022", + "\"\n\032current_attempt_start_time\030\006 \001(\003\"d\n\024Q" + + "ueryIdentifierProto\022\035\n\025application_id_st" + + "ring\030\001 \001(\t\022\021\n\tdag_index\030\002 \001(\005\022\032\n\022app_att" + + "empt_number\030\003 \001(\005\"l\n\013NotTezEvent\022\037\n\027inpu" + + "t_event_proto_bytes\030\001 \002(\014\022\023\n\013vertex_name" + + "\030\002 \002(\t\022\027\n\017dest_input_name\030\003 \002(\t\022\016\n\006key_i" + + "d\030\004 \001(\005\"\330\002\n\026SubmitWorkRequestProto\022\"\n\two" + + "rk_spec\030\001 \001(\0132\017.VertexOrBinary\022\033\n\023work_s" + + "pec_signature\030\002 \001(\014\022\027\n\017fragment_number\030\003" + + " \001(\005\022\026\n\016attempt_number\030\004 \001(\005\022\033\n\023containe", + "r_id_string\030\005 \001(\t\022\017\n\007am_host\030\006 \001(\t\022\017\n\007am" + + "_port\030\007 \001(\005\022\032\n\022credentials_binary\030\010 \001(\014\022" + + "3\n\025fragment_runtime_info\030\t \001(\0132\024.Fragmen" + + "tRuntimeInfo\022\033\n\023initial_event_bytes\030\n \001(" + + "\014\022\037\n\027initial_event_signature\030\013 \001(\014\"b\n\027Su" + + "bmitWorkResponseProto\022/\n\020submission_stat" + + "e\030\001 \001(\0162\025.SubmissionStateProto\022\026\n\016unique" + + "_node_id\030\002 \001(\t\"\205\001\n\036SourceStateUpdatedReq" + + "uestProto\022/\n\020query_identifier\030\001 \001(\0132\025.Qu" + + "eryIdentifierProto\022\020\n\010src_name\030\002 \001(\t\022 \n\005", + "state\030\003 \001(\0162\021.SourceStateProto\"!\n\037Source" + + "StateUpdatedResponseProto\"e\n\031QueryComple" + + "teRequestProto\022/\n\020query_identifier\030\001 \001(\013" + + "2\025.QueryIdentifierProto\022\027\n\014delete_delay\030" + + "\002 \001(\003:\0010\"\034\n\032QueryCompleteResponseProto\"t" + + "\n\035TerminateFragmentRequestProto\022/\n\020query" + + "_identifier\030\001 \001(\0132\025.QueryIdentifierProto" + + "\022\"\n\032fragment_identifier_string\030\002 \001(\t\" \n\036" + + "TerminateFragmentResponseProto\"&\n\024GetTok" + + "enRequestProto\022\016\n\006app_id\030\001 \001(\t\"&\n\025GetTok", + "enResponseProto\022\r\n\005token\030\001 \001(\014\"A\n\033LlapOu" + + "tputSocketInitMessage\022\023\n\013fragment_id\030\001 \002" + + "(\t\022\r\n\005token\030\002 \001(\014*2\n\020SourceStateProto\022\017\n" + + "\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024Submiss" + + "ionStateProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJECTED\020" + + "\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022LlapDaemonProto" + + "col\022?\n\nsubmitWork\022\027.SubmitWorkRequestPro" + + "to\032\030.SubmitWorkResponseProto\022W\n\022sourceSt" + + "ateUpdated\022\037.SourceStateUpdatedRequestPr" + + "oto\032 .SourceStateUpdatedResponseProto\022H\n", + "\rqueryComplete\022\032.QueryCompleteRequestPro" + + "to\032\033.QueryCompleteResponseProto\022T\n\021termi" + + "nateFragment\022\036.TerminateFragmentRequestP" + + "roto\032\037.TerminateFragmentResponseProto2]\n" + + "\026LlapManagementProtocol\022C\n\022getDelegation" + + "Token\022\025.GetTokenRequestProto\032\026.GetTokenR" + + "esponseProtoBH\n&org.apache.hadoop.hive.l" + + "lap.daemon.rpcB\030LlapDaemonProtocolProtos" + + "\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -17523,7 +17615,7 @@ private BlockingStub(com.google.protobuf.BlockingRpcChannel channel) { internal_static_SignableVertexSpec_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SignableVertexSpec_descriptor, - new java.lang.String[] { "User", "SignatureKeyId", "QueryIdentifier", "HiveQueryId", "DagName", "VertexName", "VertexIndex", "TokenIdentifier", "ProcessorDescriptor", "InputSpecs", "OutputSpecs", "GroupedInputSpecs", "VertexParallelism", }); + new java.lang.String[] { "User", "SignatureKeyId", "QueryIdentifier", "HiveQueryId", "DagName", "VertexName", "VertexIndex", "TokenIdentifier", "ProcessorDescriptor", "InputSpecs", "OutputSpecs", "GroupedInputSpecs", "VertexParallelism", "IsExternalSubmission", }); internal_static_VertexOrBinary_descriptor = getDescriptor().getMessageTypes().get(5); internal_static_VertexOrBinary_fieldAccessorTable = new diff --git llap-common/src/protobuf/LlapDaemonProtocol.proto llap-common/src/protobuf/LlapDaemonProtocol.proto index 3a3a2b8..e0c0070 100644 --- llap-common/src/protobuf/LlapDaemonProtocol.proto +++ llap-common/src/protobuf/LlapDaemonProtocol.proto @@ -67,6 +67,7 @@ message SignableVertexSpec repeated GroupInputSpecProto grouped_input_specs = 12; optional int32 vertex_parallelism = 13; // An internal field required for Tez. + optional bool is_external_submission = 14 [default = false]; } // Union diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index ce2f457..a6d9d54 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -60,6 +60,7 @@ private final LlapNodeId amNodeId; private final String appTokenIdentifier; private final Token appToken; + private final boolean isExternalQuery; // Map of states for different vertices. private final Set knownFragments = @@ -77,7 +78,8 @@ public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dag String[] localDirsBase, FileSystem localFs, String tokenUserName, String tokenAppId, final LlapNodeId amNodeId, String tokenIdentifier, - Token appToken) { + Token appToken, + boolean isExternalQuery) { this.queryIdentifier = queryIdentifier; this.appIdString = appIdString; this.dagIdString = dagIdString; @@ -93,6 +95,7 @@ public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dag this.amNodeId = amNodeId; this.appTokenIdentifier = tokenIdentifier; this.appToken = appToken; + this.isExternalQuery = isExternalQuery; final InetSocketAddress address = NetUtils.createSocketAddrForHost(amNodeId.getHostname(), amNodeId.getPort()); SecurityUtil.setTokenService(appToken, address); @@ -146,6 +149,10 @@ public void unregisterFragment(QueryFragmentInfo fragmentInfo) { return Lists.newArrayList(knownFragments); } + public boolean isExternalQuery() { + return isExternalQuery; + } + private synchronized void createLocalDirs() throws IOException { if (localDirs == null) { localDirs = new String[localDirsBase.length]; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index daeb555..464b678 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -89,7 +89,7 @@ private final Lock lock = new ReentrantLock(); - private final ConcurrentMap dagSpecificLocks = new ConcurrentHashMap<>(); + private final ConcurrentMap dagSpecificLocks = new ConcurrentHashMap<>(); // Tracks various maps for dagCompletions. This is setup here since stateChange messages // may be processed by a thread which ends up executing before a task. @@ -169,7 +169,8 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId new QueryInfo(queryIdentifier, appIdString, dagIdString, dagName, hiveQueryIdString, dagIdentifier, user, getSourceCompletionMap(queryIdentifier), localDirsBase, localFs, - tokenInfo.userName, tokenInfo.appId, amNodeId, vertex.getTokenIdentifier(), appToken); + tokenInfo.userName, tokenInfo.appId, amNodeId, vertex.getTokenIdentifier(), appToken, + vertex.getIsExternalSubmission()); QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo); if (old != null) { queryInfo = old; @@ -188,9 +189,11 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId if (LOG.isDebugEnabled()) { LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier); } - ShuffleHandler.get() - .registerDag(appIdString, dagIdentifier, appToken, - user, queryInfo.getLocalDirs()); + if (!vertex.getIsExternalSubmission()) { + ShuffleHandler.get() + .registerDag(appIdString, dagIdentifier, appToken, + user, queryInfo.getLocalDirs()); + } return queryInfo.registerFragment( vertexName, fragmentNumber, attemptNumber, vertex, fragmentIdString); @@ -212,6 +215,9 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { LOG.info("Ignoring fragmentComplete message for unknown query: {}", qId); } else { queryInfo.unregisterFragment(fragmentInfo); + + // Try marking the query as complete if this is an external submission + handleFragmentCompleteExternalQuery(queryInfo); } } @@ -237,46 +243,39 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { * @param deleteDelay */ QueryInfo queryComplete(QueryIdentifier queryIdentifier, long deleteDelay, - boolean isInternal) throws IOException { + boolean isExternalQuery) throws IOException { if (deleteDelay == -1) { deleteDelay = defaultDeleteDelaySeconds; } ReadWriteLock dagLock = getDagLock(queryIdentifier); dagLock.writeLock().lock(); try { - QueryInfo queryInfo = isInternal + // If isExternalQuery -> the call is from within hte daemon, so no permission check required + // to get access to the queryInfo instance. + QueryInfo queryInfo = isExternalQuery ? queryInfoMap.get(queryIdentifier) : checkPermissionsAndGetQuery(queryIdentifier); - rememberCompletedDag(queryIdentifier); - LOG.info("Processing queryComplete for queryIdentifier={} with deleteDelay={} seconds", queryIdentifier, - deleteDelay); - queryInfoMap.remove(queryIdentifier); if (queryInfo == null) { // Should not happen. LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier); return null; } - String[] localDirs = queryInfo.getLocalDirsNoCreate(); - if (localDirs != null) { - for (String localDir : localDirs) { - cleanupDir(localDir, deleteDelay); - ShuffleHandler.get().unregisterDag(localDir, queryInfo.getAppIdString(), queryInfo.getDagIdentifier()); - } - } - if (routeBasedLoggingEnabled) { - // Inform the routing purgePolicy. - // Send out a fake log message at the ERROR level with the MDC for this query setup. With an - // LLAP custom appender this message will not be logged. - final String dagId = queryInfo.getDagIdString(); - final String queryId = queryInfo.getHiveQueryIdString(); - MDC.put("dagId", dagId); - MDC.put("queryId", queryId); - try { - LOG.error(QUERY_COMPLETE_MARKER, "Ignore this. Log line to interact with logger." + - " Query complete: " + queryInfo.getHiveQueryIdString() + ", " + - queryInfo.getDagIdString()); - } finally { - MDC.clear(); + LOG.info( + "Processing queryComplete for queryIdentifier={}, isExternalQuery={}, with deleteDelay={} seconds", + queryIdentifier, isExternalQuery, + deleteDelay); + + queryInfoMap.remove(queryIdentifier); + if (!isExternalQuery) { + rememberCompletedDag(queryIdentifier); + cleanupLocalDirs(queryInfo, deleteDelay); + handleLogOnQueryCompletion(queryInfo.getHiveQueryIdString(), queryInfo.getDagIdString()); + } else { + // If there's no pending fragments, queue some of the cleanup for a later point - locks, log rolling. + if (queryInfo.getRegisteredFragments().size() == 0) { + executorService + .schedule(new ExternalQueryCleanerCallable(queryInfo.getHiveQueryIdString(), + queryInfo.getDagIdString(), queryInfo.getQueryIdentifier()), 1, TimeUnit.MINUTES); } } @@ -286,7 +285,9 @@ QueryInfo queryComplete(QueryIdentifier queryIdentifier, long deleteDelay, // should not be allowed after a query complete is received. sourceCompletionMap.remove(queryIdentifier); String savedQueryId = queryIdentifierToHiveQueryId.remove(queryIdentifier); - dagSpecificLocks.remove(queryIdentifier); + if (!isExternalQuery) { + removeQuerySpecificLock(queryIdentifier); + } if (savedQueryId != null) { ObjectCacheFactory.removeLlapQueryCache(savedQueryId); } @@ -297,6 +298,37 @@ QueryInfo queryComplete(QueryIdentifier queryIdentifier, long deleteDelay, } + private void cleanupLocalDirs(QueryInfo queryInfo, long deleteDelay) { + String[] localDirs = queryInfo.getLocalDirsNoCreate(); + if (localDirs != null) { + for (String localDir : localDirs) { + cleanupDir(localDir, deleteDelay); + ShuffleHandler.get().unregisterDag(localDir, queryInfo.getAppIdString(), queryInfo.getDagIdentifier()); + } + } + } + + private void handleLogOnQueryCompletion(String queryIdString, String dagIdString) { + if (routeBasedLoggingEnabled) { + // Inform the routing purgePolicy. + // Send out a fake log message at the ERROR level with the MDC for this query setup. With an + // LLAP custom appender this message will not be logged. + MDC.put("dagId", dagIdString); + MDC.put("queryId", queryIdString); + try { + LOG.error(QUERY_COMPLETE_MARKER, "Ignore this. Log line to interact with logger." + + " Query complete: " + queryIdString + ", " + + dagIdString); + } finally { + MDC.clear(); + } + } + } + + private void removeQuerySpecificLock(QueryIdentifier queryIdentifier) { + dagSpecificLocks.remove(queryIdentifier); + } + public void rememberCompletedDag(QueryIdentifier queryIdentifier) { if (completedDagMap.add(queryIdentifier)) { @@ -326,10 +358,10 @@ void registerSourceStateChange(QueryIdentifier queryIdentifier, String sourceNam } - private ReadWriteLock getDagLock(QueryIdentifier queryIdentifier) { + private ReentrantReadWriteLock getDagLock(QueryIdentifier queryIdentifier) { lock.lock(); try { - ReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier); + ReentrantReadWriteLock dagLock = dagSpecificLocks.get(queryIdentifier); if (dagLock == null) { dagLock = new ReentrantReadWriteLock(); dagSpecificLocks.put(queryIdentifier, dagLock); @@ -403,6 +435,44 @@ protected Void callInternal() { } } + private class ExternalQueryCleanerCallable extends CallableWithNdc { + + private final String queryIdString; + private final String dagIdString; + private final QueryIdentifier queryIdentifier; + + public ExternalQueryCleanerCallable(String queryIdString, String dagIdString, + QueryIdentifier queryIdentifier) { + this.queryIdString = queryIdString; + this.dagIdString = dagIdString; + this.queryIdentifier = queryIdentifier; + } + + @Override + protected Void callInternal() { + ReentrantReadWriteLock dagLock = getDagLock(queryIdentifier); + boolean locked = dagLock.writeLock().tryLock(); + if (!locked) { + // Something else holds the lock at the moment. Don't bother cleaning up. + return null; + } + try { + // See if there are additional knownFragments. If there are, more fragments came in + // after this cleanup was scheduled, and there's nothing to be done. + QueryInfo queryInfo = queryInfoMap.get(queryIdentifier); + if (queryInfo != null) { + // QueryInfo will only exist if more work came in, after this was scheduled. + return null; + } + handleLogOnQueryCompletion(queryIdString, dagIdString); + removeQuerySpecificLock(queryIdentifier); + } finally { + dagLock.writeLock().unlock(); + } + return null; + } + } + private QueryInfo checkPermissionsAndGetQuery(QueryIdentifier queryId) throws IOException { QueryInfo queryInfo = queryInfoMap.get(queryId); if (queryInfo == null) return null; @@ -414,4 +484,29 @@ private QueryInfo checkPermissionsAndGetQuery(QueryIdentifier queryId) throws IO public boolean checkPermissionsForQuery(QueryIdentifier queryId) throws IOException { return checkPermissionsAndGetQuery(queryId) != null; } + + + private void handleFragmentCompleteExternalQuery(QueryInfo queryInfo) { + if (queryInfo.isExternalQuery() && queryInfo.getRegisteredFragments().size() == 0) { + ReentrantReadWriteLock dagLock = getDagLock(queryInfo.getQueryIdentifier()); + if (dagLock == null) { + LOG.warn("Ignoring fragment completion for unknown query: {}", + queryInfo.getQueryIdentifier()); + } + boolean locked = dagLock.writeLock().tryLock(); + if (!locked) { + // Some other operation in progress using the same lock. + // A subsequent fragmentComplete is expected to come in. + return; + } + try { + queryComplete(queryInfo.getQueryIdentifier(), -1, true); + } catch (IOException e) { + LOG.error("Failed to process query complete for external submission: {}", + queryInfo.getQueryIdentifier()); + } finally { + dagLock.writeLock().unlock(); + } + } + } } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 27c426c..e3edf79 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -96,7 +96,7 @@ public static QueryInfo createQueryInfo() { new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name", "fakeHiveQueryId", 1, "fakeUser", new ConcurrentHashMap(), - new String[0], null, "fakeUser", null, nodeId, null, null); + new String[0], null, "fakeUser", null, nodeId, null, null, false); return queryInfo; } diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index 868eec7..168809e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -440,6 +440,7 @@ private SignedMessage createSignedVertexSpec(LlapSigner signer, TaskSpec taskSpe .setDagIndex(taskSpec.getDagIdentifier()).setAppAttemptNumber(0).build(); final SignableVertexSpec.Builder svsb = Converters.constructSignableVertexSpec( taskSpec, queryIdentifierProto, applicationId.toString(), queryUser, queryIdString); + svsb.setIsExternalSubmission(true); if (signer == null) { SignedMessage result = new SignedMessage(); result.message = serializeVertexSpec(svsb);